Skip to content

Commit

Permalink
Merge pull request #3700 from ipfs/kevina/enumerate-children-refactor
Browse files Browse the repository at this point in the history
Refactor EnumerateChildren to avoid need for bestEffort paramater.
  • Loading branch information
whyrusleeping committed Mar 2, 2017
2 parents 82b021c + e4bbd24 commit 5ff552c
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 27 deletions.
2 changes: 1 addition & 1 deletion core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer
for _, c := range cids {
kset := cid.NewSet()

err := dag.EnumerateChildrenAsync(ctx, dserv, c, kset.Visit)
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(n.Context(), n.DAG, k, set.Visit, false)
err := dag.EnumerateChildren(n.Context(), n.DAG.GetLinks, k, set.Visit)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestAddGCLive(t *testing.T) {
defer cancel()

set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG, last, set.Visit, false)
err = dag.EnumerateChildren(ctx, node.DAG.GetLinks, last, set.Visit)
if err != nil {
t.Fatal(err)
}
Expand Down
49 changes: 33 additions & 16 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ type DAGService interface {
}

type LinkService interface {
// Return all links for a node, may be more effect than
// calling Get in DAGService
// GetLinks return all links for a node. The complete node does not
// necessarily have to exist locally, or at all. For example, raw
// leaves cannot possibly have links so there is no need to look
// at the node.
GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)

GetOfflineLinkService() LinkService
Expand Down Expand Up @@ -114,6 +116,8 @@ func decodeBlock(b blocks.Block) (node.Node, error) {
}
}

// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
if c.Type() == cid.Raw {
return nil, nil
Expand All @@ -138,11 +142,24 @@ func (n *dagService) Remove(nd node.Node) error {
return n.Blocks.DeleteBlock(nd)
}

// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService. If the node does not exist
// locally (and can not be retrieved) an error will be returned.
func GetLinksDirect(serv DAGService) GetLinks {
return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
node, err := serv.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
v, _ := ctx.Value("progress").(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
}
set := cid.NewSet()
visit := func(c *cid.Cid) bool {
Expand All @@ -153,7 +170,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
return false
}
}
return EnumerateChildrenAsync(ctx, serv, root, visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit)
}

// FindLinks searches this nodes links for the given key,
Expand Down Expand Up @@ -380,20 +397,20 @@ func (t *Batch) Commit() error {
return err
}

type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
links, err := ds.GetLinks(ctx, root)
if bestEffort && err == ErrNotFound {
return nil
} else if err != nil {
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
links, err := getLinks(ctx, root)
if err != nil {
return err
}
for _, lnk := range links {
c := lnk.Cid
if visit(c) {
err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
err = EnumerateChildren(ctx, getLinks, c, visit)
if err != nil {
return err
}
Expand Down Expand Up @@ -427,9 +444,9 @@ func (p *ProgressTracker) Value() int {
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
feed := make(chan *cid.Cid)
out := make(chan node.Node)
out := make(chan []*node.Link)
done := make(chan struct{})

var setlk sync.Mutex
Expand All @@ -442,7 +459,7 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
for i := 0; i < FetchGraphConcurrency; i++ {
go func() {
for ic := range feed {
n, err := ds.Get(ctx, ic)
links, err := getLinks(ctx, ic)
if err != nil {
errChan <- err
return
Expand All @@ -454,7 +471,7 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi

if unseen {
select {
case out <- n:
case out <- links:
case <-fetchersCtx.Done():
return
}
Expand Down Expand Up @@ -489,8 +506,8 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
if inProgress == 0 && next == nil {
return nil
}
case nd := <-out:
for _, lnk := range nd.Links() {
case links := <-out:
for _, lnk := range links {
if next == nil {
next = lnk.Cid
send = feed
Expand Down
6 changes: 3 additions & 3 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestFetchGraph(t *testing.T) {

offline_ds := NewDAGService(bs)

err = EnumerateChildren(context.Background(), offline_ds, root.Cid(), func(_ *cid.Cid) bool { return true }, false)
err = EnumerateChildren(context.Background(), offline_ds.GetLinks, root.Cid(), func(_ *cid.Cid) bool { return true })
if err != nil {
t.Fatal(err)
}
Expand All @@ -266,7 +266,7 @@ func TestEnumerateChildren(t *testing.T) {
}

set := cid.NewSet()
err = EnumerateChildren(context.Background(), ds, root.Cid(), set.Visit, false)
err = EnumerateChildren(context.Background(), ds.GetLinks, root.Cid(), set.Visit)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) {
}

cset := cid.NewSet()
err = EnumerateChildrenAsync(context.Background(), ds, pcid, cset.Visit)
err = EnumerateChildrenAsync(context.Background(), GetLinksDirect(ds), pcid, cset.Visit)
if err == nil {
t.Fatal("this should have failed")
}
Expand Down
18 changes: 13 additions & 5 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
)

var log = logging.Logger("gc")
Expand Down Expand Up @@ -68,12 +69,12 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.
return output, nil
}

func Descendants(ctx context.Context, ls dag.LinkService, set *cid.Set, roots []*cid.Cid, bestEffort bool) error {
func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots []*cid.Cid) error {
for _, c := range roots {
set.Add(c)

// EnumerateChildren recursively walks the dag and adds the keys to the given set
err := dag.EnumerateChildren(ctx, ls, c, set.Visit, bestEffort)
err := dag.EnumerateChildren(ctx, getLinks, c, set.Visit)
if err != nil {
return err
}
Expand All @@ -86,12 +87,19 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffo
// KeySet currently implemented in memory, in the future, may be bloom filter or
// disk backed to conserve memory.
gcs := cid.NewSet()
err := Descendants(ctx, ls, gcs, pn.RecursiveKeys(), false)
err := Descendants(ctx, ls.GetLinks, gcs, pn.RecursiveKeys())
if err != nil {
return nil, err
}

err = Descendants(ctx, ls, gcs, bestEffortRoots, true)
bestEffortGetLinks := func(ctx context.Context, cid *cid.Cid) ([]*node.Link, error) {
links, err := ls.GetLinks(ctx, cid)
if err == dag.ErrNotFound {
err = nil
}
return links, err
}
err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRoots)
if err != nil {
return nil, err
}
Expand All @@ -100,7 +108,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffo
gcs.Add(k)
}

err = Descendants(ctx, ls, gcs, pn.InternalPins(), false)
err = Descendants(ctx, ls.GetLinks, gcs, pn.InternalPins())
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 5ff552c

Please sign in to comment.