Skip to content

Commit

Permalink
First round of fixes in making gossip publishing good for the validat…
Browse files Browse the repository at this point in the history
…or: See comment (#9972)

* Fixed and simplified unaggregated bits check.
* There are 2 bits on, one for the attester and one for the
End-of-bitlist, needed to account for end of bitlist bit
 * Wrong publishing topic for sync_committee_ messages
* Added more Ignore by receiving specific errors to avoid forwarding
useless data.
 * Replaced `validateAttestation` with full message processing
 * Fixed forwarding of sync committee aggregates
 * Fixed subnet announcements

---------

Co-authored-by: kewei <kewei.train@gmail.com>
  • Loading branch information
Giulio2002 and domiwei authored Apr 21, 2024
1 parent 54a1609 commit 596d54d
Show file tree
Hide file tree
Showing 31 changed files with 446 additions and 219 deletions.
11 changes: 6 additions & 5 deletions cl/aggregation/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/ledgerwatch/erigon/cl/utils/eth_clock"
)

var ErrIsSuperset = fmt.Errorf("attestation is superset of existing attestation")

var (
blsAggregate = bls.AggregateSignatures
)
Expand Down Expand Up @@ -55,13 +57,12 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error {
defer p.aggregatesLock.Unlock()
att, ok := p.aggregates[hashRoot]
if !ok {
p.aggregates[hashRoot] = inAtt.Clone().(*solid.Attestation)
p.aggregates[hashRoot] = inAtt.Copy()
return nil
}

if utils.IsSupersetBitlist(att.AggregationBits(), inAtt.AggregationBits()) {
// no need to merge existing signatures
return nil
return ErrIsSuperset
}

// merge signature
Expand All @@ -71,7 +72,7 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error {
if err != nil {
return err
}
if len(merged) > 96 {
if len(merged) != 96 {
return fmt.Errorf("merged signature is too long")
}
var mergedSig [96]byte
Expand Down Expand Up @@ -99,7 +100,7 @@ func (p *aggregationPoolImpl) GetAggregatationByRoot(root common.Hash) *solid.At
if att == nil {
return nil
}
return att.Clone().(*solid.Attestation)
return att.Copy()
}

func (p *aggregationPoolImpl) sweepStaleAtt(ctx context.Context) {
Expand Down
3 changes: 3 additions & 0 deletions cl/beacon/handler/duties_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func (a *ApiHandler) getSyncDuties(w http.ResponseWriter, r *http.Request) (*bea

// Now try reading the sync committee
syncCommittee, _, ok := a.forkchoiceStore.GetSyncCommittees(period)
if !ok {
_, syncCommittee, ok = a.forkchoiceStore.GetSyncCommittees(period - 1)
}
// Read them from the archive node if we do not have them in the fast-access storage
if !ok {
syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(tx, a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch))
Expand Down
3 changes: 3 additions & 0 deletions cl/beacon/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ApiHandler struct {
syncCommitteeMessagesService services.SyncCommitteeMessagesService
syncContributionAndProofsService services.SyncContributionService
aggregateAndProofsService services.AggregateAndProofService
attestationService services.AttestationService
}

func NewApiHandler(
Expand Down Expand Up @@ -110,6 +111,7 @@ func NewApiHandler(
syncCommitteeMessagesService services.SyncCommitteeMessagesService,
syncContributionAndProofs services.SyncContributionService,
aggregateAndProofs services.AggregateAndProofService,
attestationService services.AttestationService,
) *ApiHandler {
blobBundles, err := lru.New[common.Bytes48, BlobBundle]("blobs", maxBlobBundleCacheSize)
if err != nil {
Expand Down Expand Up @@ -146,6 +148,7 @@ func NewApiHandler(
syncCommitteeMessagesService: syncCommitteeMessagesService,
syncContributionAndProofsService: syncContributionAndProofs,
aggregateAndProofsService: aggregateAndProofs,
attestationService: attestationService,
}
}

Expand Down
7 changes: 2 additions & 5 deletions cl/beacon/handler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,11 @@ func (a *ApiHandler) GetEthV1NodeIdentity(w http.ResponseWriter, r *http.Request

func (a *ApiHandler) GetEthV1NodeSyncing(w http.ResponseWriter, r *http.Request) {
currentSlot := a.ethClock.GetCurrentSlot()
var syncDistance uint64
if a.syncedData.Syncing() {
syncDistance = currentSlot - a.syncedData.HeadSlot()
}

if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]interface{}{
"head_slot": strconv.FormatUint(a.syncedData.HeadSlot(), 10),
"sync_distance": strconv.FormatUint(syncDistance, 10),
"sync_distance": strconv.FormatUint(currentSlot-a.syncedData.HeadSlot(), 10),
"is_syncing": a.syncedData.Syncing(),
"is_optimistic": false, // needs to change
"el_offline": false,
Expand Down
76 changes: 44 additions & 32 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handler

import (
"bytes"
"encoding/json"
"errors"
"net/http"
Expand Down Expand Up @@ -64,44 +65,44 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
beaconhttp.NewEndpointError(http.StatusBadRequest, err).WriteTo(w)
return
}

headState := a.syncedData.HeadState()
if headState == nil {
beaconhttp.NewEndpointError(http.StatusServiceUnavailable, errors.New("head state not available")).WriteTo(w)
return
}
failures := []poolingFailure{}
for i, attestation := range req {
if err := a.forkchoiceStore.ValidateOnAttestation(attestation); err != nil {
failures = append(failures, poolingFailure{
Index: i,
Message: err.Error(),
})
continue
}
if err := a.committeeSub.CheckAggregateAttestation(attestation); err != nil {
var (
slot = attestation.AttestantionData().Slot()
cIndex = attestation.AttestantionData().CommitteeIndex()
committeeCountPerSlot = headState.CommitteeCount(slot / a.beaconChainCfg.SlotsPerEpoch)
subnet = subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount)
)
_ = i
if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestation); err != nil {
log.Warn("[Beacon REST] failed to process attestation", "err", err)
failures = append(failures, poolingFailure{
Index: i,
Message: err.Error(),
})
continue
}
if a.sentinel != nil {
// broadcast
var (
slot = attestation.AttestantionData().Slot()
cIndex = attestation.AttestantionData().CommitteeIndex()
committeeCountPerSlot = subnets.ComputeCommitteeCountPerSlot(a.syncedData.HeadState(), slot, a.beaconChainCfg.SlotsPerEpoch)
subnet = subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount)
)
encodedSSZ, err := attestation.EncodeSSZ(nil)
if err != nil {
beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
return
}
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBeaconAttestation(subnet),
SubnetId: &subnet,
}); err != nil {
beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
return
}
}
// if a.sentinel != nil {
// encodedSSZ, err := attestation.EncodeSSZ(nil)
// if err != nil {
// beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
// return
// }
// if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
// Data: encodedSSZ,
// Name: gossip.TopicNamePrefixBeaconAttestation,
// SubnetId: &subnet,
// }); err != nil {
// beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
// return
// }
// }
}
if len(failures) > 0 {
errResp := poolingError{
Expand Down Expand Up @@ -266,6 +267,7 @@ func (a *ApiHandler) PostEthV1ValidatorAggregatesAndProof(w http.ResponseWriter,
failures := []poolingFailure{}
for _, v := range req {
if err := a.aggregateAndProofsService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process bls-change", "err", err)
failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()})
continue
}
Expand All @@ -274,12 +276,14 @@ func (a *ApiHandler) PostEthV1ValidatorAggregatesAndProof(w http.ResponseWriter,
encodedSSZ, err := v.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Warn("[Beacon REST] failed to encode aggregate and proof", "err", err)
return
}
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBeaconAggregateAndProof,
}); err != nil {
log.Warn("[Beacon REST] failed to publish gossip", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -308,7 +312,8 @@ func (a *ApiHandler) PostEthV1BeaconPoolSyncCommittees(w http.ResponseWriter, r
continue
}
for _, subnet := range publishingSubnets {
if err := a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, v); err != nil && !errors.Is(err, services.ErrIgnore) {
if err = a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, v); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process attestation", "err", err)
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
break
}
Expand Down Expand Up @@ -354,8 +359,13 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri
return
}
failures := []poolingFailure{}
var err error
for idx, v := range msgs {
if err := a.syncContributionAndProofsService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {
if bytes.Equal(v.Message.Contribution.AggregationBits, make([]byte, len(v.Message.Contribution.AggregationBits))) {
continue // skip empty contributions
}
if err = a.syncContributionAndProofsService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process sync contribution", "err", err)
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
continue
}
Expand All @@ -364,12 +374,14 @@ func (a *ApiHandler) PostEthV1ValidatorContributionsAndProofs(w http.ResponseWri
encodedSSZ, err := v.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Warn("[Beacon REST] failed to encode sync contribution", "err", err)
return
}
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameSyncCommitteeContributionAndProof,
}); err != nil {
log.Warn("[Beacon REST] failed to publish gossip", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
6 changes: 4 additions & 2 deletions cl/beacon/handler/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,15 @@ func TestPoolSyncCommittees(t *testing.T) {
}

func TestPoolSyncContributionAndProofs(t *testing.T) {
aggrBits := make([]byte, cltypes.SyncCommitteeAggregationBitsSize)
aggrBits[0] = 1
msgs := []*cltypes.SignedContributionAndProof{
{
Message: &cltypes.ContributionAndProof{
Contribution: &cltypes.Contribution{
Slot: 1,
BeaconBlockRoot: libcommon.Hash{1, 2, 3, 4, 5, 6, 7, 8},
AggregationBits: make([]byte, cltypes.SyncCommitteeAggregationBitsSize),
AggregationBits: aggrBits,
},
},
},
Expand Down Expand Up @@ -332,6 +334,6 @@ func TestPoolSyncContributionAndProofs(t *testing.T) {
Slot: 1,
BeaconBlockRoot: libcommon.Hash{1, 2, 3, 4, 5, 6, 7, 8},
SubcommitteeIndex: 0,
AggregationBits: make([]byte, cltypes.SyncCommitteeAggregationBitsSize),
AggregationBits: aggrBits,
})
}
2 changes: 1 addition & 1 deletion cl/beacon/handler/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (a *ApiHandler) PostEthV1ValidatorSyncCommitteeSubscriptions(w http.Respons
// subscribe to subnets
for _, subnet := range syncnets {
if _, err := a.sentinel.SetSubscribeExpiry(r.Context(), &sentinel.RequestSubscribeExpiry{
Topic: fmt.Sprintf(gossip.TopicNamePrefixSyncCommittee, subnet),
Topic: gossip.TopicNameSyncCommittee(int(subnet)),
ExpiryUnixSecs: uint64(expiry.Unix()),
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
2 changes: 1 addition & 1 deletion cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
Events: true,
Validator: true,
Lighthouse: true,
}, nil, blobStorage, nil, vp, nil, nil, fcu.SyncContributionPool, nil, nil, syncCommitteeMessagesService, syncContributionService, aggregateAndProofsService) // TODO: add tests
}, nil, blobStorage, nil, vp, nil, nil, fcu.SyncContributionPool, nil, nil, syncCommitteeMessagesService, syncContributionService, aggregateAndProofsService, nil) // TODO: add tests
h.Init()
return
}
1 change: 1 addition & 0 deletions cl/beacon/handler/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (t *validatorTestSuite) SetupTest() {
nil,
nil,
nil,
nil,
)
t.gomockCtrl = gomockCtrl
}
Expand Down
1 change: 0 additions & 1 deletion cl/beacon/synced_data/synced_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func (s *SyncedDataManager) OnHeadState(newState *state.CachingBeaconState) (err
return err
}
s.headState.Store(st)
//fmt.Println(newState.CurrentSyncCommittee().GetCommittee())

return
}
Expand Down
2 changes: 1 addition & 1 deletion cl/clparams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ var CheckpointSyncEndpoints = map[NetworkType][]string{
},
SepoliaNetwork: {
//"https://beaconstate-sepolia.chainsafe.io/eth/v2/debug/beacon/states/finalized",
"https://sepolia.beaconstate.info/eth/v2/debug/beacon/states/finalized",
//"https://sepolia.beaconstate.info/eth/v2/debug/beacon/states/finalized",
"https://checkpoint-sync.sepolia.ethpandaops.io/eth/v2/debug/beacon/states/finalized",
},
GnosisNetwork: {
Expand Down
8 changes: 8 additions & 0 deletions cl/cltypes/solid/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ func (*Attestation) Static() bool {
return false
}

func (a *Attestation) Copy() *Attestation {
new := &Attestation{}
copy(new.staticBuffer[:], a.staticBuffer[:])
new.aggregationBitsBuffer = make([]byte, len(a.aggregationBitsBuffer))
copy(new.aggregationBitsBuffer, a.aggregationBitsBuffer)
return new
}

// NewAttestionFromParameters creates a new Attestation instance using provided parameters
func NewAttestionFromParameters(
aggregationBits []byte,
Expand Down
1 change: 0 additions & 1 deletion cl/phase1/forkchoice/fork_graph/fork_graph_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,6 @@ func (f *forkGraphDisk) Prune(pruneSlot uint64) (err error) {
f.currentJustifiedCheckpoints.Delete(root)
f.finalizedCheckpoints.Delete(root)
f.headers.Delete(root)
f.syncCommittees.Delete(root)
f.blockRewards.Delete(root)
f.fs.Remove(getBeaconStateFilename(root))
f.fs.Remove(getBeaconStateCacheFilename(root))
Expand Down
5 changes: 5 additions & 0 deletions cl/phase1/forkchoice/forkchoice_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ForkChoiceStorageMock struct {
LCUpdates map[uint64]*cltypes.LightClientUpdate
SyncContributionPool sync_contribution_pool.SyncContributionPool
Headers map[common.Hash]*cltypes.BeaconBlockHeader
GetBeaconCommitteeMock func(slot, committeeIndex uint64) ([]uint64, error)

Pool pool.OperationsPool
}
Expand All @@ -67,6 +68,7 @@ func NewForkChoiceStorageMock() *ForkChoiceStorageMock {
LCUpdates: make(map[uint64]*cltypes.LightClientUpdate),
SyncContributionPool: sync_contribution_pool.NewSyncContributionPoolMock(),
Headers: make(map[common.Hash]*cltypes.BeaconBlockHeader),
GetBeaconCommitteeMock: nil,
}
}

Expand Down Expand Up @@ -128,6 +130,9 @@ func (f *ForkChoiceStorageMock) GetSyncCommittees(period uint64) (*solid.SyncCom
}

func (f *ForkChoiceStorageMock) GetBeaconCommitee(slot, committeeIndex uint64) ([]uint64, error) {
if f.GetBeaconCommitteeMock != nil {
return f.GetBeaconCommitteeMock(slot, committeeIndex)
}
return []uint64{1, 2, 3, 4, 5, 6, 7, 8}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l
return err
}
if _, err := g.sentinel.PublishGossip(ctx, data); err != nil {
log.Debug("failed publish gossip", "err", err)
log.Warn("failed publish gossip", "err", err)
}
return nil
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
return g.syncCommitteeMessagesService.ProcessMessage(ctx, data.SubnetId, msg)
case gossip.IsTopicBeaconAttestation(data.Name):
att := &solid.Attestation{}
if err := att.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
if err := att.DecodeSSZ(data.Data, int(version)); err != nil {
return err
}
return g.attestationService.ProcessMessage(ctx, data.SubnetId, att)
Expand Down
3 changes: 3 additions & 0 deletions cl/phase1/network/services/aggregate_and_proof_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(ctx context.Context, subne

// Add to aggregation pool
if err := a.aggregationPool.AddAttestation(aggregateAndProof.Message.Aggregate); err != nil {
if errors.Is(err, aggregation.ErrIsSuperset) {
return ErrIgnore
}
return errors.WithMessagef(err, "failed to add attestation to pool")
}

Expand Down
Loading

0 comments on commit 596d54d

Please sign in to comment.