diff --git a/cl/aggregation/pool.go b/cl/aggregation/pool.go index bb73a810423..562bd8643c9 100644 --- a/cl/aggregation/pool.go +++ b/cl/aggregation/pool.go @@ -7,7 +7,7 @@ import ( //go:generate mockgen -destination=./mock_services/aggregation_pool_mock.go -package=mock_services . AggregationPool type AggregationPool interface { + // AddAttestation adds a single attestation to the pool. AddAttestation(att *solid.Attestation) error - //GetAggregatations(slot uint64, committeeIndex uint64) ([]*solid.Attestation, error) GetAggregatationByRoot(root common.Hash) *solid.Attestation } diff --git a/cl/aggregation/pool_impl.go b/cl/aggregation/pool_impl.go index aa3c97fecb7..2eff66ffa5c 100644 --- a/cl/aggregation/pool_impl.go +++ b/cl/aggregation/pool_impl.go @@ -47,6 +47,11 @@ func NewAggregationPool( } func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error { + // check if it's single attestation + if utils.BitsOnCount(inAtt.AggregationBits()) != 1 { + return fmt.Errorf("exactly one aggregation bit should be set") + } + // use hash of attestation data as key hashRoot, err := inAtt.AttestantionData().HashSSZ() if err != nil { @@ -62,6 +67,7 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error { } if utils.IsNonStrictSupersetBitlist(att.AggregationBits(), inAtt.AggregationBits()) { + // the on bit is already set, so ignore return ErrIsSuperset } diff --git a/cl/aggregation/pool_test.go b/cl/aggregation/pool_test.go index 82b0b42f3b5..79ae9cf410f 100644 --- a/cl/aggregation/pool_test.go +++ b/cl/aggregation/pool_test.go @@ -22,17 +22,17 @@ var ( [96]byte{'a', 'b', 'c', 'd', 'e', 'f'}, ) att1_2 = solid.NewAttestionFromParameters( - []byte{0b00001011, 0, 0, 0}, + []byte{0b00000001, 0, 0, 0}, attData1, [96]byte{'d', 'e', 'f', 'g', 'h', 'i'}, ) att1_3 = solid.NewAttestionFromParameters( - []byte{0b00000100, 0b00000011, 0, 0}, + []byte{0b00000100, 0, 0, 0}, attData1, [96]byte{'g', 'h', 'i', 'j', 'k', 'l'}, ) att1_4 = solid.NewAttestionFromParameters( - []byte{0b00111010, 0, 0, 0}, + []byte{0b00100000, 0, 0, 0}, attData1, [96]byte{'m', 'n', 'o', 'p', 'q', 'r'}, ) @@ -99,7 +99,7 @@ func (t *PoolTestSuite) TestAddAttestation() { }, hashRoot: attData1Root, expect: solid.NewAttestionFromParameters( - []byte{0b00111111, 0b00000011, 0, 0}, // merge of att1_2, att1_3 and att1_4 + []byte{0b00100101, 0, 0, 0}, // merge of att1_2, att1_3 and att1_4 attData1, mockAggrResult), }, diff --git a/cl/phase1/network/services/aggregate_and_proof_service.go b/cl/phase1/network/services/aggregate_and_proof_service.go index 7ab8f3981cc..83e518d9e5c 100644 --- a/cl/phase1/network/services/aggregate_and_proof_service.go +++ b/cl/phase1/network/services/aggregate_and_proof_service.go @@ -9,9 +9,7 @@ import ( "github.com/Giulio2002/bls" "github.com/ledgerwatch/log/v3" - "github.com/pkg/errors" - "github.com/ledgerwatch/erigon/cl/aggregation" "github.com/ledgerwatch/erigon/cl/beacon/synced_data" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" @@ -19,6 +17,7 @@ import ( "github.com/ledgerwatch/erigon/cl/merkle_tree" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" + "github.com/ledgerwatch/erigon/cl/pool" "github.com/ledgerwatch/erigon/cl/utils" ) @@ -31,19 +30,19 @@ type aggregateAndProofServiceImpl struct { syncedDataManager *synced_data.SyncedDataManager forkchoiceStore forkchoice.ForkChoiceStorage beaconCfg *clparams.BeaconChainConfig - aggregationPool aggregation.AggregationPool + opPool pool.OperationsPool test bool // set of aggregates that are scheduled for later processing aggregatesScheduledForLaterExecution sync.Map } -func NewAggregateAndProofService(ctx context.Context, syncedDataManager *synced_data.SyncedDataManager, forkchoiceStore forkchoice.ForkChoiceStorage, beaconCfg *clparams.BeaconChainConfig, aggregationPool aggregation.AggregationPool, test bool) AggregateAndProofService { +func NewAggregateAndProofService(ctx context.Context, syncedDataManager *synced_data.SyncedDataManager, forkchoiceStore forkchoice.ForkChoiceStorage, beaconCfg *clparams.BeaconChainConfig, opPool pool.OperationsPool, test bool) AggregateAndProofService { a := &aggregateAndProofServiceImpl{ syncedDataManager: syncedDataManager, forkchoiceStore: forkchoiceStore, beaconCfg: beaconCfg, - aggregationPool: aggregationPool, + opPool: opPool, test: test, } go a.loop(ctx) @@ -119,12 +118,7 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(ctx context.Context, subne } // Add to aggregation pool - if err := a.aggregationPool.AddAttestation(aggregateAndProof.Message.Aggregate); err != nil { - if errors.Is(err, aggregation.ErrIsSuperset) { - return ErrIgnore - } - return errors.WithMessagef(err, "failed to add attestation to pool") - } + a.opPool.AttestationsPool.Insert(aggregateAndProof.Message.Aggregate.Signature(), aggregateAndProof.Message.Aggregate) return nil } diff --git a/cl/phase1/network/services/aggregate_and_proof_service_test.go b/cl/phase1/network/services/aggregate_and_proof_service_test.go index 1b0648f1237..e6ea4772ba1 100644 --- a/cl/phase1/network/services/aggregate_and_proof_service_test.go +++ b/cl/phase1/network/services/aggregate_and_proof_service_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/ledgerwatch/erigon-lib/common" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/cl/antiquary/tests" "github.com/ledgerwatch/erigon/cl/beacon/synced_data" "github.com/ledgerwatch/erigon/cl/clparams" @@ -12,6 +13,7 @@ import ( "github.com/ledgerwatch/erigon/cl/cltypes/solid" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" + "github.com/ledgerwatch/erigon/cl/pool" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" ) @@ -44,7 +46,9 @@ func setupAggregateAndProofTest(t *testing.T) (AggregateAndProofService, *synced cfg := &clparams.MainnetBeaconConfig syncedDataManager := synced_data.NewSyncedDataManager(true, cfg) forkchoiceMock := forkchoice.NewForkChoiceStorageMock(t) - blockService := NewAggregateAndProofService(ctx, syncedDataManager, forkchoiceMock, cfg, nil, true) + p := pool.OperationsPool{} + p.AttestationsPool = pool.NewOperationPool[libcommon.Bytes96, *solid.Attestation](100, "test") + blockService := NewAggregateAndProofService(ctx, syncedDataManager, forkchoiceMock, cfg, p, true) return blockService, syncedDataManager, forkchoiceMock } diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index aeb3e203b79..d77dd7fc8f5 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -183,7 +183,7 @@ func RunCaplinPhase1(ctx context.Context, engine execution_client.ExecutionEngin syncCommitteeMessagesService := services.NewSyncCommitteeMessagesService(beaconConfig, ethClock, syncedDataManager, syncContributionPool, false) attestationService := services.NewAttestationService(forkChoice, committeeSub, ethClock, syncedDataManager, beaconConfig, networkConfig) syncContributionService := services.NewSyncContributionService(syncedDataManager, beaconConfig, syncContributionPool, ethClock, emitters, false) - aggregateAndProofService := services.NewAggregateAndProofService(ctx, syncedDataManager, forkChoice, beaconConfig, aggregationPool, false) + aggregateAndProofService := services.NewAggregateAndProofService(ctx, syncedDataManager, forkChoice, beaconConfig, pool, false) voluntaryExitService := services.NewVoluntaryExitService(pool, emitters, syncedDataManager, beaconConfig, ethClock) blsToExecutionChangeService := services.NewBLSToExecutionChangeService(pool, emitters, syncedDataManager, beaconConfig) proposerSlashingService := services.NewProposerSlashingService(pool, syncedDataManager, beaconConfig, ethClock)