Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Data Column Gossip Handlers #13894

Merged
merged 7 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"receive_attestation.go",
"receive_blob.go",
"receive_block.go",
"receive_sidecar.go",
"service.go",
"tracked_proposer.go",
"weak_subjectivity_checks.go",
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type BlobReceiver interface {
ReceiveBlob(context.Context, blocks.VerifiedROBlob) error
}

// DataColumnReceiver interface defines the methods of chain service for receiving new
// data columns
type DataColumnReceiver interface {
ReceiveDataColumn(context.Context, *ethpb.DataColumnSidecar) error
}

// SlashingReceiver interface defines the methods of chain service for receiving validated slashing over the wire.
type SlashingReceiver interface {
ReceiveAttesterSlashing(ctx context.Context, slashings *ethpb.AttesterSlashing)
Expand Down
12 changes: 12 additions & 0 deletions beacon-chain/blockchain/receive_sidecar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package blockchain

import (
"context"

ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)

func (s *Service) ReceiveDataColumn(ctx context.Context, ds *ethpb.DataColumnSidecar) error {
// TODO
return nil
}
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ func (c *ChainService) ReceiveBlob(_ context.Context, b blocks.VerifiedROBlob) e
return nil
}

// ReceiveDataColumn implements the same method in chain service
func (c *ChainService) ReceiveDataColumn(_ context.Context, _ *ethpb.DataColumnSidecar) error {
return nil
}

// TargetRootForEpoch mocks the same method in the chain service
func (c *ChainService) TargetRootForEpoch(_ [32]byte, _ primitives.Epoch) ([32]byte, error) {
return c.TargetRoot, nil
Expand Down
18 changes: 18 additions & 0 deletions beacon-chain/core/blocks/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ func VerifyBlockHeaderSignature(beaconState state.BeaconState, header *ethpb.Sig
return signing.VerifyBlockHeaderSigningRoot(header.Header, proposerPubKey, header.Signature, domain)
}

func VerifyBlockHeaderSignatureUsingCurrentFork(beaconState state.BeaconState, header *ethpb.SignedBeaconBlockHeader) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation is missing.

currentEpoch := slots.ToEpoch(header.Header.Slot)
fork, err := forks.Fork(currentEpoch)
if err != nil {
return err
}
domain, err := signing.Domain(fork, currentEpoch, params.BeaconConfig().DomainBeaconProposer, beaconState.GenesisValidatorsRoot())
if err != nil {
return err
}
proposer, err := beaconState.ValidatorAtIndex(header.Header.ProposerIndex)
if err != nil {
return err
}
proposerPubKey := proposer.PublicKey
return signing.VerifyBlockHeaderSigningRoot(header.Header, proposerPubKey, header.Signature, domain)
}

// VerifyBlockSignatureUsingCurrentFork verifies the proposer signature of a beacon block. This differs
// from the above method by not using fork data from the state and instead retrieving it
// via the respective epoch.
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/core/feed/operation/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (

// AttesterSlashingReceived is sent after an attester slashing is received from gossip or rpc
AttesterSlashingReceived = 8

// DataColumnSidecarReceived is sent after a data column sidecar is received from gossip or rpc.
DataColumnSidecarReceived = 9
)

// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
Expand Down Expand Up @@ -77,3 +80,7 @@ type ProposerSlashingReceivedData struct {
type AttesterSlashingReceivedData struct {
AttesterSlashing *ethpb.AttesterSlashing
}

type DataColumnSidecarReceivedData struct {
DataColumn *ethpb.DataColumnSidecar
}
2 changes: 1 addition & 1 deletion beacon-chain/p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestHostIsResolved(t *testing.T) {
// As defined in RFC 2606 , example.org is a
// reserved example domain name.
exampleHost := "example.org"
exampleIP := "93.184.216.34"
exampleIP := "93.184.215.14"

s := &Service{
cfg: &Config{
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/gossip_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var gossipTopicMappings = map[string]proto.Message{
SyncCommitteeSubnetTopicFormat: &ethpb.SyncCommitteeMessage{},
BlsToExecutionChangeSubnetTopicFormat: &ethpb.SignedBLSToExecutionChange{},
BlobSubnetTopicFormat: &ethpb.BlobSidecar{},
DataColumnSubnetTopicFormat: &ethpb.DataColumnSidecar{},
}

// GossipTopicMappings is a function to return the assigned data type
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/pubsub_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestService_CanSubscribe(t *testing.T) {
formatting := []interface{}{digest}

// Special case for attestation subnets which have a second formatting placeholder.
if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat {
if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat || topic == DataColumnSubnetTopicFormat {
formatting = append(formatting, 0 /* some subnet ID */)
}

Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/p2p/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
GossipBlsToExecutionChangeMessage = "bls_to_execution_change"
// GossipBlobSidecarMessage is the name for the blob sidecar message type.
GossipBlobSidecarMessage = "blob_sidecar"
// GossipDataColumnSidecarMessage is the name for the data column sidecar message type.
GossipDataColumnSidecarMessage = "data_column_sidecar"

// Topic Formats
//
// AttestationSubnetTopicFormat is the topic format for the attestation subnet.
Expand All @@ -52,4 +55,6 @@ const (
BlsToExecutionChangeSubnetTopicFormat = GossipProtocolAndDigest + GossipBlsToExecutionChangeMessage
// BlobSubnetTopicFormat is the topic format for the blob subnet.
BlobSubnetTopicFormat = GossipProtocolAndDigest + GossipBlobSidecarMessage + "_%d"
// DataColumnSubnetTopicFormat is the topic format for the data column subnet.
DataColumnSubnetTopicFormat = GossipProtocolAndDigest + GossipDataColumnSidecarMessage + "_%d"
)
2 changes: 2 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"subscriber_beacon_blocks.go",
"subscriber_blob_sidecar.go",
"subscriber_bls_to_execution_change.go",
"subscriber_data_column_sidecar.go",
"subscriber_handlers.go",
"subscriber_sync_committee_message.go",
"subscriber_sync_contribution_proof.go",
Expand All @@ -47,6 +48,7 @@ go_library(
"validate_beacon_blocks.go",
"validate_blob.go",
"validate_bls_to_execution_change.go",
"validate_data_column_sidecar.go",
"validate_proposer_slashing.go",
"validate_sync_committee_message.go",
"validate_sync_contribution_proof.go",
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/decode_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err
topic = p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
topic = p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.BlobSidecar{})]
case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage):
topic = p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.DataColumnSidecar{})]
}

base := p2p.GossipTopicMappings(topic, 0)
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type config struct {
type blockchainService interface {
blockchain.BlockReceiver
blockchain.BlobReceiver
blockchain.DataColumnReceiver
blockchain.HeadFetcher
blockchain.FinalizationFetcher
blockchain.ForkFetcher
Expand Down
125 changes: 115 additions & 10 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,32 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {

// New Gossip Topic in Deneb
if epoch >= params.BeaconConfig().DenebForkEpoch {
s.subscribeStaticWithSubnets(
p2p.BlobSubnetTopicFormat,
s.validateBlob, /* validator */
s.blobSubscriber, /* message handler */
digest,
params.BeaconConfig().BlobsidecarSubnetCount,
)
}
if features.Get().EnablePeerDAS {
// TODO: Subscribe to persistent column subnets here
if features.Get().EnablePeerDAS {
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSubnets(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn, /* validator */
s.dataColumnSubscriber, /* message handler */
digest,
params.BeaconConfig().DataColumnSidecarSubnetCount,
)
} else {
s.subscribeDynamicWithColumnSubnets(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn, /* validator */
s.dataColumnSubscriber, /* message handler */
digest,
)
}
} else {
s.subscribeStaticWithSubnets(
p2p.BlobSubnetTopicFormat,
s.validateBlob, /* validator */
s.blobSubscriber, /* message handler */
digest,
params.BeaconConfig().BlobsidecarSubnetCount,
)
}
}
}

Expand Down Expand Up @@ -649,6 +665,87 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
}()
}

// subscribe missing subnets for our persistent columns.
func (s *Service) subscribeColumnSubnet(
subscriptions map[uint64]*pubsub.Subscription,
idx uint64,
digest [4]byte,
validate wrappedVal,
handle subHandler,
) {
// do not subscribe if we have no peers in the same
// subnet
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.DataColumnSidecar{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
// check if subscription exists and if not subscribe the relevant subnet.
if _, exists := subscriptions[idx]; !exists {
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
}
if !s.validPeersExist(subnetTopic) {
log.Debugf("No peers found subscribed to column gossip subnet with "+
"column index %d. Searching network for peers subscribed to the subnet.", idx)
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
}

func (s *Service) subscribeDynamicWithColumnSubnets(
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
if err != nil {
panic(err)
}
base := p2p.GossipTopicMappings(topicFormat, e)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().DataColumnSidecarSubnetCount)
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)

go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Column subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
ticker.Done()
return
}

wantedSubs := s.retrieveActiveColumnSubnets()
// Resize as appropriate.
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)

// subscribe desired column subnets.
for _, idx := range wantedSubs {
s.subscribeColumnSubnet(subscriptions, idx, digest, validate, handle)
}
}
}
}()
}

// lookup peers for attester specific subnets.
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]
Expand Down Expand Up @@ -700,6 +797,14 @@ func (*Service) retrieveActiveSyncSubnets(currEpoch primitives.Epoch) []uint64 {
return slice.SetUint64(subs)
}

func (*Service) retrieveActiveColumnSubnets() []uint64 {
subs, ok, _ := cache.ColumnSubnetIDs.GetColumnSubnets()
if !ok {
return nil
}
return subs
}

// filters out required peers for the node to function, not
// pruning peers who are in our attestation subnets.
func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
Expand Down
34 changes: 34 additions & 0 deletions beacon-chain/sync/subscriber_data_column_sidecar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sync

import (
"context"
"fmt"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"google.golang.org/protobuf/proto"
)

func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) error {
b, ok := msg.(*ethpb.DataColumnSidecar)
if !ok {
return fmt.Errorf("message was not type DataColumnSidecar, type=%T", msg)
}

// TODO:Change to new one for data columns
s.setSeenBlobIndex(b.SignedBlockHeader.Header.Slot, b.SignedBlockHeader.Header.ProposerIndex, b.ColumnIndex)

if err := s.cfg.chain.ReceiveDataColumn(ctx, b); err != nil {
return err
}

s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.DataColumnSidecarReceived,
Data: &opfeed.DataColumnSidecarReceivedData{
DataColumn: b,
},
})

return nil
}
Loading
Loading