diff --git a/ipldpinner/pin.go b/ipldpinner/pin.go index d0824b3..dc90dd4 100644 --- a/ipldpinner/pin.go +++ b/ipldpinner/pin.go @@ -141,6 +141,41 @@ func New(dstore ds.Datastore, dserv, internal ipld.DAGService) (*pinner, error) }, nil } +// LoadKeys reads the pinned CIDs and sends them on the given channel. This is +// used to read pins without loading them all into memory. +func LoadKeys(ctx context.Context, dstore ds.Datastore, dserv, internal ipld.DAGService, recursive bool, keyChan chan<- cid.Cid) error { + rootKey, err := dstore.Get(pinDatastoreKey) + if err != nil { + if err == ds.ErrNotFound { + return nil + } + return err + } + rootCid, err := cid.Cast(rootKey) + if err != nil { + return err + } + + root, err := internal.Get(ctx, rootCid) + if err != nil { + return fmt.Errorf("cannot find pinning root object: %v", err) + } + + rootpb, ok := root.(*mdag.ProtoNode) + if !ok { + return mdag.ErrNotProtobuf + } + + var linkName string + if recursive { + linkName = linkRecursive + } else { + linkName = linkDirect + } + + return loadSetChan(ctx, internal, rootpb, linkName, keyChan) +} + // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { err := p.dserv.Add(ctx, node) diff --git a/ipldpinner/pin_test.go b/ipldpinner/pin_test.go index e193aa9..3c61d41 100644 --- a/ipldpinner/pin_test.go +++ b/ipldpinner/pin_test.go @@ -54,7 +54,8 @@ func assertUnpinned(t *testing.T, p pin.Pinner, c cid.Cid, failmsg string) { } func TestPinnerBasic(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() dstore := dssync.MutexWrap(ds.NewMapDatastore()) bstore := blockstore.NewBlockstore(dstore) @@ -62,7 +63,6 @@ func TestPinnerBasic(t *testing.T) { dserv := mdag.NewDAGService(bserv) - // TODO does pinner need to share datastore with blockservice? p, err := New(dstore, dserv, dserv) if err != nil { t.Fatal(err) @@ -165,6 +165,98 @@ func TestPinnerBasic(t *testing.T) { // Test recursively pinned assertPinned(t, np, bk, "could not find recursively pinned node") + + // Test that LoadKeys returns the expected CIDs. + keyChan := make(chan cid.Cid) + go func() { + err = LoadKeys(ctx, dstore, dserv, dserv, true, keyChan) + close(keyChan) + }() + keys := map[cid.Cid]struct{}{} + for c := range keyChan { + keys[c] = struct{}{} + } + if err != nil { + t.Fatal(err) + } + recKeys, _ := np.RecursiveKeys(ctx) + if len(keys) != len(recKeys) { + t.Fatal("wrong number of recursive keys from LoadKeys") + } + for _, k := range recKeys { + if _, ok := keys[k]; !ok { + t.Fatal("LoadKeys did not return correct recursive keys") + } + } + + keyChan = make(chan cid.Cid) + go func() { + err = LoadKeys(ctx, dstore, dserv, dserv, false, keyChan) + close(keyChan) + }() + keys = map[cid.Cid]struct{}{} + for c := range keyChan { + keys[c] = struct{}{} + } + if err != nil { + t.Fatal(err) + } + dirKeys, _ := np.DirectKeys(ctx) + if len(keys) != len(dirKeys) { + t.Fatal("wrong number of direct keys from LoadKeys") + } + for _, k := range dirKeys { + if _, ok := keys[k]; !ok { + t.Fatal("LoadKeys did not return correct direct keys") + } + } + + cancel() + emptyDS := dssync.MutexWrap(ds.NewMapDatastore()) + + // Check key not in datastore + err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil) + if err != nil { + t.Fatal(err) + } + + // Check error on bad key + if err = emptyDS.Put(pinDatastoreKey, []byte("bad-cid")); err != nil { + panic(err) + } + if err = emptyDS.Sync(pinDatastoreKey); err != nil { + panic(err) + } + if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil { + t.Fatal("expected error") + } + + // Lookup dag that does not exist + noKey, err := cid.Decode("QmYff9iHR1Hz6wufVeJodzXqQm4pkK4QNS9ms8tyPKVWm1") + if err != nil { + panic(err) + } + if err = emptyDS.Put(pinDatastoreKey, noKey.Bytes()); err != nil { + panic(err) + } + if err = emptyDS.Sync(pinDatastoreKey); err != nil { + panic(err) + } + err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil) + if err == nil || err.Error() != "cannot find pinning root object: merkledag: not found" { + t.Fatal("did not get expected error") + } + + // Check error when node has no links + if err = emptyDS.Put(pinDatastoreKey, emptyKey.Bytes()); err != nil { + panic(err) + } + if err = emptyDS.Sync(pinDatastoreKey); err != nil { + panic(err) + } + if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil { + t.Fatal("expected error") + } } func TestIsPinnedLookup(t *testing.T) { diff --git a/ipldpinner/set.go b/ipldpinner/set.go index 2fb931f..51951a2 100644 --- a/ipldpinner/set.go +++ b/ipldpinner/set.go @@ -219,13 +219,15 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode, // readHdr guarantees fanout is a safe value fanout := hdr.GetFanout() for i, l := range n.Links()[fanout:] { - if err := fn(i, l); err != nil { + if err = fn(i, l); err != nil { return err } } for _, l := range n.Links()[:fanout] { c := l.Cid - children(c) + if children != nil { + children(c) + } if c.Equals(emptyKey) { continue } @@ -239,7 +241,7 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode, return merkledag.ErrNotProtobuf } - if err := walkItems(ctx, dag, stpb, fn, children); err != nil { + if err = walkItems(ctx, dag, stpb, fn, children); err != nil { return err } } @@ -277,6 +279,33 @@ func loadSet(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode return res, nil } +func loadSetChan(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode, name string, keyChan chan<- cid.Cid) error { + l, err := root.GetNodeLink(name) + if err != nil { + return err + } + + n, err := l.GetNode(ctx, dag) + if err != nil { + return err + } + + pbn, ok := n.(*merkledag.ProtoNode) + if !ok { + return merkledag.ErrNotProtobuf + } + + walk := func(idx int, link *ipld.Link) error { + keyChan <- link.Cid + return nil + } + + if err = walkItems(ctx, dag, pbn, walk, nil); err != nil { + return err + } + return nil +} + func getCidListIterator(cids []cid.Cid) itemIterator { return func() (c cid.Cid, ok bool) { if len(cids) == 0 { diff --git a/pinconv/pinconv.go b/pinconv/pinconv.go index 9aee703..df21f85 100644 --- a/pinconv/pinconv.go +++ b/pinconv/pinconv.go @@ -7,7 +7,7 @@ import ( "context" "fmt" - "github.com/ipfs/go-cid" + cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" ipfspinner "github.com/ipfs/go-ipfs-pinner" "github.com/ipfs/go-ipfs-pinner/dspinner" @@ -24,39 +24,38 @@ import ( func ConvertPinsFromIPLDToDS(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) { const ipldPinPath = "/local/pins" - ipldPinner, err := ipldpinner.New(dstore, dserv, internal) - if err != nil { - return nil, 0, err - } - dsPinner, err := dspinner.New(ctx, dstore, dserv) if err != nil { return nil, 0, err } - seen := cid.NewSet() - cids, err := ipldPinner.RecursiveKeys(ctx) - if err != nil { - return nil, 0, err + var convCount int + keyChan := make(chan cid.Cid) + + go func() { + err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, true, keyChan) + close(keyChan) + }() + for key := range keyChan { + dsPinner.PinWithMode(key, ipfspinner.Recursive) + convCount++ } - for i := range cids { - seen.Add(cids[i]) - dsPinner.PinWithMode(cids[i], ipfspinner.Recursive) + if err != nil { + return nil, 0, fmt.Errorf("cannot load recursive keys: %s", err) } - convCount := len(cids) - cids, err = ipldPinner.DirectKeys(ctx) - if err != nil { - return nil, 0, err + keyChan = make(chan cid.Cid) + go func() { + err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, false, keyChan) + close(keyChan) + }() + for key := range keyChan { + dsPinner.PinWithMode(key, ipfspinner.Direct) + convCount++ } - for i := range cids { - if seen.Has(cids[i]) { - // Pin was already pinned recursively - continue - } - dsPinner.PinWithMode(cids[i], ipfspinner.Direct) + if err != nil { + return nil, 0, fmt.Errorf("cannot load direct keys: %s", err) } - convCount += len(cids) err = dsPinner.Flush(ctx) if err != nil { diff --git a/pinconv/pinconv_test.go b/pinconv/pinconv_test.go index ac7f8ff..02abca6 100644 --- a/pinconv/pinconv_test.go +++ b/pinconv/pinconv_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "strings" "testing" bs "github.com/ipfs/go-blockservice" @@ -55,7 +56,8 @@ func makeStore() (ds.Datastore, ipld.DAGService) { } func TestConversions(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() dstore, dserv := makeStore() dsPinner, err := dspinner.New(ctx, dstore, dserv) @@ -151,3 +153,27 @@ func TestConversions(t *testing.T) { t.Fatal(err) } } + +func TestConvertLoadError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dstore, dserv := makeStore() + // Point /local/pins to empty node to cause failure loading pins. + pinDatastoreKey := ds.NewKey("/local/pins") + emptyKey, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n") + if err != nil { + panic(err) + } + if err = dstore.Put(pinDatastoreKey, emptyKey.Bytes()); err != nil { + panic(err) + } + if err = dstore.Sync(pinDatastoreKey); err != nil { + panic(err) + } + + _, _, err = ConvertPinsFromIPLDToDS(ctx, dstore, dserv, dserv) + if err == nil || !strings.HasPrefix(err.Error(), "cannot load recursive keys") { + t.Fatal("did not get expected error") + } +}