diff --git a/codecov.yml b/codecov.yml index e3b315062b..9f64abe643 100644 --- a/codecov.yml +++ b/codecov.yml @@ -39,6 +39,7 @@ ignore: - "network/p2p/metrics.go" - "network/peers/metrics.go" - "network/peers/connections/metrics.go" + - "network/peers/connections/mock/mock_storage.go" - "network/topics/metrics.go" - "protocol/v2/ssv/validator/metrics.go" - "protocol/v2/ssv/runner/metrics/metrics.go" @@ -47,6 +48,7 @@ ignore: - "exporter/api/metrics.go" - "eth/executionclient/metrics.go" - "eth/eventsyncer/metrics.go" + - "eth/eventsyncer/event_syncer_mock.go" - "eth/eventhandler/metrics.go" - "operator/validator/metrics.go" - "operator/metrics.go" diff --git a/eth/eventsyncer/event_syncer.go b/eth/eventsyncer/event_syncer.go index c6c55829af..519a4cf893 100644 --- a/eth/eventsyncer/event_syncer.go +++ b/eth/eventsyncer/event_syncer.go @@ -6,8 +6,10 @@ import ( "context" "errors" "fmt" + "math/big" "time" + "github.com/ethereum/go-ethereum/core/types" "go.uber.org/zap" "github.com/ssvlabs/ssv/eth/executionclient" @@ -15,6 +17,8 @@ import ( nodestorage "github.com/ssvlabs/ssv/operator/storage" ) +//go:generate mockgen -package=eventsyncer -destination=./event_syncer_mock.go -source=./event_syncer.go + // TODO: check if something from these PRs need to be ported: // https://github.com/ssvlabs/ssv/pull/1053 @@ -26,6 +30,7 @@ var ( type ExecutionClient interface { FetchHistoricalLogs(ctx context.Context, fromBlock uint64) (logs <-chan executionclient.BlockLogs, errors <-chan error, err error) StreamLogs(ctx context.Context, fromBlock uint64) <-chan executionclient.BlockLogs + HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*types.Header, error) } type EventHandler interface { @@ -80,6 +85,21 @@ func (es *EventSyncer) Healthy(ctx context.Context) error { if time.Since(es.lastProcessedBlockChange) > es.stalenessThreshold { return fmt.Errorf("syncing is stuck at block %d", lastProcessedBlock.Uint64()) } + + return es.blockBelowThreshold(ctx, lastProcessedBlock) +} + +func (es *EventSyncer) blockBelowThreshold(ctx context.Context, block *big.Int) error { + header, err := es.executionClient.HeaderByNumber(ctx, block) + if err != nil { + return fmt.Errorf("failed to get header for block %d: %w", block, err) + } + + // #nosec G115 + if header.Time < uint64(time.Now().Add(-es.stalenessThreshold).Unix()) { + return fmt.Errorf("block %d is too old", block) + } + return nil } @@ -110,6 +130,12 @@ func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastP return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock) } + // Check if the block is too old. + b := new(big.Int).SetUint64(lastProcessedBlock) + if err := es.blockBelowThreshold(ctx, b); err != nil { + return 0, err + } + es.logger.Info("finished syncing historical events", zap.Uint64("from_block", fromBlock), zap.Uint64("last_processed_block", lastProcessedBlock)) diff --git a/eth/eventsyncer/event_syncer_mock.go b/eth/eventsyncer/event_syncer_mock.go new file mode 100644 index 0000000000..ba992727d3 --- /dev/null +++ b/eth/eventsyncer/event_syncer_mock.go @@ -0,0 +1,126 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./event_syncer.go +// +// Generated by this command: +// +// mockgen -package=eventsyncer -destination=./event_syncer_mock.go -source=./event_syncer.go +// + +// Package eventsyncer is a generated GoMock package. +package eventsyncer + +import ( + context "context" + big "math/big" + reflect "reflect" + + types "github.com/ethereum/go-ethereum/core/types" + executionclient "github.com/ssvlabs/ssv/eth/executionclient" + gomock "go.uber.org/mock/gomock" +) + +// MockExecutionClient is a mock of ExecutionClient interface. +type MockExecutionClient struct { + ctrl *gomock.Controller + recorder *MockExecutionClientMockRecorder +} + +// MockExecutionClientMockRecorder is the mock recorder for MockExecutionClient. +type MockExecutionClientMockRecorder struct { + mock *MockExecutionClient +} + +// NewMockExecutionClient creates a new mock instance. +func NewMockExecutionClient(ctrl *gomock.Controller) *MockExecutionClient { + mock := &MockExecutionClient{ctrl: ctrl} + mock.recorder = &MockExecutionClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockExecutionClient) EXPECT() *MockExecutionClientMockRecorder { + return m.recorder +} + +// FetchHistoricalLogs mocks base method. +func (m *MockExecutionClient) FetchHistoricalLogs(ctx context.Context, fromBlock uint64) (<-chan executionclient.BlockLogs, <-chan error, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchHistoricalLogs", ctx, fromBlock) + ret0, _ := ret[0].(<-chan executionclient.BlockLogs) + ret1, _ := ret[1].(<-chan error) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// FetchHistoricalLogs indicates an expected call of FetchHistoricalLogs. +func (mr *MockExecutionClientMockRecorder) FetchHistoricalLogs(ctx, fromBlock any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchHistoricalLogs", reflect.TypeOf((*MockExecutionClient)(nil).FetchHistoricalLogs), ctx, fromBlock) +} + +// HeaderByNumber mocks base method. +func (m *MockExecutionClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*types.Header, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HeaderByNumber", ctx, blockNumber) + ret0, _ := ret[0].(*types.Header) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HeaderByNumber indicates an expected call of HeaderByNumber. +func (mr *MockExecutionClientMockRecorder) HeaderByNumber(ctx, blockNumber any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeaderByNumber", reflect.TypeOf((*MockExecutionClient)(nil).HeaderByNumber), ctx, blockNumber) +} + +// StreamLogs mocks base method. +func (m *MockExecutionClient) StreamLogs(ctx context.Context, fromBlock uint64) <-chan executionclient.BlockLogs { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StreamLogs", ctx, fromBlock) + ret0, _ := ret[0].(<-chan executionclient.BlockLogs) + return ret0 +} + +// StreamLogs indicates an expected call of StreamLogs. +func (mr *MockExecutionClientMockRecorder) StreamLogs(ctx, fromBlock any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamLogs", reflect.TypeOf((*MockExecutionClient)(nil).StreamLogs), ctx, fromBlock) +} + +// MockEventHandler is a mock of EventHandler interface. +type MockEventHandler struct { + ctrl *gomock.Controller + recorder *MockEventHandlerMockRecorder +} + +// MockEventHandlerMockRecorder is the mock recorder for MockEventHandler. +type MockEventHandlerMockRecorder struct { + mock *MockEventHandler +} + +// NewMockEventHandler creates a new mock instance. +func NewMockEventHandler(ctrl *gomock.Controller) *MockEventHandler { + mock := &MockEventHandler{ctrl: ctrl} + mock.recorder = &MockEventHandlerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventHandler) EXPECT() *MockEventHandlerMockRecorder { + return m.recorder +} + +// HandleBlockEventsStream mocks base method. +func (m *MockEventHandler) HandleBlockEventsStream(ctx context.Context, logs <-chan executionclient.BlockLogs, executeTasks bool) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HandleBlockEventsStream", ctx, logs, executeTasks) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HandleBlockEventsStream indicates an expected call of HandleBlockEventsStream. +func (mr *MockEventHandlerMockRecorder) HandleBlockEventsStream(ctx, logs, executeTasks any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleBlockEventsStream", reflect.TypeOf((*MockEventHandler)(nil).HandleBlockEventsStream), ctx, logs, executeTasks) +} diff --git a/eth/eventsyncer/event_syncer_test.go b/eth/eventsyncer/event_syncer_test.go index 0f7dc91863..1c07cb774a 100644 --- a/eth/eventsyncer/event_syncer_test.go +++ b/eth/eventsyncer/event_syncer_test.go @@ -3,6 +3,7 @@ package eventsyncer import ( "context" "encoding/base64" + "errors" "math/big" "net/http/httptest" "strings" @@ -12,7 +13,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" ethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient/simulated" @@ -197,7 +197,7 @@ func setupEventHandler( func simTestBackend(testAddr ethcommon.Address) *simulator.Backend { return simulator.NewBackend( - types.GenesisAlloc{ + ethtypes.GenesisAlloc{ testAddr: {Balance: big.NewInt(10000000000000000)}, }, simulated.WithBlockGasLimit(10000000), ) @@ -240,3 +240,32 @@ func setupOperatorStorage(logger *zap.Logger, db basedb.Database, privKey keys.O return nodeStorage, operatorData } + +func TestBlockBelowThreshold(t *testing.T) { + ctrl := gomock.NewController(t) + m := NewMockExecutionClient(ctrl) + ctx := context.Background() + + s := New(nil, m, nil) + + t.Run("fails on EC error", func(t *testing.T) { + err1 := errors.New("ec err") + m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(nil, err1) + err := s.blockBelowThreshold(ctx, big.NewInt(1)) + require.ErrorIs(t, err, err1) + }) + + t.Run("fails if outside threashold", func(t *testing.T) { + header := ðtypes.Header{Time: uint64(time.Now().Add(-151 * time.Second).Unix())} + m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(header, nil) + err := s.blockBelowThreshold(ctx, big.NewInt(1)) + require.Error(t, err) + }) + + t.Run("success", func(t *testing.T) { + header := ðtypes.Header{Time: uint64(time.Now().Add(-149 * time.Second).Unix())} + m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(header, nil) + err := s.blockBelowThreshold(ctx, big.NewInt(1)) + require.NoError(t, err) + }) +} diff --git a/eth/executionclient/execution_client.go b/eth/executionclient/execution_client.go index 804d577599..805cb5efcf 100644 --- a/eth/executionclient/execution_client.go +++ b/eth/executionclient/execution_client.go @@ -178,14 +178,17 @@ func (ec *ExecutionClient) fetchLogsInBatches(ctx context.Context, startBlock, e } validLogs = append(validLogs, log) } - if len(validLogs) == 0 { - // Emit empty block logs to indicate that we have advanced to this block. - logs <- BlockLogs{BlockNumber: toBlock} - } else { - for _, blockLogs := range PackLogs(validLogs) { - logs <- blockLogs + var highestBlock uint64 + for _, blockLogs := range PackLogs(validLogs) { + logs <- blockLogs + if blockLogs.BlockNumber > highestBlock { + highestBlock = blockLogs.BlockNumber } } + // Emit empty block logs to indicate that we have advanced to this block. + if highestBlock < toBlock { + logs <- BlockLogs{BlockNumber: toBlock} + } } } }() @@ -292,6 +295,18 @@ func (ec *ExecutionClient) BlockByNumber(ctx context.Context, blockNumber *big.I return b, nil } +func (ec *ExecutionClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*ethtypes.Header, error) { + h, err := ec.client.HeaderByNumber(ctx, blockNumber) + if err != nil { + ec.logger.Error(elResponseErrMsg, + zap.String("method", "eth_getBlockByNumber"), + zap.Error(err)) + return nil, err + } + + return h, nil +} + func (ec *ExecutionClient) isClosed() bool { select { case <-ec.closed: