Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Change bitswap provide toggle to not be static #124

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ const (
)

var (
// ProvideEnabled is a variable that tells Bitswap whether or not
// to handle providing blocks (see experimental provider system)
ProvideEnabled = true

// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
Expand All @@ -58,11 +54,22 @@ var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Bitswap)

// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
return func(bs *Bitswap) {
bs.provideEnabled = enabled
}
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
func New(parent context.Context, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore) exchange.Interface {
bstore blockstore.Blockstore, options ...Option) exchange.Interface {

// important to use provided parent context (since it may include important
// loggable data). It's probably not a good idea to allow bitswap to be
Expand Down Expand Up @@ -103,19 +110,25 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

bs := &Bitswap{
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
}

// apply functional options before starting and running bitswap
for _, option := range options {
option(bs)
}

bs.wm.Startup()
Expand Down Expand Up @@ -174,6 +187,9 @@ type Bitswap struct {

// the sessionmanager manages tracking sessions
sm *bssm.SessionManager

// whether or not to make provide announcements
provideEnabled bool
}

type counters struct {
Expand Down Expand Up @@ -253,7 +269,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {

bs.engine.AddBlock(blk)

if ProvideEnabled {
if bs.provideEnabled {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
Expand Down
17 changes: 7 additions & 10 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,27 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}

func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
bitswap.ProvideEnabled = false
defer func() { bitswap.ProvideEnabled = true }()

bssession.SetProviderSearchDelay(50 * time.Millisecond)
michaelavila marked this conversation as resolved.
Show resolved Hide resolved
defer bssession.SetProviderSearchDelay(time.Second)
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net)
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false))
defer ig.Close()

hasBlock := ig.Next()
defer hasBlock.Exchange.Close()

wantsBlock := ig.Next()
defer wantsBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(block); err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
defer cancel()

wantsBlock := ig.Next()
defer wantsBlock.Exchange.Close()

ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)
// set find providers delay to less than timeout context of this test
ns.SetBaseTickDelay(10 * time.Millisecond)

received, err := ns.GetBlock(ctx, block.Cid())
if received != nil {
Expand Down
27 changes: 14 additions & 13 deletions testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@ import (

// NewTestInstanceGenerator generates a new InstanceGenerator for the given
// testnet
func NewTestInstanceGenerator(
net tn.Network) InstanceGenerator {
func NewTestInstanceGenerator(net tn.Network, bsOptions ...bitswap.Option) InstanceGenerator {
ctx, cancel := context.WithCancel(context.Background())
return InstanceGenerator{
net: net,
seq: 0,
ctx: ctx, // TODO take ctx as param to Next, Instances
cancel: cancel,
net: net,
seq: 0,
ctx: ctx, // TODO take ctx as param to Next, Instances
cancel: cancel,
bsOptions: bsOptions,
}
}

// InstanceGenerator generates new test instances of bitswap+dependencies
type InstanceGenerator struct {
seq int
net tn.Network
ctx context.Context
cancel context.CancelFunc
seq int
net tn.Network
ctx context.Context
cancel context.CancelFunc
bsOptions []bitswap.Option
}

// Close closes the clobal context, shutting down all test instances
Expand All @@ -51,7 +52,7 @@ func (g *InstanceGenerator) Next() Instance {
if err != nil {
panic("FIXME") // TODO change signature
}
return NewInstance(g.ctx, g.net, p)
return NewInstance(g.ctx, g.net, p, g.bsOptions...)
}

// Instances creates N test instances of bitswap + dependencies
Expand Down Expand Up @@ -95,7 +96,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different
// instances. To safeguard, use the InstanceGenerator to generate instances. It's
// just a much better idea.
func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity, options ...bitswap.Option) Instance {
bsdelay := delay.Fixed(0)

adapter := net.Adapter(p)
Expand All @@ -108,7 +109,7 @@ func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity) Insta
panic(err.Error()) // FIXME perhaps change signature and return error.
}

bs := bitswap.New(ctx, adapter, bstore).(*bitswap.Bitswap)
bs := bitswap.New(ctx, adapter, bstore, options...).(*bitswap.Bitswap)

return Instance{
Adapter: adapter,
Expand Down
2 changes: 1 addition & 1 deletion workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (bs *Bitswap) startWorkers(ctx context.Context, px process.Process) {
})
}

if ProvideEnabled {
if bs.provideEnabled {
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
Expand Down