Skip to content

Commit

Permalink
Fix Initial Sync with 128 data columns subnets (#14403)
Browse files Browse the repository at this point in the history
* `pingPeers`: Add log with new ENR when modified.

* `p2p Start`: Use idiomatic go error syntax.

* P2P `start`: Fix error message.

* Use not bootnodes at all if the `--chain-config-file` flag is used and no `--bootstrap-node` flag is used.

Before this commit, if the  `--chain-config-file` flag is used and no `--bootstrap-node` flag is used, then bootnodes are (incorrectly) defaulted on `mainnet` ones.

* `validPeersExist`: Centralize logs.

* `AddConnectionHandler`: Improve logging.

"Peer connected" does not really reflect the fact that a new peer is actually connected. --> "New peer connection" is more clear.

Also, instead of writing `0`, `1`or `2` for direction, now it's writted "Unknown", "Inbound", "Outbound".

* Logging: Add 2 decimals for timestamt in text and JSON logs.

* Improve "no valid peers" logging.

* Improve "Some columns have no peers responsible for custody" logging.

* `pubsubSubscriptionRequestLimit`: Increase to be consistent with data columns.

* `sendPingRequest`: Improve logging.

* `FindPeersWithSubnet`: Regularly recheck in our current set of peers if we have enough peers for this topic.

Before this commit, new peers HAD to be found, even if current peers are eventually acceptable.
For very small network, it used to lead to infinite search.

* `subscribeDynamicWithSyncSubnets`: Use exactly the same subscription function initially and every slot.

* Make deepsource happier.

* Nishant's commend: Change peer disconnected log.

* NIshant's comment: Change `Too many incoming subscription` log from error to debug.

* `FindPeersWithSubnet`: Address Nishant's comment.

* `batchSize`: Address Nishant's comment.

* `pingPeers` ==> `pingPeersAndLogEnr`.

* Update beacon-chain/sync/subscriber.go

Co-authored-by: Nishant Das <nishdas93@gmail.com>

---------

Co-authored-by: Nishant Das <nishdas93@gmail.com>
  • Loading branch information
nalepae and nisdas committed Nov 25, 2024
1 parent ba059d2 commit 82498fa
Show file tree
Hide file tree
Showing 12 changed files with 348 additions and 143 deletions.
6 changes: 3 additions & 3 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (s *Service) RefreshPersistentSubnets() {
s.updateSubnetRecordWithMetadata(bitV)

// Ping all peers.
s.pingPeers()
s.pingPeersAndLogEnr()

return
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func (s *Service) RefreshPersistentSubnets() {
s.updateSubnetRecordWithMetadataV2(bitV, bitS)

// Ping all peers to inform them of new metadata
s.pingPeers()
s.pingPeersAndLogEnr()

return
}
Expand Down Expand Up @@ -272,7 +272,7 @@ func (s *Service) RefreshPersistentSubnets() {
s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodySubnetCount)

// Ping all peers.
s.pingPeers()
s.pingPeersAndLogEnr()
}

// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
Expand Down
11 changes: 8 additions & 3 deletions beacon-chain/p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
disconnectFromPeer()
return
}

validPeerConnection := func() {
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
// Go through the handshake process.
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction,
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Peer connected")
}).Debug("New peer connection")
}

// Do not perform handshake on inbound dials.
Expand Down Expand Up @@ -173,7 +174,11 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
// Only log disconnections if we were fully connected.
if priorState == peers.PeerConnected {
log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected")
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Peer disconnected")
}
}()
},
Expand Down
28 changes: 23 additions & 5 deletions beacon-chain/p2p/pubsub_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/network/forks"
"github.com/sirupsen/logrus"
)

var _ pubsub.SubscriptionFilter = (*Service)(nil)

// It is set at this limit to handle the possibility
// of double topic subscriptions at fork boundaries.
// -> 64 Attestation Subnets * 2.
// -> 4 Sync Committee Subnets * 2.
// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2.
const pubsubSubscriptionRequestLimit = 200
// -> BeaconBlock * 2 = 2
// -> BeaconAggregateAndProof * 2 = 2
// -> VoluntaryExit * 2 = 2
// -> ProposerSlashing * 2 = 2
// -> AttesterSlashing * 2 = 2
// -> 64 Beacon Attestation * 2 = 128
// -> SyncContributionAndProof * 2 = 2
// -> 4 SyncCommitteeSubnets * 2 = 8
// -> BlsToExecutionChange * 2 = 2
// -> 128 DataColumnSidecar * 2 = 256
// -------------------------------------
// TOTAL = 406
// (Note: BlobSidecar is not included in this list since it is superseded by DataColumnSidecar)
const pubsubSubscriptionRequestLimit = 500

// CanSubscribe returns true if the topic is of interest and we could subscribe to it.
func (s *Service) CanSubscribe(topic string) bool {
Expand Down Expand Up @@ -95,8 +106,15 @@ func (s *Service) CanSubscribe(topic string) bool {
// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
// This method returns only the topics of interest and may return an error if the subscription
// request contains too many topics.
func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) {
func (s *Service) FilterIncomingSubscriptions(peerID peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) {
if len(subs) > pubsubSubscriptionRequestLimit {
subsCount := len(subs)
log.WithFields(logrus.Fields{
"peerID": peerID,
"subscriptionCounts": subsCount,
"subscriptionLimit": pubsubSubscriptionRequestLimit,
}).Debug("Too many incoming subscriptions, filtering them")

return nil, pubsub.ErrTooManySubscriptions
}

Expand Down
14 changes: 10 additions & 4 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,13 @@ func (s *Service) Start() {
s.startupErr = err
return
}
err = s.connectToBootnodes()
if err != nil {
log.WithError(err).Error("Could not add bootnode to the exclusion list")

if err := s.connectToBootnodes(); err != nil {
log.WithError(err).Error("Could not connect to boot nodes")
s.startupErr = err
return
}

s.dv5Listener = listener
go s.listenForNewNodes()
}
Expand Down Expand Up @@ -393,12 +394,17 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er
s.pingMethodLock.Unlock()
}

func (s *Service) pingPeers() {
func (s *Service) pingPeersAndLogEnr() {
s.pingMethodLock.RLock()
defer s.pingMethodLock.RUnlock()

localENR := s.dv5Listener.Self()
log.WithField("ENR", localENR).Info("New node record")

if s.pingMethod == nil {
return
}

for _, pid := range s.peers.Connected() {
go func(id peer.ID) {
if err := s.pingMethod(s.ctx, id); err != nil {
Expand Down
185 changes: 146 additions & 39 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"math"
"strings"
"sync"
"time"
Expand All @@ -20,9 +21,9 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
mathutil "github.com/prysmaticlabs/prysm/v5/math"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
)

var attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount
Expand Down Expand Up @@ -53,6 +54,79 @@ const blobSubnetLockerVal = 110
// chosen more than sync, attestation and blob subnet (6) combined.
const dataColumnSubnetVal = 150

// nodeFilter return a function that filters nodes based on the subnet topic and subnet index.
func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) {
switch {
case strings.Contains(topic, GossipAttestationMessage):
return s.filterPeerForAttSubnet(index), nil
case strings.Contains(topic, GossipSyncCommitteeMessage):
return s.filterPeerForSyncSubnet(index), nil
case strings.Contains(topic, GossipDataColumnSidecarMessage):
return s.filterPeerForDataColumnsSubnet(index), nil
default:
return nil, errors.Errorf("no subnet exists for provided topic: %s", topic)
}
}

// searchForPeers performs a network search for peers subscribed to a particular subnet.
// It exits as soon as one of these conditions is met:
// - It looped through `batchSize` nodes.
// - It found `peersToFindCount“ peers corresponding to the `filter` criteria.
// - Iterator is exhausted.
func searchForPeers(
iterator enode.Iterator,
batchSize int,
peersToFindCount int,
filter func(node *enode.Node) bool,
) []*enode.Node {
nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize)
for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ {
node := iterator.Node()

// Filter out nodes that do not meet the criteria.
if !filter(node) {
continue
}

// Remove duplicates, keeping the node with higher seq.
prevNode, ok := nodeFromNodeID[node.ID()]
if ok && prevNode.Seq() > node.Seq() {
continue
}

nodeFromNodeID[node.ID()] = node
}

// Convert the map to a slice.
nodes := make([]*enode.Node, 0, len(nodeFromNodeID))
for _, node := range nodeFromNodeID {
nodes = append(nodes, node)
}

return nodes
}

// dialPeer dials a peer in a separate goroutine.
func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) {
info, _, err := convertToAddrInfo(node)
if err != nil {
return
}

if info == nil {
return
}

wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}

wg.Done()
}()
}

// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then it tries to connect
// with those peers. This method will block until either:
Expand All @@ -61,69 +135,96 @@ const dataColumnSubnetVal = 150
// On some edge cases, this method may hang indefinitely while peers
// are actually found. In such a case, the user should cancel the context
// and re-run the method again.
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
index uint64, threshold int) (bool, error) {
func (s *Service) FindPeersWithSubnet(
ctx context.Context,
topic string,
index uint64,
threshold int,
) (bool, error) {
const batchSize = 2000

ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
defer span.End()

span.SetAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing.

if s.dv5Listener == nil {
// return if discovery isn't set
// Return if discovery isn't set
return false, nil
}

topic += s.Encoding().ProtocolSuffix()
iterator := s.dv5Listener.RandomNodes()
defer iterator.Close()
switch {
case strings.Contains(topic, GossipAttestationMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(index))
case strings.Contains(topic, GossipSyncCommitteeMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(index))
case strings.Contains(topic, GossipDataColumnSidecarMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForDataColumnsSubnet(index))
default:
return false, errors.Errorf("no subnet exists for provided topic: %s", topic)

filter, err := s.nodeFilter(topic, index)
if err != nil {
return false, errors.Wrap(err, "node filter")
}

peersSummary := func(topic string, threshold int) (int, int) {
// Retrieve how many peers we have for this topic.
peerCountForTopic := len(s.pubsub.ListPeers(topic))

// Compute how many peers we are missing to reach the threshold.
missingPeerCountForTopic := max(0, threshold-peerCountForTopic)

return peerCountForTopic, missingPeerCountForTopic
}

// Compute how many peers we are missing to reach the threshold.
peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold)

// Exit early if we have enough peers.
if missingPeerCountForTopic == 0 {
return true, nil
}

log.WithFields(logrus.Fields{
"topic": topic,
"currentPeerCount": peerCountForTopic,
"targetPeerCount": threshold,
}).Debug("Searching for new peers in the network - Start")

wg := new(sync.WaitGroup)
for {
currNum := len(s.pubsub.ListPeers(topic))
if currNum >= threshold {
// If we have enough peers, we can exit the loop. This is the happy path.
if missingPeerCountForTopic == 0 {
break
}

// If the context is done, we can exit the loop. This is the unhappy path.
if err := ctx.Err(); err != nil {
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+
"only %d out of %d peers were able to be found", topic, currNum, threshold)
return false, errors.Errorf(
"unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching",
topic, peerCountForTopic, threshold,
)
}
nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)

// Search for new peers in the network.
nodes := searchForPeers(iterator, batchSize, missingPeerCountForTopic, filter)

// Restrict dials if limit is applied.
maxConcurrentDials := math.MaxInt
if flags.MaxDialIsActive() {
nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials)
maxConcurrentDials = flags.Get().MaxConcurrentDials
}
nodes := enode.ReadNodes(iterator, nodeCount)
for _, node := range nodes {
info, _, err := convertToAddrInfo(node)
if err != nil {
continue
}

if info == nil {
continue
// Dial the peers in batches.
for start := 0; start < len(nodes); start += maxConcurrentDials {
stop := min(start+maxConcurrentDials, len(nodes))
for _, node := range nodes[start:stop] {
s.dialPeer(ctx, wg, node)
}

wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}()
// Wait for all dials to be completed.
wg.Wait()
}
// Wait for all dials to be completed.
wg.Wait()

_, missingPeerCountForTopic = peersSummary(topic, threshold)
}

log.WithField("topic", topic).Debug("Searching for new peers in the network - Success")
return true, nil
}

Expand Down Expand Up @@ -183,11 +284,17 @@ func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode.
// lower threshold to broadcast object compared to searching
// for a subnet. So that even in the event of poor peer
// connectivity, we can still broadcast an attestation.
func (s *Service) hasPeerWithSubnet(topic string) bool {
func (s *Service) hasPeerWithSubnet(subnetTopic string) bool {
// In the event peer threshold is lower, we will choose the lower
// threshold.
minPeers := mathutil.Min(1, uint64(flags.Get().MinimumPeersPerSubnet))
return len(s.pubsub.ListPeers(topic+s.Encoding().ProtocolSuffix())) >= int(minPeers) // lint:ignore uintcast -- Min peers can be safely cast to int.
minPeers := min(1, flags.Get().MinimumPeersPerSubnet)
topic := subnetTopic + s.Encoding().ProtocolSuffix()
peersWithSubnet := s.pubsub.ListPeers(topic)
peersWithSubnetCount := len(peersWithSubnet)

enoughPeers := peersWithSubnetCount >= minPeers

return enoughPeers
}

// Updates the service's discv5 listener record's attestation subnet
Expand Down
Loading

0 comments on commit 82498fa

Please sign in to comment.