Skip to content

Commit

Permalink
reprovider: apply review suggestions
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Aug 15, 2017
1 parent 048debe commit 4a5b93a
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 21 deletions.
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
return err
}

var keyProvider func(context.Context) (<-chan *cid.Cid, error)
var keyProvider rp.KeyChanFunc

switch cfg.Reprovider.Strategy {
case "all":
Expand All @@ -275,7 +275,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
case "pinned":
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false)
default:
return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy)
return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
}
n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider)

Expand Down
2 changes: 1 addition & 1 deletion docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ a running daemon do not read the config file at runtime.
- [`Identity`](#identity)
- [`Ipns`](#ipns)
- [`Mounts`](#mounts)
- [`Reproviderl`](#reprovider)
- [`Reprovider`](#reprovider)
- [`SupernodeRouting`](#supernoderouting)
- [`Swarm`](#swarm)
- [`Tour`](#tour)
Expand Down
24 changes: 9 additions & 15 deletions exchange/reprovide/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
)

// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
func NewBlockstoreProvider(bstore blocks.Blockstore) keyChanFunc {
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
return bstore.AllKeysChan(ctx)
}
}

// NewPinnedProvider returns provider supplying pinned keys
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) keyChanFunc {
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
Expand Down Expand Up @@ -46,6 +46,8 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, o
set := newStreamingSet()

go func() {
defer close(set.new)

for _, key := range pinning.DirectKeys() {
set.add(key)
}
Expand All @@ -56,41 +58,33 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, o
if !onlyRoots {
err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add)
if err != nil {
return //TODO: propagate to chan / log?
log.Errorf("reprovide indirect pins: %s", err)
return
}
}
}

close(set.new)
}()

return set, nil
}

type streamingSet struct {
set map[string]struct{}
set *cid.Set
new chan *cid.Cid
}

// NewSet initializes and returns a new Set.
func newStreamingSet() *streamingSet {
return &streamingSet{
set: make(map[string]struct{}),
set: cid.NewSet(),
new: make(chan *cid.Cid),
}
}

// has returns if the Set contains a given Cid.
func (s *streamingSet) has(c *cid.Cid) bool {
_, ok := s.set[string(c.Bytes())]
return ok
}

// add adds a Cid to the set only if it is
// not in it already.
func (s *streamingSet) add(c *cid.Cid) bool {
if !s.has(c) {
s.set[string(c.Bytes())] = struct{}{}
if s.set.Visit(c) {
s.new <- c
return true
}
Expand Down
9 changes: 6 additions & 3 deletions exchange/reprovide/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

var log = logging.Logger("reprovider")

type keyChanFunc func(context.Context) (<-chan *cid.Cid, error)
//KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error)
type doneFunc func(error)

type Reprovider struct {
Expand All @@ -23,11 +24,11 @@ type Reprovider struct {
// The routing system to provide values through
rsys routing.ContentRouting

keyProvider keyChanFunc
keyProvider KeyChanFunc
}

// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider {
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{
ctx: ctx,
trigger: make(chan doneFunc),
Expand All @@ -52,6 +53,8 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) {
case <-after:
}

//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned
unmute := rp.muteTrigger()

err := rp.Reprovide()
Expand Down

0 comments on commit 4a5b93a

Please sign in to comment.