From 1ee27334b5c01645e7a3d492bff4be9cffb0e991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Thu, 18 Jul 2019 15:12:45 +0200 Subject: [PATCH 1/3] rework the graph walking functions with functional options This commit: - reduce the API to 2 simpler functions - add consistent and clear control over if the root should be visited - add control over the concurrency factor --- merkledag.go | 130 +++++++++++++++++++++++++++++++--------------- merkledag_test.go | 11 ++-- 2 files changed, 94 insertions(+), 47 deletions(-) diff --git a/merkledag.go b/merkledag.go index c035dd4..253ee35 100644 --- a/merkledag.go +++ b/merkledag.go @@ -162,7 +162,7 @@ func FetchGraph(ctx context.Context, root cid.Cid, serv ipld.DAGService) error { } // FetchGraphWithDepthLimit fetches all nodes that are children to the given -// node down to the given depth. maxDetph=0 means "only fetch root", +// node down to the given depth. maxDepth=0 means "only fetch root", // maxDepth=1 means "fetch root and its direct children" and so on... // maxDepth=-1 means unlimited. func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, serv ipld.DAGService) error { @@ -195,9 +195,10 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s return false } + // If we have a ProgressTracker, we wrap the visit function to handle it v, _ := ctx.Value(progressContextKey).(*ProgressTracker) if v == nil { - return WalkParallelDepth(ctx, GetLinksDirect(ng), root, 0, visit) + return WalkDepth(ctx, GetLinksDirect(ng), root, visit, Concurrent(), WithRoot()) } visitProgress := func(c cid.Cid, depth int) bool { @@ -207,7 +208,7 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s } return false } - return WalkParallelDepth(ctx, GetLinksDirect(ng), root, 0, visitProgress) + return WalkDepth(ctx, GetLinksDirect(ng), root, visitProgress, Concurrent(), WithRoot()) } // GetMany gets many nodes from the DAG at once. @@ -281,21 +282,77 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks { } } +// defaultConcurrentFetch is the default maximum number of concurrent fetches +// that 'fetchNodes' will start at a time +const defaultConcurrentFetch = 32 + +// WalkOptions represent the parameters of a graph walking algorithm +type WalkOptions struct { + WithRoot bool + IgnoreBadBlock bool + Concurrency int +} + +// WalkOption is a setter for WalkOptions +type WalkOption func(*WalkOptions) + +// WithRoot is a WalkOption indicating that the root node should be visited +func WithRoot() WalkOption { + return func(walkOptions *WalkOptions) { + walkOptions.WithRoot = true + } +} + +// Concurrent is a WalkOption indicating that node fetching should be done in +// parallel, with the default concurrency factor. +// NOTE: When using that option, the walk order is *not* guarantee. +// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. +func Concurrent() WalkOption { + return func(walkOptions *WalkOptions) { + walkOptions.Concurrency = defaultConcurrentFetch + } +} + +// Concurrency is a WalkOption indicating that node fetching should be done in +// parallel, with a specific concurrency factor. +// NOTE: When using that option, the walk order is *not* guarantee. +// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. +func Concurrency(worker int) WalkOption { + return func(walkOptions *WalkOptions) { + walkOptions.Concurrency = worker + } +} + // WalkGraph will walk the dag in order (depth first) starting at the given root. -func Walk(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid) bool) error { +func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool, options ...WalkOption) error { visitDepth := func(c cid.Cid, depth int) bool { return visit(c) } - return WalkDepth(ctx, getLinks, root, 0, visitDepth) + return WalkDepth(ctx, getLinks, c, visitDepth, options...) } // WalkDepth walks the dag starting at the given root and passes the current // depth to a given visit function. The visit function can be used to limit DAG // exploration. -func WalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool) error { - if !visit(root, depth) { - return nil +func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error { + opts := &WalkOptions{} + for _, opt := range options { + opt(opts) + } + + if opts.Concurrency > 1 { + return parallelWalkDepth(ctx, getLinks, c, visit, opts) + } else { + return sequentialWalkDepth(ctx, getLinks, c, 0, visit, opts) + } +} + +func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *WalkOptions) error { + if depth != 0 || options.WithRoot { + if !visit(root, depth) { + return nil + } } links, err := getLinks(ctx, root) @@ -304,7 +361,7 @@ func WalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, } for _, lnk := range links { - if err := WalkDepth(ctx, getLinks, lnk.Cid, depth+1, visit); err != nil { + if err := sequentialWalkDepth(ctx, getLinks, lnk.Cid, depth+1, visit, options); err != nil { return err } } @@ -337,27 +394,7 @@ func (p *ProgressTracker) Value() int { return p.Total } -// FetchGraphConcurrency is total number of concurrent fetches that -// 'fetchNodes' will start at a time -var FetchGraphConcurrency = 32 - -// WalkParallel is equivalent to Walk *except* that it explores multiple paths -// in parallel. -// -// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. -func WalkParallel(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool) error { - visitDepth := func(c cid.Cid, depth int) bool { - return visit(c) - } - - return WalkParallelDepth(ctx, getLinks, c, 0, visitDepth) -} - -// WalkParallelDepth is equivalent to WalkDepth *except* that it fetches -// children in parallel. -// -// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. -func WalkParallelDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, startDepth int, visit func(cid.Cid, int) bool) error { +func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *WalkOptions) error { type cidDepth struct { cid cid.Cid depth int @@ -372,14 +409,14 @@ func WalkParallelDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, startD out := make(chan *linksDepth) done := make(chan struct{}) - var setlk sync.Mutex + var visitlk sync.Mutex var wg sync.WaitGroup errChan := make(chan error) fetchersCtx, cancel := context.WithCancel(ctx) defer wg.Wait() defer cancel() - for i := 0; i < FetchGraphConcurrency; i++ { + for i := 0; i < options.Concurrency; i++ { wg.Add(1) go func() { defer wg.Done() @@ -387,9 +424,16 @@ func WalkParallelDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, startD ci := cdepth.cid depth := cdepth.depth - setlk.Lock() - shouldVisit := visit(ci, depth) - setlk.Unlock() + var shouldVisit bool + + // bypass the root if needed + if depth != 0 || options.WithRoot { + visitlk.Lock() + shouldVisit = visit(ci, depth) + visitlk.Unlock() + } else { + shouldVisit = true + } if shouldVisit { links, err := getLinks(ctx, ci) @@ -422,20 +466,21 @@ func WalkParallelDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, startD defer close(feed) send := feed - var todobuffer []*cidDepth + var todoQueue []*cidDepth var inProgress int next := &cidDepth{ - cid: c, - depth: startDepth, + cid: root, + depth: 0, } + for { select { case send <- next: inProgress++ - if len(todobuffer) > 0 { - next = todobuffer[0] - todobuffer = todobuffer[1:] + if len(todoQueue) > 0 { + next = todoQueue[0] + todoQueue = todoQueue[1:] } else { next = nil send = nil @@ -456,7 +501,7 @@ func WalkParallelDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, startD next = cd send = feed } else { - todobuffer = append(todobuffer, cd) + todoQueue = append(todoQueue, cd) } } case err := <-errChan: @@ -466,7 +511,6 @@ func WalkParallelDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, startD return ctx.Err() } } - } var _ ipld.LinkGetter = &dagService{} diff --git a/merkledag_test.go b/merkledag_test.go index e56bb52..045a880 100644 --- a/merkledag_test.go +++ b/merkledag_test.go @@ -203,8 +203,11 @@ func makeTestDAG(t *testing.T, read io.Reader, ds ipld.DAGService) ipld.Node { // Add a root referencing all created nodes root := NodeWithData(nil) for _, n := range nodes { - root.AddNodeLink(n.Cid().String(), n) - err := ds.Add(ctx, n) + err := root.AddNodeLink(n.Cid().String(), n) + if err != nil { + t.Fatal(err) + } + err = ds.Add(ctx, n) if err != nil { t.Fatal(err) } @@ -383,7 +386,7 @@ func TestFetchGraphWithDepthLimit(t *testing.T) { } - err = WalkDepth(context.Background(), offlineDS.GetLinks, root.Cid(), 0, visitF) + err = WalkDepth(context.Background(), offlineDS.GetLinks, root.Cid(), visitF, WithRoot()) if err != nil { t.Fatal(err) } @@ -736,7 +739,7 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) { } cset := cid.NewSet() - err = WalkParallel(ctx, GetLinksDirect(ds), parent.Cid(), cset.Visit) + err = Walk(ctx, GetLinksDirect(ds), parent.Cid(), cset.Visit) if err == nil { t.Fatal("this should have failed") } From 86e5652465a6e36e6f92ea01d73bebf7eb050aaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Thu, 18 Jul 2019 16:40:02 +0200 Subject: [PATCH 2/3] add a IgnoreErrors() walking option --- merkledag.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/merkledag.go b/merkledag.go index 253ee35..6420e69 100644 --- a/merkledag.go +++ b/merkledag.go @@ -288,9 +288,9 @@ const defaultConcurrentFetch = 32 // WalkOptions represent the parameters of a graph walking algorithm type WalkOptions struct { - WithRoot bool - IgnoreBadBlock bool - Concurrency int + WithRoot bool + IgnoreErrors bool + Concurrency int } // WalkOption is a setter for WalkOptions @@ -323,6 +323,14 @@ func Concurrency(worker int) WalkOption { } } +// IgnoreErrors is a WalkOption indicating that the walk should attempt to +// continue even when an error occur. +func IgnoreErrors() WalkOption { + return func(walkOptions *WalkOptions) { + walkOptions.IgnoreErrors = true + } +} + // WalkGraph will walk the dag in order (depth first) starting at the given root. func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool, options ...WalkOption) error { visitDepth := func(c cid.Cid, depth int) bool { @@ -356,7 +364,7 @@ func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, d } links, err := getLinks(ctx, root) - if err != nil { + if err != nil && !options.IgnoreErrors { return err } @@ -437,7 +445,7 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis if shouldVisit { links, err := getLinks(ctx, ci) - if err != nil { + if err != nil && !options.IgnoreErrors { select { case errChan <- err: case <-fetchersCtx.Done(): From 27ea6f85c717d67b53caaef3c3e30b67c31b8ebe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Fri, 19 Jul 2019 12:49:23 +0200 Subject: [PATCH 3/3] more generalized/powerful error handling in the walk functions --- merkledag.go | 82 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 15 deletions(-) diff --git a/merkledag.go b/merkledag.go index 6420e69..568d445 100644 --- a/merkledag.go +++ b/merkledag.go @@ -286,19 +286,29 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks { // that 'fetchNodes' will start at a time const defaultConcurrentFetch = 32 -// WalkOptions represent the parameters of a graph walking algorithm -type WalkOptions struct { +// walkOptions represent the parameters of a graph walking algorithm +type walkOptions struct { WithRoot bool - IgnoreErrors bool Concurrency int + ErrorHandler func(c cid.Cid, err error) error } -// WalkOption is a setter for WalkOptions -type WalkOption func(*WalkOptions) +// WalkOption is a setter for walkOptions +type WalkOption func(*walkOptions) + +func (wo *walkOptions) addHandler(handler func(c cid.Cid, err error) error) { + if wo.ErrorHandler != nil { + wo.ErrorHandler = func(c cid.Cid, err error) error { + return handler(c, wo.ErrorHandler(c, err)) + } + } else { + wo.ErrorHandler = handler + } +} // WithRoot is a WalkOption indicating that the root node should be visited func WithRoot() WalkOption { - return func(walkOptions *WalkOptions) { + return func(walkOptions *walkOptions) { walkOptions.WithRoot = true } } @@ -308,7 +318,7 @@ func WithRoot() WalkOption { // NOTE: When using that option, the walk order is *not* guarantee. // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. func Concurrent() WalkOption { - return func(walkOptions *WalkOptions) { + return func(walkOptions *walkOptions) { walkOptions.Concurrency = defaultConcurrentFetch } } @@ -318,7 +328,7 @@ func Concurrent() WalkOption { // NOTE: When using that option, the walk order is *not* guarantee. // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. func Concurrency(worker int) WalkOption { - return func(walkOptions *WalkOptions) { + return func(walkOptions *walkOptions) { walkOptions.Concurrency = worker } } @@ -326,8 +336,44 @@ func Concurrency(worker int) WalkOption { // IgnoreErrors is a WalkOption indicating that the walk should attempt to // continue even when an error occur. func IgnoreErrors() WalkOption { - return func(walkOptions *WalkOptions) { - walkOptions.IgnoreErrors = true + return func(walkOptions *walkOptions) { + walkOptions.addHandler(func(c cid.Cid, err error) error { + return nil + }) + } +} + +// IgnoreMissing is a WalkOption indicating that the walk should continue when +// a node is missing. +func IgnoreMissing() WalkOption { + return func(walkOptions *walkOptions) { + walkOptions.addHandler(func(c cid.Cid, err error) error { + if err == ipld.ErrNotFound { + return nil + } + return err + }) + } +} + +// OnMissing is a WalkOption adding a callback that will be triggered on a missing +// node. +func OnMissing(callback func(c cid.Cid)) WalkOption { + return func(walkOptions *walkOptions) { + walkOptions.addHandler(func(c cid.Cid, err error) error { + if err == ipld.ErrNotFound { + callback(c) + } + return err + }) + } +} + +// OnError is a WalkOption adding a custom error handler. +// If this handler return a nil error, the walk will continue. +func OnError(handler func(c cid.Cid, err error) error) WalkOption { + return func(walkOptions *walkOptions) { + walkOptions.addHandler(handler) } } @@ -344,7 +390,7 @@ func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) // depth to a given visit function. The visit function can be used to limit DAG // exploration. func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error { - opts := &WalkOptions{} + opts := &walkOptions{} for _, opt := range options { opt(opts) } @@ -356,7 +402,7 @@ func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid } } -func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *WalkOptions) error { +func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *walkOptions) error { if depth != 0 || options.WithRoot { if !visit(root, depth) { return nil @@ -364,7 +410,10 @@ func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, d } links, err := getLinks(ctx, root) - if err != nil && !options.IgnoreErrors { + if err != nil && options.ErrorHandler != nil { + err = options.ErrorHandler(root, err) + } + if err != nil { return err } @@ -402,7 +451,7 @@ func (p *ProgressTracker) Value() int { return p.Total } -func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *WalkOptions) error { +func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *walkOptions) error { type cidDepth struct { cid cid.Cid depth int @@ -445,7 +494,10 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis if shouldVisit { links, err := getLinks(ctx, ci) - if err != nil && !options.IgnoreErrors { + if err != nil && options.ErrorHandler != nil { + err = options.ErrorHandler(root, err) + } + if err != nil { select { case errChan <- err: case <-fetchersCtx.Done():