Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eventsyncing): health calls check last seen block distance #1978

Merged
merged 21 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ignore:
- "network/p2p/metrics.go"
- "network/peers/metrics.go"
- "network/peers/connections/metrics.go"
- "network/peers/connections/mock/mock_storage.go"
- "network/topics/metrics.go"
- "protocol/v2/ssv/validator/metrics.go"
- "protocol/v2/ssv/runner/metrics/metrics.go"
Expand All @@ -47,6 +48,7 @@ ignore:
- "exporter/api/metrics.go"
- "eth/executionclient/metrics.go"
- "eth/eventsyncer/metrics.go"
- "eth/eventsyncer/event_syncer_mock.go"
- "eth/eventhandler/metrics.go"
- "operator/validator/metrics.go"
- "operator/metrics.go"
Expand Down
10 changes: 10 additions & 0 deletions eth/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@
txn := eh.nodeStorage.Begin()
defer txn.Discard()

if len(block.Logs) == 0 {
if err := eh.nodeStorage.SaveHighestSeenBlock(txn, new(big.Int).SetUint64(block.LastSeen)); err != nil {
return nil, fmt.Errorf("set last processed block %d: %w", block.LastSeen, err)
}
if err := txn.Commit(); err != nil {
return nil, fmt.Errorf("commit transaction for block %d: %w", block.LastSeen, err)
}
return nil, nil

Check warning on line 156 in eth/eventhandler/event_handler.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/event_handler.go#L150-L156

Added lines #L150 - L156 were not covered by tests
}

lastProcessedBlock, found, err := eh.nodeStorage.GetLastProcessedBlock(txn)
if err != nil {
return nil, fmt.Errorf("get last processed block: %w", err)
Expand Down
35 changes: 35 additions & 0 deletions eth/eventsyncer/event_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/core/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/eth/executionclient"
"github.com/ssvlabs/ssv/logging/fields"
nodestorage "github.com/ssvlabs/ssv/operator/storage"
)

//go:generate mockgen -package=eventsyncer -destination=./event_syncer_mock.go -source=./event_syncer.go

// TODO: check if something from these PRs need to be ported:
// https://github.com/ssvlabs/ssv/pull/1053

Expand All @@ -26,6 +30,7 @@
type ExecutionClient interface {
FetchHistoricalLogs(ctx context.Context, fromBlock uint64) (logs <-chan executionclient.BlockLogs, errors <-chan error, err error)
StreamLogs(ctx context.Context, fromBlock uint64) <-chan executionclient.BlockLogs
HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*types.Header, error)
}

type EventHandler interface {
Expand Down Expand Up @@ -80,6 +85,29 @@
if time.Since(es.lastProcessedBlockChange) > es.stalenessThreshold {
return fmt.Errorf("syncing is stuck at block %d", lastProcessedBlock.Uint64())
}

highestSeenBlock, found, err := es.nodeStorage.GetHighestSeenBlock(nil)
if err != nil {
return fmt.Errorf("failed to read last processed block: %w", err)
}
if !found || highestSeenBlock == nil || highestSeenBlock.Uint64() == 0 {
return fmt.Errorf("last seen block is not set")
}

Check warning on line 95 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L89-L95

Added lines #L89 - L95 were not covered by tests

return es.blockBelowThreashold(ctx, highestSeenBlock)

Check warning on line 97 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L97

Added line #L97 was not covered by tests
}

func (es *EventSyncer) blockBelowThreashold(ctx context.Context, block *big.Int) error {
anatolie-ssv marked this conversation as resolved.
Show resolved Hide resolved
header, err := es.executionClient.HeaderByNumber(ctx, block)
if err != nil {
return fmt.Errorf("failed to get header for block %d: %w", block, err)
}

// #nosec G115
if header.Time != 0 && header.Time < uint64(time.Now().Add(-es.stalenessThreshold).Unix()) {
anatolie-ssv marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("block %d is too old", block)
}

return nil
}

Expand Down Expand Up @@ -110,6 +138,13 @@
return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock)
}

// Check if the block is too old.
// #nosec G115
b := big.NewInt(int64(es.lastProcessedBlock))
anatolie-ssv marked this conversation as resolved.
Show resolved Hide resolved
if err := es.blockBelowThreashold(ctx, b); err != nil {
return 0, err
}

Check warning on line 146 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L145-L146

Added lines #L145 - L146 were not covered by tests

es.logger.Info("finished syncing historical events",
zap.Uint64("from_block", fromBlock),
zap.Uint64("last_processed_block", lastProcessedBlock))
Expand Down
126 changes: 126 additions & 0 deletions eth/eventsyncer/event_syncer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 31 additions & 2 deletions eth/eventsyncer/event_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventsyncer
import (
"context"
"encoding/base64"
"errors"
"math/big"
"net/http/httptest"
"strings"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient/simulated"
Expand Down Expand Up @@ -197,7 +197,7 @@ func setupEventHandler(

func simTestBackend(testAddr ethcommon.Address) *simulator.Backend {
return simulator.NewBackend(
types.GenesisAlloc{
ethtypes.GenesisAlloc{
testAddr: {Balance: big.NewInt(10000000000000000)},
}, simulated.WithBlockGasLimit(10000000),
)
Expand Down Expand Up @@ -240,3 +240,32 @@ func setupOperatorStorage(logger *zap.Logger, db basedb.Database, privKey keys.O

return nodeStorage, operatorData
}

func TestBlockBelowThreashold(t *testing.T) {
ctrl := gomock.NewController(t)
m := NewMockExecutionClient(ctrl)
ctx := context.Background()

s := New(nil, m, nil)

t.Run("fails on EC error", func(t *testing.T) {
err1 := errors.New("ec err")
m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(nil, err1)
err := s.blockBelowThreashold(ctx, big.NewInt(1))
require.ErrorIs(t, err, err1)
})

t.Run("fails if outside threashold", func(t *testing.T) {
header := &ethtypes.Header{Time: uint64(time.Now().Add(-151 * time.Second).Unix())}
m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(header, nil)
err := s.blockBelowThreashold(ctx, big.NewInt(1))
require.Error(t, err)
})

t.Run("success", func(t *testing.T) {
header := &ethtypes.Header{Time: uint64(time.Now().Add(-149 * time.Second).Unix())}
m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(header, nil)
err := s.blockBelowThreashold(ctx, big.NewInt(1))
require.NoError(t, err)
})
}
25 changes: 19 additions & 6 deletions eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,14 @@
}
validLogs = append(validLogs, log)
}

if len(validLogs) == 0 {
// Emit empty block logs to indicate that we have advanced to this block.
logs <- BlockLogs{BlockNumber: toBlock}
} else {
for _, blockLogs := range PackLogs(validLogs) {
logs <- blockLogs
}
logs <- BlockLogs{LastSeen: toBlock}
continue
}

for _, blockLogs := range PackLogs(validLogs) {
logs <- blockLogs
}
}
}
Expand Down Expand Up @@ -292,6 +293,18 @@
return b, nil
}

func (ec *ExecutionClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*ethtypes.Header, error) {
h, err := ec.client.HeaderByNumber(ctx, blockNumber)
if err != nil {
ec.logger.Error(elResponseErrMsg,
zap.String("method", "eth_getBlockByNumber"),
zap.Error(err))
return nil, err
}

Check warning on line 303 in eth/executionclient/execution_client.go

View check run for this annotation

Codecov / codecov/patch

eth/executionclient/execution_client.go#L296-L303

Added lines #L296 - L303 were not covered by tests

return h, nil

Check warning on line 305 in eth/executionclient/execution_client.go

View check run for this annotation

Codecov / codecov/patch

eth/executionclient/execution_client.go#L305

Added line #L305 was not covered by tests
}

func (ec *ExecutionClient) isClosed() bool {
select {
case <-ec.closed:
Expand Down
1 change: 1 addition & 0 deletions eth/executionclient/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
// BlockLogs holds a block's number and it's logs.
type BlockLogs struct {
BlockNumber uint64
LastSeen uint64
anatolie-ssv marked this conversation as resolved.
Show resolved Hide resolved
Logs []ethtypes.Log
}

Expand Down
10 changes: 10 additions & 0 deletions network/peers/connections/mock/mock_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func (m NodeStorage) GetLastProcessedBlock(txn basedb.Reader) (*big.Int, bool, e
panic("implement me")
}

func (m NodeStorage) SaveHighestSeenBlock(txn basedb.ReadWriter, offset *big.Int) error {
//TODO implement me
panic("implement me")
}

func (m NodeStorage) GetHighestSeenBlock(txn basedb.Reader) (*big.Int, bool, error) {
//TODO implement me
panic("implement me")
}

func (m NodeStorage) DropRegistryData() error {
//TODO implement me
panic("implement me")
Expand Down
Loading
Loading