diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 4e5c57b82..d29eb6faf 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -5,14 +5,9 @@ package client import ( "context" "errors" - "sync" "time" - delay "github.com/ipfs/go-ipfs-delay" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager" bsgetter "github.com/ipfs/boxo/bitswap/client/internal/getter" bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue" @@ -33,11 +28,14 @@ import ( exchange "github.com/ipfs/boxo/exchange" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + delay "github.com/ipfs/go-ipfs-delay" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" process "github.com/jbenet/goprocess" procctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var log = logging.Logger("bitswap-client") @@ -239,6 +237,7 @@ type counters struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. +// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block]. func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) { ctx, span := internal.StartSpan(ctx, "GetBlock", trace.WithAttributes(attribute.String("Key", k.String()))) defer span.End() @@ -248,6 +247,7 @@ func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) // GetBlocks returns a channel where the caller may receive blocks that // correspond to the provided |keys|. Returns an error if BitSwap is unable to // begin this request within the deadline enforced by the context. +// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block]. // // NB: Your request remains open until the context expires. To conserve // resources, provide a context with a reasonably short deadline (ie. not one @@ -284,7 +284,8 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err // Publish the block to any Bitswap clients that had requested blocks. // (the sessions use this pubsub mechanism to inform clients of incoming // blocks) - bs.notif.Publish(blks...) + var zero peer.ID + bs.notif.Publish(zero, blks...) return nil } @@ -325,7 +326,7 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl // (the sessions use this pubsub mechanism to inform clients of incoming // blocks) for _, b := range wanted { - bs.notif.Publish(b) + bs.notif.Publish(from, b) } for _, b := range wanted { diff --git a/bitswap/client/internal/notifications/notifications.go b/bitswap/client/internal/notifications/notifications.go index ed4b79f57..dc6dda899 100644 --- a/bitswap/client/internal/notifications/notifications.go +++ b/bitswap/client/internal/notifications/notifications.go @@ -3,10 +3,13 @@ package notifications import ( "context" "sync" + "time" pubsub "github.com/cskr/pubsub" + "github.com/ipfs/boxo/bitswap/client/traceability" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" ) const bufferSize = 16 @@ -15,7 +18,7 @@ const bufferSize = 16 // for cids. It's used internally by bitswap to decouple receiving blocks // and actually providing them back to the GetBlocks caller. type PubSub interface { - Publish(blocks ...blocks.Block) + Publish(from peer.ID, blocks ...blocks.Block) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block Shutdown() } @@ -35,7 +38,7 @@ type impl struct { closed chan struct{} } -func (ps *impl) Publish(blocks ...blocks.Block) { +func (ps *impl) Publish(from peer.ID, blocks ...blocks.Block) { ps.lk.RLock() defer ps.lk.RUnlock() select { @@ -45,7 +48,7 @@ func (ps *impl) Publish(blocks ...blocks.Block) { } for _, block := range blocks { - ps.wrapped.Pub(block, block.Cid().KeyString()) + ps.wrapped.Pub(traceability.Block{Block: block, From: from}, block.Cid().KeyString()) } } @@ -84,6 +87,8 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl default: } + subscribe := time.Now() + // AddSubOnceEach listens for each key in the list, and closes the channel // once all keys have been received ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...) @@ -113,10 +118,13 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl if !ok { return } - block, ok := val.(blocks.Block) + block, ok := val.(traceability.Block) if !ok { + // FIXME: silently dropping errors wtf ? return } + block.Delay = time.Since(subscribe) + select { case <-ctx.Done(): return diff --git a/bitswap/client/internal/notifications/notifications_test.go b/bitswap/client/internal/notifications/notifications_test.go index 09c8eb806..25b580f6a 100644 --- a/bitswap/client/internal/notifications/notifications_test.go +++ b/bitswap/client/internal/notifications/notifications_test.go @@ -10,10 +10,12 @@ import ( blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" + "github.com/libp2p/go-libp2p/core/peer" ) func TestDuplicates(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id b1 := blocks.NewBlock([]byte("1")) b2 := blocks.NewBlock([]byte("2")) @@ -22,16 +24,16 @@ func TestDuplicates(t *testing.T) { defer n.Shutdown() ch := n.Subscribe(context.Background(), b1.Cid(), b2.Cid()) - n.Publish(b1) + n.Publish(zero, b1) blockRecvd, ok := <-ch if !ok { t.Fail() } assertBlocksEqual(t, b1, blockRecvd) - n.Publish(b1) // ignored duplicate + n.Publish(zero, b1) // ignored duplicate - n.Publish(b2) + n.Publish(zero, b2) blockRecvd, ok = <-ch if !ok { t.Fail() @@ -41,6 +43,7 @@ func TestDuplicates(t *testing.T) { func TestPublishSubscribe(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id blockSent := blocks.NewBlock([]byte("Greetings from The Interval")) @@ -48,7 +51,7 @@ func TestPublishSubscribe(t *testing.T) { defer n.Shutdown() ch := n.Subscribe(context.Background(), blockSent.Cid()) - n.Publish(blockSent) + n.Publish(zero, blockSent) blockRecvd, ok := <-ch if !ok { t.Fail() @@ -60,6 +63,7 @@ func TestPublishSubscribe(t *testing.T) { func TestSubscribeMany(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id e1 := blocks.NewBlock([]byte("1")) e2 := blocks.NewBlock([]byte("2")) @@ -68,14 +72,14 @@ func TestSubscribeMany(t *testing.T) { defer n.Shutdown() ch := n.Subscribe(context.Background(), e1.Cid(), e2.Cid()) - n.Publish(e1) + n.Publish(zero, e1) r1, ok := <-ch if !ok { t.Fatal("didn't receive first expected block") } assertBlocksEqual(t, e1, r1) - n.Publish(e2) + n.Publish(zero, e2) r2, ok := <-ch if !ok { t.Fatal("didn't receive second expected block") @@ -87,6 +91,7 @@ func TestSubscribeMany(t *testing.T) { // would be requested twice at the same time. func TestDuplicateSubscribe(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id e1 := blocks.NewBlock([]byte("1")) @@ -95,7 +100,7 @@ func TestDuplicateSubscribe(t *testing.T) { ch1 := n.Subscribe(context.Background(), e1.Cid()) ch2 := n.Subscribe(context.Background(), e1.Cid()) - n.Publish(e1) + n.Publish(zero, e1) r1, ok := <-ch1 if !ok { t.Fatal("didn't receive first expected block") @@ -158,6 +163,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) { func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { test.Flaky(t) + var zero peer.ID // this test doesn't check the peer id g := blocksutil.NewBlockGenerator() ctx, cancel := context.WithCancel(context.Background()) @@ -179,7 +185,7 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { t.Log("cancel context before any blocks published") cancel() for _, b := range bs { - n.Publish(b) + n.Publish(zero, b) } t.Log("publishing the large number of blocks to the ignored channel must not deadlock") diff --git a/bitswap/client/traceability/block.go b/bitswap/client/traceability/block.go new file mode 100644 index 000000000..d3370a16c --- /dev/null +++ b/bitswap/client/traceability/block.go @@ -0,0 +1,21 @@ +package traceability + +import ( + "time" + + blocks "github.com/ipfs/go-block-format" + "github.com/libp2p/go-libp2p/core/peer" +) + +// Block is a block whos provenance has been tracked. +type Block struct { + blocks.Block + + // From contains the peer id of the node who sent us the block. + // It will be the zero value if we did not downloaded this block from the + // network. (such as by getting the block from NotifyNewBlocks). + From peer.ID + // Delay contains how long did we had to wait between when we started being + // intrested and when we actually got the block. + Delay time.Duration +}