Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eventsyncing): health calls check last seen block distance #1978

Merged
merged 21 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ignore:
- "network/p2p/metrics.go"
- "network/peers/metrics.go"
- "network/peers/connections/metrics.go"
- "network/peers/connections/mock/mock_storage.go"
- "network/topics/metrics.go"
- "protocol/v2/ssv/validator/metrics.go"
- "protocol/v2/ssv/runner/metrics/metrics.go"
Expand All @@ -47,6 +48,7 @@ ignore:
- "exporter/api/metrics.go"
- "eth/executionclient/metrics.go"
- "eth/eventsyncer/metrics.go"
- "eth/eventsyncer/event_syncer_mock.go"
- "eth/eventhandler/metrics.go"
- "operator/validator/metrics.go"
- "operator/metrics.go"
Expand Down
26 changes: 26 additions & 0 deletions eth/eventsyncer/event_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/core/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/eth/executionclient"
"github.com/ssvlabs/ssv/logging/fields"
nodestorage "github.com/ssvlabs/ssv/operator/storage"
)

//go:generate mockgen -package=eventsyncer -destination=./event_syncer_mock.go -source=./event_syncer.go

// TODO: check if something from these PRs need to be ported:
// https://github.com/ssvlabs/ssv/pull/1053

Expand All @@ -26,6 +30,7 @@
type ExecutionClient interface {
FetchHistoricalLogs(ctx context.Context, fromBlock uint64) (logs <-chan executionclient.BlockLogs, errors <-chan error, err error)
StreamLogs(ctx context.Context, fromBlock uint64) <-chan executionclient.BlockLogs
HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*types.Header, error)
}

type EventHandler interface {
Expand Down Expand Up @@ -80,6 +85,21 @@
if time.Since(es.lastProcessedBlockChange) > es.stalenessThreshold {
return fmt.Errorf("syncing is stuck at block %d", lastProcessedBlock.Uint64())
}

return es.blockBelowThreshold(ctx, lastProcessedBlock)

Check warning on line 89 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L89

Added line #L89 was not covered by tests
}

func (es *EventSyncer) blockBelowThreshold(ctx context.Context, block *big.Int) error {
header, err := es.executionClient.HeaderByNumber(ctx, block)
if err != nil {
return fmt.Errorf("failed to get header for block %d: %w", block, err)
}

// #nosec G115
if header.Time > 0 && header.Time < uint64(time.Now().Add(-es.stalenessThreshold).Unix()) {
anatolie-ssv marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("block %d is too old", block)
}

return nil
}

Expand Down Expand Up @@ -110,6 +130,12 @@
return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock)
}

// Check if the block is too old.
b := new(big.Int).SetUint64(lastProcessedBlock)
if err := es.blockBelowThreshold(ctx, b); err != nil {
return 0, err
}

Check warning on line 137 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L136-L137

Added lines #L136 - L137 were not covered by tests

es.logger.Info("finished syncing historical events",
zap.Uint64("from_block", fromBlock),
zap.Uint64("last_processed_block", lastProcessedBlock))
Expand Down
126 changes: 126 additions & 0 deletions eth/eventsyncer/event_syncer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 31 additions & 2 deletions eth/eventsyncer/event_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventsyncer
import (
"context"
"encoding/base64"
"errors"
"math/big"
"net/http/httptest"
"strings"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient/simulated"
Expand Down Expand Up @@ -197,7 +197,7 @@ func setupEventHandler(

func simTestBackend(testAddr ethcommon.Address) *simulator.Backend {
return simulator.NewBackend(
types.GenesisAlloc{
ethtypes.GenesisAlloc{
testAddr: {Balance: big.NewInt(10000000000000000)},
}, simulated.WithBlockGasLimit(10000000),
)
Expand Down Expand Up @@ -240,3 +240,32 @@ func setupOperatorStorage(logger *zap.Logger, db basedb.Database, privKey keys.O

return nodeStorage, operatorData
}

func TestBlockBelowThreshold(t *testing.T) {
ctrl := gomock.NewController(t)
m := NewMockExecutionClient(ctrl)
ctx := context.Background()

s := New(nil, m, nil)

t.Run("fails on EC error", func(t *testing.T) {
err1 := errors.New("ec err")
m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(nil, err1)
err := s.blockBelowThreshold(ctx, big.NewInt(1))
require.ErrorIs(t, err, err1)
})

t.Run("fails if outside threashold", func(t *testing.T) {
header := &ethtypes.Header{Time: uint64(time.Now().Add(-151 * time.Second).Unix())}
m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(header, nil)
err := s.blockBelowThreshold(ctx, big.NewInt(1))
require.Error(t, err)
})

t.Run("success", func(t *testing.T) {
header := &ethtypes.Header{Time: uint64(time.Now().Add(-149 * time.Second).Unix())}
m.EXPECT().HeaderByNumber(ctx, big.NewInt(1)).Return(header, nil)
err := s.blockBelowThreshold(ctx, big.NewInt(1))
require.NoError(t, err)
})
}
27 changes: 21 additions & 6 deletions eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,17 @@
}
validLogs = append(validLogs, log)
}
if len(validLogs) == 0 {
// Emit empty block logs to indicate that we have advanced to this block.
logs <- BlockLogs{BlockNumber: toBlock}
} else {
for _, blockLogs := range PackLogs(validLogs) {
logs <- blockLogs
var highestBlock uint64
for _, blockLogs := range PackLogs(validLogs) {
logs <- blockLogs
if blockLogs.BlockNumber > highestBlock {
highestBlock = blockLogs.BlockNumber
}
}
// Emit empty block logs to indicate that we have advanced to this block.
if highestBlock < toBlock {
logs <- BlockLogs{BlockNumber: toBlock}
}
}
}
}()
Expand Down Expand Up @@ -292,6 +295,18 @@
return b, nil
}

func (ec *ExecutionClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*ethtypes.Header, error) {
h, err := ec.client.HeaderByNumber(ctx, blockNumber)
if err != nil {
ec.logger.Error(elResponseErrMsg,
zap.String("method", "eth_getBlockByNumber"),
zap.Error(err))
return nil, err
}

Check warning on line 305 in eth/executionclient/execution_client.go

View check run for this annotation

Codecov / codecov/patch

eth/executionclient/execution_client.go#L298-L305

Added lines #L298 - L305 were not covered by tests

return h, nil

Check warning on line 307 in eth/executionclient/execution_client.go

View check run for this annotation

Codecov / codecov/patch

eth/executionclient/execution_client.go#L307

Added line #L307 was not covered by tests
}

func (ec *ExecutionClient) isClosed() bool {
select {
case <-ec.closed:
Expand Down
10 changes: 10 additions & 0 deletions network/peers/connections/mock/mock_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func (m NodeStorage) GetLastProcessedBlock(txn basedb.Reader) (*big.Int, bool, e
panic("implement me")
}

func (m NodeStorage) SaveHighestSeenBlock(txn basedb.ReadWriter, offset *big.Int) error {
//TODO implement me
panic("implement me")
}

func (m NodeStorage) GetHighestSeenBlock(txn basedb.Reader) (*big.Int, bool, error) {
//TODO implement me
panic("implement me")
}

func (m NodeStorage) DropRegistryData() error {
//TODO implement me
panic("implement me")
Expand Down
19 changes: 19 additions & 0 deletions operator/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
var (
storagePrefix = []byte("operator/")
lastProcessedBlockKey = []byte("syncOffset") // TODO: temporarily left as syncOffset for compatibility, consider renaming and adding a migration for that
highestSeenBlockKey = []byte("lastSeen")
configKey = []byte("config")
)

Expand Down Expand Up @@ -176,6 +177,10 @@
return s.db.Using(rw).Set(storagePrefix, lastProcessedBlockKey, offset.Bytes())
}

func (s *storage) SaveHighestSeenBlock(rw basedb.ReadWriter, offset *big.Int) error {
return s.db.Using(rw).Set(storagePrefix, highestSeenBlockKey, offset.Bytes())

Check warning on line 181 in operator/storage/storage.go

View check run for this annotation

Codecov / codecov/patch

operator/storage/storage.go#L180-L181

Added lines #L180 - L181 were not covered by tests
}

func (s *storage) dropLastProcessedBlock() error {
return s.db.DropPrefix(append(storagePrefix, lastProcessedBlockKey...))
}
Expand Down Expand Up @@ -206,6 +211,20 @@
return offset, found, nil
}

// GetHighestSeenBlock returns the highest received block with or without events.
func (s *storage) GetHighestSeenBlock(r basedb.Reader) (*big.Int, bool, error) {
obj, found, err := s.db.UsingReader(r).Get(storagePrefix, highestSeenBlockKey)
if !found {
return nil, found, nil
}
if err != nil {
return nil, found, err
}

Check warning on line 222 in operator/storage/storage.go

View check run for this annotation

Codecov / codecov/patch

operator/storage/storage.go#L215-L222

Added lines #L215 - L222 were not covered by tests

offset := new(big.Int).SetBytes(obj.Value)
return offset, found, nil

Check warning on line 225 in operator/storage/storage.go

View check run for this annotation

Codecov / codecov/patch

operator/storage/storage.go#L224-L225

Added lines #L224 - L225 were not covered by tests
}

// GetPrivateKeyHash return sha256 hashed private key
func (s *storage) GetPrivateKeyHash() (string, bool, error) {
obj, found, err := s.db.Get(storagePrefix, []byte(HashedPrivateKey))
Expand Down
Loading