Skip to content

Commit

Permalink
provider: refactor to only maintain one batched implementation and ad…
Browse files Browse the repository at this point in the history
…d throughput callback

This code does a few things:
1. Remove the simple provider to avoid duplicating features.
2. Add the support for single providing on the batched provider.
3. Fix a bugs in the batched provider.
4. Add support for a throughputCallback in the batched provider.
6. Add support for an offline mode of the batched provider (stuff is exclusively pushed onto the queue).
5. Move the batched provider to be the only provider and make the queue implementation private.
  • Loading branch information
Jorropo committed May 31, 2023
1 parent c23df38 commit f03ea05
Show file tree
Hide file tree
Showing 16 changed files with 556 additions and 1,190 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.19
require (
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a
github.com/benbjohnson/clock v1.3.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cespare/xxhash/v2 v2.2.0
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3
github.com/cskr/pubsub v1.0.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down
30 changes: 0 additions & 30 deletions provider/README.md

This file was deleted.

119 changes: 0 additions & 119 deletions provider/batched/system_test.go

This file was deleted.

22 changes: 14 additions & 8 deletions provider/queue/queue.go → provider/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"context"
"fmt"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
Expand All @@ -20,7 +21,6 @@ var log = logging.Logger("provider.queue")
type Queue struct {
// used to differentiate queues in datastore
// e.g. provider vs reprovider
name string
ctx context.Context
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
Expand All @@ -32,11 +32,10 @@ type Queue struct {
}

// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
cancelCtx, cancel := context.WithCancel(ctx)
func NewQueue(ds datastore.Datastore) *Queue {
namespaced := namespace.Wrap(ds, datastore.NewKey("/queue"))
cancelCtx, cancel := context.WithCancel(context.Background())
q := &Queue{
name: name,
ctx: cancelCtx,
ds: namespaced,
dequeue: make(chan cid.Cid),
Expand All @@ -45,13 +44,16 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
closed: make(chan struct{}, 1),
}
q.work()
return q, nil
return q
}

// Close stops the queue
func (q *Queue) Close() error {
q.close()
<-q.closed
// We don't close dequeue because the provider which consume this get caught in
// an infinite loop dequeing cid.Undef if we do that.
// The provider has it's own select on top of dequeue and will handle this by itself.
return nil
}

Expand Down Expand Up @@ -79,8 +81,6 @@ func (q *Queue) work() {
defer func() {
// also cancels any in-progess enqueue tasks.
q.close()
// unblocks anyone waiting
close(q.dequeue)
// unblocks the close call
close(q.closed)
}()
Expand Down Expand Up @@ -121,6 +121,12 @@ func (q *Queue) work() {
q.counter++
nextKey := datastore.NewKey(keyPath)

if c == cid.Undef {
// fast path, skip rereading the datastore if we don't have anything in hand yet
c = toQueue
k = nextKey
}

if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ func TestBasicOperation(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)

Expand All @@ -63,10 +61,8 @@ func TestMangledData(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -75,7 +71,7 @@ func TestMangledData(t *testing.T) {

// put bad data in the queue
queueKey := datastore.NewKey("/test/0")
err = queue.ds.Put(ctx, queueKey, []byte("borked"))
err := queue.ds.Put(ctx, queueKey, []byte("borked"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -91,10 +87,8 @@ func TestInitialization(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -104,10 +98,8 @@ func TestInitialization(t *testing.T) {
assertOrdered(cids[:5], queue, t)

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids[5:], queue, t)
}
Expand All @@ -118,21 +110,18 @@ func TestInitializationWithManyCids(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)

cids := makeCids(25)
for _, c := range cids {
queue.Enqueue(c)
}

queue.Close()

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids, queue, t)
}
32 changes: 32 additions & 0 deletions provider/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package provider

import (
"context"

"github.com/ipfs/go-cid"
)

type noopProvider struct{}

var _ System = (*noopProvider)(nil)

// NewNoopProvider creates a ProviderSystem that does nothing.
func NewNoopProvider() System {
return &noopProvider{}
}

func (op *noopProvider) Close() error {
return nil
}

func (op *noopProvider) Provide(cid.Cid) error {
return nil
}

func (op *noopProvider) Reprovide(context.Context) error {
return nil
}

func (op *noopProvider) Stat() (ReproviderStats, error) {
return ReproviderStats{}, nil
}
29 changes: 0 additions & 29 deletions provider/offline.go

This file was deleted.

Loading

0 comments on commit f03ea05

Please sign in to comment.