Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/p2p: Add blk/s and bytes/s to periodic log #9976

Merged
merged 15 commits into from
Apr 27, 2024
102 changes: 63 additions & 39 deletions polygon/p2p/fetcher_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ type FetcherConfig struct {

type Fetcher interface {
// FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received.
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error)
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error)
// FetchBodies fetches block bodies for the given headers from a peer. Blocks until data is received.
FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error)
FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error)
// FetchBlocks fetches headers and bodies for a given [start, end) range from a peer and
// assembles them into blocks. Blocks until data is received.
FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Block, error)
FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error)
}

func NewFetcher(
Expand Down Expand Up @@ -63,9 +63,14 @@ type fetcher struct {
requestIdGenerator RequestIdGenerator
}

func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) {
type FetcherResponse[T any] struct {
Data T
TotalSize int
}

func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
if start >= end {
return nil, &ErrInvalidFetchHeadersRange{
return FetcherResponse[[]*types.Header]{}, &ErrInvalidFetchHeadersRange{
start: start,
end: end,
}
Expand All @@ -82,6 +87,7 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
if amount%eth.MaxHeadersServe > 0 {
numChunks++
}
totalHeadersSize := 0

headers := make([]*types.Header, 0, amount)
for chunkNum := uint64(0); chunkNum < numChunks; chunkNum++ {
Expand All @@ -91,30 +97,35 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
// a node may not respond with all MaxHeadersServe in 1 response,
// so we keep on consuming from last received number (akin to consuming a paging api)
// until we have all headers of the chunk or the peer stopped returning headers
headersChunk, err := fetchWithRetry(f.config, func() ([]*types.Header, error) {
headersChunk, err := fetchWithRetry(f.config, func() (FetcherResponse[[]*types.Header], error) {
return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId)
})
if err != nil {
return nil, err
return FetcherResponse[[]*types.Header]{}, err
}
if len(headersChunk) == 0 {
if len(headersChunk.Data) == 0 {
break
}

headers = append(headers, headersChunk...)
chunkStart += uint64(len(headersChunk))
headers = append(headers, headersChunk.Data...)
chunkStart += uint64(len(headersChunk.Data))
totalHeadersSize += headersChunk.TotalSize
}
}

if err := f.validateHeadersResponse(headers, start, amount); err != nil {
return nil, err
return FetcherResponse[[]*types.Header]{}, err
}

return headers, nil
return FetcherResponse[[]*types.Header]{
Data: headers,
TotalSize: totalHeadersSize,
}, nil
}

func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) {
var bodies []*types.Body
totalBodiesSize := 0

for len(headers) > 0 {
// Note: we always request MaxBodiesServe for optimal response sizes (fully utilising the 2 MB soft limit).
Expand All @@ -128,43 +139,50 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer
headersChunk = headers
}

bodiesChunk, err := fetchWithRetry(f.config, func() ([]*types.Body, error) {
bodiesChunk, err := fetchWithRetry(f.config, func() (*FetcherResponse[[]*types.Body], error) {
return f.fetchBodies(ctx, headersChunk, peerId)
})
if err != nil {
return nil, err
return FetcherResponse[[]*types.Body]{}, err
}
if len(bodiesChunk) == 0 {
return nil, NewErrMissingBodies(headers)
if len(bodiesChunk.Data) == 0 {
return FetcherResponse[[]*types.Body]{}, NewErrMissingBodies(headers)
}

bodies = append(bodies, bodiesChunk...)
headers = headers[len(bodiesChunk):]
bodies = append(bodies, bodiesChunk.Data...)
headers = headers[len(bodiesChunk.Data):]
totalBodiesSize += bodiesChunk.TotalSize
}

return bodies, nil
return FetcherResponse[[]*types.Body]{
Data: bodies,
TotalSize: totalBodiesSize,
}, nil
}

func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Block, error) {
func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) {
headers, err := f.FetchHeaders(ctx, start, end, peerId)
if err != nil {
return nil, err
return FetcherResponse[[]*types.Block]{}, err
}

bodies, err := f.FetchBodies(ctx, headers, peerId)
bodies, err := f.FetchBodies(ctx, headers.Data, peerId)
if err != nil {
return nil, err
return FetcherResponse[[]*types.Block]{}, err
}

blocks := make([]*types.Block, len(headers))
for i, header := range headers {
blocks[i] = types.NewBlockFromNetwork(header, bodies[i])
blocks := make([]*types.Block, len(headers.Data))
for i, header := range headers.Data {
blocks[i] = types.NewBlockFromNetwork(header, bodies.Data[i])
}

return blocks, nil
return FetcherResponse[[]*types.Block]{
Data: blocks,
TotalSize: headers.TotalSize + bodies.TotalSize,
}, nil
}

func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) ([]*types.Header, error) {
func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -192,15 +210,18 @@ func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *P
},
})
if err != nil {
return nil, err
return FetcherResponse[[]*types.Header]{}, err
}

message, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId))
if err != nil {
return nil, err
return FetcherResponse[[]*types.Header]{}, err
}

return message.BlockHeadersPacket, nil
return FetcherResponse[[]*types.Header]{
Data: message.BlockHeadersPacket,
TotalSize: messageSize,
}, nil
}

func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount uint64) error {
Expand Down Expand Up @@ -234,7 +255,7 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount
return nil
}

func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (*FetcherResponse[[]*types.Body], error) {
// cleanup for the chan message observer
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -266,7 +287,7 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer
return nil, err
}

message, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId))
if err != nil {
return nil, err
}
Expand All @@ -275,7 +296,10 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer
return nil, err
}

return message.BlockBodiesPacket, nil
return &FetcherResponse[[]*types.Body]{
Data: message.BlockBodiesPacket,
TotalSize: messageSize,
}, nil
}

func (f *fetcher) validateBodies(bodies []*types.Body, headers []*types.Header) error {
Expand Down Expand Up @@ -318,21 +342,21 @@ func awaitResponse[TPacket any](
timeout time.Duration,
messages chan *DecodedInboundMessage[TPacket],
filter func(*DecodedInboundMessage[TPacket]) bool,
) (TPacket, error) {
) (TPacket, int, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

for {
select {
case <-ctx.Done():
var nilPacket TPacket
return nilPacket, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err())
return nilPacket, 0, fmt.Errorf("await %v response interrupted: %w", reflect.TypeOf(nilPacket), ctx.Err())
case message := <-messages:
if filter(message) {
continue
}

return message.Decoded, nil
return message.Decoded, len(message.Data), nil
}
}
}
Expand Down
35 changes: 19 additions & 16 deletions polygon/p2p/fetcher_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ func TestFetcherFetchHeaders(t *testing.T) {
test.mockSentryStreams(mockRequestResponse)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 3, peerId)
headersData := headers.Data
require.NoError(t, err)
require.Len(t, headers, 2)
require.Equal(t, uint64(1), headers[0].Number.Uint64())
require.Equal(t, uint64(2), headers[1].Number.Uint64())
require.Len(t, headersData, 2)
require.Equal(t, uint64(1), headersData[0].Number.Uint64())
require.Equal(t, uint64(2), headersData[1].Number.Uint64())
})
}

Expand Down Expand Up @@ -103,10 +104,11 @@ func TestFetcherFetchHeadersWithChunking(t *testing.T) {
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId)
headersData := headers.Data
require.NoError(t, err)
require.Len(t, headers, 1999)
require.Equal(t, uint64(1), headers[0].Number.Uint64())
require.Equal(t, uint64(1999), headers[len(headers)-1].Number.Uint64())
require.Len(t, headersData, 1999)
require.Equal(t, uint64(1), headersData[0].Number.Uint64())
require.Equal(t, uint64(1999), headersData[len(headersData)-1].Number.Uint64())
})
}

Expand Down Expand Up @@ -156,7 +158,7 @@ func TestFetcherFetchHeadersResponseTimeout(t *testing.T) {
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 11, peerId)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -220,10 +222,11 @@ func TestFetcherFetchHeadersResponseTimeoutRetrySuccess(t *testing.T) {
test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2, mockRequestResponse3)
test.run(func(ctx context.Context, t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 2000, peerId)
headersData := headers.Data
require.NoError(t, err)
require.Len(t, headers, 1999)
require.Equal(t, uint64(1), headers[0].Number.Uint64())
require.Equal(t, uint64(1999), headers[len(headers)-1].Number.Uint64())
require.Len(t, headersData, 1999)
require.Equal(t, uint64(1), headersData[0].Number.Uint64())
require.Equal(t, uint64(1999), headersData[len(headersData)-1].Number.Uint64())
})
}

Expand All @@ -238,7 +241,7 @@ func TestFetcherErrInvalidFetchHeadersRange(t *testing.T) {
require.ErrorAs(t, err, &errInvalidFetchHeadersRange)
require.Equal(t, uint64(3), errInvalidFetchHeadersRange.start)
require.Equal(t, uint64(1), errInvalidFetchHeadersRange.end)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -284,7 +287,7 @@ func TestFetcherFetchHeadersErrIncompleteResponse(t *testing.T) {
headers, err := test.fetcher.FetchHeaders(ctx, 1, 4, peerId)
require.ErrorAs(t, err, &errIncompleteHeaders)
require.Equal(t, uint64(3), errIncompleteHeaders.LowestMissingBlockNum())
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -365,7 +368,7 @@ func TestFetcherFetchBodies(t *testing.T) {
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.NoError(t, err)
require.Len(t, bodies, 2)
require.Len(t, bodies.Data, 2)
})
}

Expand Down Expand Up @@ -404,7 +407,7 @@ func TestFetcherFetchBodiesResponseTimeout(t *testing.T) {
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, bodies)
require.Nil(t, bodies.Data)
})
}

Expand Down Expand Up @@ -463,7 +466,7 @@ func TestFetcherFetchBodiesResponseTimeoutRetrySuccess(t *testing.T) {
test.run(func(ctx context.Context, t *testing.T) {
bodies, err := test.fetcher.FetchBodies(ctx, mockHeaders, peerId)
require.NoError(t, err)
require.Len(t, bodies, 1)
require.Len(t, bodies.Data, 1)
})
}

Expand Down Expand Up @@ -497,7 +500,7 @@ func TestFetcherFetchBodiesErrMissingBodies(t *testing.T) {
lowest, exists := errMissingBlocks.LowestMissingBlockNum()
require.Equal(t, uint64(1), lowest)
require.True(t, exists)
require.Nil(t, bodies)
require.Nil(t, bodies.Data)
})
}

Expand Down
8 changes: 4 additions & 4 deletions polygon/p2p/fetcher_penalizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ type penalizingFetcher struct {
peerPenalizer PeerPenalizer
}

func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) {
func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
headers, err := pf.Fetcher.FetchHeaders(ctx, start, end, peerId)
if err != nil {
return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{})
return FetcherResponse[[]*types.Header]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{})
}

return headers, nil
}

func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) ([]*types.Body, error) {
func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) {
bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId)
if err != nil {
return nil, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{})
return FetcherResponse[[]*types.Body]{}, pf.maybePenalize(ctx, peerId, err, &ErrTooManyBodies{})
}

return bodies, nil
Expand Down
8 changes: 4 additions & 4 deletions polygon/p2p/fetcher_penalizing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrTooManyHeaders(t
require.ErrorAs(t, err, &errTooManyHeaders)
require.Equal(t, 2, errTooManyHeaders.requested)
require.Equal(t, 5, errTooManyHeaders.received)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrNonSequentialHead
require.ErrorAs(t, err, &errNonSequentialHeaderNumbers)
require.Equal(t, uint64(3), errNonSequentialHeaderNumbers.current)
require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.expected)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -119,7 +119,7 @@ func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenIncorrectOrigin(t *t
require.ErrorAs(t, err, &errNonSequentialHeaderNumbers)
require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.current)
require.Equal(t, uint64(1), errNonSequentialHeaderNumbers.expected)
require.Nil(t, headers)
require.Nil(t, headers.Data)
})
}

Expand Down Expand Up @@ -154,7 +154,7 @@ func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrTooManyBodies(t *t
require.ErrorAs(t, err, &errTooManyBodies)
require.Equal(t, 1, errTooManyBodies.requested)
require.Equal(t, 2, errTooManyBodies.received)
require.Nil(t, bodies)
require.Nil(t, bodies.Data)
})
}

Expand Down
Loading
Loading