-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Initial Sync with 128 data columns subnets #14403
Changes from 13 commits
50b4f4b
7514b7b
e2220b5
d95407f
2857344
be836a9
0abfab4
959ff24
2d446a2
edb0fc1
c12b239
7d02a3c
eee87bb
0af4c2e
bd34800
ab0b0e8
5e2af9f
0b39d26
bd82d71
5c0cc4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,16 +10,27 @@ import ( | |
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" | ||
"github.com/prysmaticlabs/prysm/v5/config/params" | ||
"github.com/prysmaticlabs/prysm/v5/network/forks" | ||
"github.com/sirupsen/logrus" | ||
) | ||
|
||
var _ pubsub.SubscriptionFilter = (*Service)(nil) | ||
|
||
// It is set at this limit to handle the possibility | ||
// of double topic subscriptions at fork boundaries. | ||
// -> 64 Attestation Subnets * 2. | ||
// -> 4 Sync Committee Subnets * 2. | ||
// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2. | ||
const pubsubSubscriptionRequestLimit = 200 | ||
// -> BeaconBlock * 2 = 2 | ||
// -> BeaconAggregateAndProof * 2 = 2 | ||
// -> VoluntaryExit * 2 = 2 | ||
// -> ProposerSlashing * 2 = 2 | ||
// -> AttesterSlashing * 2 = 2 | ||
// -> 64 Beacon Attestation * 2 = 128 | ||
// -> SyncContributionAndProof * 2 = 2 | ||
// -> 4 SyncCommitteeSubnets * 2 = 8 | ||
// -> BlsToExecutionChange * 2 = 2 | ||
// -> 128 DataColumnSidecar * 2 = 256 | ||
// ------------------------------------- | ||
// TOTAL = 406 | ||
// (Note: BlobSidecar is not included in this list since it is superseded by DataColumnSidecar) | ||
const pubsubSubscriptionRequestLimit = 500 | ||
|
||
// CanSubscribe returns true if the topic is of interest and we could subscribe to it. | ||
func (s *Service) CanSubscribe(topic string) bool { | ||
|
@@ -95,8 +106,15 @@ func (s *Service) CanSubscribe(topic string) bool { | |
// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications. | ||
// This method returns only the topics of interest and may return an error if the subscription | ||
// request contains too many topics. | ||
func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) { | ||
func (s *Service) FilterIncomingSubscriptions(peerID peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) { | ||
if len(subs) > pubsubSubscriptionRequestLimit { | ||
subsCount := len(subs) | ||
log.WithFields(logrus.Fields{ | ||
"peerID": peerID, | ||
"subscriptionCounts": subsCount, | ||
"subscriptionLimit": pubsubSubscriptionRequestLimit, | ||
}).Error("Too many incoming subscriptions, filtering them") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we make this a debug ? This can be triggered by any peer, so they could simply causes your logs to be spammed/blown up by sending you rpc subscriptions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in ab0b0e8. |
||
|
||
return nil, pubsub.ErrTooManySubscriptions | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -202,12 +202,13 @@ func (s *Service) Start() { | |
s.startupErr = err | ||
return | ||
} | ||
err = s.connectToBootnodes() | ||
if err != nil { | ||
log.WithError(err).Error("Could not add bootnode to the exclusion list") | ||
|
||
if err := s.connectToBootnodes(); err != nil { | ||
log.WithError(err).Error("Could not connect to boot nodes") | ||
s.startupErr = err | ||
return | ||
} | ||
|
||
s.dv5Listener = listener | ||
go s.listenForNewNodes() | ||
} | ||
|
@@ -396,9 +397,14 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er | |
func (s *Service) pingPeers() { | ||
s.pingMethodLock.RLock() | ||
defer s.pingMethodLock.RUnlock() | ||
|
||
localENR := s.dv5Listener.Self() | ||
log.WithField("ENR", localENR).Info("New node record") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we logging this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not? We log the very first ENR here:
But it turns out this ENR is the real one for a very small amount of time. Logging the initial ENR without logging modified ones is quite misleading since the user will think that it is it actual ENR, while it is not any more the case. My opinion is:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you do want to log enr changes, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. I can of course do it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can also rename There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renaming would be better then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in bd82d71. |
||
|
||
if s.pingMethod == nil { | ||
return | ||
} | ||
|
||
for _, pid := range s.peers.Connected() { | ||
go func(id peer.ID) { | ||
if err := s.pingMethod(s.ctx, id); err != nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package p2p | |
|
||
import ( | ||
"context" | ||
"math" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
@@ -20,9 +21,9 @@ import ( | |
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper" | ||
"github.com/prysmaticlabs/prysm/v5/crypto/hash" | ||
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" | ||
mathutil "github.com/prysmaticlabs/prysm/v5/math" | ||
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" | ||
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" | ||
"github.com/sirupsen/logrus" | ||
) | ||
|
||
var attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount | ||
|
@@ -53,6 +54,79 @@ const blobSubnetLockerVal = 110 | |
// chosen more than sync, attestation and blob subnet (6) combined. | ||
const dataColumnSubnetVal = 150 | ||
|
||
// nodeFilter return a function that filters nodes based on the subnet topic and subnet index. | ||
func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) { | ||
switch { | ||
case strings.Contains(topic, GossipAttestationMessage): | ||
return s.filterPeerForAttSubnet(index), nil | ||
case strings.Contains(topic, GossipSyncCommitteeMessage): | ||
return s.filterPeerForSyncSubnet(index), nil | ||
case strings.Contains(topic, GossipDataColumnSidecarMessage): | ||
return s.filterPeerForDataColumnsSubnet(index), nil | ||
default: | ||
return nil, errors.Errorf("no subnet exists for provided topic: %s", topic) | ||
} | ||
} | ||
|
||
// searchForPeers performs a network search for peers subscribed to a particular subnet. | ||
// It exits as soon as one of these conditions is met: | ||
// - It looped through `batchSize` nodes. | ||
// - It found `peersToFindCount“ peers corresponding to the `filter` criteria. | ||
// - Iterator is exhausted. | ||
func searchForPeers( | ||
iterator enode.Iterator, | ||
batchSize int, | ||
peersToFindCount int, | ||
filter func(node *enode.Node) bool, | ||
) []*enode.Node { | ||
nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize) | ||
for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ { | ||
node := iterator.Node() | ||
|
||
// Filter out nodes that do not meet the criteria. | ||
if !filter(node) { | ||
continue | ||
} | ||
|
||
// Remove duplicates, keeping the node with higher seq. | ||
prevNode, ok := nodeFromNodeID[node.ID()] | ||
if ok && prevNode.Seq() > node.Seq() { | ||
continue | ||
} | ||
|
||
nodeFromNodeID[node.ID()] = node | ||
} | ||
|
||
// Convert the map to a slice. | ||
nodes := make([]*enode.Node, 0, len(nodeFromNodeID)) | ||
for _, node := range nodeFromNodeID { | ||
nodes = append(nodes, node) | ||
} | ||
|
||
return nodes | ||
} | ||
|
||
// dialPeer dials a peer in a separate goroutine. | ||
func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) { | ||
info, _, err := convertToAddrInfo(node) | ||
if err != nil { | ||
return | ||
} | ||
|
||
if info == nil { | ||
return | ||
} | ||
|
||
wg.Add(1) | ||
go func() { | ||
if err := s.connectWithPeer(ctx, *info); err != nil { | ||
log.WithError(err).Tracef("Could not connect with peer %s", info.String()) | ||
} | ||
|
||
wg.Done() | ||
}() | ||
} | ||
|
||
// FindPeersWithSubnet performs a network search for peers | ||
// subscribed to a particular subnet. Then it tries to connect | ||
// with those peers. This method will block until either: | ||
|
@@ -61,69 +135,88 @@ const dataColumnSubnetVal = 150 | |
// On some edge cases, this method may hang indefinitely while peers | ||
// are actually found. In such a case, the user should cancel the context | ||
// and re-run the method again. | ||
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, | ||
index uint64, threshold int) (bool, error) { | ||
func (s *Service) FindPeersWithSubnet( | ||
ctx context.Context, | ||
topic string, | ||
index uint64, | ||
threshold int, | ||
) (bool, error) { | ||
const batchSize = 200 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you make the batch bigger ? Ex: 1000 or 2000 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 0b39d26. |
||
|
||
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") | ||
defer span.End() | ||
|
||
span.AddAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing. | ||
|
||
if s.dv5Listener == nil { | ||
// return if discovery isn't set | ||
// Return if discovery isn't set | ||
return false, nil | ||
} | ||
|
||
topic += s.Encoding().ProtocolSuffix() | ||
iterator := s.dv5Listener.RandomNodes() | ||
defer iterator.Close() | ||
switch { | ||
case strings.Contains(topic, GossipAttestationMessage): | ||
iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(index)) | ||
case strings.Contains(topic, GossipSyncCommitteeMessage): | ||
iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(index)) | ||
case strings.Contains(topic, GossipDataColumnSidecarMessage): | ||
iterator = filterNodes(ctx, iterator, s.filterPeerForDataColumnsSubnet(index)) | ||
default: | ||
return false, errors.Errorf("no subnet exists for provided topic: %s", topic) | ||
|
||
filter, err := s.nodeFilter(topic, index) | ||
if err != nil { | ||
return false, errors.Wrap(err, "node filter") | ||
} | ||
|
||
firstLoop := true | ||
|
||
wg := new(sync.WaitGroup) | ||
for { | ||
currNum := len(s.pubsub.ListPeers(topic)) | ||
if currNum >= threshold { | ||
// Retrieve how many peers we have for this topic. | ||
peerCountForTopic := len(s.pubsub.ListPeers(topic)) | ||
|
||
// Compute how many peers we are missing to reach the threshold. | ||
missingPeersCount := max(0, threshold-peerCountForTopic) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Overflow if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both ==> It will work as intended, since |
||
|
||
// If we have enough peers, we can exit the loop. This is the happy path. | ||
if missingPeersCount == 0 { | ||
break | ||
} | ||
|
||
if firstLoop { | ||
firstLoop = false | ||
|
||
log.WithFields(logrus.Fields{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just log this out of the loop without having to use a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the Let me propose a maybe better solution. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 5e2af9f. |
||
"topic": topic, | ||
"currentPeerCount": peerCountForTopic, | ||
"targetPeerCount": threshold, | ||
}).Debug("Searching for new peers in the network - Start") | ||
} | ||
|
||
// If the context is done, we can exit the loop. This is the unhappy path. | ||
if err := ctx.Err(); err != nil { | ||
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+ | ||
"only %d out of %d peers were able to be found", topic, currNum, threshold) | ||
return false, errors.Errorf( | ||
"unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching", | ||
topic, peerCountForTopic, threshold, | ||
) | ||
} | ||
nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch) | ||
|
||
// Search for new peers in the network. | ||
nodes := searchForPeers(iterator, batchSize, missingPeersCount, filter) | ||
|
||
// Restrict dials if limit is applied. | ||
maxConcurrentDials := math.MaxInt | ||
if flags.MaxDialIsActive() { | ||
nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials) | ||
maxConcurrentDials = flags.Get().MaxConcurrentDials | ||
} | ||
nodes := enode.ReadNodes(iterator, nodeCount) | ||
for _, node := range nodes { | ||
info, _, err := convertToAddrInfo(node) | ||
if err != nil { | ||
continue | ||
} | ||
|
||
if info == nil { | ||
continue | ||
// Dial the peers in batches. | ||
for start := 0; start < len(nodes); start += maxConcurrentDials { | ||
stop := min(start+maxConcurrentDials, len(nodes)) | ||
for _, node := range nodes[start:stop] { | ||
s.dialPeer(ctx, wg, node) | ||
} | ||
|
||
wg.Add(1) | ||
go func() { | ||
if err := s.connectWithPeer(ctx, *info); err != nil { | ||
log.WithError(err).Tracef("Could not connect with peer %s", info.String()) | ||
} | ||
wg.Done() | ||
}() | ||
// Wait for all dials to be completed. | ||
wg.Wait() | ||
} | ||
// Wait for all dials to be completed. | ||
wg.Wait() | ||
} | ||
|
||
log.WithField("topic", topic).Debug("Searching for new peers in the network - Success") | ||
return true, nil | ||
} | ||
|
||
|
@@ -183,11 +276,17 @@ func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode. | |
// lower threshold to broadcast object compared to searching | ||
// for a subnet. So that even in the event of poor peer | ||
// connectivity, we can still broadcast an attestation. | ||
func (s *Service) hasPeerWithSubnet(topic string) bool { | ||
func (s *Service) hasPeerWithSubnet(subnetTopic string) bool { | ||
// In the event peer threshold is lower, we will choose the lower | ||
// threshold. | ||
minPeers := mathutil.Min(1, uint64(flags.Get().MinimumPeersPerSubnet)) | ||
return len(s.pubsub.ListPeers(topic+s.Encoding().ProtocolSuffix())) >= int(minPeers) // lint:ignore uintcast -- Min peers can be safely cast to int. | ||
minPeers := min(1, flags.Get().MinimumPeersPerSubnet) | ||
topic := subnetTopic + s.Encoding().ProtocolSuffix() | ||
peersWithSubnet := s.pubsub.ListPeers(topic) | ||
peersWithSubnetCount := len(peersWithSubnet) | ||
|
||
enoughPeers := peersWithSubnetCount >= minPeers | ||
|
||
return enoughPeers | ||
} | ||
|
||
// Updates the service's discv5 listener record's attestation subnet | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
Peer disconnected
works better here. This disconnection handler applies to all peers not just newly joined peersThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in bd34800.