diff --git a/polygon/p2p/fetcher_base.go b/polygon/p2p/fetcher_base.go index 629b2591bae..a313e0cfa4e 100644 --- a/polygon/p2p/fetcher_base.go +++ b/polygon/p2p/fetcher_base.go @@ -25,12 +25,12 @@ type FetcherConfig struct { type Fetcher interface { // FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received. - FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) + FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) // FetchBodies fetches block bodies for the given headers from a peer. Blocks until data is received. - FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) + FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) // FetchBlocks fetches headers and bodies for a given [start, end) range from a peer and // assembles them into blocks. Blocks until data is received. - FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Block, error) + FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) } func NewFetcher( @@ -63,9 +63,14 @@ type fetcher struct { requestIdGenerator RequestIdGenerator } -func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) { +type FetcherResponse[T any] struct { + Data T + TotalSize int +} + +func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { if start >= end { - return nil, &ErrInvalidFetchHeadersRange{ + return FetcherResponse[[]*types.Header]{}, &ErrInvalidFetchHeadersRange{ start: start, end: end, } @@ -82,6 +87,7 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe if amount%eth.MaxHeadersServe > 0 { numChunks++ } + totalHeadersSize := 0 headers := make([]*types.Header, 0, amount) for chunkNum := uint64(0); chunkNum < numChunks; chunkNum++ { @@ -91,30 +97,35 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe // a node may not respond with all MaxHeadersServe in 1 response, // so we keep on consuming from last received number (akin to consuming a paging api) // until we have all headers of the chunk or the peer stopped returning headers - headersChunk, err := fetchWithRetry(f.config, func() ([]*types.Header, error) { + headersChunk, err := fetchWithRetry(f.config, func() (FetcherResponse[[]*types.Header], error) { return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId) }) if err != nil { - return nil, err + return FetcherResponse[[]*types.Header]{}, err } - if len(headersChunk) == 0 { + if len(headersChunk.Data) == 0 { break } - headers = append(headers, headersChunk...) - chunkStart += uint64(len(headersChunk)) + headers = append(headers, headersChunk.Data...) + chunkStart += uint64(len(headersChunk.Data)) + totalHeadersSize += headersChunk.TotalSize } } if err := f.validateHeadersResponse(headers, start, amount); err != nil { - return nil, err + return FetcherResponse[[]*types.Header]{}, err } - return headers, nil + return FetcherResponse[[]*types.Header]{ + Data: headers, + TotalSize: totalHeadersSize, + }, nil } -func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) { +func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) { var bodies []*types.Body + totalBodiesSize := 0 for len(headers) > 0 { // Note: we always request MaxBodiesServe for optimal response sizes (fully utilising the 2 MB soft limit). @@ -128,43 +139,50 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer headersChunk = headers } - bodiesChunk, err := fetchWithRetry(f.config, func() ([]*types.Body, error) { + bodiesChunk, err := fetchWithRetry(f.config, func() (*FetcherResponse[[]*types.Body], error) { return f.fetchBodies(ctx, headersChunk, peerId) }) if err != nil { - return nil, err + return FetcherResponse[[]*types.Body]{}, err } - if len(bodiesChunk) == 0 { - return nil, NewErrMissingBodies(headers) + if len(bodiesChunk.Data) == 0 { + return FetcherResponse[[]*types.Body]{}, NewErrMissingBodies(headers) } - bodies = append(bodies, bodiesChunk...) - headers = headers[len(bodiesChunk):] + bodies = append(bodies, bodiesChunk.Data...) + headers = headers[len(bodiesChunk.Data):] + totalBodiesSize += bodiesChunk.TotalSize } - return bodies, nil + return FetcherResponse[[]*types.Body]{ + Data: bodies, + TotalSize: totalBodiesSize, + }, nil } -func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Block, error) { +func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) { headers, err := f.FetchHeaders(ctx, start, end, peerId) if err != nil { - return nil, err + return FetcherResponse[[]*types.Block]{}, err } - bodies, err := f.FetchBodies(ctx, headers, peerId) + bodies, err := f.FetchBodies(ctx, headers.Data, peerId) if err != nil { - return nil, err + return FetcherResponse[[]*types.Block]{}, err } - blocks := make([]*types.Block, len(headers)) - for i, header := range headers { - blocks[i] = types.NewBlockFromNetwork(header, bodies[i]) + blocks := make([]*types.Block, len(headers.Data)) + for i, header := range headers.Data { + blocks[i] = types.NewBlockFromNetwork(header, bodies.Data[i]) } - return blocks, nil + return FetcherResponse[[]*types.Block]{ + Data: blocks, + TotalSize: headers.TotalSize + bodies.TotalSize, + }, nil } -func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Header, error) { +func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -192,15 +210,18 @@ func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *P }, }) if err != nil { - return nil, err + return FetcherResponse[[]*types.Header]{}, err } - message, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId)) + message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId)) if err != nil { - return nil, err + return FetcherResponse[[]*types.Header]{}, err } - return message.BlockHeadersPacket, nil + return FetcherResponse[[]*types.Header]{ + Data: message.BlockHeadersPacket, + TotalSize: messageSize, + }, nil } func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount uint64) error { @@ -234,7 +255,7 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount return nil } -func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) { +func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (*FetcherResponse[[]*types.Body], error) { // cleanup for the chan message observer ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -266,7 +287,7 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer return nil, err } - message, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId)) + message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId)) if err != nil { return nil, err } @@ -275,7 +296,10 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer return nil, err } - return message.BlockBodiesPacket, nil + return &FetcherResponse[[]*types.Body]{ + Data: message.BlockBodiesPacket, + TotalSize: messageSize, + }, nil } func (f *fetcher) validateBodies(bodies []*types.Body, headers []*types.Header) error { @@ -318,7 +342,7 @@ func awaitResponse[TPacket any]( timeout time.Duration, messages chan *DecodedInboundMessage[TPacket], filter func(*DecodedInboundMessage[TPacket]) bool, -) (TPacket, error) { +) (TPacket, int, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -326,13 +350,13 @@ func awaitResponse[TPacket any]( select { case <-ctx.Done(): var nilPacket TPacket - return nilPacket, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err()) + return nilPacket, 0, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err()) case message := <-messages: if filter(message) { continue } - return message.Decoded, nil + return message.Decoded, len(message.Data), nil } } } diff --git a/polygon/p2p/fetcher_base_test.go b/polygon/p2p/fetcher_base_test.go index 5a0469c289a..8b6f69dc5a0 100644 --- a/polygon/p2p/fetcher_base_test.go +++ b/polygon/p2p/fetcher_base_test.go @@ -54,10 +54,11 @@ func TestFetcherFetchHeaders(t *testing.T) { test.mockSentryStreams(mockRequestResponse) test.run(func(ctx context.Context, t *testing.T) { headers, err := test.fetcher.FetchHeaders(ctx, 1, 3, peerId) + headersData := headers.Data require.NoError(t, err) - require.Len(t, headers, 2) - require.Equal(t, uint64(1), headers[0].Number.Uint64()) - require.Equal(t, uint64(2), headers[1].Number.Uint64()) + require.Len(t, headersData, 2) + require.Equal(t, uint64(1), headersData[0].Number.Uint64()) + require.Equal(t, uint64(2), headersData[1].Number.Uint64()) }) } @@ -103,10 +104,11 @@ func TestFetcherFetchHeadersWithChunking(t *testing.T) { test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2) test.run(func(ctx context.Context, t *testing.T) { headers, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId) + headersData := headers.Data require.NoError(t, err) - require.Len(t, headers, 1999) - require.Equal(t, uint64(1), headers[0].Number.Uint64()) - require.Equal(t, uint64(1999), headers[len(headers)-1].Number.Uint64()) + require.Len(t, headersData, 1999) + require.Equal(t, uint64(1), headersData[0].Number.Uint64()) + require.Equal(t, uint64(1999), headersData[len(headersData)-1].Number.Uint64()) }) } @@ -156,7 +158,7 @@ func TestFetcherFetchHeadersResponseTimeout(t *testing.T) { test.run(func(ctx context.Context, t *testing.T) { headers, err := test.fetcher.FetchHeaders(ctx, 1, 11, peerId) require.ErrorIs(t, err, context.DeadlineExceeded) - require.Nil(t, headers) + require.Nil(t, headers.Data) }) } @@ -220,10 +222,11 @@ func TestFetcherFetchHeadersResponseTimeoutRetrySuccess(t *testing.T) { test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2, mockRequestResponse3) test.run(func(ctx context.Context, t *testing.T) { headers, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId) + headersData := headers.Data require.NoError(t, err) - require.Len(t, headers, 1999) - require.Equal(t, uint64(1), headers[0].Number.Uint64()) - require.Equal(t, uint64(1999), headers[len(headers)-1].Number.Uint64()) + require.Len(t, headersData, 1999) + require.Equal(t, uint64(1), headersData[0].Number.Uint64()) + require.Equal(t, uint64(1999), headersData[len(headersData)-1].Number.Uint64()) }) } @@ -238,7 +241,7 @@ func TestFetcherErrInvalidFetchHeadersRange(t *testing.T) { require.ErrorAs(t, err, &errInvalidFetchHeadersRange) require.Equal(t, uint64(3), errInvalidFetchHeadersRange.start) require.Equal(t, uint64(1), errInvalidFetchHeadersRange.end) - require.Nil(t, headers) + require.Nil(t, headers.Data) }) } @@ -284,7 +287,7 @@ func TestFetcherFetchHeadersErrIncompleteResponse(t *testing.T) { headers, err := test.fetcher.FetchHeaders(ctx, 1, 4, peerId) require.ErrorAs(t, err, &errIncompleteHeaders) require.Equal(t, uint64(3), errIncompleteHeaders.LowestMissingBlockNum()) - require.Nil(t, headers) + require.Nil(t, headers.Data) }) } @@ -365,7 +368,7 @@ func TestFetcherFetchBodies(t *testing.T) { test.run(func(ctx context.Context, t *testing.T) { bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId) require.NoError(t, err) - require.Len(t, bodies, 2) + require.Len(t, bodies.Data, 2) }) } @@ -404,7 +407,7 @@ func TestFetcherFetchBodiesResponseTimeout(t *testing.T) { test.run(func(ctx context.Context, t *testing.T) { bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId) require.ErrorIs(t, err, context.DeadlineExceeded) - require.Nil(t, bodies) + require.Nil(t, bodies.Data) }) } @@ -463,7 +466,7 @@ func TestFetcherFetchBodiesResponseTimeoutRetrySuccess(t *testing.T) { test.run(func(ctx context.Context, t *testing.T) { bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId) require.NoError(t, err) - require.Len(t, bodies, 1) + require.Len(t, bodies.Data, 1) }) } @@ -497,7 +500,7 @@ func TestFetcherFetchBodiesErrMissingBodies(t *testing.T) { lowest, exists := errMissingBlocks.LowestMissingBlockNum() require.Equal(t, uint64(1), lowest) require.True(t, exists) - require.Nil(t, bodies) + require.Nil(t, bodies.Data) }) } diff --git a/polygon/p2p/fetcher_penalizing.go b/polygon/p2p/fetcher_penalizing.go index 32d3ce62fc8..75761f57e5f 100644 --- a/polygon/p2p/fetcher_penalizing.go +++ b/polygon/p2p/fetcher_penalizing.go @@ -28,19 +28,19 @@ type penalizingFetcher struct { peerPenalizer PeerPenalizer } -func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) { +func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { headers, err := pf.Fetcher.FetchHeaders(ctx, start, end, peerId) if err != nil { - return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{}) + return FetcherResponse[[]*types.Header]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{}) } return headers, nil } -func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) { +func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) { bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId) if err != nil { - return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{}) + return FetcherResponse[[]*types.Body]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{}) } return bodies, nil diff --git a/polygon/p2p/fetcher_penalizing_test.go b/polygon/p2p/fetcher_penalizing_test.go index 72b7ae156c2..0c96e26a898 100644 --- a/polygon/p2p/fetcher_penalizing_test.go +++ b/polygon/p2p/fetcher_penalizing_test.go @@ -43,7 +43,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrTooManyHeaders(t require.ErrorAs(t, err, &errTooManyHeaders) require.Equal(t, 2, errTooManyHeaders.requested) require.Equal(t, 5, errTooManyHeaders.received) - require.Nil(t, headers) + require.Nil(t, headers.Data) }) } @@ -82,7 +82,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrNonSequentialHead require.ErrorAs(t, err, &errNonSequentialHeaderNumbers) require.Equal(t, uint64(3), errNonSequentialHeaderNumbers.current) require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.expected) - require.Nil(t, headers) + require.Nil(t, headers.Data) }) } @@ -119,7 +119,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenIncorrectOrigin(t *t require.ErrorAs(t, err, &errNonSequentialHeaderNumbers) require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.current) require.Equal(t, uint64(1), errNonSequentialHeaderNumbers.expected) - require.Nil(t, headers) + require.Nil(t, headers.Data) }) } @@ -154,7 +154,7 @@ func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrTooManyBodies(t *t require.ErrorAs(t, err, &errTooManyBodies) require.Equal(t, 1, errTooManyBodies.requested) require.Equal(t, 2, errTooManyBodies.received) - require.Nil(t, bodies) + require.Nil(t, bodies.Data) }) } diff --git a/polygon/p2p/fetcher_tracking.go b/polygon/p2p/fetcher_tracking.go index 6eb4ad8160b..8f510ade3f2 100644 --- a/polygon/p2p/fetcher_tracking.go +++ b/polygon/p2p/fetcher_tracking.go @@ -23,7 +23,7 @@ type trackingFetcher struct { peerTracker PeerTracker } -func (tf *trackingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) { +func (tf *trackingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { res, err := tf.Fetcher.FetchHeaders(ctx, start, end, peerId) if err != nil { var errIncompleteHeaders *ErrIncompleteHeaders @@ -33,14 +33,14 @@ func (tf *trackingFetcher) FetchHeaders(ctx context.Context, start uint64, end u tf.peerTracker.BlockNumMissing(peerId, start) } - return nil, err + return FetcherResponse[[]*types.Header]{}, err } - tf.peerTracker.BlockNumPresent(peerId, res[len(res)-1].Number.Uint64()) + tf.peerTracker.BlockNumPresent(peerId, res.Data[len(res.Data)-1].Number.Uint64()) return res, nil } -func (tf *trackingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) { +func (tf *trackingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) { bodies, err := tf.Fetcher.FetchBodies(ctx, headers, peerId) if err != nil { var errMissingBodies *ErrMissingBodies @@ -56,7 +56,7 @@ func (tf *trackingFetcher) FetchBodies(ctx context.Context, headers []*types.Hea } } - return nil, err + return FetcherResponse[[]*types.Body]{}, err } return bodies, nil diff --git a/polygon/p2p/fetcher_tracking_test.go b/polygon/p2p/fetcher_tracking_test.go index 89d9a340951..f7576865ca3 100644 --- a/polygon/p2p/fetcher_tracking_test.go +++ b/polygon/p2p/fetcher_tracking_test.go @@ -59,10 +59,11 @@ func TestTrackingFetcherFetchHeadersUpdatesPeerTracker(t *testing.T) { }, time.Second, 100*time.Millisecond, "expected number of initial peers never satisfied: want=2, have=%d", len(peerIds)) headers, err := test.trackingFetcher.FetchHeaders(ctx, 1, 3, peerId1) // fetch headers 1 and 2 + headersData := headers.Data require.NoError(t, err) - require.Len(t, headers, 2) - require.Equal(t, uint64(1), headers[0].Number.Uint64()) - require.Equal(t, uint64(2), headers[1].Number.Uint64()) + require.Len(t, headersData, 2) + require.Equal(t, uint64(1), headersData[0].Number.Uint64()) + require.Equal(t, uint64(2), headersData[1].Number.Uint64()) peerIds = test.peerTracker.ListPeersMayHaveBlockNum(4) // peers which may have blocks 1,2,3,4 require.Len(t, peerIds, 2) @@ -74,7 +75,7 @@ func TestTrackingFetcherFetchHeadersUpdatesPeerTracker(t *testing.T) { require.Equal(t, uint64(2), errIncompleteHeaders.requested) require.Equal(t, uint64(0), errIncompleteHeaders.received) require.Equal(t, uint64(3), errIncompleteHeaders.LowestMissingBlockNum()) - require.Nil(t, headers) + require.Nil(t, headers.Data) // should be one peer less now given that we know that peer 1 does not have block num 4 peerIds = test.peerTracker.ListPeersMayHaveBlockNum(4) @@ -145,14 +146,14 @@ func TestTrackingFetcherFetchBodiesUpdatesPeerTracker(t *testing.T) { bodies, err := test.trackingFetcher.FetchBodies(ctx, mockHeaders, peerId1) require.ErrorIs(t, err, &ErrMissingBodies{}) - require.Nil(t, bodies) + require.Nil(t, bodies.Data) peerIds = test.peerTracker.ListPeersMayHaveBlockNum(1) // only peerId2 may have block 1, peerId does not require.Len(t, peerIds, 1) bodies, err = test.trackingFetcher.FetchBodies(ctx, mockHeaders, peerId2) require.ErrorIs(t, err, context.DeadlineExceeded) - require.Nil(t, bodies) + require.Nil(t, bodies.Data) peerIds = test.peerTracker.ListPeersMayHaveBlockNum(1) // neither peerId1 nor peerId2 have block num 1 require.Len(t, peerIds, 0) diff --git a/polygon/p2p/service_mock.go b/polygon/p2p/service_mock.go index 3350c65a980..ab82cc415bc 100644 --- a/polygon/p2p/service_mock.go +++ b/polygon/p2p/service_mock.go @@ -67,10 +67,10 @@ func (mr *MockServiceMockRecorder) BlockNumPresent(peerId, blockNum any) *gomock } // FetchBlocks mocks base method. -func (m *MockService) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Block, error) { +func (m *MockService) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchBlocks", ctx, start, end, peerId) - ret0, _ := ret[0].([]*types.Block) + ret0, _ := ret[0].(FetcherResponse[[]*types.Block]) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -82,10 +82,10 @@ func (mr *MockServiceMockRecorder) FetchBlocks(ctx, start, end, peerId any) *gom } // FetchBodies mocks base method. -func (m *MockService) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) { +func (m *MockService) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchBodies", ctx, headers, peerId) - ret0, _ := ret[0].([]*types.Body) + ret0, _ := ret[0].(FetcherResponse[[]*types.Body]) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -97,10 +97,10 @@ func (mr *MockServiceMockRecorder) FetchBodies(ctx, headers, peerId any) *gomock } // FetchHeaders mocks base method. -func (m *MockService) FetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Header, error) { +func (m *MockService) FetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchHeaders", ctx, start, end, peerId) - ret0, _ := ret[0].([]*types.Header) + ret0, _ := ret[0].(FetcherResponse[[]*types.Header]) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/polygon/sync/block_downloader.go b/polygon/sync/block_downloader.go index cfb7cc9a728..613fa8710af 100644 --- a/polygon/sync/block_downloader.go +++ b/polygon/sync/block_downloader.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "sync/atomic" "time" "github.com/c2h5oh/datasize" @@ -127,6 +128,9 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp var lastBlock *types.Block batchFetchStartTime := time.Now() + fetchStartTime := time.Now() + var blockCount, blocksTotalSize atomic.Uint64 + for len(waypoints) > 0 { select { case <-ctx.Done(): @@ -162,7 +166,14 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp "kind", reflect.TypeOf(waypointsBatch[0]), "peerCount", len(peers), "maxWorkers", d.maxWorkers, + "blk/s", fmt.Sprintf("%.2f", float64(blockCount.Load())/time.Since(fetchStartTime).Seconds()), + "bytes/s", fmt.Sprintf("%s", common.ByteCount(uint64(float64(blocksTotalSize.Load())/time.Since(fetchStartTime).Seconds()))), ) + + blockCount.Store(0) + blocksTotalSize.Store(0) + fetchStartTime = time.Now() + default: // carry on } @@ -181,7 +192,7 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp return } - blocks, err := d.fetchVerifiedBlocks(ctx, waypoint, peerId) + blocks, totalSize, err := d.fetchVerifiedBlocks(ctx, waypoint, peerId) if err != nil { d.logger.Debug( syncLogPrefix("issue downloading waypoint blocks - will try again"), @@ -196,6 +207,9 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp return } + blocksTotalSize.Add(uint64(totalSize)) + blockCount.Add(uint64(len(blocks))) + waypointBlocksMemo.Add(waypoint.RootHash(), blocks) blockBatches[i] = blocks }(i, waypoint, peers[i]) @@ -237,6 +251,7 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(ctx context.Context, wayp } d.logger.Debug(syncLogPrefix("fetched blocks"), "len", len(blocks), "duration", time.Since(batchFetchStartTime)) + batchFetchStartTime = time.Now() // reset for next time if err := d.storage.InsertBlocks(ctx, blocks); err != nil { @@ -255,28 +270,28 @@ func (d *blockDownloader) fetchVerifiedBlocks( ctx context.Context, waypoint heimdall.Waypoint, peerId *p2p.PeerId, -) ([]*types.Block, error) { +) ([]*types.Block, int, error) { // 1. Fetch headers in waypoint from a peer start := waypoint.StartBlock().Uint64() end := waypoint.EndBlock().Uint64() + 1 // waypoint end is inclusive, fetch headers is [start, end) headers, err := d.p2pService.FetchHeaders(ctx, start, end, peerId) if err != nil { - return nil, err + return nil, 0, err } // 2. Verify headers match waypoint root hash - if err = d.headersVerifier(waypoint, headers); err != nil { + if err = d.headersVerifier(waypoint, headers.Data); err != nil { d.logger.Debug(syncLogPrefix("penalizing peer - invalid headers"), "peerId", peerId, "err", err) if penalizeErr := d.p2pService.Penalize(ctx, peerId); penalizeErr != nil { err = fmt.Errorf("%w: %w", penalizeErr, err) } - return nil, err + return nil, 0, err } // 3. Fetch bodies for the verified waypoint headers - bodies, err := d.p2pService.FetchBodies(ctx, headers, peerId) + bodies, err := d.p2pService.FetchBodies(ctx, headers.Data, peerId) if err != nil { if errors.Is(err, &p2p.ErrMissingBodies{}) { d.logger.Debug(syncLogPrefix("penalizing peer - missing bodies"), "peerId", peerId, "err", err) @@ -286,13 +301,13 @@ func (d *blockDownloader) fetchVerifiedBlocks( } } - return nil, err + return nil, 0, err } // 4. Assemble blocks - blocks := make([]*types.Block, len(headers)) - for i, header := range headers { - blocks[i] = types.NewBlockFromNetwork(header, bodies[i]) + blocks := make([]*types.Block, len(headers.Data)) + for i, header := range headers.Data { + blocks[i] = types.NewBlockFromNetwork(header, bodies.Data[i]) } // 5. Verify blocks @@ -303,8 +318,8 @@ func (d *blockDownloader) fetchVerifiedBlocks( err = fmt.Errorf("%w: %w", penalizeErr, err) } - return nil, err + return nil, 0, err } - return blocks, nil + return blocks, headers.TotalSize + bodies.TotalSize, nil } diff --git a/polygon/sync/block_downloader_test.go b/polygon/sync/block_downloader_test.go index e8bfcf7c1c8..20a2abef2b0 100644 --- a/polygon/sync/block_downloader_test.go +++ b/polygon/sync/block_downloader_test.go @@ -134,33 +134,38 @@ func (hdt blockDownloaderTest) fakeMilestones(count int) heimdall.Waypoints { return milestones } -type fetchHeadersMock func(ctx context.Context, start uint64, end uint64, peerId *p2p.PeerId) ([]*types.Header, error) +type fetchHeadersMock func(ctx context.Context, start uint64, end uint64, peerId *p2p.PeerId) (p2p.FetcherResponse[[]*types.Header], error) func (hdt blockDownloaderTest) defaultFetchHeadersMock() fetchHeadersMock { // p2p.Service.FetchHeaders interface is using [start, end) so we stick to that - return func(ctx context.Context, start uint64, end uint64, _ *p2p.PeerId) ([]*types.Header, error) { + return func(ctx context.Context, start uint64, end uint64, _ *p2p.PeerId) (p2p.FetcherResponse[[]*types.Header], error) { if start >= end { - return nil, fmt.Errorf("unexpected start >= end in test: start=%d, end=%d", start, end) + return p2p.FetcherResponse[[]*types.Header]{Data: nil, TotalSize: 0}, fmt.Errorf("unexpected start >= end in test: start=%d, end=%d", start, end) } res := make([]*types.Header, end-start) + size := 0 for num := start; num < end; num++ { - res[num-start] = &types.Header{ + header := &types.Header{ Number: new(big.Int).SetUint64(num), } + res[num-start] = header + size += header.EncodingSize() } - return res, nil + return p2p.FetcherResponse[[]*types.Header]{Data: res, TotalSize: size}, nil } } -type fetchBodiesMock func(context.Context, []*types.Header, *p2p.PeerId) ([]*types.Body, error) +type fetchBodiesMock func(context.Context, []*types.Header, *p2p.PeerId) (p2p.FetcherResponse[[]*types.Body], error) func (hdt blockDownloaderTest) defaultFetchBodiesMock() fetchBodiesMock { - return func(ctx context.Context, headers []*types.Header, _ *p2p.PeerId) ([]*types.Body, error) { + return func(ctx context.Context, headers []*types.Header, _ *p2p.PeerId) (p2p.FetcherResponse[[]*types.Body], error) { bodies := make([]*types.Body, len(headers)) + size := 0 + for i := range headers { - bodies[i] = &types.Body{ + body := &types.Body{ Transactions: []types.Transaction{ types.NewEIP1559Transaction( *uint256.NewInt(1), @@ -175,9 +180,11 @@ func (hdt blockDownloaderTest) defaultFetchBodiesMock() fetchBodiesMock { ), }, } + bodies[i] = body + size += body.EncodingSize() } - return bodies, nil + return p2p.FetcherResponse[[]*types.Body]{Data: bodies, TotalSize: size}, nil } } @@ -438,9 +445,9 @@ func TestBlockDownloaderDownloadBlocksWhenMissingBodiesThenPenalizePeerAndReDown Times(1) test.p2pService.EXPECT(). FetchBodies(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(ctx context.Context, headers []*types.Header, peerId *p2p.PeerId) ([]*types.Body, error) { + DoAndReturn(func(ctx context.Context, headers []*types.Header, peerId *p2p.PeerId) (p2p.FetcherResponse[[]*types.Body], error) { if peerId.Equal(p2p.PeerIdFromUint64(2)) { - return nil, p2p.NewErrMissingBodies(headers) + return p2p.FetcherResponse[[]*types.Body]{Data: nil, TotalSize: 0}, p2p.NewErrMissingBodies(headers) } return test.defaultFetchBodiesMock()(ctx, headers, peerId) diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index f1de4c0b106..fba32762e13 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -125,7 +125,7 @@ func (s *Sync) onNewBlockEvent( if ccBuilder.ContainsHash(newBlockHeader.ParentHash) { newBlocks = []*types.Block{event.NewBlock} } else { - newBlocks, err = s.p2pService.FetchBlocks(ctx, rootNum, newBlockHeaderNum+1, event.PeerId) + blocks, err := s.p2pService.FetchBlocks(ctx, rootNum, newBlockHeaderNum+1, event.PeerId) if err != nil { if (p2p.ErrIncompleteHeaders{}).Is(err) || (p2p.ErrMissingBodies{}).Is(err) { s.logger.Debug( @@ -140,6 +140,8 @@ func (s *Sync) onNewBlockEvent( return err } + + newBlocks = blocks.Data } if err := s.blocksVerifier(newBlocks); err != nil { @@ -204,7 +206,7 @@ func (s *Sync) onNewBlockHashesEvent( } newBlockEvent := EventNewBlock{ - NewBlock: newBlocks[0], + NewBlock: newBlocks.Data[0], PeerId: event.PeerId, }