Skip to content

Commit

Permalink
subscription done
Browse files Browse the repository at this point in the history
  • Loading branch information
domiwei committed Mar 22, 2024
1 parent 684b6b5 commit 91c56aa
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 115 deletions.
231 changes: 123 additions & 108 deletions cl/attestation/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,29 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/Giulio2002/bls"
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/gossip"
state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/log/v3"
)

type Attestation struct {
indiciesDB kv.RoDB
beaconConfig *clparams.BeaconChainConfig
netConfig *clparams.NetworkConfig
sentinel sentinel.SentinelClient
indiciesDB kv.RoDB
genesisConfig *clparams.GenesisConfig
beaconConfig *clparams.BeaconChainConfig
netConfig *clparams.NetworkConfig
sentinel sentinel.SentinelClient
// subscriptions
//subnetAttMutex sync.Mutex
//subnets map[uint64]*subnetSubscription // map from subnet id to subscription list
aggregationMutex sync.RWMutex
aggregations map[string]*aggregateData // map from slot:committeeIndex to aggregate data
aggregations map[string]*aggregationData // map from slot:committeeIndex to aggregate data
validatorSubsMutex sync.RWMutex
validatorSubs map[uint64][]*validatorSub // map from validator index to subscription details
}
Expand All @@ -34,38 +36,31 @@ func NewAttestation(
indiciesDB kv.RoDB,
beaconConfig *clparams.BeaconChainConfig,
netConfig *clparams.NetworkConfig,
genesisConfig *clparams.GenesisConfig,
sentinel sentinel.SentinelClient,
) *Attestation {
return &Attestation{
indiciesDB: indiciesDB,
beaconConfig: beaconConfig,
netConfig: netConfig,
sentinel: sentinel,
a := &Attestation{
indiciesDB: indiciesDB,
beaconConfig: beaconConfig,
netConfig: netConfig,
genesisConfig: genesisConfig,
sentinel: sentinel,
//subnets: make(map[uint64]*subnetSubscription),
aggregations: make(map[string]*aggregateData),
aggregations: make(map[string]*aggregationData),
validatorSubs: make(map[uint64][]*validatorSub),
}
a.sweepByStaleSlots(ctx)
return a
}

type aggregateData struct {
type aggregationData struct {
subnetId uint64
slot uint64
committeeIndex uint64
internalLock sync.RWMutex
signature []byte
bits []byte
}

/*
type subnetSubscription struct {
subnetId uint64
slot uint64
subscribers map[uint64]validator // map from validator index to subscription details
needAggregate bool
aggregationSignature []byte
aggregationBits []byte
}
*/
type validatorSub struct {
subnetId uint64
slot uint64
Expand All @@ -76,7 +71,7 @@ func toAggregationId(slot, committeeIndex uint64) string {
return fmt.Sprintf("%d:%d", slot, committeeIndex)
}

func (a *Attestation) AddAttestationSubscription(p *cltypes.BeaconCommitteeSubscription) error {
func (a *Attestation) AddAttestationSubscription(ctx context.Context, p *cltypes.BeaconCommitteeSubscription) error {
subnetId, err := a.computeSubnetId(p.Slot, p.CommitteeIndex)
if err != nil {
return err
Expand All @@ -86,19 +81,20 @@ func (a *Attestation) AddAttestationSubscription(p *cltypes.BeaconCommitteeSubsc
if _, exist := a.validatorSubs[p.ValidatorIndex]; !exist {
a.validatorSubs[p.ValidatorIndex] = make([]*validatorSub, 0)
}
a.validatorSubs[p.ValidatorIndex] = append(a.validatorSubs[p.ValidatorIndex], &validatorSub{
subnetId: subnetId,
slot: p.Slot,
committeeIndex: p.CommitteeIndex,
})
a.validatorSubs[p.ValidatorIndex] = append(a.validatorSubs[p.ValidatorIndex],
&validatorSub{
subnetId: subnetId,
slot: p.Slot,
committeeIndex: p.CommitteeIndex,
})
a.validatorSubsMutex.Unlock()

// 2. if aggregator, add to aggregation collection
if p.IsAggregator {
a.aggregationMutex.Lock()
aggrId := toAggregationId(p.Slot, p.CommitteeIndex)
if _, exist := a.aggregations[aggrId]; !exist {
a.aggregations[aggrId] = &aggregateData{
a.aggregations[aggrId] = &aggregationData{
subnetId: subnetId,
slot: p.Slot,
committeeIndex: p.CommitteeIndex,
Expand All @@ -110,95 +106,114 @@ func (a *Attestation) AddAttestationSubscription(p *cltypes.BeaconCommitteeSubsc
}

// 3. set sentinel gossip expiration by subnet id

/*
a.subnetAttMutex.Lock()
defer a.subnetAttMutex.Unlock()
// add subscription to attestationSubscriptions
curSubnet, exist := a.subnets[subnetId]
if !exist {
// a new subnet
a.subnets[subnetId] = &subnetSubscription{
subnetId: subnetId,
subscribers: make(map[uint64]validator),
needAggregate: false,
aggregationSignature: nil,
aggregationBits: make([]byte, a.beaconConfig.MaxValidatorsPerCommittee/8),
}
}
curSubnet.subscribers[p.ValidatorIndex] = validator{
// todo: might need to consider expiration
expiry: time.Now().Add(7 * 24 * time.Hour),
}
// todo: a.sentinel.SetGossipExpiration()
if p.IsAggregator {
curSubnet.needAggregate = true
}
return nil
*/
}

func (a *Attestation) OnReceiveAttestation(att *solid.Attestation) error {
// compute subnet id
slot := att.AttestantionData().Slot()
committeeIndex := att.AttestantionData().CommitteeIndex()
subnetId, err := a.computeSubnetId(slot, committeeIndex)
if err != nil {
log.Error("computeSubnetId failed", "err", err)
request := sentinel.RequestSubscribeExpiry{
Topic: gossip.TopicNameBeaconAttestation(subnetId),
ExpiryUnixSecs: uint64(time.Now().Add(24 * time.Hour).Unix()), // temporarily set to 24 hours
}
if _, err := a.sentinel.SetSubscribeExpiry(ctx, &request); err != nil {
return err
}
return nil
}

a.subnetAttMutex.Lock()
defer a.subnetAttMutex.Unlock()
curSubnet, exist := a.subnets[subnetId]
func (a *Attestation) OnReceiveAttestation(att *solid.Attestation) error {
var (
slot = att.AttestantionData().Slot()
committeeIndex = att.AttestantionData().CommitteeIndex()
sig = att.Signature()
bits = att.AggregationBits()
)
aggrId := toAggregationId(slot, committeeIndex)
a.aggregationMutex.Lock()
defer a.aggregationMutex.Unlock()
aggrData, exist := a.aggregations[aggrId]
if !exist {
// no one is interested in this subnet
// no one is interested in this aggregation
return nil
}
bitGroupIdx := -1
// check if already have aggregation signature associated with the bit. if not, add it
for i := 0; i < len(bits); i++ {
if bits[i] == 0 {
continue
} else if bits[i]|aggrData.bits[i] == aggrData.bits[i] {
// already have this bit, skip current attestation
return nil
} else {
// get a new bit
bitGroupIdx = i
break
}
}
if bitGroupIdx == -1 {
// weird case. all bits are 0
log.Warn("all bits are 0")
return nil
}
// aggregate
sigBytes := make([]byte, 96)
copy(sigBytes, sig[:])
if aggrData != nil {
aggrSig, err := bls.AggregateSignatures([][]byte{
aggrData.signature,
sigBytes,
})
if err != nil {
log.Error("aggregate signature failed", "err", err)
return err
}
aggrData.signature = aggrSig
} else {
aggrData.signature = sigBytes
}
// update aggregation bits
aggrData.bits[bitGroupIdx] |= bits[bitGroupIdx]
return nil
}

if curSubnet.needAggregate {
sig := att.Signature()
bits := att.AggregationBits()
bitGroupIdx := -1
// check if already have aggregation signature associated with the bit. if not, add it
for i := 0; i < len(bits); i++ {
if bits[i] == 0 {
continue
} else if bits[i]|curSubnet.aggregationBits[i] == curSubnet.aggregationBits[i] {
// already have this bit, skip current attestation
return nil
func (a *Attestation) sweepByStaleSlots(ctx context.Context) {
// sweep subscriptions if slot is older than current slot
sweepValidatorSubscriptions := func(curSlot uint64) {
a.validatorSubsMutex.Lock()
defer a.validatorSubsMutex.Unlock()
for idx, subs := range a.validatorSubs {
liveSubs := make([]*validatorSub, 0)
for i := 0; i < len(subs); i++ {
if curSlot <= subs[i].slot {
// keep this subscription
liveSubs = append(liveSubs, subs[i])
}
}
if len(liveSubs) == 0 {
delete(a.validatorSubs, idx)
} else {
// get a new bit
bitGroupIdx = i
break
a.validatorSubs[idx] = liveSubs
}
}
if bitGroupIdx == -1 {
// weird case. all bits are 0
log.Warn("all bits are 0")
return nil
}

// aggregate
sigBytes := make([]byte, 96)
copy(sigBytes, sig[:])
signatures := [][]byte{sigBytes}
if curSubnet.aggregationSignature != nil {
signatures = append(signatures, curSubnet.aggregationSignature)
aggrSig, err := bls.AggregateSignatures(signatures)
if err != nil {
log.Error("aggregate signature failed", "err", err)
return err
}
// sweep aggregations if slot is older than current slot
sweepAggregations := func(curSlot uint64) {
a.aggregationMutex.Lock()
defer a.aggregationMutex.Unlock()
for id, aggrData := range a.aggregations {
if curSlot > aggrData.slot {
delete(a.aggregations, id)
}
curSubnet.aggregationSignature = aggrSig
} else {
curSubnet.aggregationSignature = sigBytes
}
// update aggregation bits
curSubnet.aggregationBits[bitGroupIdx] |= bits[bitGroupIdx]
}
return nil
// sweep every minute
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
curSlot := utils.GetCurrentSlot(a.genesisConfig.GenesisTime, a.beaconConfig.SecondsPerSlot)
sweepValidatorSubscriptions(curSlot)
sweepAggregations(curSlot)
}
}
}

func (a *Attestation) computeSubnetId(slot uint64, committeeIndex uint64) (uint64, error) {
Expand Down
2 changes: 1 addition & 1 deletion cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
Events: true,
Validator: true,
Lighthouse: true,
}, nil, blobStorage, nil, vp, nil)
}, nil, blobStorage, nil, vp, nil, nil)
h.Init()
return
}
8 changes: 2 additions & 6 deletions cl/beacon/handler/validators.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handler

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -665,17 +666,12 @@ func (a *ApiHandler) postBeaconCommitteeSubscriptions(w http.ResponseWriter, r *
return
}
for _, sub := range req {
if err := a.attestation.AddAttestationSubscription(sub); err != nil {
if err := a.attestation.AddAttestationSubscription(context.Background(), sub); err != nil {
log.Error("failed to add attestation subscription", "err", err)
// todo: more specific error
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

/*if err := a.forkchoiceStore.OnValidatorCommitteeSubscriptions(req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}*/
w.WriteHeader(http.StatusOK)
}
4 changes: 4 additions & 0 deletions cl/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ func IsTopicBlobSidecar(d string) bool {
func IsTopicBeaconAttestation(d string) bool {
return strings.HasPrefix(d, "beacon_attestation_")
}

func TopicNameBeaconAttestation(d uint64) string {
return fmt.Sprintf(TopicNamePrefixBeaconAttestation, d)
}

0 comments on commit 91c56aa

Please sign in to comment.