From 9af129185ea17c4882bbf2f7c041f5f688f831ec Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 5 Apr 2023 15:18:33 +0200 Subject: [PATCH] provider: refactor to only maintain one batched implementation and add throughput callback This code does a few things: 1. Remove the simple provider to avoid duplicating features. 2. Add the support for single providing on the batched provider. 3. Fix a bugs in the batched provider. 4. Add support for a throughputCallback in the batched provider. 6. Add support for an offline mode of the batched provider (stuff is exclusively pushed onto the queue). 5. Move the batched provider to be the only provider and make the queue implementation private. --- go.mod | 1 - go.sum | 2 - provider/README.md | 30 -- provider/batched/system_test.go | 119 -------- provider/{ => internal}/queue/queue.go | 22 +- provider/{ => internal}/queue/queue_test.go | 39 +-- provider/noop.go | 32 ++ provider/offline.go | 29 -- provider/provider.go | 108 ++++++- provider/{batched/system.go => reprovider.go} | 259 +++++++++++----- provider/reprovider_test.go | 219 +++++++++++++ provider/simple/provider.go | 116 ------- provider/simple/provider_test.go | 166 ---------- provider/simple/reprovide.go | 255 ---------------- provider/simple/reprovide_test.go | 289 ------------------ provider/system.go | 60 ---- 16 files changed, 556 insertions(+), 1190 deletions(-) delete mode 100644 provider/README.md delete mode 100644 provider/batched/system_test.go rename provider/{ => internal}/queue/queue.go (86%) rename provider/{ => internal}/queue/queue_test.go (80%) create mode 100644 provider/noop.go delete mode 100644 provider/offline.go rename provider/{batched/system.go => reprovider.go} (55%) create mode 100644 provider/reprovider_test.go delete mode 100644 provider/simple/provider.go delete mode 100644 provider/simple/provider_test.go delete mode 100644 provider/simple/reprovide.go delete mode 100644 provider/simple/reprovide_test.go delete mode 100644 provider/system.go diff --git a/go.mod b/go.mod index 5be828081..2d08290fc 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.19 require ( github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a github.com/benbjohnson/clock v1.3.0 - github.com/cenkalti/backoff v2.2.1+incompatible github.com/cespare/xxhash/v2 v2.2.0 github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 github.com/cskr/pubsub v1.0.2 diff --git a/go.sum b/go.sum index 86980317b..7b4a7f276 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/provider/README.md b/provider/README.md deleted file mode 100644 index 0e4f4650d..000000000 --- a/provider/README.md +++ /dev/null @@ -1,30 +0,0 @@ -## Usage - -Here's how you create, start, interact with, and stop the provider system: - -```golang -import ( - "context" - "time" - - "github.com/ipfs/boxo/provider" - "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/boxo/provider/simple" -) - -rsys := (your routing system here) -dstore := (your datastore here) -cid := (your cid to provide here) - -q := queue.NewQueue(context.Background(), "example", dstore) - -reprov := simple.NewReprovider(context.Background(), time.Hour * 12, rsys, simple.NewBlockstoreProvider(dstore)) -prov := simple.NewProvider(context.Background(), q, rsys) -sys := provider.NewSystem(prov, reprov) - -sys.Run() - -sys.Provide(cid) - -sys.Close() -``` diff --git a/provider/batched/system_test.go b/provider/batched/system_test.go deleted file mode 100644 index c8a7d7b84..000000000 --- a/provider/batched/system_test.go +++ /dev/null @@ -1,119 +0,0 @@ -package batched - -import ( - "context" - "strconv" - "sync" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - mh "github.com/multiformats/go-multihash" - - "github.com/ipfs/boxo/internal/test" - q "github.com/ipfs/boxo/provider/queue" -) - -type mockProvideMany struct { - lk sync.Mutex - keys []mh.Multihash -} - -func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { - m.lk.Lock() - defer m.lk.Unlock() - m.keys = keys - return nil -} - -func (m *mockProvideMany) Ready() bool { - return true -} - -func (m *mockProvideMany) GetKeys() []mh.Multihash { - m.lk.Lock() - defer m.lk.Unlock() - return m.keys[:] -} - -var _ provideMany = (*mockProvideMany)(nil) - -func TestBatched(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - provider := &mockProvideMany{} - - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - const numProvides = 100 - keysToProvide := make(map[cid.Cid]int) - for i := 0; i < numProvides; i++ { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - c := cid.NewCidV1(cid.Raw, h) - keysToProvide[c] = i - } - - batchSystem, err := New(provider, queue, KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { - ch := make(chan cid.Cid) - go func() { - for k := range keysToProvide { - select { - case ch <- k: - case <-ctx.Done(): - return - } - } - }() - return ch, nil - }), initialReprovideDelay(0)) - if err != nil { - t.Fatal(err) - } - - batchSystem.Run() - - var keys []mh.Multihash - for { - if ctx.Err() != nil { - t.Fatal("test hung") - } - keys = provider.GetKeys() - if len(keys) != 0 { - break - } - time.Sleep(time.Millisecond * 100) - } - - if len(keys) != numProvides { - t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) - } - - provMap := make(map[string]struct{}) - for _, k := range keys { - provMap[string(k)] = struct{}{} - } - - for i := 0; i < numProvides; i++ { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - if _, found := provMap[string(h)]; !found { - t.Fatalf("could not find provider with value %d", i) - } - } -} diff --git a/provider/queue/queue.go b/provider/internal/queue/queue.go similarity index 86% rename from provider/queue/queue.go rename to provider/internal/queue/queue.go index 618256bbe..dada92788 100644 --- a/provider/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -3,6 +3,7 @@ package queue import ( "context" "fmt" + cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" @@ -20,7 +21,6 @@ var log = logging.Logger("provider.queue") type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider - name string ctx context.Context ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid @@ -32,11 +32,10 @@ type Queue struct { } // NewQueue creates a queue for cids -func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { - namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) - cancelCtx, cancel := context.WithCancel(ctx) +func NewQueue(ds datastore.Datastore) *Queue { + namespaced := namespace.Wrap(ds, datastore.NewKey("/queue")) + cancelCtx, cancel := context.WithCancel(context.Background()) q := &Queue{ - name: name, ctx: cancelCtx, ds: namespaced, dequeue: make(chan cid.Cid), @@ -45,13 +44,16 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, closed: make(chan struct{}, 1), } q.work() - return q, nil + return q } // Close stops the queue func (q *Queue) Close() error { q.close() <-q.closed + // We don't close dequeue because the provider which consume this get caught in + // an infinite loop dequeing cid.Undef if we do that. + // The provider has it's own select on top of dequeue and will handle this by itself. return nil } @@ -79,8 +81,6 @@ func (q *Queue) work() { defer func() { // also cancels any in-progess enqueue tasks. q.close() - // unblocks anyone waiting - close(q.dequeue) // unblocks the close call close(q.closed) }() @@ -121,6 +121,12 @@ func (q *Queue) work() { q.counter++ nextKey := datastore.NewKey(keyPath) + if c == cid.Undef { + // fast path, skip rereading the datastore if we don't have anything in hand yet + c = toQueue + k = nextKey + } + if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to enqueue cid: %s", err) continue diff --git a/provider/queue/queue_test.go b/provider/internal/queue/queue_test.go similarity index 80% rename from provider/queue/queue_test.go rename to provider/internal/queue/queue_test.go index 9eacf4349..a2d9f0be4 100644 --- a/provider/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -43,10 +43,8 @@ func TestBasicOperation(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) + defer queue.Close() cids := makeCids(10) @@ -63,10 +61,8 @@ func TestMangledData(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) + defer queue.Close() cids := makeCids(10) for _, c := range cids { @@ -75,7 +71,7 @@ func TestMangledData(t *testing.T) { // put bad data in the queue queueKey := datastore.NewKey("/test/0") - err = queue.ds.Put(ctx, queueKey, []byte("borked")) + err := queue.ds.Put(ctx, queueKey, []byte("borked")) if err != nil { t.Fatal(err) } @@ -91,10 +87,8 @@ func TestInitialization(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) + defer queue.Close() cids := makeCids(10) for _, c := range cids { @@ -104,10 +98,8 @@ func TestInitialization(t *testing.T) { assertOrdered(cids[:5], queue, t) // make a new queue, same data - queue, err = NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue = NewQueue(ds) + defer queue.Close() assertOrdered(cids[5:], queue, t) } @@ -118,21 +110,18 @@ func TestInitializationWithManyCids(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) cids := makeCids(25) for _, c := range cids { queue.Enqueue(c) } + queue.Close() + // make a new queue, same data - queue, err = NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue = NewQueue(ds) + defer queue.Close() assertOrdered(cids, queue, t) } diff --git a/provider/noop.go b/provider/noop.go new file mode 100644 index 000000000..5367ccb30 --- /dev/null +++ b/provider/noop.go @@ -0,0 +1,32 @@ +package provider + +import ( + "context" + + "github.com/ipfs/go-cid" +) + +type noopProvider struct{} + +var _ System = (*noopProvider)(nil) + +// NewNoopProvider creates a ProviderSystem that does nothing. +func NewNoopProvider() System { + return &noopProvider{} +} + +func (op *noopProvider) Close() error { + return nil +} + +func (op *noopProvider) Provide(cid.Cid) error { + return nil +} + +func (op *noopProvider) Reprovide(context.Context) error { + return nil +} + +func (op *noopProvider) Stat() (ReproviderStats, error) { + return ReproviderStats{}, nil +} diff --git a/provider/offline.go b/provider/offline.go deleted file mode 100644 index 030a70ab1..000000000 --- a/provider/offline.go +++ /dev/null @@ -1,29 +0,0 @@ -package provider - -import ( - "context" - - "github.com/ipfs/go-cid" -) - -type offlineProvider struct{} - -// NewOfflineProvider creates a ProviderSystem that does nothing -func NewOfflineProvider() System { - return &offlineProvider{} -} - -func (op *offlineProvider) Run() { -} - -func (op *offlineProvider) Close() error { - return nil -} - -func (op *offlineProvider) Provide(cid.Cid) error { - return nil -} - -func (op *offlineProvider) Reprovide(context.Context) error { - return nil -} diff --git a/provider/provider.go b/provider/provider.go index 3b9c6ba3e..8b85fe62f 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -3,25 +3,115 @@ package provider import ( "context" + blocks "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/fetcher" + fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" + "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/go-cid" + "github.com/ipfs/go-cidutil" + logging "github.com/ipfs/go-log" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) +var logR = logging.Logger("reprovider.simple") + // Provider announces blocks to the network type Provider interface { - // Run is used to begin processing the provider work - Run() // Provide takes a cid and makes an attempt to announce it to the network Provide(cid.Cid) error - // Close stops the provider - Close() error } // Reprovider reannounces blocks to the network type Reprovider interface { - // Run is used to begin processing the reprovider work and waiting for reprovide triggers - Run() - // Trigger a reprovide - Trigger(context.Context) error - // Close stops the reprovider + // Reprovide starts a new reprovide if one isn't running already. + Reprovide(context.Context) error +} + +// System defines the interface for interacting with the value +// provider system +type System interface { Close() error + Stat() (ReproviderStats, error) + Provider + Reprovider +} + +// KeyChanFunc is function streaming CIDs to pass to content routing +type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) + +// NewBlockstoreProvider returns key provider using bstore.AllKeysChan +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + return bstore.AllKeysChan(ctx) + } +} + +// NewPinnedProvider returns provider supplying pinned keys +func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.Factory) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) + if err != nil { + return nil, err + } + + outCh := make(chan cid.Cid) + go func() { + defer close(outCh) + for c := range set.New { + select { + case <-ctx.Done(): + return + case outCh <- c: + } + } + + }() + + return outCh, nil + } +} + +func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { + set := cidutil.NewStreamingSet() + + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(set.New) + + dkeys, err := pinning.DirectKeys(ctx) + if err != nil { + logR.Errorf("reprovide direct pins: %s", err) + return + } + for _, key := range dkeys { + set.Visitor(ctx)(key) + } + + rkeys, err := pinning.RecursiveKeys(ctx) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + + session := fetchConfig.NewSession(ctx) + for _, key := range rkeys { + set.Visitor(ctx)(key) + if !onlyRoots { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { + clink, ok := res.LastBlockLink.(cidlink.Link) + if ok { + set.Visitor(ctx)(clink.Cid) + } + return nil + }) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + } + } + }() + + return set, nil } diff --git a/provider/batched/system.go b/provider/reprovider.go similarity index 55% rename from provider/batched/system.go rename to provider/reprovider.go index e3cb0325a..055cfe5c9 100644 --- a/provider/batched/system.go +++ b/provider/reprovider.go @@ -1,26 +1,26 @@ -package batched +package provider import ( "context" "errors" "fmt" + "math" "strconv" "sync" "time" - provider "github.com/ipfs/boxo/provider" - "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/boxo/provider/simple" + "github.com/ipfs/boxo/provider/internal/queue" "github.com/ipfs/boxo/verifcid" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + namespace "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log" "github.com/multiformats/go-multihash" ) var log = logging.Logger("provider.batched") -type BatchProvidingSystem struct { +type reprovider struct { ctx context.Context close context.CancelFunc closewg sync.WaitGroup @@ -29,39 +29,64 @@ type BatchProvidingSystem struct { initalReprovideDelay time.Duration initialReprovideDelaySet bool - rsys provideMany - keyProvider simple.KeyChanFunc + rsys Provide + keyProvider KeyChanFunc q *queue.Queue ds datastore.Batching reprovideCh chan cid.Cid - totalProvides, lastReprovideBatchSize int + maxReprovideBatchSize uint + + statLk sync.Mutex + totalProvides, lastReprovideBatchSize uint64 avgProvideDuration, lastReprovideDuration time.Duration + + throughputCallback ThroughputCallback + // throughputProvideCurrentCount counts how many provides has been done since the last call to throughputCallback + throughputProvideCurrentCount uint + // throughputDurationSum sums up durations between two calls to the throughputCallback + throughputDurationSum time.Duration + throughputMinimumProvides uint + + keyPrefix datastore.Key } -var _ provider.System = (*BatchProvidingSystem)(nil) +var _ System = (*reprovider)(nil) + +type Provide interface { + Provide(context.Context, cid.Cid, bool) error +} -type provideMany interface { +type ProvideMany interface { ProvideMany(ctx context.Context, keys []multihash.Multihash) error +} + +type Ready interface { Ready() bool } // Option defines the functional option type that can be used to configure // BatchProvidingSystem instances -type Option func(system *BatchProvidingSystem) error - -var lastReprovideKey = datastore.NewKey("/provider/reprovide/lastreprovide") - -func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingSystem, error) { - s := &BatchProvidingSystem{ - reprovideInterval: time.Hour * 24, - rsys: provider, - keyProvider: nil, - q: q, - ds: datastore.NewMapDatastore(), - reprovideCh: make(chan cid.Cid), +type Option func(system *reprovider) error + +var lastReprovideKey = datastore.NewKey("/reprovide/lastreprovide") +var DefaultKeyPrefix = datastore.NewKey("/provider") + +// New creates a new [System]. By default it is offline, that means it will +// enqueue tasks in ds. +// To have it publish records in the network use the [Online] option. +// If provider casts to [ProvideMany] the [ProvideMany.ProvideMany] method will +// be called instead. +// +// If provider casts to [Ready], it will wait until [Ready.Ready] is true. +func New(ds datastore.Batching, opts ...Option) (System, error) { + s := &reprovider{ + reprovideInterval: time.Hour * 24, + maxReprovideBatchSize: math.MaxUint, + keyPrefix: DefaultKeyPrefix, + reprovideCh: make(chan cid.Cid), } for _, o := range opts { @@ -89,50 +114,93 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS } } + s.ds = namespace.Wrap(ds, s.keyPrefix) + s.q = queue.NewQueue(s.ds) + // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options ctx, cancel := context.WithCancel(context.Background()) s.ctx = ctx s.close = cancel - return s, nil -} + if s.rsys != nil { + if _, ok := s.rsys.(ProvideMany); !ok { + s.maxReprovideBatchSize = 1 + } -func Datastore(batching datastore.Batching) Option { - return func(system *BatchProvidingSystem) error { - system.ds = batching - return nil + s.run() } + + return s, nil } func ReproviderInterval(duration time.Duration) Option { - return func(system *BatchProvidingSystem) error { + return func(system *reprovider) error { system.reprovideInterval = duration return nil } } -func KeyProvider(fn simple.KeyChanFunc) Option { - return func(system *BatchProvidingSystem) error { +func KeyProvider(fn KeyChanFunc) Option { + return func(system *reprovider) error { system.keyProvider = fn return nil } } +// DatastorePrefix sets a prefix for internal state stored in the Datastore. +// Defaults to [DefaultKeyPrefix]. +func DatastorePrefix(k datastore.Key) Option { + return func(system *reprovider) error { + system.keyPrefix = k + return nil + } +} + +// ThroughputReport will fire the callback synchronously once at least limit +// multihashes have been advertised, it will then wait until a new set of at least +// limit multihashes has been advertised. +// While ThroughputReport is set batches will be at most minimumProvides big. +// If it returns false it wont ever be called again. +func ThroughputReport(f ThroughputCallback, minimumProvides uint) Option { + return func(system *reprovider) error { + system.throughputCallback = f + system.throughputMinimumProvides = minimumProvides + return nil + } +} + +type ThroughputCallback = func(reprovide bool, complete bool, totalKeysProvided uint, totalDuration time.Duration) (continueWatching bool) + +// Online will enable the router and make it send publishes online. +// nil can be used to turn the router offline. +// You can't register multiple providers, if this option is passed multiple times +// it will error. +func Online(rsys Provide) Option { + return func(system *reprovider) error { + if system.rsys != nil { + return fmt.Errorf("trying to register two provider on the same reprovider") + } + system.rsys = rsys + return nil + } +} + func initialReprovideDelay(duration time.Duration) Option { - return func(system *BatchProvidingSystem) error { + return func(system *reprovider) error { system.initialReprovideDelaySet = true system.initalReprovideDelay = duration return nil } } -func (s *BatchProvidingSystem) Run() { - // how long we wait between the first provider we hear about and batching up the provides to send out - const pauseDetectionThreshold = time.Millisecond * 500 - // how long we are willing to collect providers for the batch after we receive the first one - const maxCollectionDuration = time.Minute * 10 +// how long we wait between the first provider we hear about and batching up the provides to send out +const pauseDetectionThreshold = time.Millisecond * 500 +// how long we are willing to collect providers for the batch after we receive the first one +const maxCollectionDuration = time.Minute * 10 + +func (s *reprovider) run() { provCh := s.q.Dequeue() s.closewg.Add(1) @@ -166,27 +234,16 @@ func (s *BatchProvidingSystem) Run() { for { performedReprovide := false + complete := false + + batchSize := s.maxReprovideBatchSize + if s.throughputCallback != nil && s.throughputMinimumProvides < batchSize { + batchSize = s.throughputMinimumProvides + } // at the start of every loop the maxCollectionDurationTimer and pauseDetectTimer should be already be // stopped and have empty channels - loop: - for { - select { - case <-maxCollectionDurationTimer.C: - // if this timer has fired then the pause timer has started so let's stop and empty it - stopAndEmptyTimer(pauseDetectTimer) - break loop - default: - } - - select { - case c := <-provCh: - resetTimersAfterReceivingProvide() - m[c] = struct{}{} - continue - default: - } - + for uint(len(m)) < batchSize { select { case c := <-provCh: resetTimersAfterReceivingProvide() @@ -198,15 +255,19 @@ func (s *BatchProvidingSystem) Run() { case <-pauseDetectTimer.C: // if this timer has fired then the max collection timer has started so let's stop and empty it stopAndEmptyTimer(maxCollectionDurationTimer) - break loop + complete = true + goto AfterLoop case <-maxCollectionDurationTimer.C: // if this timer has fired then the pause timer has started so let's stop and empty it stopAndEmptyTimer(pauseDetectTimer) - break loop + goto AfterLoop case <-s.ctx.Done(): return } } + stopAndEmptyTimer(pauseDetectTimer) + stopAndEmptyTimer(maxCollectionDurationTimer) + AfterLoop: if len(m) == 0 { continue @@ -230,41 +291,63 @@ func (s *BatchProvidingSystem) Run() { continue } - for !s.rsys.Ready() { - log.Debugf("reprovider system not ready") - select { - case <-time.After(time.Minute): - case <-s.ctx.Done(): - return + if r, ok := s.rsys.(Ready); ok { + ticker := time.NewTicker(time.Minute) + for !r.Ready() { + log.Debugf("reprovider system not ready") + select { + case <-ticker.C: + case <-s.ctx.Done(): + return + } } + ticker.Stop() } log.Debugf("starting provide of %d keys", len(keys)) start := time.Now() - err := s.rsys.ProvideMany(s.ctx, keys) + err := doProvideMany(s.ctx, s.rsys, keys) if err != nil { log.Debugf("providing failed %v", err) continue } dur := time.Since(start) - totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideDuration) - recentAvgProvideDuration := time.Duration(int64(dur) / int64(len(keys))) - s.avgProvideDuration = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys))) - s.totalProvides += len(keys) + totalProvideTime := time.Duration(s.totalProvides) * s.avgProvideDuration + recentAvgProvideDuration := dur / time.Duration(len(keys)) + + s.statLk.Lock() + s.avgProvideDuration = time.Duration((totalProvideTime + dur) / (time.Duration(s.totalProvides) + time.Duration(len(keys)))) + s.totalProvides += uint64(len(keys)) log.Debugf("finished providing of %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration) if performedReprovide { - s.lastReprovideBatchSize = len(keys) + s.lastReprovideBatchSize = uint64(len(keys)) s.lastReprovideDuration = dur + s.statLk.Unlock() + + // Don't hold the lock while writing to disk, consumers don't need to wait on IO to read thoses fields. + if err := s.ds.Put(s.ctx, lastReprovideKey, storeTime(time.Now())); err != nil { log.Errorf("could not store last reprovide time: %v", err) } if err := s.ds.Sync(s.ctx, lastReprovideKey); err != nil { log.Errorf("could not perform sync of last reprovide time: %v", err) } + } else { + s.statLk.Unlock() + } + + s.throughputDurationSum += dur + s.throughputProvideCurrentCount += uint(len(keys)) + if s.throughputCallback != nil && s.throughputProvideCurrentCount >= s.throughputMinimumProvides { + if more := s.throughputCallback(performedReprovide, complete, s.throughputProvideCurrentCount, s.throughputDurationSum); !more { + s.throughputCallback = nil + } + s.throughputProvideCurrentCount = 0 + s.throughputDurationSum = 0 } } }() @@ -327,22 +410,22 @@ func parseTime(b []byte) (time.Time, error) { return time.Unix(0, tns), nil } -func (s *BatchProvidingSystem) Close() error { +func (s *reprovider) Close() error { s.close() err := s.q.Close() s.closewg.Wait() return err } -func (s *BatchProvidingSystem) Provide(cid cid.Cid) error { +func (s *reprovider) Provide(cid cid.Cid) error { return s.q.Enqueue(cid) } -func (s *BatchProvidingSystem) Reprovide(ctx context.Context) error { +func (s *reprovider) Reprovide(ctx context.Context) error { return s.reprovide(ctx, true) } -func (s *BatchProvidingSystem) reprovide(ctx context.Context, force bool) error { +func (s *reprovider) reprovide(ctx context.Context, force bool) error { if !s.shouldReprovide() && !force { return nil } @@ -373,7 +456,7 @@ reprovideCidLoop: return nil } -func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { +func (s *reprovider) getLastReprovideTime() (time.Time, error) { val, err := s.ds.Get(s.ctx, lastReprovideKey) if errors.Is(err, datastore.ErrNotFound) { return time.Time{}, nil @@ -390,31 +473,45 @@ func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { return t, nil } -func (s *BatchProvidingSystem) shouldReprovide() bool { +func (s *reprovider) shouldReprovide() bool { t, err := s.getLastReprovideTime() if err != nil { log.Debugf("getting last reprovide time failed: %s", err) return false } - if time.Since(t) < time.Duration(float64(s.reprovideInterval)*0.5) { + if time.Since(t) < s.reprovideInterval { return false } return true } -type BatchedProviderStats struct { - TotalProvides, LastReprovideBatchSize int +type ReproviderStats struct { + TotalProvides, LastReprovideBatchSize uint64 AvgProvideDuration, LastReprovideDuration time.Duration } // Stat returns various stats about this provider system -func (s *BatchProvidingSystem) Stat(ctx context.Context) (BatchedProviderStats, error) { - // TODO: Does it matter that there is no locking around the total+average values? - return BatchedProviderStats{ +func (s *reprovider) Stat() (ReproviderStats, error) { + s.statLk.Lock() + defer s.statLk.Unlock() + return ReproviderStats{ TotalProvides: s.totalProvides, LastReprovideBatchSize: s.lastReprovideBatchSize, AvgProvideDuration: s.avgProvideDuration, LastReprovideDuration: s.lastReprovideDuration, }, nil } + +func doProvideMany(ctx context.Context, r Provide, keys []multihash.Multihash) error { + if many, ok := r.(ProvideMany); ok { + return many.ProvideMany(ctx, keys) + } + + for _, k := range keys { + if err := r.Provide(ctx, cid.NewCidV1(cid.Raw, k), true); err != nil { + return err + } + } + return nil +} diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go new file mode 100644 index 000000000..bfb8fc187 --- /dev/null +++ b/provider/reprovider_test.go @@ -0,0 +1,219 @@ +package provider + +import ( + "bytes" + "context" + "runtime" + "strconv" + "sync" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" +) + +type allFeatures interface { + Provide + ProvideMany + Ready +} + +type mockProvideMany struct { + delay time.Duration + lk sync.Mutex + keys []mh.Multihash + calls uint +} + +func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { + m.lk.Lock() + defer m.lk.Unlock() + m.keys = append(m.keys, keys...) + m.calls++ + time.Sleep(time.Duration(len(keys)) * m.delay) + return nil +} + +func (m *mockProvideMany) Provide(ctx context.Context, key cid.Cid, _ bool) error { + m.lk.Lock() + defer m.lk.Unlock() + m.keys = append(m.keys, key.Hash()) + m.calls++ + time.Sleep(m.delay) + return nil +} + +func (m *mockProvideMany) Ready() bool { + return true +} + +func (m *mockProvideMany) GetKeys() (keys []mh.Multihash, calls uint) { + m.lk.Lock() + defer m.lk.Unlock() + return append([]mh.Multihash(nil), m.keys...), m.calls +} + +var _ allFeatures = (*mockProvideMany)(nil) + +type allButMany interface { + Provide + Ready +} + +type singleMockWrapper struct { + allButMany +} + +func TestReprovider(t *testing.T) { + t.Parallel() + t.Run("many", func(t *testing.T) { + t.Parallel() + testProvider(t, false) + }) + t.Run("single", func(t *testing.T) { + t.Parallel() + testProvider(t, true) + }) +} + +func testProvider(t *testing.T, singleProvide bool) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + + // It has to be so big because the combo of noisy CI runners + OSes that don't + // have scheduler as good as linux's one add a whole lot of jitter. + const provideDelay = time.Millisecond * 5 + orig := &mockProvideMany{ + delay: provideDelay, + } + var provider Provide = orig + if singleProvide { + provider = singleMockWrapper{orig} + } + + const numProvides = 100 + keysToProvide := make([]cid.Cid, numProvides) + for i := range keysToProvide { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + c := cid.NewCidV1(cid.Raw, h) + keysToProvide[i] = c + } + + var keyWait sync.Mutex + keyWait.Lock() + batchSystem, err := New(ds, Online(provider), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + go func() { + defer keyWait.Unlock() + for _, k := range keysToProvide { + select { + case ch <- k: + case <-ctx.Done(): + return + } + } + }() + return ch, nil + }), + initialReprovideDelay(0), + ThroughputReport(func(_, complete bool, n uint, d time.Duration) bool { + if !singleProvide && complete { + t.Errorf("expected an incomplete report but got a complete one") + } + + const twentyFivePercent = provideDelay / 4 + const seventyFivePercent = provideDelay - twentyFivePercent + const hundredTwentyFivePercent = provideDelay + twentyFivePercent + + avg := d / time.Duration(n) + + // windows's and darwin's schedulers and timers are too unreliable for this check + if runtime.GOOS != "windows" && runtime.GOOS != "darwin" && !(seventyFivePercent <= avg && avg <= hundredTwentyFivePercent) { + t.Errorf("average computed duration is not within bounds, expected between %v and %v but got %v.", seventyFivePercent, hundredTwentyFivePercent, avg) + } + return false + }, numProvides/2), + ) + if err != nil { + t.Fatal(err) + } + defer batchSystem.Close() + + keyWait.Lock() + time.Sleep(pauseDetectionThreshold + time.Millisecond*50) // give it time to call provider after that + + keys, calls := orig.GetKeys() + if len(keys) != numProvides { + t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) + } + if singleProvide { + if calls != 100 { + t.Fatalf("expected 100 call single provide call, got %d", calls) + } + } else { + // Two because of ThroughputReport's limit being half. + if calls != 2 { + t.Fatalf("expected 2 call batched provide call, got %d", calls) + } + } + + provMap := make(map[string]struct{}) + for _, k := range keys { + provMap[string(k)] = struct{}{} + } + + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + if _, found := provMap[string(h)]; !found { + t.Fatalf("could not find provider with value %d", i) + } + } +} + +func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { + // Don't run in Parallel as this test is time sensitive. + + someHash, err := mh.Sum([]byte("Vires in Numeris!"), mh.BLAKE3, -1) + assert.NoError(t, err) + c := cid.NewCidV1(cid.Raw, someHash) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + + // First public using an offline system to enqueue in the datastore. + sys, err := New(ds) + assert.NoError(t, err) + + err = sys.Provide(c) + assert.NoError(t, err) + + err = sys.Close() + assert.NoError(t, err) + + // Secondly restart an online datastore and we want to see this previously provided cid published. + prov := &mockProvideMany{} + sys, err = New(ds, Online(prov), initialReprovideDelay(0)) + assert.NoError(t, err) + + time.Sleep(pauseDetectionThreshold + time.Millisecond*10) // give it time to call provider after that + + err = sys.Close() + assert.NoError(t, err) + + prov.lk.Lock() + defer prov.lk.Unlock() + if len(prov.keys) != 1 { + t.Fatalf("expected to see 1 provide; got %d", len(prov.keys)) + } + if !bytes.Equal(prov.keys[0], someHash) { + t.Fatalf("keys are not equal expected %v, got %v", someHash, prov.keys[0]) + } +} diff --git a/provider/simple/provider.go b/provider/simple/provider.go deleted file mode 100644 index 63de031ad..000000000 --- a/provider/simple/provider.go +++ /dev/null @@ -1,116 +0,0 @@ -// Package simple implements structures and methods to provide blocks, -// keep track of which blocks are provided, and to allow those blocks to -// be reprovided. -package simple - -import ( - "context" - "time" - - q "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p/core/routing" -) - -var logP = logging.Logger("provider.simple") - -// Provider announces blocks to the network -type Provider struct { - ctx context.Context - // the CIDs for which provide announcements should be made - queue *q.Queue - // used to announce providing to the network - contentRouting routing.ContentRouting - // how long to wait for announce to complete before giving up - timeout time.Duration - // how many workers concurrently work through thhe queue - workerLimit int -} - -// Option defines the functional option type that can be used to configure -// provider instances -type Option func(*Provider) - -// WithTimeout is an option to set a timeout on a provider -func WithTimeout(timeout time.Duration) Option { - return func(p *Provider) { - p.timeout = timeout - } -} - -// MaxWorkers is an option to set the max workers on a provider -func MaxWorkers(count int) Option { - return func(p *Provider) { - p.workerLimit = count - } -} - -// NewProvider creates a provider that announces blocks to the network using a content router -func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider { - p := &Provider{ - ctx: ctx, - queue: queue, - contentRouting: contentRouting, - workerLimit: 8, - } - - for _, option := range options { - option(p) - } - - return p -} - -// Close stops the provider -func (p *Provider) Close() error { - return p.queue.Close() -} - -// Run workers to handle provide requests. -func (p *Provider) Run() { - p.handleAnnouncements() -} - -// Provide the given cid using specified strategy. -func (p *Provider) Provide(root cid.Cid) error { - return p.queue.Enqueue(root) -} - -// Handle all outgoing cids by providing (announcing) them -func (p *Provider) handleAnnouncements() { - for workers := 0; workers < p.workerLimit; workers++ { - go func() { - for p.ctx.Err() == nil { - select { - case <-p.ctx.Done(): - return - case c, ok := <-p.queue.Dequeue(): - if !ok { - // queue closed. - return - } - - p.doProvide(c) - } - } - }() - } -} - -func (p *Provider) doProvide(c cid.Cid) { - ctx := p.ctx - if p.timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, p.timeout) - defer cancel() - } else { - ctx = p.ctx - } - - logP.Info("announce - start - ", c) - if err := p.contentRouting.Provide(ctx, c, true); err != nil { - logP.Warnf("Unable to provide entry: %s, %s", c, err) - } - logP.Info("announce - end - ", c) -} diff --git a/provider/simple/provider_test.go b/provider/simple/provider_test.go deleted file mode 100644 index 8734c8ff6..000000000 --- a/provider/simple/provider_test.go +++ /dev/null @@ -1,166 +0,0 @@ -package simple_test - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/sync" - blocksutil "github.com/ipfs/go-ipfs-blocksutil" - "github.com/libp2p/go-libp2p/core/peer" - - "github.com/ipfs/boxo/internal/test" - q "github.com/ipfs/boxo/provider/queue" - - . "github.com/ipfs/boxo/provider/simple" -) - -var blockGenerator = blocksutil.NewBlockGenerator() - -type mockRouting struct { - provided chan cid.Cid -} - -func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { - select { - case r.provided <- cid: - case <-ctx.Done(): - panic("context cancelled, but shouldn't have") - } - return nil -} - -func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan peer.AddrInfo { - return nil -} - -func mockContentRouting() *mockRouting { - r := mockRouting{} - r.provided = make(chan cid.Cid) - return &r -} - -func TestAnnouncement(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r) - prov.Run() - - cids := cid.NewSet() - - for i := 0; i < 100; i++ { - c := blockGenerator.Next().Cid() - cids.Add(c) - } - - go func() { - for _, c := range cids.Keys() { - err = prov.Provide(c) - // A little goroutine stirring to exercise some different states - r := rand.Intn(10) - time.Sleep(time.Microsecond * time.Duration(r)) - } - }() - - for cids.Len() > 0 { - select { - case cp := <-r.provided: - if !cids.Has(cp) { - t.Fatal("Wrong CID provided") - } - cids.Remove(cp) - case <-time.After(time.Second * 5): - t.Fatal("Timeout waiting for cids to be provided.") - } - } - prov.Close() - - select { - case cp := <-r.provided: - t.Fatal("did not expect to provide CID: ", cp) - case <-time.After(time.Second * 1): - } -} - -func TestClose(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r) - prov.Run() - - prov.Close() - - select { - case cp := <-r.provided: - t.Fatal("did not expect to provide anything, provided: ", cp) - case <-time.After(time.Second * 1): - } -} - -func TestAnnouncementTimeout(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r, WithTimeout(1*time.Second)) - prov.Run() - - cids := cid.NewSet() - - for i := 0; i < 100; i++ { - c := blockGenerator.Next().Cid() - cids.Add(c) - } - - go func() { - for _, c := range cids.Keys() { - err = prov.Provide(c) - // A little goroutine stirring to exercise some different states - r := rand.Intn(10) - time.Sleep(time.Microsecond * time.Duration(r)) - } - }() - - for cids.Len() > 0 { - select { - case cp := <-r.provided: - if !cids.Has(cp) { - t.Fatal("Wrong CID provided") - } - cids.Remove(cp) - case <-time.After(time.Second * 5): - t.Fatal("Timeout waiting for cids to be provided.") - } - } -} diff --git a/provider/simple/reprovide.go b/provider/simple/reprovide.go deleted file mode 100644 index a29b484fc..000000000 --- a/provider/simple/reprovide.go +++ /dev/null @@ -1,255 +0,0 @@ -package simple - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/cenkalti/backoff" - blocks "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/fetcher" - fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" - "github.com/ipfs/boxo/verifcid" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-cidutil" - logging "github.com/ipfs/go-log" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/libp2p/go-libp2p/core/routing" -) - -var logR = logging.Logger("reprovider.simple") - -// ErrClosed is returned by Trigger when operating on a closed reprovider. -var ErrClosed = errors.New("reprovider service stopped") - -// KeyChanFunc is function streaming CIDs to pass to content routing -type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) - -// Reprovider reannounces blocks to the network -type Reprovider struct { - // Reprovider context. Cancel to stop, then wait on closedCh. - ctx context.Context - cancel context.CancelFunc - closedCh chan struct{} - - // Trigger triggers a reprovide. - trigger chan chan<- error - - // The routing system to provide values through - rsys routing.ContentRouting - - keyProvider KeyChanFunc - - tick time.Duration -} - -// NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, reprovideInterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { - ctx, cancel := context.WithCancel(ctx) - return &Reprovider{ - ctx: ctx, - cancel: cancel, - closedCh: make(chan struct{}), - trigger: make(chan chan<- error), - - rsys: rsys, - keyProvider: keyProvider, - tick: reprovideInterval, - } -} - -// Close the reprovider -func (rp *Reprovider) Close() error { - rp.cancel() - <-rp.closedCh - return nil -} - -// Run re-provides keys with 'tick' interval or when triggered -func (rp *Reprovider) Run() { - defer close(rp.closedCh) - - var initialReprovideCh, reprovideCh <-chan time.Time - - // If reproviding is enabled (non-zero) - if rp.tick > 0 { - reprovideTicker := time.NewTicker(rp.tick) - defer reprovideTicker.Stop() - reprovideCh = reprovideTicker.C - - // If the reprovide ticker is larger than a minute (likely), - // provide once after we've been up a minute. - // - // Don't provide _immediately_ as we might be just about to stop. - if rp.tick > time.Minute { - initialReprovideTimer := time.NewTimer(time.Minute) - defer initialReprovideTimer.Stop() - - initialReprovideCh = initialReprovideTimer.C - } - } - - var done chan<- error - for rp.ctx.Err() == nil { - select { - case <-initialReprovideCh: - case <-reprovideCh: - case done = <-rp.trigger: - case <-rp.ctx.Done(): - return - } - - err := rp.Reprovide() - - // only log if we've hit an actual error, otherwise just tell the client we're shutting down - if rp.ctx.Err() != nil { - err = ErrClosed - } else if err != nil { - logR.Errorf("failed to reprovide: %s", err) - } - - if done != nil { - if err != nil { - done <- err - } - close(done) - } - } -} - -// Reprovide registers all keys given by rp.keyProvider to libp2p content routing -func (rp *Reprovider) Reprovide() error { - keychan, err := rp.keyProvider(rp.ctx) - if err != nil { - return fmt.Errorf("failed to get key chan: %s", err) - } - for c := range keychan { - // hash security - if err := verifcid.ValidateCid(c); err != nil { - logR.Errorf("insecure hash in reprovider, %s (%s)", c, err) - continue - } - op := func() error { - err := rp.rsys.Provide(rp.ctx, c, true) - if err != nil { - logR.Debugf("Failed to provide key: %s", err) - } - return err - } - - err := backoff.Retry(op, backoff.WithContext(backoff.NewExponentialBackOff(), rp.ctx)) - if err != nil { - logR.Debugf("Providing failed after number of retries: %s", err) - return err - } - } - return nil -} - -// Trigger starts the reprovision process in rp.Run and waits for it to finish. -// -// Returns an error if a reprovide is already in progress. -func (rp *Reprovider) Trigger(ctx context.Context) error { - resultCh := make(chan error, 1) - select { - case rp.trigger <- resultCh: - default: - return fmt.Errorf("reprovider is already running") - } - - select { - case err := <-resultCh: - return err - case <-rp.ctx.Done(): - return ErrClosed - case <-ctx.Done(): - return ctx.Err() - } -} - -// Strategies - -// NewBlockstoreProvider returns key provider using bstore.AllKeysChan -func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - return bstore.AllKeysChan(ctx) - } -} - -// Pinner interface defines how the simple.Reprovider wants to interact -// with a Pinning service -type Pinner interface { - DirectKeys(ctx context.Context) ([]cid.Cid, error) - RecursiveKeys(ctx context.Context) ([]cid.Cid, error) -} - -// NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(onlyRoots bool, pinning Pinner, fetchConfig fetcher.Factory) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) - if err != nil { - return nil, err - } - - outCh := make(chan cid.Cid) - go func() { - defer close(outCh) - for c := range set.New { - select { - case <-ctx.Done(): - return - case outCh <- c: - } - } - - }() - - return outCh, nil - } -} - -func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { - set := cidutil.NewStreamingSet() - - go func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(set.New) - - dkeys, err := pinning.DirectKeys(ctx) - if err != nil { - logR.Errorf("reprovide direct pins: %s", err) - return - } - for _, key := range dkeys { - set.Visitor(ctx)(key) - } - - rkeys, err := pinning.RecursiveKeys(ctx) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - - session := fetchConfig.NewSession(ctx) - for _, key := range rkeys { - set.Visitor(ctx)(key) - if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { - clink, ok := res.LastBlockLink.(cidlink.Link) - if ok { - set.Visitor(ctx)(clink.Cid) - } - return nil - }) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - } - } - }() - - return set, nil -} diff --git a/provider/simple/reprovide_test.go b/provider/simple/reprovide_test.go deleted file mode 100644 index 8b521ae56..000000000 --- a/provider/simple/reprovide_test.go +++ /dev/null @@ -1,289 +0,0 @@ -package simple_test - -import ( - "bytes" - "context" - "testing" - "time" - - bsrv "github.com/ipfs/boxo/blockservice" - blockstore "github.com/ipfs/boxo/blockstore" - offline "github.com/ipfs/boxo/exchange/offline" - bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" - "github.com/ipfs/boxo/internal/test" - mock "github.com/ipfs/boxo/routing/mock" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/codec/dagcbor" - "github.com/ipld/go-ipld-prime/fluent/qp" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - testutil "github.com/libp2p/go-libp2p-testing/net" - "github.com/libp2p/go-libp2p/core/peer" - mh "github.com/multiformats/go-multihash" - - . "github.com/ipfs/boxo/provider/simple" -) - -func setupRouting(t *testing.T) (clA, clB mock.Client, idA, idB peer.ID) { - mrserv := mock.NewServer() - - iidA := testutil.RandIdentityOrFatal(t) - iidB := testutil.RandIdentityOrFatal(t) - - clA = mrserv.Client(iidA) - clB = mrserv.Client(iidB) - - return clA, clB, iidA.ID(), iidB.ID() -} - -func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) { - bstore = blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - for _, data := range []string{"foo", "bar"} { - nb := basicnode.Prototype.Any.NewBuilder() - err := nb.AssignString(data) - if err != nil { - t.Fatal(err) - } - blk := toBlock(t, nb.Build()) - err = bstore.Put(context.Background(), blk) - if err != nil { - t.Fatal(err) - } - nodes = append(nodes, blk.Cid()) - nd, err := qp.BuildMap(basicnode.Prototype.Map, 1, func(ma ipld.MapAssembler) { - qp.MapEntry(ma, "child", qp.Link(cidlink.Link{Cid: blk.Cid()})) - }) - if err != nil { - t.Fatal(err) - } - blk = toBlock(t, nd) - err = bstore.Put(context.Background(), blk) - if err != nil { - t.Fatal(err) - } - nodes = append(nodes, blk.Cid()) - } - - return nodes, bstore -} - -func toBlock(t *testing.T, nd ipld.Node) blocks.Block { - buf := new(bytes.Buffer) - err := dagcbor.Encode(nd, buf) - if err != nil { - t.Fatal(err) - } - c, err := cid.Prefix{ - Version: 1, - Codec: cid.DagCBOR, - MhType: mh.SHA2_256, - MhLength: -1, - }.Sum(buf.Bytes()) - if err != nil { - t.Fatal(err) - } - blk, err := blocks.NewBlockWithCid(buf.Bytes(), c) - if err != nil { - t.Fatal(err) - } - return blk -} - -func TestReprovide(t *testing.T) { - test.Flaky(t) - testReprovide(t, func(r *Reprovider, ctx context.Context) error { - return r.Reprovide() - }) -} - -func TestTrigger(t *testing.T) { - test.Flaky(t) - testReprovide(t, func(r *Reprovider, ctx context.Context) error { - go r.Run() - time.Sleep(1 * time.Second) - defer r.Close() - err := r.Trigger(ctx) - return err - }) -} - -func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context) error) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clA, clB, idA, _ := setupRouting(t) - nodes, bstore := setupDag(t) - - keyProvider := NewBlockstoreProvider(bstore) - reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) - reprov.Trigger(context.Background()) - err := trigger(reprov, ctx) - if err != nil { - t.Fatal(err) - } - - var providers []peer.AddrInfo - maxProvs := 100 - - for _, c := range nodes { - // We provide raw cids because of the multihash keying - // FIXME(@Jorropo): I think this change should be done in the DHT layer, probably an issue with our routing mock. - b := c.Bytes() - b[1] = 0x55 // rewrite the cid to raw - _, c, err := cid.CidFromBytes(b) - if err != nil { - t.Fatal(err) - } - provChan := clB.FindProvidersAsync(ctx, c, maxProvs) - for p := range provChan { - providers = append(providers, p) - } - - if len(providers) == 0 { - t.Fatal("Should have gotten a provider") - } - - if providers[0].ID != idA { - t.Fatal("Somehow got the wrong peer back as a provider.") - } - } -} - -func TestTriggerTwice(t *testing.T) { - test.Flaky(t) - // Ensure we can only trigger once at a time. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clA, _, _, _ := setupRouting(t) - - keyCh := make(chan cid.Cid) - startCh := make(chan struct{}) - keyFunc := func(ctx context.Context) (<-chan cid.Cid, error) { - <-startCh - return keyCh, nil - } - - reprov := NewReprovider(ctx, time.Hour, clA, keyFunc) - go reprov.Run() - defer reprov.Close() - - // Wait for the reprovider to start, otherwise, the reprovider will - // think a concurrent reprovide is running. - // - // We _could_ fix this race... but that would be complexity for nothing. - // 1. We start a reprovide 1 minute after startup anyways. - // 2. The window is really narrow. - time.Sleep(1 * time.Second) - - errCh := make(chan error, 2) - - // Trigger in the background - go func() { - errCh <- reprov.Trigger(ctx) - }() - - // Wait for the trigger to really start. - startCh <- struct{}{} - - start := time.Now() - // Try to trigger again, this should fail immediately. - if err := reprov.Trigger(ctx); err == nil { - t.Fatal("expected an error") - } - if time.Since(start) > 10*time.Millisecond { - t.Fatal("expected reprovide to fail instantly") - } - - // Let the trigger progress. - close(keyCh) - - // Check the result. - err := <-errCh - if err != nil { - t.Fatal(err) - } - - // Try to trigger again, this should work. - go func() { - errCh <- reprov.Trigger(ctx) - }() - startCh <- struct{}{} - err = <-errCh - if err != nil { - t.Fatal(err) - } -} - -type mockPinner struct { - recursive []cid.Cid - direct []cid.Cid -} - -func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.direct, nil -} - -func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.recursive, nil -} - -func TestReprovidePinned(t *testing.T) { - test.Flaky(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - nodes, bstore := setupDag(t) - - fetchConfig := bsfetcher.NewFetcherConfig(bsrv.New(bstore, offline.Exchange(bstore))) - - for i := 0; i < 2; i++ { - clA, clB, idA, _ := setupRouting(t) - - onlyRoots := i == 0 - t.Logf("only roots: %v", onlyRoots) - - var provide, dont []cid.Cid - if onlyRoots { - provide = []cid.Cid{nodes[1], nodes[3]} - dont = []cid.Cid{nodes[0], nodes[2]} - } else { - provide = []cid.Cid{nodes[0], nodes[1], nodes[3]} - dont = []cid.Cid{nodes[2]} - } - - keyProvider := NewPinnedProvider(onlyRoots, &mockPinner{ - recursive: []cid.Cid{nodes[1]}, - direct: []cid.Cid{nodes[3]}, - }, fetchConfig) - - reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) - err := reprov.Reprovide() - if err != nil { - t.Fatal(err) - } - - for i, c := range provide { - prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) - if !ok { - t.Errorf("Should have gotten a provider for %d", i) - continue - } - - if prov.ID != idA { - t.Errorf("Somehow got the wrong peer back as a provider.") - continue - } - } - for i, c := range dont { - prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) - if ok { - t.Fatalf("found provider %s for %d, expected none", prov.ID, i) - } - } - } -} diff --git a/provider/system.go b/provider/system.go deleted file mode 100644 index 9fc3e8879..000000000 --- a/provider/system.go +++ /dev/null @@ -1,60 +0,0 @@ -package provider - -import ( - "context" - - "github.com/ipfs/go-cid" -) - -// System defines the interface for interacting with the value -// provider system -type System interface { - Run() - Close() error - Provide(cid.Cid) error - Reprovide(context.Context) error -} - -type system struct { - provider Provider - reprovider Reprovider -} - -// NewSystem constructs a new provider system from a provider and reprovider -func NewSystem(provider Provider, reprovider Reprovider) System { - return &system{provider, reprovider} -} - -// Run the provider system by running the provider and reprovider -func (s *system) Run() { - go s.provider.Run() - go s.reprovider.Run() -} - -// Close the provider and reprovider -func (s *system) Close() error { - var errs []error - - if err := s.provider.Close(); err != nil { - errs = append(errs, err) - } - - if err := s.reprovider.Close(); err != nil { - errs = append(errs, err) - } - - if len(errs) > 0 { - return errs[0] - } - return nil -} - -// Provide a value -func (s *system) Provide(cid cid.Cid) error { - return s.provider.Provide(cid) -} - -// Reprovide all the previously provided values -func (s *system) Reprovide(ctx context.Context) error { - return s.reprovider.Trigger(ctx) -}