Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eventsyncing): health calls check last seen block distance #1978

Merged
merged 21 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ignore:
- "network/p2p/metrics.go"
- "network/peers/metrics.go"
- "network/peers/connections/metrics.go"
- "network/peers/connections/mock/mock_storage.go"
- "network/topics/metrics.go"
- "protocol/v2/ssv/validator/metrics.go"
- "protocol/v2/ssv/runner/metrics/metrics.go"
Expand All @@ -47,6 +48,7 @@ ignore:
- "exporter/api/metrics.go"
- "eth/executionclient/metrics.go"
- "eth/eventsyncer/metrics.go"
- "eth/eventsyncer/event_syncer_mock.go"
- "eth/eventhandler/metrics.go"
- "operator/validator/metrics.go"
- "operator/metrics.go"
Expand Down
27 changes: 27 additions & 0 deletions eth/eventsyncer/event_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ import (
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/core/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/eth/executionclient"
"github.com/ssvlabs/ssv/logging/fields"
nodestorage "github.com/ssvlabs/ssv/operator/storage"
)

//go:generate mockgen -package=eventsyncer -destination=./event_syncer_mock.go -source=./event_syncer.go

// TODO: check if something from these PRs need to be ported:
// https://github.com/ssvlabs/ssv/pull/1053

Expand All @@ -26,6 +30,7 @@ var (
type ExecutionClient interface {
FetchHistoricalLogs(ctx context.Context, fromBlock uint64) (logs <-chan executionclient.BlockLogs, errors <-chan error, err error)
StreamLogs(ctx context.Context, fromBlock uint64) <-chan executionclient.BlockLogs
HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*types.Header, error)
}

type EventHandler interface {
Expand Down Expand Up @@ -80,6 +85,21 @@ func (es *EventSyncer) Healthy(ctx context.Context) error {
if time.Since(es.lastProcessedBlockChange) > es.stalenessThreshold {
return fmt.Errorf("syncing is stuck at block %d", lastProcessedBlock.Uint64())
}

return es.blockBelowThreshold(ctx, lastProcessedBlock)
}

func (es *EventSyncer) blockBelowThreshold(ctx context.Context, block *big.Int) error {
header, err := es.executionClient.HeaderByNumber(ctx, block)
if err != nil {
return fmt.Errorf("failed to get header for block %d: %w", block, err)
}

// #nosec G115
if header.Time < uint64(time.Now().Add(-es.stalenessThreshold).Unix()) {
return fmt.Errorf("block %d is too old", block)
}

return nil
}

Expand Down Expand Up @@ -110,6 +130,13 @@ func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastP
return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock)
}

// Check if the block is too old.
// #nosec G115
b := big.NewInt(int64(es.lastProcessedBlock))
anatolie-ssv marked this conversation as resolved.
Show resolved Hide resolved
if err := es.blockBelowThreshold(ctx, b); err != nil {
return 0, err
}

es.logger.Info("finished syncing historical events",
zap.Uint64("from_block", fromBlock),
zap.Uint64("last_processed_block", lastProcessedBlock))
Expand Down
126 changes: 126 additions & 0 deletions eth/eventsyncer/event_syncer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 31 additions & 2 deletions eth/eventsyncer/event_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventsyncer
import (
"context"
"encoding/base64"
"errors"
"math/big"
"net/http/httptest"
"strings"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient/simulated"
Expand Down Expand Up @@ -197,7 +197,7 @@ func setupEventHandler(

func simTestBackend(testAddr ethcommon.Address) *simulator.Backend {
return simulator.NewBackend(
types.GenesisAlloc{
ethtypes.GenesisAlloc{
testAddr: {Balance: big.NewInt(10000000000000000)},
}, simulated.WithBlockGasLimit(10000000),
)
Expand Down Expand Up @@ -240,3 +240,32 @@ func setupOperatorStorage(logger *zap.Logger, db basedb.Database, privKey keys.O

return nodeStorage, operatorData
}

func TestBlockBelowThreshold(t *testing.T) {
ctrl := gomock.NewController(t)
m := NewMockExecutionClient(ctrl)
ctx := context.Background()

s := New(nil, m, nil)

t.Run("fails on EC error", func(t *testing.T) {
err1 := errors.New("ec err")
m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(nil, err1)
err := s.blockBelowThreshold(ctx, big.NewInt(1))
require.ErrorIs(t, err, err1)
})

t.Run("fails if outside threashold", func(t *testing.T) {
header := &ethtypes.Header{Time: uint64(time.Now().Add(-151 * time.Second).Unix())}
m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(header, nil)
err := s.blockBelowThreshold(ctx, big.NewInt(1))
require.Error(t, err)
})

t.Run("success", func(t *testing.T) {
header := &ethtypes.Header{Time: uint64(time.Now().Add(-149 * time.Second).Unix())}
m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(header, nil)
err := s.blockBelowThreshold(ctx, big.NewInt(1))
require.NoError(t, err)
})
}
27 changes: 21 additions & 6 deletions eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,17 @@ func (ec *ExecutionClient) fetchLogsInBatches(ctx context.Context, startBlock, e
}
validLogs = append(validLogs, log)
}
if len(validLogs) == 0 {
// Emit empty block logs to indicate that we have advanced to this block.
logs <- BlockLogs{BlockNumber: toBlock}
} else {
for _, blockLogs := range PackLogs(validLogs) {
logs <- blockLogs
var highestBlock uint64
for _, blockLogs := range PackLogs(validLogs) {
logs <- blockLogs
if blockLogs.BlockNumber > highestBlock {
highestBlock = blockLogs.BlockNumber
}
}
// Emit empty block logs to indicate that we have advanced to this block.
if highestBlock < toBlock {
logs <- BlockLogs{BlockNumber: toBlock}
}
}
}
}()
Expand Down Expand Up @@ -292,6 +295,18 @@ func (ec *ExecutionClient) BlockByNumber(ctx context.Context, blockNumber *big.I
return b, nil
}

func (ec *ExecutionClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*ethtypes.Header, error) {
h, err := ec.client.HeaderByNumber(ctx, blockNumber)
if err != nil {
ec.logger.Error(elResponseErrMsg,
zap.String("method", "eth_getBlockByNumber"),
zap.Error(err))
return nil, err
}

return h, nil
}

func (ec *ExecutionClient) isClosed() bool {
select {
case <-ec.closed:
Expand Down
1 change: 1 addition & 0 deletions eth/executionclient/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
// BlockLogs holds a block's number and it's logs.
type BlockLogs struct {
BlockNumber uint64
LastSeen uint64
anatolie-ssv marked this conversation as resolved.
Show resolved Hide resolved
Logs []ethtypes.Log
}

Expand Down
10 changes: 10 additions & 0 deletions network/peers/connections/mock/mock_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func (m NodeStorage) GetLastProcessedBlock(txn basedb.Reader) (*big.Int, bool, e
panic("implement me")
}

func (m NodeStorage) SaveHighestSeenBlock(txn basedb.ReadWriter, offset *big.Int) error {
//TODO implement me
panic("implement me")
}

func (m NodeStorage) GetHighestSeenBlock(txn basedb.Reader) (*big.Int, bool, error) {
//TODO implement me
panic("implement me")
}

func (m NodeStorage) DropRegistryData() error {
//TODO implement me
panic("implement me")
Expand Down
19 changes: 19 additions & 0 deletions operator/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var HashedPrivateKey = "hashed-private-key"
var (
storagePrefix = []byte("operator/")
lastProcessedBlockKey = []byte("syncOffset") // TODO: temporarily left as syncOffset for compatibility, consider renaming and adding a migration for that
highestSeenBlockKey = []byte("lastSeen")
configKey = []byte("config")
)

Expand Down Expand Up @@ -176,6 +177,10 @@ func (s *storage) SaveLastProcessedBlock(rw basedb.ReadWriter, offset *big.Int)
return s.db.Using(rw).Set(storagePrefix, lastProcessedBlockKey, offset.Bytes())
}

func (s *storage) SaveHighestSeenBlock(rw basedb.ReadWriter, offset *big.Int) error {
return s.db.Using(rw).Set(storagePrefix, highestSeenBlockKey, offset.Bytes())
}

func (s *storage) dropLastProcessedBlock() error {
return s.db.DropPrefix(append(storagePrefix, lastProcessedBlockKey...))
}
Expand Down Expand Up @@ -206,6 +211,20 @@ func (s *storage) GetLastProcessedBlock(r basedb.Reader) (*big.Int, bool, error)
return offset, found, nil
}

// GetHighestSeenBlock returns the highest received block with or without events.
func (s *storage) GetHighestSeenBlock(r basedb.Reader) (*big.Int, bool, error) {
obj, found, err := s.db.UsingReader(r).Get(storagePrefix, highestSeenBlockKey)
if !found {
return nil, found, nil
}
if err != nil {
return nil, found, err
}

offset := new(big.Int).SetBytes(obj.Value)
return offset, found, nil
}

// GetPrivateKeyHash return sha256 hashed private key
func (s *storage) GetPrivateKeyHash() (string, bool, error) {
obj, found, err := s.db.Get(storagePrefix, []byte(HashedPrivateKey))
Expand Down
Loading