Skip to content

Commit

Permalink
Merge branch 'stage' into fix/multiclient-oneclient
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Jan 30, 2025
2 parents 23d393d + 860597a commit 85779ec
Show file tree
Hide file tree
Showing 16 changed files with 389 additions and 618 deletions.
15 changes: 15 additions & 0 deletions eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,21 @@ func (ec *ExecutionClient) isClosed() bool {
func (ec *ExecutionClient) streamLogsToChan(ctx context.Context, logs chan<- BlockLogs, fromBlock uint64) (lastBlock uint64, err error) {
heads := make(chan *ethtypes.Header)

// Generally, execution client can stream logs using SubscribeFilterLogs, but we chose to use SubscribeNewHead + FilterLogs.
//
// We must receive all events as they determine the state of the ssv network, so a discrepancy can result in slashing.
// Therefore, we must be sure that we don't miss any log while streaming.
//
// With SubscribeFilterLogs we cannot specify the block we subscribe from, it always starts at the highest.
// So with streaming we had some bugs because of missing blocks:
// - first sync history from genesis to block 100, but then stream sometimes starts late at 102 (missed 101)
// - inevitably miss blocks during any stream connection interruptions (such as EL restarts)
//
// Thus, we decided not to rely on log streaming and use SubscribeNewHead + FilterLogs.
//
// It also allowed us to implement more 'atomic' behaviour easier:
// We can revert the tx if there was an error in processing all the events of a block.
// So we can restart from this block once everything is good.
sub, err := ec.client.SubscribeNewHead(ctx, heads)
if err != nil {
ec.logger.Error(elResponseErrMsg,
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/prysmaticlabs/go-bitfield v0.0.0-20240328144219-a1caa50c3a1e
github.com/prysmaticlabs/prysm/v4 v4.0.8
github.com/rs/zerolog v1.32.0
github.com/sanity-io/litter v1.5.6
github.com/sourcegraph/conc v0.3.0
github.com/spf13/cobra v1.8.1
github.com/ssvlabs/eth2-key-manager v1.4.2
Expand All @@ -56,8 +57,6 @@ require (
tailscale.com v1.72.0
)

require github.com/felixge/httpsnoop v1.0.4 // indirect

require (
github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c // indirect
github.com/DataDog/zstd v1.5.2 // indirect
Expand Down Expand Up @@ -91,6 +90,7 @@ require (
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
github.com/ethereum/go-verkle v0.1.1-0.20240306133620-7d920df305f0 // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ github.com/crate-crypto/go-kzg-4844 v1.0.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXk
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/d4l3k/messagediff v1.2.1 h1:ZcAIMYsUg0EAp9X+tt8/enBE/Q8Yd5kzPynLyKptt9U=
github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkEQxENCrlLo=
github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -619,6 +620,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -683,6 +685,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sanity-io/litter v1.5.6 h1:hCFycYzhRnW4niFbbmR7QKdmds69PbVa/sNmEN5euSU=
github.com/sanity-io/litter v1.5.6/go.mod h1:9gzJgR2i4ZpjZHsKvUXIRQVk7P+yM3e+jAF7bU2UI5U=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
Expand Down Expand Up @@ -745,6 +749,7 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
16 changes: 0 additions & 16 deletions ibft/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"
Expand All @@ -27,20 +25,6 @@ const (
participantsKey = "pt"
)

var (
metricsHighestDecided = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ssv:validator:ibft_highest_decided",
Help: "The highest decided sequence number",
}, []string{"identifier", "pubKey"})
)

func init() {
logger := zap.L()
if err := prometheus.Register(metricsHighestDecided); err != nil {
logger.Debug("could not register prometheus collector")
}
}

// participantStorage struct
// instanceType is what separates different iBFT eth2 duty types (attestation, proposal and aggregation)
type participantStorage struct {
Expand Down
12 changes: 6 additions & 6 deletions migrations/migration_5_gob.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Metadata struct {
Liquidated bool
}

func storageShareGOBToSpecShare(share *storageShareGOB) (*types.SSVShare, error) {
func storageShareGOBToDomainShare(share *storageShareGOB) (*types.SSVShare, error) {
committee := make([]*spectypes.ShareMember, len(share.Committee))
for i, c := range share.Committee {
committee[i] = &spectypes.ShareMember{
Expand All @@ -75,7 +75,7 @@ func storageShareGOBToSpecShare(share *storageShareGOB) (*types.SSVShare, error)
var validatorPubKey spectypes.ValidatorPK
copy(validatorPubKey[:], share.ValidatorPubKey)

specShare := &types.SSVShare{
domainShare := &types.SSVShare{
Share: spectypes.Share{
ValidatorPubKey: validatorPubKey,
SharePubKey: share.SharePubKey,
Expand All @@ -89,10 +89,10 @@ func storageShareGOBToSpecShare(share *storageShareGOB) (*types.SSVShare, error)
}

if share.BeaconMetadata != nil {
specShare.ValidatorIndex = share.BeaconMetadata.Index
specShare.Status = share.BeaconMetadata.Status
specShare.ActivationEpoch = share.BeaconMetadata.ActivationEpoch
domainShare.ValidatorIndex = share.BeaconMetadata.Index
domainShare.Status = share.BeaconMetadata.Status
domainShare.ActivationEpoch = share.BeaconMetadata.ActivationEpoch
}

return specShare, nil
return domainShare, nil
}
183 changes: 157 additions & 26 deletions migrations/migration_5_share_gob_to_ssz.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package migrations

import (
"bytes"
"context"
"fmt"

"github.com/sanity-io/litter"
opstorage "github.com/ssvlabs/ssv/operator/storage"
"github.com/ssvlabs/ssv/registry/storage"
"github.com/ssvlabs/ssv/storage/basedb"
"go.uber.org/zap"
)
Expand All @@ -12,32 +17,51 @@ import (
// data to be targeted - so that SSV node with "fresh" DB can operate just fine.
var migration_5_change_share_format_from_gob_to_ssz = Migration{
Name: "migration_5_change_share_format_from_gob_to_ssz",
Run: func(ctx context.Context, logger *zap.Logger, opt Options, key []byte, completed CompletedFunc) error {
// storagePrefix is a base prefix we use when storing shares
var storagePrefix = []byte("operator/")
Run: func(ctx context.Context, logger *zap.Logger, opt Options, key []byte, completed CompletedFunc) (err error) {
var sharesGOBTotal int

defer func() {
if err != nil {
return // cannot complete migration successfully
}
// complete migration, this makes sure migration applies only once
if err = completed(opt.Db); err != nil {
err = fmt.Errorf("complete transaction: %w", err)
return
}
logger.Info("migration completed", zap.Int("gob_shares_total", sharesGOBTotal))
}()

// sets is a bunch of updates this migration will need to perform, we cannot do them all in a
// single transaction (because there is a limit on how large a single transaction can be) so
// we'll use SetMany func that will split up the data we want to update into batches committing
// each batch in a separate transaction. I guess that makes this migration non-atomic.
sets := make([]basedb.Obj, 0)
// sharesSSZEncoded is a bunch of updates this migration will need to perform, we cannot do them
// all in a single transaction (because there is a limit on how large a single transaction can be)
// so we'll use SetMany func that will split up the data we want to update into batches committing
// each batch in a separate transaction. I guess that makes this migration non-atomic, but since
// this migration is idempotent atomicity isn't required (we can re-apply it however many times
// we like without "breaking" anything)
sharesSSZEncoded := make([]basedb.Obj, 0)

err := opt.Db.GetAll(append(storagePrefix, sharesPrefixGOB...), func(i int, obj basedb.Obj) error {
sharesGOB := make(map[string]*storageShareGOB)
err = opt.Db.GetAll(append(opstorage.OperatorStoragePrefix, sharesPrefixGOB...), func(i int, obj basedb.Obj) error {
shareGOB := &storageShareGOB{}
if err := shareGOB.Decode(obj.Value); err != nil {
return fmt.Errorf("decode gob share: %w", err)
}
share, err := storageShareGOBToSpecShare(shareGOB)
sID := shareID(shareGOB.ValidatorPubKey)
if _, ok := sharesGOB[sID]; ok {
return fmt.Errorf("have already seen GOB share with the same share ID: %s", sID)
}
sharesGOB[sID] = shareGOB
share, err := storageShareGOBToDomainShare(shareGOB)
if err != nil {
return fmt.Errorf("convert storage share to spec share: %w", err)
return fmt.Errorf("convert gob storage share to domain share: %w", err)
}
shareSSZ := specShareToStorageShareSSZ(share)
key := storageKeySSZ(share.ValidatorPubKey[:])
shareSSZ := storage.FromSSVShare(share)
key := storage.SharesDBKey(shareSSZ.ValidatorPubKey[:])
value, err := shareSSZ.Encode()
if err != nil {
return fmt.Errorf("encode ssz share: %w", err)
}
sets = append(sets, basedb.Obj{
sharesSSZEncoded = append(sharesSSZEncoded, basedb.Obj{
Key: key,
Value: value,
})
Expand All @@ -47,23 +71,130 @@ var migration_5_change_share_format_from_gob_to_ssz = Migration{
return fmt.Errorf("GetAll: %w", err)
}

if err := opt.Db.SetMany(storagePrefix, len(sets), func(i int) (basedb.Obj, error) {
return sets[i], nil
sharesGOBTotal = len(sharesGOB)
if sharesGOBTotal == 0 {
return nil // we won't be creating any SSZ shares
}

if err := opt.Db.SetMany(opstorage.OperatorStoragePrefix, len(sharesSSZEncoded), func(i int) (basedb.Obj, error) {
return sharesSSZEncoded[i], nil
}); err != nil {
return fmt.Errorf("SetMany: %w", err)
}

// TODO - do not complete this migration for now, we'll complete it once
// additional sanity-checks are added here
//if err := opt.Db.DropPrefix(append(storagePrefix, sharesPrefixGOB...)); err != nil {
// return fmt.Errorf("DropPrefix: %w", err)
//}
//
//// This makes sure migration applies only once.
//if err := completed(opt.Db); err != nil {
// return fmt.Errorf("complete transaction: %w", err)
//}
sharesSSZTotal := 0
if err := opt.Db.GetAll(storage.SharesDBPrefix(opstorage.OperatorStoragePrefix), func(i int, obj basedb.Obj) error {
shareSSZ := &storage.Share{}
err := shareSSZ.Decode(obj.Value)
if err != nil {
return fmt.Errorf("decode ssz share: %w", err)
}
sID := shareID(shareSSZ.ValidatorPubKey)
shareGOB, ok := sharesGOB[sID]
if !ok {
// this shouldn't really happen & we should probably return error if it does, but
// on stage since we already have some SSV nodes that migrated to SSZ format and
// potentially added new validators (new SSZ shares) erroring would prevent migration
// from completing, so we don't return error here
return nil
}
if !matchGOBvsSSZ(shareGOB, shareSSZ) {
return fmt.Errorf(
"GOB share doesn't match corresponding SSZ share, GOB: %s, SSZ: %s",
litter.Sdump(shareGOB),
litter.Sdump(shareSSZ),
)
}
sharesSSZTotal++
return nil
}); err != nil {
return fmt.Errorf("GetMany: %w", err)
}

if sharesSSZTotal != sharesGOBTotal {
return fmt.Errorf("total SSZ shares count %d doesn't match GOB shares count %d", sharesSSZTotal, sharesGOBTotal)
}

if err = opt.Db.DropPrefix(append(opstorage.OperatorStoragePrefix, sharesPrefixGOB...)); err != nil {
err = fmt.Errorf("DropPrefix (GOB shares): %w", err)
return
}

return nil
},
}

func shareID(validatorPubkey []byte) string {
return string(validatorPubkey)
}

func matchGOBvsSSZ(shareGOB *storageShareGOB, shareSSZ *storage.Share) bool {
// note, ssz share no longer has OperatorID field
if !bytes.Equal(shareGOB.ValidatorPubKey, shareSSZ.ValidatorPubKey) {
return false
}
if !bytes.Equal(shareGOB.SharePubKey, shareSSZ.SharePubKey) {
return false
}
if len(shareGOB.Committee) != len(shareSSZ.Committee) {
return false
}
for i, committeeGOB := range shareGOB.Committee {
committeeSSZ := shareSSZ.Committee[i]
if committeeGOB.OperatorID != committeeSSZ.OperatorID {
return false
}
if !bytes.Equal(committeeGOB.PubKey, committeeSSZ.PubKey) {
return false
}
}
if shareGOB.Quorum != shareSSZ.Quorum {
return false
}
if shareGOB.PartialQuorum != shareSSZ.PartialQuorum {
return false
}
if shareGOB.DomainType != shareSSZ.DomainType {
return false
}
if shareGOB.FeeRecipientAddress != shareSSZ.FeeRecipientAddress {
return false
}
if !bytes.Equal(shareGOB.Graffiti, shareSSZ.Graffiti) {
return false
}

if shareGOB.OwnerAddress != shareSSZ.OwnerAddress {
return false
}
if shareGOB.Liquidated != shareSSZ.Liquidated {
return false
}

// finally, check Beacon metadata matches
if shareGOB.BeaconMetadata == nil {
if shareSSZ.ValidatorIndex != 0 {
return false
}
if shareSSZ.Status != 0 {
return false
}
if shareSSZ.ActivationEpoch != 0 {
return false
}
// note, ssz share no longer has Balance field
return true
}
if uint64(shareGOB.BeaconMetadata.Index) != shareSSZ.ValidatorIndex {
return false
}
// #nosec G115 - never above max int
if int(shareGOB.BeaconMetadata.Status) != int(shareSSZ.Status) {
return false
}
if uint64(shareGOB.BeaconMetadata.ActivationEpoch) != shareSSZ.ActivationEpoch {
return false
}

return true
}
Loading

0 comments on commit 85779ec

Please sign in to comment.