Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor EnumerateChildren to avoid need for bestEffort paramater. #3700

Merged
merged 3 commits into from
Mar 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer
for _, c := range cids {
kset := cid.NewSet()

err := dag.EnumerateChildrenAsync(ctx, dserv, c, kset.Visit)
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(n.Context(), n.DAG, k, set.Visit, false)
err := dag.EnumerateChildren(n.Context(), n.DAG.GetLinks, k, set.Visit)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestAddGCLive(t *testing.T) {
defer cancel()

set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG, last, set.Visit, false)
err = dag.EnumerateChildren(ctx, node.DAG.GetLinks, last, set.Visit)
if err != nil {
t.Fatal(err)
}
Expand Down
49 changes: 33 additions & 16 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ type DAGService interface {
}

type LinkService interface {
// Return all links for a node, may be more effect than
// calling Get in DAGService
// GetLinks return all links for a node. The complete node does not
// necessarily have to exist locally, or at all. For example, raw
// leaves cannot possibly have links so there is no need to look
// at the node.
GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)

GetOfflineLinkService() LinkService
Expand Down Expand Up @@ -114,6 +116,8 @@ func decodeBlock(b blocks.Block) (node.Node, error) {
}
}

// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
if c.Type() == cid.Raw {
return nil, nil
Expand All @@ -138,11 +142,24 @@ func (n *dagService) Remove(nd node.Node) error {
return n.Blocks.DeleteBlock(nd)
}

// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService. If the node does not exist
// locally (and can not be retrieved) an error will be returned.
func GetLinksDirect(serv DAGService) GetLinks {
return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
node, err := serv.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
v, _ := ctx.Value("progress").(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
}
set := cid.NewSet()
visit := func(c *cid.Cid) bool {
Expand All @@ -153,7 +170,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
return false
}
}
return EnumerateChildrenAsync(ctx, serv, root, visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit)
}

// FindLinks searches this nodes links for the given key,
Expand Down Expand Up @@ -380,20 +397,20 @@ func (t *Batch) Commit() error {
return err
}

type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
links, err := ds.GetLinks(ctx, root)
if bestEffort && err == ErrNotFound {
return nil
} else if err != nil {
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
links, err := getLinks(ctx, root)
if err != nil {
return err
}
for _, lnk := range links {
c := lnk.Cid
if visit(c) {
err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
err = EnumerateChildren(ctx, getLinks, c, visit)
if err != nil {
return err
}
Expand Down Expand Up @@ -427,9 +444,9 @@ func (p *ProgressTracker) Value() int {
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
feed := make(chan *cid.Cid)
out := make(chan node.Node)
out := make(chan []*node.Link)
done := make(chan struct{})

var setlk sync.Mutex
Expand All @@ -442,7 +459,7 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
for i := 0; i < FetchGraphConcurrency; i++ {
go func() {
for ic := range feed {
n, err := ds.Get(ctx, ic)
links, err := getLinks(ctx, ic)
if err != nil {
errChan <- err
return
Expand All @@ -454,7 +471,7 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi

if unseen {
select {
case out <- n:
case out <- links:
case <-fetchersCtx.Done():
return
}
Expand Down Expand Up @@ -489,8 +506,8 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
if inProgress == 0 && next == nil {
return nil
}
case nd := <-out:
for _, lnk := range nd.Links() {
case links := <-out:
for _, lnk := range links {
if next == nil {
next = lnk.Cid
send = feed
Expand Down
6 changes: 3 additions & 3 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestFetchGraph(t *testing.T) {

offline_ds := NewDAGService(bs)

err = EnumerateChildren(context.Background(), offline_ds, root.Cid(), func(_ *cid.Cid) bool { return true }, false)
err = EnumerateChildren(context.Background(), offline_ds.GetLinks, root.Cid(), func(_ *cid.Cid) bool { return true })
if err != nil {
t.Fatal(err)
}
Expand All @@ -266,7 +266,7 @@ func TestEnumerateChildren(t *testing.T) {
}

set := cid.NewSet()
err = EnumerateChildren(context.Background(), ds, root.Cid(), set.Visit, false)
err = EnumerateChildren(context.Background(), ds.GetLinks, root.Cid(), set.Visit)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) {
}

cset := cid.NewSet()
err = EnumerateChildrenAsync(context.Background(), ds, pcid, cset.Visit)
err = EnumerateChildrenAsync(context.Background(), GetLinksDirect(ds), pcid, cset.Visit)
if err == nil {
t.Fatal("this should have failed")
}
Expand Down
18 changes: 13 additions & 5 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
)

var log = logging.Logger("gc")
Expand Down Expand Up @@ -68,12 +69,12 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.
return output, nil
}

func Descendants(ctx context.Context, ls dag.LinkService, set *cid.Set, roots []*cid.Cid, bestEffort bool) error {
func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots []*cid.Cid) error {
for _, c := range roots {
set.Add(c)

// EnumerateChildren recursively walks the dag and adds the keys to the given set
err := dag.EnumerateChildren(ctx, ls, c, set.Visit, bestEffort)
err := dag.EnumerateChildren(ctx, getLinks, c, set.Visit)
if err != nil {
return err
}
Expand All @@ -86,12 +87,19 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffo
// KeySet currently implemented in memory, in the future, may be bloom filter or
// disk backed to conserve memory.
gcs := cid.NewSet()
err := Descendants(ctx, ls, gcs, pn.RecursiveKeys(), false)
err := Descendants(ctx, ls.GetLinks, gcs, pn.RecursiveKeys())
if err != nil {
return nil, err
}

err = Descendants(ctx, ls, gcs, bestEffortRoots, true)
bestEffortGetLinks := func(ctx context.Context, cid *cid.Cid) ([]*node.Link, error) {
links, err := ls.GetLinks(ctx, cid)
if err == dag.ErrNotFound {
err = nil
}
return links, err
}
err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRoots)
if err != nil {
return nil, err
}
Expand All @@ -100,7 +108,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffo
gcs.Add(k)
}

err = Descendants(ctx, ls, gcs, pn.InternalPins(), false)
err = Descendants(ctx, ls.GetLinks, gcs, pn.InternalPins())
if err != nil {
return nil, err
}
Expand Down