diff --git a/core/builder.go b/core/builder.go index 28a5a283b92..065c9cbd87e 100644 --- a/core/builder.go +++ b/core/builder.go @@ -231,5 +231,11 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } n.Resolver = path.NewBasicResolver(n.DAG) + if cfg.Online { + if err := n.startLateOnlineServices(ctx); err != nil { + return err + } + } + return n.loadFilesRoot() } diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index 867cb49c232..a769e6b0a7a 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -21,10 +21,11 @@ var BitswapCmd = &cmds.Command{ ShortDescription: ``, }, Subcommands: map[string]*cmds.Command{ - "wantlist": showWantlistCmd, - "stat": bitswapStatCmd, - "unwant": unwantCmd, - "ledger": ledgerCmd, + "wantlist": showWantlistCmd, + "stat": bitswapStatCmd, + "unwant": unwantCmd, + "ledger": ledgerCmd, + "reprovide": reprovideCmd, }, } @@ -242,3 +243,30 @@ prints the ledger associated with a given peer. }, }, } + +var reprovideCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Trigger reprovider.", + ShortDescription: ` +Trigger reprovider to announce our data to network. +`, + }, + Run: func(req cmds.Request, res cmds.Response) { + nd, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + if !nd.OnlineMode() { + res.SetError(errNotOnline, cmds.ErrClient) + return + } + + err = nd.Reprovider.Trigger(req.Context()) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + }, +} diff --git a/core/core.go b/core/core.go index 8afd615db35..28ec138310a 100644 --- a/core/core.go +++ b/core/core.go @@ -237,22 +237,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return err } - n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) - - if cfg.Reprovider.Interval != "0" { - interval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return err - } - - interval = dur - } - - go n.Reprovider.ProvideEvery(ctx, interval) - } - if pubsub { n.Floodsub = floodsub.NewFloodSub(ctx, peerhost) } @@ -273,6 +257,45 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return n.Bootstrap(DefaultBootstrapConfig) } +func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { + cfg, err := n.Repo.Config() + if err != nil { + return err + } + + var keyProvider rp.KeyChanFunc + + switch cfg.Reprovider.Strategy { + case "all": + fallthrough + case "": + keyProvider = rp.NewBlockstoreProvider(n.Blockstore) + case "roots": + keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true) + case "pinned": + keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false) + default: + return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) + } + n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider) + + if cfg.Reprovider.Interval != "0" { + interval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return err + } + + interval = dur + } + + go n.Reprovider.ProvideEvery(interval) + } + + return nil +} + func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { var annAddrs []ma.Multiaddr for _, addr := range cfg.Announce { diff --git a/docs/config.md b/docs/config.md index c922fc3983d..815bcf416b3 100644 --- a/docs/config.md +++ b/docs/config.md @@ -15,7 +15,7 @@ a running daemon do not read the config file at runtime. - [`Identity`](#identity) - [`Ipns`](#ipns) - [`Mounts`](#mounts) -- [`ReproviderInterval`](#reproviderinterval) +- [`Reprovider`](#reprovider) - [`SupernodeRouting`](#supernoderouting) - [`Swarm`](#swarm) - [`Tour`](#tour) @@ -193,7 +193,9 @@ Mountpoint for `/ipns/`. - `FuseAllowOther` Sets the FUSE allow other option on the mountpoint. -## `ReproviderInterval` +## `Reprovider` + +- `Interval` Sets the time between rounds of reproviding local content to the routing system. If unset, it defaults to 12 hours. If set to the value `"0"` it will disable content reproviding. @@ -203,6 +205,12 @@ not being able to discover that you have the objects that you have. If you want to have this disabled and keep the network aware of what you have, you must manually announce your content periodically. +- `Strategy` +Tells reprovider what should be announced. Valid strategies are: + - "all" (default) - announce all stored data + - "pinned" - only announce pinned data + - "roots" - only announce directly pinned keys and root keys of recursive pins + ## `SupernodeRouting` Deprecated. diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go new file mode 100644 index 00000000000..5335e17651c --- /dev/null +++ b/exchange/reprovide/providers.go @@ -0,0 +1,93 @@ +package reprovide + +import ( + "context" + + blocks "github.com/ipfs/go-ipfs/blocks/blockstore" + merkledag "github.com/ipfs/go-ipfs/merkledag" + pin "github.com/ipfs/go-ipfs/pin" + + cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid" +) + +// NewBlockstoreProvider returns key provider using bstore.AllKeysChan +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { + return func(ctx context.Context) (<-chan *cid.Cid, error) { + return bstore.AllKeysChan(ctx) + } +} + +// NewPinnedProvider returns provider supplying pinned keys +func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc { + return func(ctx context.Context) (<-chan *cid.Cid, error) { + set, err := pinSet(ctx, pinning, dag, onlyRoots) + if err != nil { + return nil, err + } + + outCh := make(chan *cid.Cid) + go func() { + defer close(outCh) + for c := range set.new { + select { + case <-ctx.Done(): + return + case outCh <- c: + } + } + + }() + + return outCh, nil + } +} + +func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) (*streamingSet, error) { + set := newStreamingSet() + + go func() { + defer close(set.new) + + for _, key := range pinning.DirectKeys() { + set.add(key) + } + + for _, key := range pinning.RecursiveKeys() { + set.add(key) + + if !onlyRoots { + err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add) + if err != nil { + log.Errorf("reprovide indirect pins: %s", err) + return + } + } + } + }() + + return set, nil +} + +type streamingSet struct { + set *cid.Set + new chan *cid.Cid +} + +// NewSet initializes and returns a new Set. +func newStreamingSet() *streamingSet { + return &streamingSet{ + set: cid.NewSet(), + new: make(chan *cid.Cid), + } +} + +// add adds a Cid to the set only if it is +// not in it already. +func (s *streamingSet) add(c *cid.Cid) bool { + if s.set.Visit(c) { + s.new <- c + return true + } + + return false +} diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index afa66016f62..a56220c6e34 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -5,56 +5,82 @@ import ( "fmt" "time" - blocks "github.com/ipfs/go-ipfs/blocks/blockstore" backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff" routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid" ) var log = logging.Logger("reprovider") +//KeyChanFunc is function streaming CIDs to pass to content routing +type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error) +type doneFunc func(error) + type Reprovider struct { + ctx context.Context + trigger chan doneFunc + // The routing system to provide values through rsys routing.ContentRouting - // The backing store for blocks to be provided - bstore blocks.Blockstore + keyProvider KeyChanFunc } -func NewReprovider(rsys routing.ContentRouting, bstore blocks.Blockstore) *Reprovider { +// NewReprovider creates new Reprovider instance. +func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ - rsys: rsys, - bstore: bstore, + ctx: ctx, + trigger: make(chan doneFunc), + + rsys: rsys, + keyProvider: keyProvider, } } -func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) { +// ProvideEvery re-provides keys with 'tick' interval +func (rp *Reprovider) ProvideEvery(tick time.Duration) { // dont reprovide immediately. // may have just started the daemon and shutting it down immediately. // probability( up another minute | uptime ) increases with uptime. after := time.After(time.Minute) + var done doneFunc for { select { - case <-ctx.Done(): + case <-rp.ctx.Done(): return + case done = <-rp.trigger: case <-after: - err := rp.Reprovide(ctx) - if err != nil { - log.Debug(err) - } - after = time.After(tick) } + + //'mute' the trigger channel so when `ipfs bitswap reprovide` is called + //a 'reprovider is already running' error is returned + unmute := rp.muteTrigger() + + err := rp.Reprovide() + if err != nil { + log.Debug(err) + } + + if done != nil { + done(err) + } + + unmute() + + after = time.After(tick) } } -func (rp *Reprovider) Reprovide(ctx context.Context) error { - keychan, err := rp.bstore.AllKeysChan(ctx) +// Reprovide registers all keys given by rp.keyProvider to libp2p content routing +func (rp *Reprovider) Reprovide() error { + keychan, err := rp.keyProvider(rp.ctx) if err != nil { - return fmt.Errorf("Failed to get key chan from blockstore: %s", err) + return fmt.Errorf("Failed to get key chan: %s", err) } for c := range keychan { op := func() error { - err := rp.rsys.Provide(ctx, c, true) + err := rp.rsys.Provide(rp.ctx, c, true) if err != nil { log.Debugf("Failed to provide key: %s", err) } @@ -71,3 +97,41 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error { } return nil } + +// Trigger starts reprovision process in rp.ProvideEvery and waits for it +func (rp *Reprovider) Trigger(ctx context.Context) error { + progressCtx, done := context.WithCancel(ctx) + + var err error + df := func(e error) { + err = e + done() + } + + select { + case <-rp.ctx.Done(): + return context.Canceled + case <-ctx.Done(): + return context.Canceled + case rp.trigger <- df: + <-progressCtx.Done() + return err + } +} + +func (rp *Reprovider) muteTrigger() context.CancelFunc { + ctx, cf := context.WithCancel(rp.ctx) + go func() { + defer cf() + for { + select { + case <-ctx.Done(): + return + case done := <-rp.trigger: + done(fmt.Errorf("reprovider is already running")) + } + } + }() + + return cf +} diff --git a/exchange/reprovide/reprovide_test.go b/exchange/reprovide/reprovide_test.go index 2d755e52606..af5e0d880aa 100644 --- a/exchange/reprovide/reprovide_test.go +++ b/exchange/reprovide/reprovide_test.go @@ -32,8 +32,9 @@ func TestReprovide(t *testing.T) { blk := blocks.NewBlock([]byte("this is a test")) bstore.Put(blk) - reprov := NewReprovider(clA, bstore) - err := reprov.Reprovide(ctx) + keyProvider := NewBlockstoreProvider(bstore) + reprov := NewReprovider(ctx, clA, keyProvider) + err := reprov.Reprovide() if err != nil { t.Fatal(err) } diff --git a/repo/config/init.go b/repo/config/init.go index aa129d97e12..f31edd42b33 100644 --- a/repo/config/init.go +++ b/repo/config/init.go @@ -72,6 +72,7 @@ func Init(out io.Writer, nBitsForKeypair int) (*Config, error) { }, Reprovider: Reprovider{ Interval: "12h", + Strategy: "all", }, } diff --git a/repo/config/reprovider.go b/repo/config/reprovider.go index 53cf293ab61..fa029c2fc21 100644 --- a/repo/config/reprovider.go +++ b/repo/config/reprovider.go @@ -2,4 +2,5 @@ package config type Reprovider struct { Interval string // Time period to reprovide locally stored objects to the network + Strategy string // Which keys to announce } diff --git a/test/sharness/t0175-reprovider.sh b/test/sharness/t0175-reprovider.sh new file mode 100755 index 00000000000..986c6202ee2 --- /dev/null +++ b/test/sharness/t0175-reprovider.sh @@ -0,0 +1,127 @@ +#!/bin/sh + +test_description="Test reprovider" + +. lib/test-lib.sh + +init_strategy() { + NUM_NODES=6 + test_expect_success 'init iptb' ' + iptb init -f -n $NUM_NODES --bootstrap=none --port=0 + ' + + test_expect_success 'peer ids' ' + PEERID_0=$(iptb get id 0) && + PEERID_1=$(iptb get id 1) + ' + + test_expect_success 'use pinning startegy for reprovider' ' + ipfsi 0 config Reprovider.Strategy '$1' + ' + + startup_cluster 6 --debug +} + +findprovs_empty() { + test_expect_success 'findprovs '$1' succeeds' ' + ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut + ' + + test_expect_success "findprovs $1 output is empty" ' + test_must_be_empty findprovsOut + ' +} + +findprovs_expect() { + test_expect_success 'findprovs '$1' succeeds' ' + ipfsi 1 dht findprovs -n 1 '$1' > findprovsOut && + echo '$2' > expected + ' + + test_expect_success "findprovs $1 output looks good" ' + test_cmp findprovsOut expected + ' +} + +reprovide() { + test_expect_success 'reprovide' ' + # TODO: this hangs, though only after reprovision was done + ipfsi 0 bitswap reprovide + ' +} + +test_expect_success 'stop peer 1' ' + iptb stop 1 +' + +# Test 'all' strategy +init_strategy 'all' + +test_expect_success 'add test object' ' + HASH_0=$(echo "foo" | ipfsi 0 add -q --local) +' + +findprovs_empty '$HASH_0' +reprovide +findprovs_expect '$HASH_0' '$PEERID_0' + +# Test 'pinned' strategy +init_strategy 'pinned' + +test_expect_success 'prepare test files' ' + echo foo > f1 && + echo bar > f2 +' + +test_expect_success 'add test objects' ' + HASH_FOO=$(ipfsi 0 add -q --local --pin=false f1) && + HASH_BAR=$(ipfsi 0 add -q --local --pin=false f2) && + HASH_BAR_DIR=$(ipfsi 0 add -q --local -w f2) +' + +findprovs_empty '$HASH_FOO' +findprovs_empty '$HASH_BAR' +findprovs_empty '$HASH_BAR_DIR' + +reprovide + +findprovs_empty '$HASH_FOO' +findprovs_expect '$HASH_BAR' '$PEERID_0' +findprovs_expect '$HASH_BAR_DIR' '$PEERID_0' + +test_expect_success 'stop peer 1' ' + iptb stop 1 +' + +# Test 'roots' strategy +init_strategy 'roots' + +test_expect_success 'prepare test files' ' + echo foo > f1 && + echo bar > f2 && + echo baz > f3 +' + +test_expect_success 'add test objects' ' + HASH_FOO=$(ipfsi 0 add -q --local --pin=false f1) && + HASH_BAR=$(ipfsi 0 add -q --local --pin=false f2) && + HASH_BAZ=$(ipfsi 0 add -q --local f3) && + HASH_BAR_DIR=$(ipfsi 0 add -q --local -w f2 | tail -1) +' + +findprovs_empty '$HASH_FOO' +findprovs_empty '$HASH_BAR' +findprovs_empty '$HASH_BAR_DIR' + +reprovide + +findprovs_empty '$HASH_FOO' +findprovs_empty '$HASH_BAR' +findprovs_expect '$HASH_BAZ' '$PEERID_0' +findprovs_expect '$HASH_BAR_DIR' '$PEERID_0' + +test_expect_success 'stop peer 1' ' + iptb stop 1 +' + +test_done