Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Avoid loading all pins into memory during migration #5

Merged
merged 3 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions ipldpinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,41 @@ func New(dstore ds.Datastore, dserv, internal ipld.DAGService) (*pinner, error)
}, nil
}

// LoadKeys reads the pinned CIDs and sends them on the given channel. This is
// used to read pins without loading them all into memory.
func LoadKeys(ctx context.Context, dstore ds.Datastore, dserv, internal ipld.DAGService, recursive bool, keyChan chan<- cid.Cid) error {
rootKey, err := dstore.Get(pinDatastoreKey)
if err != nil {
if err == ds.ErrNotFound {
return nil
}
return err
}
rootCid, err := cid.Cast(rootKey)
if err != nil {
return err
}

root, err := internal.Get(ctx, rootCid)
if err != nil {
return fmt.Errorf("cannot find pinning root object: %v", err)
}

rootpb, ok := root.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
}

var linkName string
if recursive {
linkName = linkRecursive
} else {
linkName = linkDirect
}

return loadSetChan(ctx, internal, rootpb, linkName, keyChan)
}

// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
err := p.dserv.Add(ctx, node)
Expand Down
96 changes: 94 additions & 2 deletions ipldpinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func assertUnpinned(t *testing.T, p pin.Pinner, c cid.Cid, failmsg string) {
}

func TestPinnerBasic(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))

dserv := mdag.NewDAGService(bserv)

// TODO does pinner need to share datastore with blockservice?
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -165,6 +165,98 @@ func TestPinnerBasic(t *testing.T) {

// Test recursively pinned
assertPinned(t, np, bk, "could not find recursively pinned node")

// Test that LoadKeys returns the expected CIDs.
keyChan := make(chan cid.Cid)
go func() {
err = LoadKeys(ctx, dstore, dserv, dserv, true, keyChan)
close(keyChan)
}()
keys := map[cid.Cid]struct{}{}
for c := range keyChan {
keys[c] = struct{}{}
}
if err != nil {
t.Fatal(err)
}
recKeys, _ := np.RecursiveKeys(ctx)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if len(keys) != len(recKeys) {
t.Fatal("wrong number of recursive keys from LoadKeys")
}
for _, k := range recKeys {
if _, ok := keys[k]; !ok {
t.Fatal("LoadKeys did not return correct recursive keys")
}
}

keyChan = make(chan cid.Cid)
go func() {
err = LoadKeys(ctx, dstore, dserv, dserv, false, keyChan)
close(keyChan)
}()
keys = map[cid.Cid]struct{}{}
for c := range keyChan {
keys[c] = struct{}{}
}
if err != nil {
t.Fatal(err)
}
dirKeys, _ := np.DirectKeys(ctx)
if len(keys) != len(dirKeys) {
t.Fatal("wrong number of direct keys from LoadKeys")
}
for _, k := range dirKeys {
if _, ok := keys[k]; !ok {
t.Fatal("LoadKeys did not return correct direct keys")
}
}

cancel()
emptyDS := dssync.MutexWrap(ds.NewMapDatastore())

// Check key not in datastore
err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil)
if err != nil {
t.Fatal(err)
}

// Check error on bad key
if err = emptyDS.Put(pinDatastoreKey, []byte("bad-cid")); err != nil {
panic(err)
}
if err = emptyDS.Sync(pinDatastoreKey); err != nil {
panic(err)
}
if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil {
t.Fatal("expected error")
}

// Lookup dag that does not exist
noKey, err := cid.Decode("QmYff9iHR1Hz6wufVeJodzXqQm4pkK4QNS9ms8tyPKVWm1")
if err != nil {
panic(err)
}
if err = emptyDS.Put(pinDatastoreKey, noKey.Bytes()); err != nil {
panic(err)
}
if err = emptyDS.Sync(pinDatastoreKey); err != nil {
panic(err)
}
err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil)
if err == nil || err.Error() != "cannot find pinning root object: merkledag: not found" {
t.Fatal("did not get expected error")
}

// Check error when node has no links
if err = emptyDS.Put(pinDatastoreKey, emptyKey.Bytes()); err != nil {
panic(err)
}
if err = emptyDS.Sync(pinDatastoreKey); err != nil {
panic(err)
}
if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil {
t.Fatal("expected error")
}
}

func TestIsPinnedLookup(t *testing.T) {
Expand Down
35 changes: 32 additions & 3 deletions ipldpinner/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,15 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode,
// readHdr guarantees fanout is a safe value
fanout := hdr.GetFanout()
for i, l := range n.Links()[fanout:] {
if err := fn(i, l); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the shadow here (and below)? it's fine since we never return err anywhere else, but one of the nice things with shadowing err here is that it's obvious that there's not some higher scope err we're supposed to be keeping track of.

Copy link
Author

@gammazero gammazero Jan 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought shadowing err was generally something to avoid. The primary reason being that someone may write a deferred function that closes over err and checks its value, and the author may not notice the shadowing.

However, I can also see the other side of that, that shadowing guarantees that the variable in the outer scope is not overwritten. I generally tend to avoid shadowing as a general pattern since I think more bugs result from shadowing as opposed to not shadowing.

if err = fn(i, l); err != nil {
return err
}
}
for _, l := range n.Links()[:fanout] {
c := l.Cid
children(c)
if children != nil {
children(c)
}
if c.Equals(emptyKey) {
continue
}
Expand All @@ -239,7 +241,7 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode,
return merkledag.ErrNotProtobuf
}

if err := walkItems(ctx, dag, stpb, fn, children); err != nil {
if err = walkItems(ctx, dag, stpb, fn, children); err != nil {
return err
}
}
Expand Down Expand Up @@ -277,6 +279,33 @@ func loadSet(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode
return res, nil
}

func loadSetChan(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode, name string, keyChan chan<- cid.Cid) error {
l, err := root.GetNodeLink(name)
if err != nil {
return err
}

n, err := l.GetNode(ctx, dag)
if err != nil {
return err
}

pbn, ok := n.(*merkledag.ProtoNode)
if !ok {
return merkledag.ErrNotProtobuf
}

walk := func(idx int, link *ipld.Link) error {
keyChan <- link.Cid
return nil
}

if err = walkItems(ctx, dag, pbn, walk, nil); err != nil {
return err
}
return nil
}

func getCidListIterator(cids []cid.Cid) itemIterator {
return func() (c cid.Cid, ok bool) {
if len(cids) == 0 {
Expand Down
47 changes: 23 additions & 24 deletions pinconv/pinconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"context"
"fmt"

"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dspinner"
Expand All @@ -24,39 +24,38 @@ import (
func ConvertPinsFromIPLDToDS(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) {
const ipldPinPath = "/local/pins"

ipldPinner, err := ipldpinner.New(dstore, dserv, internal)
if err != nil {
return nil, 0, err
}

dsPinner, err := dspinner.New(ctx, dstore, dserv)
if err != nil {
return nil, 0, err
}

seen := cid.NewSet()
cids, err := ipldPinner.RecursiveKeys(ctx)
if err != nil {
return nil, 0, err
var convCount int
keyChan := make(chan cid.Cid)

go func() {
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, true, keyChan)
close(keyChan)
}()
for key := range keyChan {
dsPinner.PinWithMode(key, ipfspinner.Recursive)
convCount++
}
for i := range cids {
seen.Add(cids[i])
dsPinner.PinWithMode(cids[i], ipfspinner.Recursive)
if err != nil {
return nil, 0, fmt.Errorf("cannot load recursive keys: %s", err)
}
convCount := len(cids)

cids, err = ipldPinner.DirectKeys(ctx)
if err != nil {
return nil, 0, err
keyChan = make(chan cid.Cid)
go func() {
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, false, keyChan)
close(keyChan)
}()
for key := range keyChan {
dsPinner.PinWithMode(key, ipfspinner.Direct)
convCount++
}
for i := range cids {
if seen.Has(cids[i]) {
// Pin was already pinned recursively
continue
}
dsPinner.PinWithMode(cids[i], ipfspinner.Direct)
if err != nil {
return nil, 0, fmt.Errorf("cannot load direct keys: %s", err)
}
convCount += len(cids)

err = dsPinner.Flush(ctx)
if err != nil {
Expand Down
28 changes: 27 additions & 1 deletion pinconv/pinconv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"strings"
"testing"

bs "github.com/ipfs/go-blockservice"
Expand Down Expand Up @@ -55,7 +56,8 @@ func makeStore() (ds.Datastore, ipld.DAGService) {
}

func TestConversions(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dstore, dserv := makeStore()

dsPinner, err := dspinner.New(ctx, dstore, dserv)
Expand Down Expand Up @@ -151,3 +153,27 @@ func TestConversions(t *testing.T) {
t.Fatal(err)
}
}

func TestConvertLoadError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dstore, dserv := makeStore()
// Point /local/pins to empty node to cause failure loading pins.
pinDatastoreKey := ds.NewKey("/local/pins")
emptyKey, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
if err != nil {
panic(err)
}
if err = dstore.Put(pinDatastoreKey, emptyKey.Bytes()); err != nil {
panic(err)
}
if err = dstore.Sync(pinDatastoreKey); err != nil {
panic(err)
}

_, _, err = ConvertPinsFromIPLDToDS(ctx, dstore, dserv, dserv)
if err == nil || !strings.HasPrefix(err.Error(), "cannot load recursive keys") {
t.Fatal("did not get expected error")
}
}