From 92dbc3e6fc38a493ac7fe5cb45ace6e7897cbb53 Mon Sep 17 00:00:00 2001 From: gammazero Date: Thu, 15 Jul 2021 02:40:08 -0700 Subject: [PATCH 1/9] Sync pinner on every pin operation by default This PR builds on PR #13 to sync the pinner on every pin operation. This PR does the following that #13 does not: - Sync's dag service separately from pin data - Does not release and immediately reacquire lock to sync, syncs while still holding pinner lock. - Syncs only pin data for most operations In addition to sync of pin data, this PR also revises how indexes are rebuilt. Instead of reading through all pins and loading all previous index data, now only a single pass through the pins is needed to rebuild missing indexes resulting from incomplete add or delete operations. This is operationally much simpler, but also does not require storing entire pin sets or index sets in memory as did the previous solution. --- dsindex/indexer.go | 32 ++-- dspinner/pin.go | 344 +++++++++++++++++++++++++----------------- dspinner/pin_test.go | 68 +++++++-- dspinner/sync_test.go | 88 +++++++++++ go.mod | 1 + go.sum | 42 +++++- 6 files changed, 402 insertions(+), 173 deletions(-) create mode 100644 dspinner/sync_test.go diff --git a/dsindex/indexer.go b/dsindex/indexer.go index e1119ac..884cd80 100644 --- a/dsindex/indexer.go +++ b/dsindex/indexer.go @@ -112,40 +112,30 @@ func (x *indexer) ForEach(ctx context.Context, key string, fn func(key, value st if err != nil { return err } + defer results.Close() - for { - r, ok := results.NextSync() - if !ok { - break + for r := range results.Next() { + if ctx.Err() != nil { + return ctx.Err() } if r.Error != nil { - err = r.Error - break - } - if ctx.Err() != nil { - err = ctx.Err() - break + return fmt.Errorf("cannot read index: %v", r.Error) } ent := r.Entry - var decIdx string - decIdx, err = decode(path.Base(path.Dir(ent.Key))) + decIdx, err := decode(path.Base(path.Dir(ent.Key))) if err != nil { - err = fmt.Errorf("cannot decode index: %v", err) - break + return fmt.Errorf("cannot decode index: %v", err) } - var decKey string - decKey, err = decode(path.Base(ent.Key)) + decKey, err := decode(path.Base(ent.Key)) if err != nil { - err = fmt.Errorf("cannot decode key: %v", err) - break + return fmt.Errorf("cannot decode key: %v", err) } if !fn(decIdx, decKey) { - break + return nil } } - results.Close() - return err + return nil } func (x *indexer) HasValue(ctx context.Context, key, value string) (bool, error) { diff --git a/dspinner/pin.go b/dspinner/pin.go index b793cf2..3fb15dc 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -28,6 +28,9 @@ const ( pinKeyPath = "/pins/pin" indexKeyPath = "/pins/index" dirtyKeyPath = "/pins/state/dirty" + + addOp = 1 + delOp = 2 ) var ( @@ -84,7 +87,8 @@ func init() { // pinner implements the Pinner interface type pinner struct { - lock sync.RWMutex + autoSync bool + lock sync.RWMutex dserv ipld.DAGService dstore ds.Datastore @@ -127,8 +131,13 @@ type syncDAGService interface { // New creates a new pinner and loads its keysets from the given datastore. If // there is no data present in the datastore, then an empty pinner is returned. -func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService) (ipfspinner.Pinner, error) { +// +// By default, changes are automatically flushed to the datastore. This can be +// disabled by calling SetAutosync(false), which will require that Flush be +// called explicitly. +func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService) (*pinner, error) { p := &pinner{ + autoSync: true, cidDIndex: dsindex.New(dstore, ds.NewKey(pinCidDIndexPath)), cidRIndex: dsindex.New(dstore, ds.NewKey(pinCidRIndexPath)), nameIndex: dsindex.New(dstore, ds.NewKey(pinNameIndexPath)), @@ -146,12 +155,7 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService) (ipfsp if data[0] == 1 { p.dirty = 1 - pins, err := p.loadAllPins(ctx) - if err != nil { - return nil, fmt.Errorf("cannot load pins: %v", err) - } - - err = p.rebuildIndexes(ctx, pins) + err = p.rebuildIndexes(ctx) if err != nil { return nil, fmt.Errorf("cannot rebuild indexes: %v", err) } @@ -160,6 +164,17 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService) (ipfsp return p, nil } +// SetAutosync allows auto-syncing to be enabled or disabled during runtime. +// This may be used to turn off autosync before doing many repeated pinning +// operations, and then turn it on after. Returns the previous value. +func (p *pinner) SetAutosync(auto bool) bool { + p.lock.Lock() + defer p.lock.Unlock() + + p.autoSync, auto = auto, p.autoSync + return auto +} + // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { err := p.dserv.Add(ctx, node) @@ -193,6 +208,9 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { return err } + // If autosyncing, sync dag service before making any change to pins + err = p.flushDagService(ctx, false) + // Only look again if something has changed. if p.dirty != dirtyBefore { found, err = p.cidRIndex.HasAny(ctx, cidKey) @@ -231,7 +249,7 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { return err } } - return nil + return p.flushPins(ctx, false) } func (p *pinner) addPin(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, name string) (string, error) { @@ -244,9 +262,24 @@ func (p *pinner) addPin(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, na return "", fmt.Errorf("could not encode pin: %v", err) } - p.setDirty(ctx, true) + p.setDirty(ctx) - // Store CID index + // Store the pin. Pin is stored first so that an incomplete add is + // detected by a pin that does not have an index. + err = p.dstore.Put(pp.dsKey(), pinData) + if err != nil { + if mode == ipfspinner.Recursive { + p.cidRIndex.Delete(ctx, c.KeyString(), pp.Id) + } else { + p.cidDIndex.Delete(ctx, c.KeyString(), pp.Id) + } + if name != "" { + p.nameIndex.Delete(ctx, name, pp.Id) + } + return "", err + } + + // Store CID index. switch mode { case ipfspinner.Recursive: err = p.cidRIndex.Add(ctx, c.KeyString(), pp.Id) @@ -263,36 +296,22 @@ func (p *pinner) addPin(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, na // Store name index err = p.nameIndex.Add(ctx, name, pp.Id) if err != nil { + if mode == ipfspinner.Recursive { + p.cidRIndex.Delete(ctx, c.KeyString(), pp.Id) + } else { + p.cidDIndex.Delete(ctx, c.KeyString(), pp.Id) + } return "", fmt.Errorf("could not add pin name index: %v", err) } } - // Store the pin. Pin must be stored after index for recovery to work. - err = p.dstore.Put(pp.dsKey(), pinData) - if err != nil { - if mode == ipfspinner.Recursive { - p.cidRIndex.Delete(ctx, c.KeyString(), pp.Id) - } else { - p.cidDIndex.Delete(ctx, c.KeyString(), pp.Id) - } - if name != "" { - p.nameIndex.Delete(ctx, name, pp.Id) - } - return "", err - } - return pp.Id, nil } func (p *pinner) removePin(ctx context.Context, pp *pin) error { - p.setDirty(ctx, true) + p.setDirty(ctx) + var err error - // Remove pin from datastore. Pin must be removed before index for - // recovery to work. - err := p.dstore.Delete(pp.dsKey()) - if err != nil { - return err - } // Remove cid index from datastore if pp.Mode == ipfspinner.Recursive { err = p.cidRIndex.Delete(ctx, pp.Cid.KeyString(), pp.Id) @@ -311,6 +330,13 @@ func (p *pinner) removePin(ctx context.Context, pp *pin) error { } } + // The pin is removed last so that an incomplete remove is detected by a + // pin that has a missing index. + err = p.dstore.Delete(pp.dsKey()) + if err != nil { + return err + } + return nil } @@ -347,12 +373,15 @@ func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { } } - _, err = p.removePinsForCid(ctx, c, ipfspinner.Any) + removed, err := p.removePinsForCid(ctx, c, ipfspinner.Any) if err != nil { return err } + if !removed { + return nil + } - return nil + return p.flushPins(ctx, false) } // IsPinned returns whether or not the given key is pinned @@ -542,7 +571,17 @@ func (p *pinner) RemovePinWithMode(c cid.Cid, mode ipfspinner.Mode) { p.lock.Lock() defer p.lock.Unlock() - p.removePinsForCid(ctx, c, mode) + removed, err := p.removePinsForCid(ctx, c, mode) + if err != nil { + log.Error("cound not remove pins: %s", err) + return + } + if !removed { + return + } + if err = p.flushPins(ctx, false); err != nil { + log.Error("cound not remove pins: %s", err) + } } // removePinsForCid removes all pins for a cid that has the specified mode. @@ -583,7 +622,7 @@ func (p *pinner) removePinsForCid(ctx context.Context, c cid.Cid, mode ipfspinne pp, err = p.loadPin(ctx, pid) if err != nil { if err == ds.ErrNotFound { - p.setDirty(ctx, true) + p.setDirty(ctx) // Fix index; remove index for pin that does not exist switch mode { case ipfspinner.Recursive: @@ -594,6 +633,12 @@ func (p *pinner) removePinsForCid(ctx context.Context, c cid.Cid, mode ipfspinne p.cidRIndex.DeleteKey(ctx, cidKey) p.cidDIndex.DeleteKey(ctx, cidKey) } + if err = p.flushPins(ctx, true); err != nil { + return false, err + } + // Mark this as removed since it removed an index, which is + // what prevents determines if an item is pinned. + removed = true log.Error("found CID index with missing pin") continue } @@ -619,93 +664,91 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) { return decodePin(pid, pinData) } -// loadAllPins loads all pins from the datastore. -func (p *pinner) loadAllPins(ctx context.Context) ([]*pin, error) { +// rebuildIndexes uses the stored pins to rebuild secondary indexes. This +// resolves any discrepancy between secondary indexes and pins that could +// result from a program termination between saving the two. +func (p *pinner) rebuildIndexes(ctx context.Context) error { + checkIncomplete := func(pp *pin, indexer dsindex.Indexer) (bool, error) { + var repaired bool + // Check that the indexer indexes this pin + indexKey := pp.Cid.KeyString() + ok, err := indexer.HasValue(ctx, indexKey, pp.Id) + if err != nil { + return false, err + } + if !ok { + // There was no index found for this pin. This was wither an + // incomplete add or and incomplete delete of a pin. Either way, + // restore the index to complete the add or to undo the incomplete + // delete. + if err = indexer.Add(ctx, indexKey, pp.Id); err != nil { + return false, err + } + repaired = true + } + // Check for missing name index + if pp.Name != "" { + ok, err = p.nameIndex.HasValue(ctx, pp.Name, pp.Id) + if err != nil { + return false, err + } + if !ok { + if err = p.nameIndex.Add(ctx, pp.Name, pp.Id); err != nil { + return false, err + } + } + repaired = true + } + return repaired, nil + } + + // Load all pins from the datastore. q := query.Query{ Prefix: pinKeyPath, } results, err := p.dstore.Query(q) if err != nil { - return nil, err - } - ents, err := results.Rest() - if err != nil { - return nil, err - } - if len(ents) == 0 { - return nil, nil + return err } + defer results.Close() + + var repairedCount int - pins := make([]*pin, len(ents)) - for i := range ents { + // Iterate all pins and check if the corresponding recursive or direct + // index is missing. If the index is missing then create the index. + for r := range results.Next() { if ctx.Err() != nil { - return nil, ctx.Err() + return ctx.Err() } - var p *pin - p, err = decodePin(path.Base(ents[i].Key), ents[i].Value) + if r.Error != nil { + return fmt.Errorf("cannot read index: %v", r.Error) + } + ent := r.Entry + pp, err := decodePin(path.Base(ent.Key), ent.Value) if err != nil { - return nil, err + return err } - pins[i] = p - } - return pins, nil -} -// rebuildIndexes uses the stored pins to rebuild secondary indexes. This -// resolves any discrepancy between secondary indexes and pins that could -// result from a program termination between saving the two. -func (p *pinner) rebuildIndexes(ctx context.Context, pins []*pin) error { - // Build temporary in-memory CID index from pins - dstoreMem := ds.NewMapDatastore() - tmpCidDIndex := dsindex.New(dstoreMem, ds.NewKey(pinCidDIndexPath)) - tmpCidRIndex := dsindex.New(dstoreMem, ds.NewKey(pinCidRIndexPath)) - tmpNameIndex := dsindex.New(dstoreMem, ds.NewKey(pinNameIndexPath)) - var hasNames bool - for _, pp := range pins { - if ctx.Err() != nil { - return ctx.Err() - } + var repaired bool if pp.Mode == ipfspinner.Recursive { - tmpCidRIndex.Add(ctx, pp.Cid.KeyString(), pp.Id) + repaired, err = checkIncomplete(pp, p.cidRIndex) + if err != nil { + return err + } } else if pp.Mode == ipfspinner.Direct { - tmpCidDIndex.Add(ctx, pp.Cid.KeyString(), pp.Id) - } - if pp.Name != "" { - tmpNameIndex.Add(ctx, pp.Name, pp.Id) - hasNames = true + repaired, err = checkIncomplete(pp, p.cidDIndex) + if err != nil { + return err + } } - } - - // Sync the CID index to what was build from pins. This fixes any invalid - // indexes, which could happen if ipfs was terminated between writing pin - // and writing secondary index. - changed, err := dsindex.SyncIndex(ctx, tmpCidRIndex, p.cidRIndex) - if err != nil { - return fmt.Errorf("cannot sync indexes: %v", err) - } - if changed { - log.Info("invalid recursive indexes detected - rebuilt") - } - - changed, err = dsindex.SyncIndex(ctx, tmpCidDIndex, p.cidDIndex) - if err != nil { - return fmt.Errorf("cannot sync indexes: %v", err) - } - if changed { - log.Info("invalid direct indexes detected - rebuilt") - } - if hasNames { - changed, err = dsindex.SyncIndex(ctx, tmpNameIndex, p.nameIndex) - if err != nil { - return fmt.Errorf("cannot sync name indexes: %v", err) - } - if changed { - log.Info("invalid name indexes detected - rebuilt") + if repaired { + repairedCount++ } } - return p.Flush(ctx) + log.Infof("repaired invalid indexes for %d pins", repairedCount) + return p.flushPins(ctx, true) } // DirectKeys returns a slice containing the directly pinned keys @@ -810,37 +853,55 @@ func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error return err } - if !unpin { - return nil - } - - _, err = p.removePinsForCid(ctx, from, ipfspinner.Recursive) - if err != nil { - return err + if unpin { + _, err = p.removePinsForCid(ctx, from, ipfspinner.Recursive) + if err != nil { + return err + } } - return nil + return p.flushPins(ctx, false) } -// Flush encodes and writes pinner keysets to the datastore -func (p *pinner) Flush(ctx context.Context) error { - p.lock.Lock() - defer p.lock.Unlock() - +func (p *pinner) flushDagService(ctx context.Context, force bool) error { + if !p.autoSync && !force { + return nil + } if syncDServ, ok := p.dserv.(syncDAGService); ok { if err := syncDServ.Sync(); err != nil { return fmt.Errorf("cannot sync pinned data: %v", err) } } + return nil +} - // Sync pins and indexes +func (p *pinner) flushPins(ctx context.Context, force bool) error { + if !p.autoSync && !force { + return nil + } if err := p.dstore.Sync(ds.NewKey(basePath)); err != nil { return fmt.Errorf("cannot sync pin state: %v", err) } + p.setClean(ctx) + return nil +} - p.setDirty(ctx, false) +// Flush encodes and writes pinner keysets to the datastore +func (p *pinner) Flush(ctx context.Context) error { + // If autosync is enabled, then this is not needed. + if p.autoSync { + return nil + } - return nil + p.lock.Lock() + defer p.lock.Unlock() + + err := p.flushDagService(ctx, true) + if err != nil { + return err + } + + return p.flushPins(ctx, true) } // PinWithMode allows the user to have fine grained control over pin @@ -869,6 +930,9 @@ func (p *pinner) PinWithMode(c cid.Cid, mode ipfspinner.Mode) { if err != nil { return } + if err = p.flushPins(ctx, false); err != nil { + log.Errorf("failed to create %s pin: %s", mode, err) + } } // hasChild recursively looks for a Cid among the children of a root Cid. @@ -914,26 +978,32 @@ func decodePin(pid string, data []byte) (*pin, error) { return p, nil } -// setDirty saves a boolean dirty flag in the datastore whenever there is a -// transition between a dirty (counter > 0) and non-dirty (counter == 0) state. -func (p *pinner) setDirty(ctx context.Context, dirty bool) { - isClean := p.dirty == p.clean - if dirty { - p.dirty++ - if !isClean { - return // do not save; was already dirty - } - } else if isClean { +// setDirty updates the dirty counter and saves a dirty state in the datastore +// if the state was previously clean +func (p *pinner) setDirty(ctx context.Context) { + wasClean := p.dirty == p.clean + p.dirty++ + + if !wasClean { + return // do not save; was already dirty + } + + data := []byte{1} + p.dstore.Put(dirtyKey, data) + p.dstore.Sync(dirtyKey) +} + +// setClean saves a clean state value in the datastore if the state was +// previously dirty +func (p *pinner) setClean(ctx context.Context) { + if p.dirty == p.clean { return // already clean - } else { - p.clean = p.dirty // set clean } - // Do edge-triggered write to datastore data := []byte{0} - if dirty { - data[0] = 1 - } p.dstore.Put(dirtyKey, data) - p.dstore.Sync(dirtyKey) + if err := p.dstore.Sync(dirtyKey); err != nil { + return + } + p.clean = p.dirty // set clean } diff --git a/dspinner/pin_test.go b/dspinner/pin_test.go index d8c4e95..d78ace7 100644 --- a/dspinner/pin_test.go +++ b/dspinner/pin_test.go @@ -281,15 +281,14 @@ func TestPinnerBasic(t *testing.T) { assertPinned(t, p, bk, "could not find recursively pinned node") // Remove the pin but not the index to simulate corruption - dsp := p.(*pinner) - ids, err := dsp.cidDIndex.Search(ctx, ak.KeyString()) + ids, err := p.cidDIndex.Search(ctx, ak.KeyString()) if err != nil { t.Fatal(err) } if len(ids) == 0 { t.Fatal("did not find pin for cid", ak.String()) } - pp, err := dsp.loadPin(ctx, ids[0]) + pp, err := p.loadPin(ctx, ids[0]) if err != nil { t.Fatal(err) } @@ -299,7 +298,7 @@ func TestPinnerBasic(t *testing.T) { if pp.Cid != ak { t.Error("loaded pin has wrong cid") } - err = dsp.dstore.Delete(pp.dsKey()) + err = p.dstore.Delete(pp.dsKey()) if err != nil { t.Fatal(err) } @@ -331,13 +330,11 @@ func TestAddLoadPin(t *testing.T) { dserv := mdag.NewDAGService(bserv) - ipfsPin, err := New(ctx, dstore, dserv) + p, err := New(ctx, dstore, dserv) if err != nil { t.Fatal(err) } - p := ipfsPin.(*pinner) - a, ak := randNode() dserv.Add(ctx, a) @@ -384,7 +381,7 @@ func TestRemovePinWithMode(t *testing.T) { p.Pin(ctx, a, false) - ok, err := p.(*pinner).removePinsForCid(ctx, ak, ipfspin.Recursive) + ok, err := p.removePinsForCid(ctx, ak, ipfspin.Recursive) if err != nil { t.Fatal(err) } @@ -642,6 +639,18 @@ func TestLoadDirty(t *testing.T) { if err != nil { t.Fatal(err) } + prev := p.SetAutosync(false) + if !prev { + t.Fatal("expected previous autosync to be true") + } + prev = p.SetAutosync(false) + if prev { + t.Fatal("expected previous autosync to be false") + } + prev = p.SetAutosync(true) + if prev { + t.Fatal("expected previous autosync to be false") + } a, ak := randNode() err = dserv.Add(ctx, a) @@ -660,10 +669,13 @@ func TestLoadDirty(t *testing.T) { cidBKey := bk.KeyString() // Corrupt index - cidRIndex := p.(*pinner).cidRIndex + cidRIndex := p.cidRIndex cidRIndex.DeleteKey(ctx, cidAKey) cidRIndex.Add(ctx, cidBKey, "not-a-pin-id") + // Force dirty, since Pin syncs automatically + p.setDirty(ctx) + // Verify dirty data, err := dstore.Get(dirtyKey) if err != nil { @@ -681,7 +693,8 @@ func TestLoadDirty(t *testing.T) { t.Fatal("index should be deleted") } - // Create new pinner on same datastore that was never flushed. + // Create new pinner on same datastore that was never flushed. This should + // detect the dirty flag and repair the indexes. p, err = New(ctx, dstore, dserv) if err != nil { t.Fatal(err) @@ -697,7 +710,7 @@ func TestLoadDirty(t *testing.T) { } // Verify index rebuilt - cidRIndex = p.(*pinner).cidRIndex + cidRIndex = p.cidRIndex has, err = cidRIndex.HasAny(ctx, cidAKey) if err != nil { t.Fatal(err) @@ -706,12 +719,12 @@ func TestLoadDirty(t *testing.T) { t.Fatal("index should have been rebuilt") } - has, err = cidRIndex.HasAny(ctx, cidBKey) + has, err = p.removePinsForCid(ctx, bk, ipfspin.Any) if err != nil { t.Fatal(err) } - if has { - t.Fatal("index should have been removed by rebuild") + if !has { + t.Fatal("expected Unpin to return true since index removed") } } @@ -939,6 +952,33 @@ func BenchmarkLoadRebuild(b *testing.B) { }) } +func BenchmarkRebuild(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dstore, dserv := makeStore() + + for pins := 4096; pins <= 65536; pins += 4096 { + pinner, err := New(ctx, dstore, dserv) + if err != nil { + panic(err.Error()) + } + nodes := makeNodes(4096, dserv) + pinNodes(nodes, pinner, true) + + b.Run(fmt.Sprintf("Rebuild %d", pins), func(b *testing.B) { + for i := 0; i < b.N; i++ { + dstore.Put(dirtyKey, []byte{1}) + + _, err = New(ctx, dstore, dserv) + if err != nil { + panic(err.Error()) + } + } + }) + } +} + // BenchmarkNthPins shows the time it takes to create/save 1 pin when a number // of other pins already exist. Each run in the series shows performance for // creating a pin in a larger number of existing pins. diff --git a/dspinner/sync_test.go b/dspinner/sync_test.go new file mode 100644 index 0000000..0f3c81a --- /dev/null +++ b/dspinner/sync_test.go @@ -0,0 +1,88 @@ +package dspinner + +import ( + "context" + "os" + "testing" + + bs "github.com/ipfs/go-blockservice" + ds "github.com/ipfs/go-datastore" + bds "github.com/ipfs/go-ds-badger" + lds "github.com/ipfs/go-ds-leveldb" + blockstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" + ipld "github.com/ipfs/go-ipld-format" + mdag "github.com/ipfs/go-merkledag" +) + +func makeStoreLevelDB(dir string) (ds.Datastore, ipld.DAGService) { + ldstore, err := lds.NewDatastore(dir, nil) + if err != nil { + panic(err) + } + // dstore := &batchWrap{ldstore} + dstore := ldstore + bstore := blockstore.NewBlockstore(dstore) + bserv := bs.New(bstore, offline.Exchange(bstore)) + dserv := mdag.NewDAGService(bserv) + return dstore, dserv +} + +func makeStoreBadger(dir string) (ds.Datastore, ipld.DAGService) { + bdstore, err := bds.NewDatastore(dir, nil) + if err != nil { + panic(err) + } + dstore := &batchWrap{bdstore} + bstore := blockstore.NewBlockstore(dstore) + bserv := bs.New(bstore, offline.Exchange(bstore)) + dserv := mdag.NewDAGService(bserv) + return dstore, dserv +} + +func benchAutoSync(b *testing.B, N int, auto bool, dstore ds.Datastore, dserv ipld.DAGService) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pinner, err := New(ctx, dstore, dserv) + if err != nil { + panic(err.Error()) + } + + nodes := makeNodes(N, dserv) + + pinner.SetAutosync(auto) + pinNodes(nodes, pinner, true) +} + +func BenchmarkSyncOnceBadger(b *testing.B) { + const dsDir = "b-once" + dstoreB1, dservB1 := makeStoreBadger(dsDir) + defer os.RemoveAll(dsDir) + benchAutoSync(b, b.N, false, dstoreB1, dservB1) + dstoreB1.Close() +} + +func BenchmarkSyncEveryBadger(b *testing.B) { + const dsDir = "b-every" + dstoreB2, dservB2 := makeStoreBadger(dsDir) + defer os.RemoveAll(dsDir) + benchAutoSync(b, b.N, true, dstoreB2, dservB2) + dstoreB2.Close() +} + +func BenchmarkSyncOnceLevelDB(b *testing.B) { + const dsDir = "l-once" + dstoreL1, dservL1 := makeStoreLevelDB(dsDir) + defer os.RemoveAll(dsDir) + benchAutoSync(b, b.N, false, dstoreL1, dservL1) + dstoreL1.Close() +} + +func BenchmarkSyncEveryLevelDB(b *testing.B) { + const dsDir = "l-every" + dstoreL2, dservL2 := makeStoreLevelDB(dsDir) + defer os.RemoveAll(dsDir) + benchAutoSync(b, b.N, true, dstoreL2, dservL2) + dstoreL2.Close() +} diff --git a/go.mod b/go.mod index dc042bc..8cd8b13 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/ipfs/go-blockservice v0.1.4 github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 + github.com/ipfs/go-ds-badger v0.2.6 github.com/ipfs/go-ds-leveldb v0.4.2 github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 diff --git a/go.sum b/go.sum index 5616985..f0d5c9b 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,13 @@ github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M= +github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50 h1:4i3KsuVA0o0KoBxAC5x+MY7RbteiMK1V7gf/G08NGIQ= @@ -15,8 +20,14 @@ github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVa github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -24,7 +35,14 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= +github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8= +github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrVH//y2UQE= +github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= +github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -49,11 +67,13 @@ github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmv github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= @@ -82,6 +102,8 @@ github.com/ipfs/go-datastore v0.4.5/go.mod h1:eXTcaaiN6uOlVCLS9GjJUJtlvJfM3xk23w github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= +github.com/ipfs/go-ds-badger v0.2.6 h1:Hy8jw4rifxtRDrqpvC1yh36oIyE37KDzsUzlHUPOFiU= +github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.2 h1:QmQoAJ9WkPMUfBLnu1sBVy0xWWlJPg0m4kRAiJL9iaw= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= @@ -231,6 +253,7 @@ github.com/libp2p/go-ws-transport v0.1.0/go.mod h1:rjw1MG1LU9YDC6gzmwObkPd/Sqwhw github.com/libp2p/go-yamux v1.2.2/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow= github.com/libp2p/go-yamux v1.2.3 h1:xX8A36vpXb59frIzWFdEgptLMsOANMFq2K7fPRlunYI= github.com/libp2p/go-yamux v1.2.3/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= @@ -243,6 +266,8 @@ github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+ github.com/minio/sha256-simd v0.1.0/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771 h1:MHkK1uRtFbVqvAgvWxafZe54+5uBxLluGylDiKgdhwo= github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= @@ -285,6 +310,7 @@ github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -293,6 +319,7 @@ github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992/go.mod h1:uIp+gprXx github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1 h1:CskT+S6Ay54OwxBGB0R3Rsx4Muto6UnEYTyKJbyRIAI= github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa h1:E+gaaifzi2xF65PbDmuKI3PhLWY6G5opMLniFq8vmXA= @@ -301,14 +328,23 @@ github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQ github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436 h1:qOpVTI+BrstcjTZLm2Yz/3sOnqkzj3FQoh0g+E5s3Gc= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= @@ -320,6 +356,7 @@ github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvX github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= @@ -329,6 +366,7 @@ go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9E go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -353,14 +391,16 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190524122548-abf6ff778158/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190610200419-93c9922d18ae h1:xiXzMMEQdQcric9hXtr1QU98MHunKK7OTtsoU6bYWs4= golang.org/x/sys v0.0.0-20190610200419-93c9922d18ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= From f4578ef90ebfd7e9ae431dde77641627b8c549a8 Mon Sep 17 00:00:00 2001 From: gammazero Date: Sun, 18 Jul 2021 17:17:35 -0700 Subject: [PATCH 2/9] Fix error rebuilding pin indexes when dirty flag detected Fixes an error that would cause all indexes to be rebuilt whether needed or not when dirty flag detected. This would greatly prolong the rebuilding process as every index for every pin would be rewritten. No other problems resulted, othen than the unnesessary rebuild of all indexes and large amount of memory used during rebuild for large pin sets. Tests have been added to verify the correct functioning of the indexer and rebuilding process. --- dspinner/pin.go | 172 ++++++++++++++--------------- dspinner/pin_test.go | 253 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 309 insertions(+), 116 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index 3fb15dc..0c7f488 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -117,7 +117,7 @@ func (p *pin) dsKey() ds.Key { func newPin(c cid.Cid, mode ipfspinner.Mode, name string) *pin { return &pin{ - Id: ds.RandomKey().String(), + Id: path.Base(ds.RandomKey().String()), Cid: c, Name: name, Mode: mode, @@ -664,93 +664,6 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) { return decodePin(pid, pinData) } -// rebuildIndexes uses the stored pins to rebuild secondary indexes. This -// resolves any discrepancy between secondary indexes and pins that could -// result from a program termination between saving the two. -func (p *pinner) rebuildIndexes(ctx context.Context) error { - checkIncomplete := func(pp *pin, indexer dsindex.Indexer) (bool, error) { - var repaired bool - // Check that the indexer indexes this pin - indexKey := pp.Cid.KeyString() - ok, err := indexer.HasValue(ctx, indexKey, pp.Id) - if err != nil { - return false, err - } - if !ok { - // There was no index found for this pin. This was wither an - // incomplete add or and incomplete delete of a pin. Either way, - // restore the index to complete the add or to undo the incomplete - // delete. - if err = indexer.Add(ctx, indexKey, pp.Id); err != nil { - return false, err - } - repaired = true - } - // Check for missing name index - if pp.Name != "" { - ok, err = p.nameIndex.HasValue(ctx, pp.Name, pp.Id) - if err != nil { - return false, err - } - if !ok { - if err = p.nameIndex.Add(ctx, pp.Name, pp.Id); err != nil { - return false, err - } - } - repaired = true - } - return repaired, nil - } - - // Load all pins from the datastore. - q := query.Query{ - Prefix: pinKeyPath, - } - results, err := p.dstore.Query(q) - if err != nil { - return err - } - defer results.Close() - - var repairedCount int - - // Iterate all pins and check if the corresponding recursive or direct - // index is missing. If the index is missing then create the index. - for r := range results.Next() { - if ctx.Err() != nil { - return ctx.Err() - } - if r.Error != nil { - return fmt.Errorf("cannot read index: %v", r.Error) - } - ent := r.Entry - pp, err := decodePin(path.Base(ent.Key), ent.Value) - if err != nil { - return err - } - - var repaired bool - if pp.Mode == ipfspinner.Recursive { - repaired, err = checkIncomplete(pp, p.cidRIndex) - if err != nil { - return err - } - } else if pp.Mode == ipfspinner.Direct { - repaired, err = checkIncomplete(pp, p.cidDIndex) - if err != nil { - return err - } - } - - if repaired { - repairedCount++ - } - } - - log.Infof("repaired invalid indexes for %d pins", repairedCount) - return p.flushPins(ctx, true) -} - // DirectKeys returns a slice containing the directly pinned keys func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { p.lock.RLock() @@ -1007,3 +920,86 @@ func (p *pinner) setClean(ctx context.Context) { } p.clean = p.dirty // set clean } + +// rebuildIndexes uses the stored pins to rebuild secondary indexes. This +// resolves any discrepancy between secondary indexes and pins that could +// result from a program termination between saving the two. +func (p *pinner) rebuildIndexes(ctx context.Context) error { + // Load all pins from the datastore. + q := query.Query{ + Prefix: pinKeyPath, + } + results, err := p.dstore.Query(q) + if err != nil { + return err + } + defer results.Close() + + var checkedCount, repairedCount int + + // Iterate all pins and check if the corresponding recursive or direct + // index is missing. If the index is missing then create the index. + for r := range results.Next() { + if ctx.Err() != nil { + return ctx.Err() + } + if r.Error != nil { + return fmt.Errorf("cannot read index: %v", r.Error) + } + ent := r.Entry + pp, err := decodePin(path.Base(ent.Key), ent.Value) + if err != nil { + return err + } + + var indexer dsindex.Indexer + if pp.Mode == ipfspinner.Recursive { + indexer = p.cidRIndex + } else if pp.Mode == ipfspinner.Direct { + indexer = p.cidDIndex + } else { + log.Error("unrecognized pin mode:", pp.Mode) + continue + } + + // Check that the indexer indexes this pin + indexKey := pp.Cid.KeyString() + ok, err := indexer.HasValue(ctx, indexKey, pp.Id) + if err != nil { + return err + } + + var repaired bool + if !ok { + log.Errorf("missing index for cid: %s", pp.Cid.String()) + // There was no index found for this pin. This was wither an + // incomplete add or and incomplete delete of a pin. Either way, + // restore the index to complete the add or to undo the incomplete + // delete. + if err = indexer.Add(ctx, indexKey, pp.Id); err != nil { + return err + } + repaired = true + } + // Check for missing name index + if pp.Name != "" { + ok, err = p.nameIndex.HasValue(ctx, pp.Name, pp.Id) + if err != nil { + return err + } + if !ok { + if err = p.nameIndex.Add(ctx, pp.Name, pp.Id); err != nil { + return err + } + } + repaired = true + } + if repaired { + repairedCount++ + } + checkedCount++ + } + + log.Infof("checked %d pins for invalid indexes, repaired %d pins", checkedCount, repairedCount) + return p.flushPins(ctx, true) +} diff --git a/dspinner/pin_test.go b/dspinner/pin_test.go index d78ace7..ebb72ea 100644 --- a/dspinner/pin_test.go +++ b/dspinner/pin_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "path" "testing" "time" @@ -13,6 +14,7 @@ import ( cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" dssync "github.com/ipfs/go-datastore/sync" lds "github.com/ipfs/go-ds-leveldb" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -916,7 +918,7 @@ func makeStore() (ds.Datastore, ipld.DAGService) { // BenchmarkLoadRebuild loads a pinner that has some number of saved pins, and // compares the load time when rebuilding indexes to loading without rebuilding // indexes. -func BenchmarkLoadRebuild(b *testing.B) { +func BenchmarkLoad(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -952,33 +954,6 @@ func BenchmarkLoadRebuild(b *testing.B) { }) } -func BenchmarkRebuild(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dstore, dserv := makeStore() - - for pins := 4096; pins <= 65536; pins += 4096 { - pinner, err := New(ctx, dstore, dserv) - if err != nil { - panic(err.Error()) - } - nodes := makeNodes(4096, dserv) - pinNodes(nodes, pinner, true) - - b.Run(fmt.Sprintf("Rebuild %d", pins), func(b *testing.B) { - for i := 0; i < b.N; i++ { - dstore.Put(dirtyKey, []byte{1}) - - _, err = New(ctx, dstore, dserv) - if err != nil { - panic(err.Error()) - } - } - }) - } -} - // BenchmarkNthPins shows the time it takes to create/save 1 pin when a number // of other pins already exist. Each run in the series shows performance for // creating a pin in a larger number of existing pins. @@ -1137,3 +1112,225 @@ func benchmarkPinAll(b *testing.B, count int, pinner ipfspin.Pinner, dserv ipld. b.StartTimer() } } + +func BenchmarkRebuild(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dstore, dserv := makeStore() + pinIncr := 32768 + + for pins := pinIncr; pins <= pinIncr*5; pins += pinIncr { + pinner, err := New(ctx, dstore, dserv) + if err != nil { + panic(err.Error()) + } + nodes := makeNodes(pinIncr, dserv) + pinNodes(nodes, pinner, true) + + b.Run(fmt.Sprintf("Rebuild %d", pins), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + dstore.Put(dirtyKey, []byte{1}) + + _, err = New(ctx, dstore, dserv) + if err != nil { + panic(err.Error()) + } + } + }) + } +} + +func TestCidIndex(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dstore, dserv := makeStore() + pinner, err := New(ctx, dstore, dserv) + if err != nil { + t.Fatal(err) + } + nodes := makeNodes(1, dserv) + node := nodes[0] + + c := node.Cid() + cidKey := c.KeyString() + + // Pin the cid + pid, err := pinner.addPin(ctx, c, ipfspin.Recursive, "") + if err != nil { + t.Fatal(err) + } + + t.Log("Added pin:", pid) + t.Log("CID index:", c.String(), "-->", pid) + + // Check that the index exists + ok, err := pinner.cidRIndex.HasAny(ctx, cidKey) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("R-index has no value for", cidKey) + } + + // Check that searching for the cid returns a value + values, err := pinner.cidRIndex.Search(ctx, cidKey) + if err != nil { + t.Fatal(err) + } + if len(values) != 1 { + t.Fatal("expect index to return one value") + } + if values[0] != pid { + t.Fatal("indexer should have has value", cidKey, "-->", pid) + } + + // Check that index has specific value + ok, err = pinner.cidRIndex.HasValue(ctx, cidKey, pid) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("indexer should have has value", cidKey, "-->", pid) + } + + // Iterate values of index + var seen bool + err = pinner.cidRIndex.ForEach(ctx, "", func(key, value string) bool { + if seen { + t.Fatal("expected one key-value pair") + } + if key != cidKey { + t.Fatal("unexpected key:", key) + } + if value != pid { + t.Fatal("unexpected value:", value) + } + seen = true + return true + }) + if err != nil { + t.Fatal(err) + } + + // Load all pins from the datastore. + q := query.Query{ + Prefix: pinKeyPath, + } + results, err := pinner.dstore.Query(q) + if err != nil { + t.Fatal(err) + } + defer results.Close() + + // Iterate all pins and check if the corresponding recursive or direct + // index is missing. If the index is missing then create the index. + seen = false + for r := range results.Next() { + if seen { + t.Fatal("has more than one pin") + } + if r.Error != nil { + t.Fatal(fmt.Errorf("cannot read index: %v", r.Error)) + } + ent := r.Entry + pp, err := decodePin(path.Base(ent.Key), ent.Value) + if err != nil { + t.Fatal(err) + } + t.Log("Found pin:", pp.Id) + if pp.Id != pid { + t.Fatal("ID of loaded pin is not the same known to indexer") + } + seen = true + } +} + +func TestRebuild(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dstore, dserv := makeStore() + pinner, err := New(ctx, dstore, dserv) + if err != nil { + t.Fatal(err) + } + nodes := makeNodes(3, dserv) + pinNodes(nodes, pinner, true) + + c1 := nodes[0].Cid() + cid1Key := c1.KeyString() + c2 := nodes[1].Cid() + cid2Key := c2.KeyString() + c3 := nodes[2].Cid() + cid3Key := c3.KeyString() + + // Get pin IDs + values, err := pinner.cidRIndex.Search(ctx, cid1Key) + if err != nil { + t.Fatal(err) + } + pid1 := values[0] + values, err = pinner.cidRIndex.Search(ctx, cid2Key) + if err != nil { + t.Fatal(err) + } + pid2 := values[0] + values, err = pinner.cidRIndex.Search(ctx, cid3Key) + if err != nil { + t.Fatal(err) + } + pid3 := values[0] + + // Corrupt index by deleting cid index 2 to simulate an incomplete add or delete + pinner.cidRIndex.DeleteKey(ctx, cid2Key) + + // Corrupt index by deleting pin to simulate corruption + var pp *pin + pp, err = pinner.loadPin(ctx, pid3) + if err != nil { + t.Fatal(err) + } + err = pinner.dstore.Delete(pp.dsKey()) + if err != nil { + t.Fatal(err) + } + + pinner.setDirty(ctx) + + // Rebuild indexes + pinner, err = New(ctx, dstore, dserv) + if err != nil { + t.Fatal(err) + } + + // Verify that indexes have same values as before + err = verifyIndexValue(ctx, pinner, cid1Key, pid1) + if err != nil { + t.Fatal(err) + } + err = verifyIndexValue(ctx, pinner, cid2Key, pid2) + if err != nil { + t.Fatal(err) + } + err = verifyIndexValue(ctx, pinner, cid3Key, pid3) + if err != nil { + t.Fatal(err) + } +} + +func verifyIndexValue(ctx context.Context, pinner *pinner, cidKey, expectedPid string) error { + values, err := pinner.cidRIndex.Search(ctx, cidKey) + if err != nil { + return err + } + if len(values) != 1 { + return errors.New("expected 1 value") + } + if expectedPid != values[0] { + return errors.New("index has wrong value") + } + return nil +} From 62b7c582080ad08315e8399cea8ef091110ad124 Mon Sep 17 00:00:00 2001 From: gammazero Date: Sun, 18 Jul 2021 17:46:05 -0700 Subject: [PATCH 3/9] remove unneeded vals --- dspinner/pin.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index 0c7f488..5a01f8e 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -28,9 +28,6 @@ const ( pinKeyPath = "/pins/pin" indexKeyPath = "/pins/index" dirtyKeyPath = "/pins/state/dirty" - - addOp = 1 - delOp = 2 ) var ( @@ -210,6 +207,9 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { // If autosyncing, sync dag service before making any change to pins err = p.flushDagService(ctx, false) + if err != nil { + return err + } // Only look again if something has changed. if p.dirty != dirtyBefore { From 54fd055883bdc2a4fc6f3c5675fc1af39b3c67fc Mon Sep 17 00:00:00 2001 From: gammazero Date: Sun, 18 Jul 2021 18:04:17 -0700 Subject: [PATCH 4/9] Additional error checks --- dspinner/pin.go | 70 +++++++++++++++++++++++++++++++++++--------- dspinner/pin_test.go | 60 +++++++++++++++++++++++++++++-------- 2 files changed, 104 insertions(+), 26 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index 5a01f8e..fc4df84 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -228,7 +228,10 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { return err } if found { - p.removePinsForCid(ctx, c, ipfspinner.Direct) + _, err = p.removePinsForCid(ctx, c, ipfspinner.Direct) + if err != nil { + return err + } } _, err = p.addPin(ctx, c, ipfspinner.Recursive, "") @@ -269,12 +272,21 @@ func (p *pinner) addPin(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, na err = p.dstore.Put(pp.dsKey(), pinData) if err != nil { if mode == ipfspinner.Recursive { - p.cidRIndex.Delete(ctx, c.KeyString(), pp.Id) + e := p.cidRIndex.Delete(ctx, c.KeyString(), pp.Id) + if e != nil { + log.Errorf("error deleting index: %s", e) + } } else { - p.cidDIndex.Delete(ctx, c.KeyString(), pp.Id) + e := p.cidDIndex.Delete(ctx, c.KeyString(), pp.Id) + if e != nil { + log.Errorf("error deleting index: %s", e) + } } if name != "" { - p.nameIndex.Delete(ctx, name, pp.Id) + e := p.nameIndex.Delete(ctx, name, pp.Id) + if e != nil { + log.Errorf("error deleting index: %s", e) + } } return "", err } @@ -297,9 +309,15 @@ func (p *pinner) addPin(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, na err = p.nameIndex.Add(ctx, name, pp.Id) if err != nil { if mode == ipfspinner.Recursive { - p.cidRIndex.Delete(ctx, c.KeyString(), pp.Id) + e := p.cidRIndex.Delete(ctx, c.KeyString(), pp.Id) + if e != nil { + log.Errorf("error deleting index: %s", e) + } } else { - p.cidDIndex.Delete(ctx, c.KeyString(), pp.Id) + e := p.cidDIndex.Delete(ctx, c.KeyString(), pp.Id) + if e != nil { + log.Errorf("error deleting index: %s", e) + } } return "", fmt.Errorf("could not add pin name index: %v", err) } @@ -626,12 +644,24 @@ func (p *pinner) removePinsForCid(ctx context.Context, c cid.Cid, mode ipfspinne // Fix index; remove index for pin that does not exist switch mode { case ipfspinner.Recursive: - p.cidRIndex.DeleteKey(ctx, cidKey) + _, err = p.cidRIndex.DeleteKey(ctx, cidKey) + if err != nil { + return false, fmt.Errorf("error deleting index: %s", err) + } case ipfspinner.Direct: - p.cidDIndex.DeleteKey(ctx, cidKey) + _, err = p.cidDIndex.DeleteKey(ctx, cidKey) + if err != nil { + return false, fmt.Errorf("error deleting index: %s", err) + } case ipfspinner.Any: - p.cidRIndex.DeleteKey(ctx, cidKey) - p.cidDIndex.DeleteKey(ctx, cidKey) + _, err = p.cidRIndex.DeleteKey(ctx, cidKey) + if err != nil { + return false, fmt.Errorf("error deleting index: %s", err) + } + _, err = p.cidDIndex.DeleteKey(ctx, cidKey) + if err != nil { + return false, fmt.Errorf("error deleting index: %s", err) + } } if err = p.flushPins(ctx, true); err != nil { return false, err @@ -902,8 +932,15 @@ func (p *pinner) setDirty(ctx context.Context) { } data := []byte{1} - p.dstore.Put(dirtyKey, data) - p.dstore.Sync(dirtyKey) + err := p.dstore.Put(dirtyKey, data) + if err != nil { + log.Errorf("failed to set pin dirty flag: %s", err) + return + } + err = p.dstore.Sync(dirtyKey) + if err != nil { + log.Errorf("failed to sync pin dirty flag: %s", err) + } } // setClean saves a clean state value in the datastore if the state was @@ -914,8 +951,13 @@ func (p *pinner) setClean(ctx context.Context) { } data := []byte{0} - p.dstore.Put(dirtyKey, data) - if err := p.dstore.Sync(dirtyKey); err != nil { + err := p.dstore.Put(dirtyKey, data) + if err != nil { + log.Errorf("failed to set clear dirty flag: %s", err) + return + } + if err = p.dstore.Sync(dirtyKey); err != nil { + log.Errorf("failed to sync cleared pin dirty flag: %s", err) return } p.clean = p.dirty // set clean diff --git a/dspinner/pin_test.go b/dspinner/pin_test.go index ebb72ea..cb58dcb 100644 --- a/dspinner/pin_test.go +++ b/dspinner/pin_test.go @@ -164,11 +164,20 @@ func TestPinnerBasic(t *testing.T) { assertPinnedWithType(t, p, bk, ipfspin.Recursive, "Recursively pinned node not found") d, _ := randNode() - d.AddNodeLink("a", a) - d.AddNodeLink("c", c) + err = d.AddNodeLink("a", a) + if err != nil { + panic(err) + } + err = d.AddNodeLink("c", c) + if err != nil { + panic(err) + } e, _ := randNode() - d.AddNodeLink("e", e) + err = d.AddNodeLink("e", e) + if err != nil { + panic(err) + } // Must be in dagserv for unpin to work err = dserv.Add(ctx, e) @@ -338,7 +347,10 @@ func TestAddLoadPin(t *testing.T) { } a, ak := randNode() - dserv.Add(ctx, a) + err = dserv.Add(ctx, a) + if err != nil { + panic(err) + } mode := ipfspin.Recursive name := "my-pin" @@ -379,9 +391,15 @@ func TestRemovePinWithMode(t *testing.T) { } a, ak := randNode() - dserv.Add(ctx, a) + err = dserv.Add(ctx, a) + if err != nil { + panic(err) + } - p.Pin(ctx, a, false) + err = p.Pin(ctx, a, false) + if err != nil { + t.Fatal(err) + } ok, err := p.removePinsForCid(ctx, ak, ipfspin.Recursive) if err != nil { @@ -672,8 +690,14 @@ func TestLoadDirty(t *testing.T) { // Corrupt index cidRIndex := p.cidRIndex - cidRIndex.DeleteKey(ctx, cidAKey) - cidRIndex.Add(ctx, cidBKey, "not-a-pin-id") + _, err = cidRIndex.DeleteKey(ctx, cidAKey) + if err != nil { + t.Fatal(err) + } + err = cidRIndex.Add(ctx, cidBKey, "not-a-pin-id") + if err != nil { + t.Fatal(err) + } // Force dirty, since Pin syncs automatically p.setDirty(ctx) @@ -933,7 +957,10 @@ func BenchmarkLoad(b *testing.B) { b.Run("RebuildTrue", func(b *testing.B) { for i := 0; i < b.N; i++ { - dstore.Put(dirtyKey, []byte{1}) + err = dstore.Put(dirtyKey, []byte{1}) + if err != nil { + panic(err.Error()) + } _, err = New(ctx, dstore, dserv) if err != nil { @@ -944,7 +971,10 @@ func BenchmarkLoad(b *testing.B) { b.Run("RebuildFalse", func(b *testing.B) { for i := 0; i < b.N; i++ { - dstore.Put(dirtyKey, []byte{0}) + err = dstore.Put(dirtyKey, []byte{0}) + if err != nil { + panic(err.Error()) + } _, err = New(ctx, dstore, dserv) if err != nil { @@ -1131,7 +1161,10 @@ func BenchmarkRebuild(b *testing.B) { b.Run(fmt.Sprintf("Rebuild %d", pins), func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - dstore.Put(dirtyKey, []byte{1}) + err = dstore.Put(dirtyKey, []byte{1}) + if err != nil { + panic(err.Error()) + } _, err = New(ctx, dstore, dserv) if err != nil { @@ -1285,7 +1318,10 @@ func TestRebuild(t *testing.T) { pid3 := values[0] // Corrupt index by deleting cid index 2 to simulate an incomplete add or delete - pinner.cidRIndex.DeleteKey(ctx, cid2Key) + _, err = pinner.cidRIndex.DeleteKey(ctx, cid2Key) + if err != nil { + t.Fatal(err) + } // Corrupt index by deleting pin to simulate corruption var pp *pin From cb1434ca93785315b2cb629fb19846fd97b9726d Mon Sep 17 00:00:00 2001 From: gammazero Date: Mon, 19 Jul 2021 13:20:21 -0700 Subject: [PATCH 5/9] Review changes --- dspinner/pin.go | 41 +++++++++++++++-------------------------- dspinner/pin_test.go | 13 +++++++++++++ 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index fc4df84..6ff5fa5 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -267,31 +267,13 @@ func (p *pinner) addPin(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, na p.setDirty(ctx) - // Store the pin. Pin is stored first so that an incomplete add is - // detected by a pin that does not have an index. + // Store the pin err = p.dstore.Put(pp.dsKey(), pinData) if err != nil { - if mode == ipfspinner.Recursive { - e := p.cidRIndex.Delete(ctx, c.KeyString(), pp.Id) - if e != nil { - log.Errorf("error deleting index: %s", e) - } - } else { - e := p.cidDIndex.Delete(ctx, c.KeyString(), pp.Id) - if e != nil { - log.Errorf("error deleting index: %s", e) - } - } - if name != "" { - e := p.nameIndex.Delete(ctx, name, pp.Id) - if e != nil { - log.Errorf("error deleting index: %s", e) - } - } return "", err } - // Store CID index. + // Store CID index switch mode { case ipfspinner.Recursive: err = p.cidRIndex.Add(ctx, c.KeyString(), pp.Id) @@ -831,11 +813,6 @@ func (p *pinner) flushPins(ctx context.Context, force bool) error { // Flush encodes and writes pinner keysets to the datastore func (p *pinner) Flush(ctx context.Context) error { - // If autosync is enabled, then this is not needed. - if p.autoSync { - return nil - } - p.lock.Lock() defer p.lock.Unlock() @@ -994,18 +971,29 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { return err } + indexKey := pp.Cid.KeyString() + var indexer dsindex.Indexer if pp.Mode == ipfspinner.Recursive { indexer = p.cidRIndex + // Delete any direct index + err = p.cidDIndex.Delete(ctx, indexKey, pp.Id) + if err != nil { + return err + } } else if pp.Mode == ipfspinner.Direct { indexer = p.cidDIndex + // Delete any recursive index + err = p.cidRIndex.Delete(ctx, indexKey, pp.Id) + if err != nil { + return err + } } else { log.Error("unrecognized pin mode:", pp.Mode) continue } // Check that the indexer indexes this pin - indexKey := pp.Cid.KeyString() ok, err := indexer.HasValue(ctx, indexKey, pp.Id) if err != nil { return err @@ -1036,6 +1024,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { } repaired = true } + if repaired { repairedCount++ } diff --git a/dspinner/pin_test.go b/dspinner/pin_test.go index cb58dcb..ed28286 100644 --- a/dspinner/pin_test.go +++ b/dspinner/pin_test.go @@ -1317,6 +1317,12 @@ func TestRebuild(t *testing.T) { } pid3 := values[0] + // Corrupt by adding direct index when there is already a recursive index + err = pinner.cidDIndex.Add(ctx, cid1Key, pid1) + if err != nil { + t.Fatal(err) + } + // Corrupt index by deleting cid index 2 to simulate an incomplete add or delete _, err = pinner.cidRIndex.DeleteKey(ctx, cid2Key) if err != nil { @@ -1368,5 +1374,12 @@ func verifyIndexValue(ctx context.Context, pinner *pinner, cidKey, expectedPid s if expectedPid != values[0] { return errors.New("index has wrong value") } + ok, err := pinner.cidDIndex.HasAny(ctx, cidKey) + if err != nil { + return err + } + if ok { + return errors.New("should not have a direct index") + } return nil } From 86e8c793157f09f5bf8f3dcbe66b7cd763c68216 Mon Sep 17 00:00:00 2001 From: gammazero Date: Tue, 27 Jul 2021 11:00:33 -0700 Subject: [PATCH 6/9] Do not rebuild indexes with old values --- dspinner/pin.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index 6ff5fa5..d8d7671 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -1001,15 +1001,22 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { var repaired bool if !ok { - log.Errorf("missing index for cid: %s", pp.Cid.String()) - // There was no index found for this pin. This was wither an - // incomplete add or and incomplete delete of a pin. Either way, - // restore the index to complete the add or to undo the incomplete - // delete. - if err = indexer.Add(ctx, indexKey, pp.Id); err != nil { + // Do not rebuild if index has an old value with leading slash + ok, err = indexer.HasValue(ctx, indexKey, "/"+pp.Id) + if err != nil { return err } - repaired = true + if !ok { + log.Errorf("missing index for cid: %s", pp.Cid.String()) + // There was no index found for this pin. This was either an + // incomplete add or and incomplete delete of a pin. Either + // way, restore the index to complete the add or to undo the + // incomplete delete. + if err = indexer.Add(ctx, indexKey, pp.Id); err != nil { + return err + } + repaired = true + } } // Check for missing name index if pp.Name != "" { From bba6accd19cf13e1d84c26d1066f10488fdfb427 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 28 Jul 2021 10:45:49 -0400 Subject: [PATCH 7/9] log every repaired pin; flush datastore while repairing --- dspinner/pin.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index d8d7671..9208f99 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -940,6 +940,9 @@ func (p *pinner) setClean(ctx context.Context) { p.clean = p.dirty // set clean } +// sync datastore after every 50 cid repairs +const syncRepairFrequency = 50 + // rebuildIndexes uses the stored pins to rebuild secondary indexes. This // resolves any discrepancy between secondary indexes and pins that could // result from a program termination between saving the two. @@ -978,6 +981,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { indexer = p.cidRIndex // Delete any direct index err = p.cidDIndex.Delete(ctx, indexKey, pp.Id) + log.Infof("deleting stale pin index for cid %v", pp.Cid.String()) if err != nil { return err } @@ -985,6 +989,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { indexer = p.cidDIndex // Delete any recursive index err = p.cidRIndex.Delete(ctx, indexKey, pp.Id) + log.Infof("deleting stale pin index for cid %v", pp.Cid.String()) if err != nil { return err } @@ -1007,7 +1012,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { return err } if !ok { - log.Errorf("missing index for cid: %s", pp.Cid.String()) + log.Infof("repairing pin index for cid: %s", pp.Cid.String()) // There was no index found for this pin. This was either an // incomplete add or and incomplete delete of a pin. Either // way, restore the index to complete the add or to undo the @@ -1025,6 +1030,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { return err } if !ok { + log.Infof("repairing pin name for cid: %s", pp.Cid.String()) if err = p.nameIndex.Add(ctx, pp.Name, pp.Id); err != nil { return err } @@ -1036,6 +1042,9 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { repairedCount++ } checkedCount++ + if checkedCount%syncRepairFrequency == 0 { + p.flushPins(ctx, true) + } } log.Infof("checked %d pins for invalid indexes, repaired %d pins", checkedCount, repairedCount) From a6efbad1eb6b68d4701cf5a204ebc2fe91262798 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 28 Jul 2021 11:05:13 -0400 Subject: [PATCH 8/9] info -> error --- dspinner/pin.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index 9208f99..c84472a 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -981,7 +981,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { indexer = p.cidRIndex // Delete any direct index err = p.cidDIndex.Delete(ctx, indexKey, pp.Id) - log.Infof("deleting stale pin index for cid %v", pp.Cid.String()) + log.Errorf("deleting stale pin index for cid %v", pp.Cid.String()) if err != nil { return err } @@ -989,7 +989,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { indexer = p.cidDIndex // Delete any recursive index err = p.cidRIndex.Delete(ctx, indexKey, pp.Id) - log.Infof("deleting stale pin index for cid %v", pp.Cid.String()) + log.Errorf("deleting stale pin index for cid %v", pp.Cid.String()) if err != nil { return err } @@ -1012,7 +1012,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { return err } if !ok { - log.Infof("repairing pin index for cid: %s", pp.Cid.String()) + log.Errorf("repairing pin index for cid: %s", pp.Cid.String()) // There was no index found for this pin. This was either an // incomplete add or and incomplete delete of a pin. Either // way, restore the index to complete the add or to undo the @@ -1030,7 +1030,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { return err } if !ok { - log.Infof("repairing pin name for cid: %s", pp.Cid.String()) + log.Errorf("repairing pin name for cid: %s", pp.Cid.String()) if err = p.nameIndex.Add(ctx, pp.Name, pp.Id); err != nil { return err } @@ -1047,6 +1047,6 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { } } - log.Infof("checked %d pins for invalid indexes, repaired %d pins", checkedCount, repairedCount) + log.Errorf("checked %d pins for invalid indexes, repaired %d pins", checkedCount, repairedCount) return p.flushPins(ctx, true) } From a223f5b32749768dc38d6b654a9662e19dd6fa3f Mon Sep 17 00:00:00 2001 From: gammazero Date: Wed, 28 Jul 2021 09:14:30 -0700 Subject: [PATCH 9/9] Check that stale pin exists before logging and removing --- dspinner/pin.go | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/dspinner/pin.go b/dspinner/pin.go index c84472a..a02f015 100644 --- a/dspinner/pin.go +++ b/dspinner/pin.go @@ -976,30 +976,38 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { indexKey := pp.Cid.KeyString() - var indexer dsindex.Indexer + var indexer, staleIndexer dsindex.Indexer + var idxrName, staleIdxrName string if pp.Mode == ipfspinner.Recursive { indexer = p.cidRIndex - // Delete any direct index - err = p.cidDIndex.Delete(ctx, indexKey, pp.Id) - log.Errorf("deleting stale pin index for cid %v", pp.Cid.String()) - if err != nil { - return err - } + staleIndexer = p.cidDIndex + idxrName = linkRecursive + staleIdxrName = linkDirect } else if pp.Mode == ipfspinner.Direct { indexer = p.cidDIndex - // Delete any recursive index - err = p.cidRIndex.Delete(ctx, indexKey, pp.Id) - log.Errorf("deleting stale pin index for cid %v", pp.Cid.String()) - if err != nil { - return err - } + staleIndexer = p.cidRIndex + idxrName = linkDirect + staleIdxrName = linkRecursive } else { log.Error("unrecognized pin mode:", pp.Mode) continue } + // Remove any stale index from unused indexer + ok, err := staleIndexer.HasValue(ctx, indexKey, pp.Id) + if err != nil { + return err + } + if ok { + // Delete any stale index + log.Errorf("deleting stale %s pin index for cid %v", staleIdxrName, pp.Cid.String()) + if err = staleIndexer.Delete(ctx, indexKey, pp.Id); err != nil { + return err + } + } + // Check that the indexer indexes this pin - ok, err := indexer.HasValue(ctx, indexKey, pp.Id) + ok, err = indexer.HasValue(ctx, indexKey, pp.Id) if err != nil { return err } @@ -1012,7 +1020,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { return err } if !ok { - log.Errorf("repairing pin index for cid: %s", pp.Cid.String()) + log.Errorf("repairing %s pin index for cid: %s", idxrName, pp.Cid.String()) // There was no index found for this pin. This was either an // incomplete add or and incomplete delete of a pin. Either // way, restore the index to complete the add or to undo the @@ -1030,7 +1038,7 @@ func (p *pinner) rebuildIndexes(ctx context.Context) error { return err } if !ok { - log.Errorf("repairing pin name for cid: %s", pp.Cid.String()) + log.Errorf("repairing name pin index for cid: %s", pp.Cid.String()) if err = p.nameIndex.Add(ctx, pp.Name, pp.Id); err != nil { return err }