From dee2191718fc70286103f4167708f7a0f770058c Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Mon, 29 Apr 2024 22:42:33 +0200 Subject: [PATCH] Caplin: process new attesting indicies before block comes in to avoid occasiona Reorg (#10085) --- cl/phase1/forkchoice/interface.go | 18 +++- .../mock_services/forkchoice_mock.go | 82 +++++++++++++---- cl/phase1/forkchoice/on_attestation.go | 53 +++++++++-- .../services/aggregate_and_proof_service.go | 89 +++++++++++++++---- cl/sentinel/service/start.go | 57 +++++++++--- 5 files changed, 242 insertions(+), 57 deletions(-) diff --git a/cl/phase1/forkchoice/interface.go b/cl/phase1/forkchoice/interface.go index 5d4b89e0605..e24af7990ff 100644 --- a/cl/phase1/forkchoice/interface.go +++ b/cl/phase1/forkchoice/interface.go @@ -30,8 +30,13 @@ type ForkChoiceStorageReader interface { JustifiedCheckpoint() solid.Checkpoint JustifiedSlot() uint64 ProposerBoostRoot() common.Hash - GetStateAtBlockRoot(blockRoot libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) - GetFinalityCheckpoints(blockRoot libcommon.Hash) (bool, solid.Checkpoint, solid.Checkpoint, solid.Checkpoint) + GetStateAtBlockRoot( + blockRoot libcommon.Hash, + alwaysCopy bool, + ) (*state.CachingBeaconState, error) + GetFinalityCheckpoints( + blockRoot libcommon.Hash, + ) (bool, solid.Checkpoint, solid.Checkpoint, solid.Checkpoint) GetSyncCommittees(period uint64) (*solid.SyncCommittee, *solid.SyncCommittee, bool) GetBeaconCommitee(slot, committeeIndex uint64) ([]uint64, error) Slot() uint64 @@ -60,8 +65,15 @@ type ForkChoiceStorageReader interface { type ForkChoiceStorageWriter interface { OnAttestation(attestation *solid.Attestation, fromBlock, insert bool) error OnAttesterSlashing(attesterSlashing *cltypes.AttesterSlashing, test bool) error - OnBlock(ctx context.Context, block *cltypes.SignedBeaconBlock, newPayload bool, fullValidation bool, checkDataAvaibility bool) error + OnBlock( + ctx context.Context, + block *cltypes.SignedBeaconBlock, + newPayload bool, + fullValidation bool, + checkDataAvaibility bool, + ) error AddPreverifiedBlobSidecar(blobSidecar *cltypes.BlobSidecar) error OnTick(time uint64) SetSynced(synced bool) + ProcessAttestingIndicies(attestation *solid.Attestation, attestionIndicies []uint64) } diff --git a/cl/phase1/forkchoice/mock_services/forkchoice_mock.go b/cl/phase1/forkchoice/mock_services/forkchoice_mock.go index ea15759a373..ea37ada97eb 100644 --- a/cl/phase1/forkchoice/mock_services/forkchoice_mock.go +++ b/cl/phase1/forkchoice/mock_services/forkchoice_mock.go @@ -172,17 +172,27 @@ func (f *ForkChoiceStorageMock) ProposerBoostRoot() common.Hash { return f.ProposerBoostRootVal } -func (f *ForkChoiceStorageMock) GetStateAtBlockRoot(blockRoot common.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) { +func (f *ForkChoiceStorageMock) GetStateAtBlockRoot( + blockRoot common.Hash, + alwaysCopy bool, +) (*state.CachingBeaconState, error) { return f.StateAtBlockRootVal[blockRoot], nil } -func (f *ForkChoiceStorageMock) GetFinalityCheckpoints(blockRoot common.Hash) (bool, solid.Checkpoint, solid.Checkpoint, solid.Checkpoint) { - oneNil := f.GetFinalityCheckpointsVal[blockRoot][0] != nil && f.GetFinalityCheckpointsVal[blockRoot][1] != nil && f.GetFinalityCheckpointsVal[blockRoot][2] != nil +func (f *ForkChoiceStorageMock) GetFinalityCheckpoints( + blockRoot common.Hash, +) (bool, solid.Checkpoint, solid.Checkpoint, solid.Checkpoint) { + oneNil := f.GetFinalityCheckpointsVal[blockRoot][0] != nil && + f.GetFinalityCheckpointsVal[blockRoot][1] != nil && + f.GetFinalityCheckpointsVal[blockRoot][2] != nil return oneNil, f.GetFinalityCheckpointsVal[blockRoot][0], f.GetFinalityCheckpointsVal[blockRoot][1], f.GetFinalityCheckpointsVal[blockRoot][2] } -func (f *ForkChoiceStorageMock) GetSyncCommittees(period uint64) (*solid.SyncCommittee, *solid.SyncCommittee, bool) { - return f.GetSyncCommitteesVal[period][0], f.GetSyncCommitteesVal[period][1], f.GetSyncCommitteesVal[period][0] != nil && f.GetSyncCommitteesVal[period][1] != nil +func (f *ForkChoiceStorageMock) GetSyncCommittees( + period uint64, +) (*solid.SyncCommittee, *solid.SyncCommittee, bool) { + return f.GetSyncCommitteesVal[period][0], f.GetSyncCommitteesVal[period][1], f.GetSyncCommitteesVal[period][0] != nil && + f.GetSyncCommitteesVal[period][1] != nil } func (f *ForkChoiceStorageMock) GetBeaconCommitee(slot, committeeIndex uint64) ([]uint64, error) { @@ -200,17 +210,32 @@ func (f *ForkChoiceStorageMock) Time() uint64 { return f.TimeVal } -func (f *ForkChoiceStorageMock) OnAttestation(attestation *solid.Attestation, fromBlock, insert bool) error { +func (f *ForkChoiceStorageMock) OnAttestation( + attestation *solid.Attestation, + fromBlock, insert bool, +) error { f.Pool.AttestationsPool.Insert(attestation.Signature(), attestation) return nil } -func (f *ForkChoiceStorageMock) OnAttesterSlashing(attesterSlashing *cltypes.AttesterSlashing, test bool) error { - f.Pool.AttesterSlashingsPool.Insert(pool.ComputeKeyForAttesterSlashing(attesterSlashing), attesterSlashing) +func (f *ForkChoiceStorageMock) OnAttesterSlashing( + attesterSlashing *cltypes.AttesterSlashing, + test bool, +) error { + f.Pool.AttesterSlashingsPool.Insert( + pool.ComputeKeyForAttesterSlashing(attesterSlashing), + attesterSlashing, + ) return nil } -func (f *ForkChoiceStorageMock) OnBlock(ctx context.Context, block *cltypes.SignedBeaconBlock, newPayload bool, fullValidation bool, checkDataAvaiability bool) error { +func (f *ForkChoiceStorageMock) OnBlock( + ctx context.Context, + block *cltypes.SignedBeaconBlock, + newPayload bool, + fullValidation bool, + checkDataAvaiability bool, +) error { return nil } @@ -250,7 +275,9 @@ func (f *ForkChoiceStorageMock) SetSynced(synced bool) { panic("implement me") } -func (f *ForkChoiceStorageMock) GetLightClientBootstrap(blockRoot common.Hash) (*cltypes.LightClientBootstrap, bool) { +func (f *ForkChoiceStorageMock) GetLightClientBootstrap( + blockRoot common.Hash, +) (*cltypes.LightClientBootstrap, bool) { return f.LightClientBootstraps[blockRoot], f.LightClientBootstraps[blockRoot] != nil } @@ -258,11 +285,15 @@ func (f *ForkChoiceStorageMock) NewestLightClientUpdate() *cltypes.LightClientUp return f.NewestLCUpdate } -func (f *ForkChoiceStorageMock) GetLightClientUpdate(period uint64) (*cltypes.LightClientUpdate, bool) { +func (f *ForkChoiceStorageMock) GetLightClientUpdate( + period uint64, +) (*cltypes.LightClientUpdate, bool) { return f.LCUpdates[period], f.LCUpdates[period] != nil } -func (f *ForkChoiceStorageMock) GetHeader(blockRoot libcommon.Hash) (*cltypes.BeaconBlockHeader, bool) { +func (f *ForkChoiceStorageMock) GetHeader( + blockRoot libcommon.Hash, +) (*cltypes.BeaconBlockHeader, bool) { return f.Headers[blockRoot], f.Headers[blockRoot] != nil } @@ -270,23 +301,34 @@ func (f *ForkChoiceStorageMock) GetBalances(blockRoot libcommon.Hash) (solid.Uin panic("implement me") } -func (f *ForkChoiceStorageMock) GetInactivitiesScores(blockRoot libcommon.Hash) (solid.Uint64ListSSZ, error) { +func (f *ForkChoiceStorageMock) GetInactivitiesScores( + blockRoot libcommon.Hash, +) (solid.Uint64ListSSZ, error) { panic("implement me") } -func (f *ForkChoiceStorageMock) GetPreviousPartecipationIndicies(blockRoot libcommon.Hash) (*solid.BitList, error) { +func (f *ForkChoiceStorageMock) GetPreviousPartecipationIndicies( + blockRoot libcommon.Hash, +) (*solid.BitList, error) { panic("implement me") } -func (f *ForkChoiceStorageMock) GetValidatorSet(blockRoot libcommon.Hash) (*solid.ValidatorSet, error) { +func (f *ForkChoiceStorageMock) GetValidatorSet( + blockRoot libcommon.Hash, +) (*solid.ValidatorSet, error) { panic("implement me") } -func (f *ForkChoiceStorageMock) GetCurrentPartecipationIndicies(blockRoot libcommon.Hash) (*solid.BitList, error) { +func (f *ForkChoiceStorageMock) GetCurrentPartecipationIndicies( + blockRoot libcommon.Hash, +) (*solid.BitList, error) { panic("implement me") } -func (f *ForkChoiceStorageMock) GetPublicKeyForValidator(blockRoot libcommon.Hash, idx uint64) (libcommon.Bytes48, error) { +func (f *ForkChoiceStorageMock) GetPublicKeyForValidator( + blockRoot libcommon.Hash, + idx uint64, +) (libcommon.Bytes48, error) { panic("implement me") } @@ -301,3 +343,9 @@ func (f *ForkChoiceStorageMock) AddPreverifiedBlobSidecar(msg *cltypes.BlobSidec func (f *ForkChoiceStorageMock) ValidateOnAttestation(attestation *solid.Attestation) error { panic("implement me") } + +func (f *ForkChoiceStorageMock) ProcessAttestingIndicies( + attestation *solid.Attestation, + attestionIndicies []uint64, +) { +} diff --git a/cl/phase1/forkchoice/on_attestation.go b/cl/phase1/forkchoice/on_attestation.go index 55ecfe5381f..4056e8c11cb 100644 --- a/cl/phase1/forkchoice/on_attestation.go +++ b/cl/phase1/forkchoice/on_attestation.go @@ -14,7 +14,11 @@ var ( ) // OnAttestation processes incoming attestations. -func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBlock bool, insert bool) error { +func (f *ForkChoiceStore) OnAttestation( + attestation *solid.Attestation, + fromBlock bool, + insert bool, +) error { if !f.synced.Load() { return nil } @@ -42,7 +46,11 @@ func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBloc target := data.Target() if headState == nil { - attestationIndicies, err = f.verifyAttestationWithCheckpointState(target, attestation, fromBlock) + attestationIndicies, err = f.verifyAttestationWithCheckpointState( + target, + attestation, + fromBlock, + ) } else { attestationIndicies, err = f.verifyAttestationWithState(headState, attestation, fromBlock) } @@ -56,7 +64,20 @@ func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBloc return nil } -func (f *ForkChoiceStore) verifyAttestationWithCheckpointState(target solid.Checkpoint, attestation *solid.Attestation, fromBlock bool) (attestationIndicies []uint64, err error) { +func (f *ForkChoiceStore) ProcessAttestingIndicies( + attestation *solid.Attestation, + attestionIndicies []uint64, +) { + f.mu.Lock() + defer f.mu.Unlock() + f.processAttestingIndicies(attestation, attestionIndicies) +} + +func (f *ForkChoiceStore) verifyAttestationWithCheckpointState( + target solid.Checkpoint, + attestation *solid.Attestation, + fromBlock bool, +) (attestationIndicies []uint64, err error) { data := attestation.AttestantionData() targetState, err := f.getCheckpointState(target) if err != nil { @@ -67,7 +88,10 @@ func (f *ForkChoiceStore) verifyAttestationWithCheckpointState(target solid.Chec return nil, fmt.Errorf("target state does not exist") } // Now we need to find the attesting indicies. - attestationIndicies, err = targetState.getAttestingIndicies(&data, attestation.AggregationBits()) + attestationIndicies, err = targetState.getAttestingIndicies( + &data, + attestation.AggregationBits(), + ) if err != nil { return nil, err } @@ -88,7 +112,11 @@ func (f *ForkChoiceStore) verifyAttestationWithCheckpointState(target solid.Chec return attestationIndicies, nil } -func (f *ForkChoiceStore) verifyAttestationWithState(s *state.CachingBeaconState, attestation *solid.Attestation, fromBlock bool) (attestationIndicies []uint64, err error) { +func (f *ForkChoiceStore) verifyAttestationWithState( + s *state.CachingBeaconState, + attestation *solid.Attestation, + fromBlock bool, +) (attestationIndicies []uint64, err error) { data := attestation.AttestantionData() if err != nil { return nil, err @@ -127,7 +155,8 @@ func (f *ForkChoiceStore) setLatestMessage(index uint64, message LatestMessage) } func (f *ForkChoiceStore) getLatestMessage(validatorIndex uint64) (LatestMessage, bool) { - if validatorIndex >= uint64(len(f.latestMessages)) || f.latestMessages[validatorIndex] == (LatestMessage{}) { + if validatorIndex >= uint64(len(f.latestMessages)) || + f.latestMessages[validatorIndex] == (LatestMessage{}) { return LatestMessage{}, false } return f.latestMessages[validatorIndex], true @@ -157,7 +186,10 @@ func (f *ForkChoiceStore) setUnequivocating(validatorIndex uint64) { f.equivocatingIndicies[index] |= 1 << uint(subIndex) } -func (f *ForkChoiceStore) processAttestingIndicies(attestation *solid.Attestation, indicies []uint64) { +func (f *ForkChoiceStore) processAttestingIndicies( + attestation *solid.Attestation, + indicies []uint64, +) { beaconBlockRoot := attestation.AttestantionData().BeaconBlockRoot() target := attestation.AttestantionData().Target() @@ -184,7 +216,8 @@ func (f *ForkChoiceStore) ValidateOnAttestation(attestation *solid.Attestation) if _, has := f.forkGraph.GetHeader(target.BlockRoot()); !has { return fmt.Errorf("target root is missing") } - if blockHeader, has := f.forkGraph.GetHeader(attestation.AttestantionData().BeaconBlockRoot()); !has || blockHeader.Slot > attestation.AttestantionData().Slot() { + if blockHeader, has := f.forkGraph.GetHeader(attestation.AttestantionData().BeaconBlockRoot()); !has || + blockHeader.Slot > attestation.AttestantionData().Slot() { return fmt.Errorf("bad attestation data") } // LMD vote must be consistent with FFG vote target @@ -200,7 +233,9 @@ func (f *ForkChoiceStore) ValidateOnAttestation(attestation *solid.Attestation) return nil } -func (f *ForkChoiceStore) validateTargetEpochAgainstCurrentTime(attestation *solid.Attestation) error { +func (f *ForkChoiceStore) validateTargetEpochAgainstCurrentTime( + attestation *solid.Attestation, +) error { target := attestation.AttestantionData().Target() // Attestations must be from the current or previous epoch currentEpoch := f.computeEpochAtSlot(f.Slot()) diff --git a/cl/phase1/network/services/aggregate_and_proof_service.go b/cl/phase1/network/services/aggregate_and_proof_service.go index 83e518d9e5c..b2fdb54c2db 100644 --- a/cl/phase1/network/services/aggregate_and_proof_service.go +++ b/cl/phase1/network/services/aggregate_and_proof_service.go @@ -37,7 +37,14 @@ type aggregateAndProofServiceImpl struct { aggregatesScheduledForLaterExecution sync.Map } -func NewAggregateAndProofService(ctx context.Context, syncedDataManager *synced_data.SyncedDataManager, forkchoiceStore forkchoice.ForkChoiceStorage, beaconCfg *clparams.BeaconChainConfig, opPool pool.OperationsPool, test bool) AggregateAndProofService { +func NewAggregateAndProofService( + ctx context.Context, + syncedDataManager *synced_data.SyncedDataManager, + forkchoiceStore forkchoice.ForkChoiceStorage, + beaconCfg *clparams.BeaconChainConfig, + opPool pool.OperationsPool, + test bool, +) AggregateAndProofService { a := &aggregateAndProofServiceImpl{ syncedDataManager: syncedDataManager, forkchoiceStore: forkchoiceStore, @@ -49,7 +56,11 @@ func NewAggregateAndProofService(ctx context.Context, syncedDataManager *synced_ return a } -func (a *aggregateAndProofServiceImpl) ProcessMessage(ctx context.Context, subnet *uint64, aggregateAndProof *cltypes.SignedAggregateAndProof) error { +func (a *aggregateAndProofServiceImpl) ProcessMessage( + ctx context.Context, + subnet *uint64, + aggregateAndProof *cltypes.SignedAggregateAndProof, +) error { headState := a.syncedDataManager.HeadState() if headState == nil { return ErrIgnore @@ -72,7 +83,10 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(ctx context.Context, subne finalizedCheckpoint := a.forkchoiceStore.FinalizedCheckpoint() finalizedSlot := finalizedCheckpoint.Epoch() * a.beaconCfg.SlotsPerEpoch // [IGNORE] The current finalized_checkpoint is an ancestor of the block defined by aggregate.data.beacon_block_root -- i.e. get_checkpoint_block(store, aggregate.data.beacon_block_root, finalized_checkpoint.epoch) == store.finalized_checkpoint.root - if a.forkchoiceStore.Ancestor(aggregateData.BeaconBlockRoot(), finalizedSlot) != finalizedCheckpoint.BlockRoot() { + if a.forkchoiceStore.Ancestor( + aggregateData.BeaconBlockRoot(), + finalizedSlot, + ) != finalizedCheckpoint.BlockRoot() { return ErrIgnore } @@ -100,7 +114,10 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(ctx context.Context, subne return fmt.Errorf("committee index not in committee") } // [REJECT] The aggregate attestation's target block is an ancestor of the block named in the LMD vote -- i.e. get_checkpoint_block(store, aggregate.data.beacon_block_root, aggregate.data.target.epoch) == aggregate.data.target.root - if a.forkchoiceStore.Ancestor(aggregateData.BeaconBlockRoot(), epoch*a.beaconCfg.SlotsPerEpoch) != target.BlockRoot() { + if a.forkchoiceStore.Ancestor( + aggregateData.BeaconBlockRoot(), + epoch*a.beaconCfg.SlotsPerEpoch, + ) != target.BlockRoot() { return fmt.Errorf("invalid target block") } if a.test { @@ -112,21 +129,39 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(ctx context.Context, subne log.Warn("receveived aggregate and proof from invalid aggregator") return fmt.Errorf("invalid aggregate and proof") } - - if err := verifySignaturesOnAggregate(headState, aggregateAndProof); err != nil { + attestingIndicies, err := headState.GetAttestingIndicies( + aggregateAndProof.Message.Aggregate.AttestantionData(), + aggregateAndProof.Message.Aggregate.AggregationBits(), + true, + ) + if err != nil { return err } - - // Add to aggregation pool - a.opPool.AttestationsPool.Insert(aggregateAndProof.Message.Aggregate.Signature(), aggregateAndProof.Message.Aggregate) - + if err := verifySignaturesOnAggregate(headState, aggregateAndProof); err != nil { + return err + } // Add to aggregation pool + a.opPool.AttestationsPool.Insert( + aggregateAndProof.Message.Aggregate.Signature(), + aggregateAndProof.Message.Aggregate, + ) + a.forkchoiceStore.ProcessAttestingIndicies( + aggregateAndProof.Message.Aggregate, + attestingIndicies, + ) return nil } -func verifySignaturesOnAggregate(s *state.CachingBeaconState, aggregateAndProof *cltypes.SignedAggregateAndProof) error { +func verifySignaturesOnAggregate( + s *state.CachingBeaconState, + aggregateAndProof *cltypes.SignedAggregateAndProof, +) error { aggregationBits := aggregateAndProof.Message.Aggregate.AggregationBits() // [REJECT] The aggregate attestation has participants -- that is, len(get_attesting_indices(state, aggregate)) >= 1. - attestingIndicies, err := s.GetAttestingIndicies(aggregateAndProof.Message.Aggregate.AttestantionData(), aggregationBits, true) + attestingIndicies, err := s.GetAttestingIndicies( + aggregateAndProof.Message.Aggregate.AttestantionData(), + aggregationBits, + true, + ) if err != nil { return err } @@ -146,13 +181,19 @@ func verifySignaturesOnAggregate(s *state.CachingBeaconState, aggregateAndProof return verifyAggregateMessageSignature(s, aggregateAndProof, attestingIndicies) } -func verifyAggregateAndProofSignature(state *state.CachingBeaconState, aggregate *cltypes.AggregateAndProof) error { +func verifyAggregateAndProofSignature( + state *state.CachingBeaconState, + aggregate *cltypes.AggregateAndProof, +) error { slot := aggregate.Aggregate.AttestantionData().Slot() publicKey, err := state.ValidatorPublicKey(int(aggregate.AggregatorIndex)) if err != nil { return err } - domain, err := state.GetDomain(state.BeaconConfig().DomainSelectionProof, slot*state.BeaconConfig().SlotsPerEpoch) + domain, err := state.GetDomain( + state.BeaconConfig().DomainSelectionProof, + slot*state.BeaconConfig().SlotsPerEpoch, + ) if err != nil { return err } @@ -167,7 +208,10 @@ func verifyAggregateAndProofSignature(state *state.CachingBeaconState, aggregate return nil } -func verifyAggregatorSignature(state *state.CachingBeaconState, aggregate *cltypes.SignedAggregateAndProof) error { +func verifyAggregatorSignature( + state *state.CachingBeaconState, + aggregate *cltypes.SignedAggregateAndProof, +) error { publicKey, err := state.ValidatorPublicKey(int(aggregate.Message.AggregatorIndex)) if err != nil { return err @@ -190,8 +234,15 @@ func verifyAggregatorSignature(state *state.CachingBeaconState, aggregate *cltyp return nil } -func verifyAggregateMessageSignature(s *state.CachingBeaconState, aggregateAndProof *cltypes.SignedAggregateAndProof, attestingIndicies []uint64) error { - indexedAttestation := state.GetIndexedAttestation(aggregateAndProof.Message.Aggregate, attestingIndicies) +func verifyAggregateMessageSignature( + s *state.CachingBeaconState, + aggregateAndProof *cltypes.SignedAggregateAndProof, + attestingIndicies []uint64, +) error { + indexedAttestation := state.GetIndexedAttestation( + aggregateAndProof.Message.Aggregate, + attestingIndicies, + ) valid, err := state.IsValidIndexedAttestation(s, indexedAttestation) if err != nil { @@ -203,7 +254,9 @@ func verifyAggregateMessageSignature(s *state.CachingBeaconState, aggregateAndPr return nil } -func (a *aggregateAndProofServiceImpl) scheduleAggregateForLaterProcessing(aggregateAndProof *cltypes.SignedAggregateAndProof) { +func (a *aggregateAndProofServiceImpl) scheduleAggregateForLaterProcessing( + aggregateAndProof *cltypes.SignedAggregateAndProof, +) { key, err := aggregateAndProof.HashSSZ() if err != nil { panic(err) diff --git a/cl/sentinel/service/start.go b/cl/sentinel/service/start.go index 58f96e9f02a..dc4dfcf53a7 100644 --- a/cl/sentinel/service/start.go +++ b/cl/sentinel/service/start.go @@ -44,7 +44,8 @@ func generateSubnetsTopics(template string, maxIds int) []sentinel.GossipTopic { } func getExpirationForTopic(topic string) time.Time { - if strings.Contains(topic, "beacon_attestation") || (strings.Contains(topic, "sync_committee_") && !strings.Contains(topic, gossip.TopicNameSyncCommitteeContributionAndProof)) { + if strings.Contains(topic, "beacon_attestation") || + (strings.Contains(topic, "sync_committee_") && !strings.Contains(topic, gossip.TopicNameSyncCommitteeContributionAndProof)) { return time.Unix(0, 0) } @@ -60,7 +61,16 @@ func createSentinel( ethClock eth_clock.EthereumClock, validatorTopics bool, logger log.Logger) (*sentinel.Sentinel, error) { - sent, err := sentinel.New(context.Background(), cfg, ethClock, blockReader, blobStorage, indiciesDB, logger, forkChoiceReader) + sent, err := sentinel.New( + context.Background(), + cfg, + ethClock, + blockReader, + blobStorage, + indiciesDB, + logger, + forkChoiceReader, + ) if err != nil { return nil, err } @@ -75,13 +85,27 @@ func createSentinel( sentinel.BlsToExecutionChangeSsz, ////sentinel.LightClientFinalityUpdateSsz, ////sentinel.LightClientOptimisticUpdateSsz, + sentinel.SyncCommitteeContributionAndProofSsz, + sentinel.BeaconAggregateAndProofSsz, } - if validatorTopics { - gossipTopics = append(gossipTopics, sentinel.SyncCommitteeContributionAndProofSsz, sentinel.BeaconAggregateAndProofSsz) - } - gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixBlobSidecar, int(cfg.BeaconConfig.MaxBlobsPerBlock))...) - gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixBeaconAttestation, int(cfg.NetworkConfig.AttestationSubnetCount))...) - gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixSyncCommittee, int(cfg.BeaconConfig.SyncCommitteeSubnetCount))...) + gossipTopics = append( + gossipTopics, + generateSubnetsTopics( + gossip.TopicNamePrefixBlobSidecar, + int(cfg.BeaconConfig.MaxBlobsPerBlock), + )...) + gossipTopics = append( + gossipTopics, + generateSubnetsTopics( + gossip.TopicNamePrefixBeaconAttestation, + int(cfg.NetworkConfig.AttestationSubnetCount), + )...) + gossipTopics = append( + gossipTopics, + generateSubnetsTopics( + gossip.TopicNamePrefixSyncCommittee, + int(cfg.BeaconConfig.SyncCommitteeSubnetCount), + )...) for _, v := range gossipTopics { if err := sent.Unsubscribe(v); err != nil { @@ -110,7 +134,16 @@ func StartSentinelService( forkChoiceReader forkchoice.ForkChoiceStorageReader, logger log.Logger) (sentinelrpc.SentinelClient, error) { ctx := context.Background() - sent, err := createSentinel(cfg, blockReader, blobStorage, indiciesDB, forkChoiceReader, ethClock, srvCfg.Validator, logger) + sent, err := createSentinel( + cfg, + blockReader, + blobStorage, + indiciesDB, + forkChoiceReader, + ethClock, + srvCfg.Validator, + logger, + ) if err != nil { return nil, err } @@ -125,7 +158,11 @@ func StartSentinelService( return direct.NewSentinelClientDirect(server), nil } -func StartServe(server *SentinelServer, srvCfg *ServerConfig, creds credentials.TransportCredentials) { +func StartServe( + server *SentinelServer, + srvCfg *ServerConfig, + creds credentials.TransportCredentials, +) { lis, err := net.Listen(srvCfg.Network, srvCfg.Addr) if err != nil { log.Warn("[Sentinel] could not serve service", "reason", err)