Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Initial Sync with 128 data columns subnets #14403

Merged
merged 20 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
50b4f4b
`pingPeers`: Add log with new ENR when modified.
nalepae Aug 30, 2024
7514b7b
`p2p Start`: Use idiomatic go error syntax.
nalepae Aug 30, 2024
e2220b5
P2P `start`: Fix error message.
nalepae Aug 30, 2024
d95407f
Use not bootnodes at all if the `--chain-config-file` flag is used an…
nalepae Aug 30, 2024
2857344
`validPeersExist`: Centralize logs.
nalepae Aug 30, 2024
be836a9
`AddConnectionHandler`: Improve logging.
nalepae Sep 1, 2024
0abfab4
Logging: Add 2 decimals for timestamt in text and JSON logs.
nalepae Sep 1, 2024
959ff24
Improve "no valid peers" logging.
nalepae Sep 2, 2024
2d446a2
Improve "Some columns have no peers responsible for custody" logging.
nalepae Sep 2, 2024
edb0fc1
`pubsubSubscriptionRequestLimit`: Increase to be consistent with data…
nalepae Sep 3, 2024
c12b239
`sendPingRequest`: Improve logging.
nalepae Sep 3, 2024
7d02a3c
`FindPeersWithSubnet`: Regularly recheck in our current set of peers …
nalepae Sep 3, 2024
eee87bb
`subscribeDynamicWithSyncSubnets`: Use exactly the same subscription …
nalepae Sep 3, 2024
0af4c2e
Make deepsource happier.
nalepae Sep 3, 2024
bd34800
Nishant's commend: Change peer disconnected log.
nalepae Sep 3, 2024
ab0b0e8
NIshant's comment: Change `Too many incoming subscription` log from e…
nalepae Sep 3, 2024
5e2af9f
`FindPeersWithSubnet`: Address Nishant's comment.
nalepae Sep 3, 2024
0b39d26
`batchSize`: Address Nishant's comment.
nalepae Sep 3, 2024
bd82d71
`pingPeers` ==> `pingPeersAndLogEnr`.
nalepae Sep 3, 2024
5c0cc4c
Update beacon-chain/sync/subscriber.go
nalepae Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions beacon-chain/p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
disconnectFromPeer()
return
}

validPeerConnection := func() {
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
// Go through the handshake process.
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction,
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Peer connected")
}).Debug("New peer connection")
}

// Do not perform handshake on inbound dials.
Expand Down Expand Up @@ -173,7 +174,11 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
// Only log disconnections if we were fully connected.
if priorState == peers.PeerConnected {
log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected")
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("New peer disconnection")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Peer disconnected works better here. This disconnection handler applies to all peers not just newly joined peers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bd34800.

}
}()
},
Expand Down
28 changes: 23 additions & 5 deletions beacon-chain/p2p/pubsub_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/network/forks"
"github.com/sirupsen/logrus"
)

var _ pubsub.SubscriptionFilter = (*Service)(nil)

// It is set at this limit to handle the possibility
// of double topic subscriptions at fork boundaries.
// -> 64 Attestation Subnets * 2.
// -> 4 Sync Committee Subnets * 2.
// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2.
const pubsubSubscriptionRequestLimit = 200
// -> BeaconBlock * 2 = 2
// -> BeaconAggregateAndProof * 2 = 2
// -> VoluntaryExit * 2 = 2
// -> ProposerSlashing * 2 = 2
// -> AttesterSlashing * 2 = 2
// -> 64 Beacon Attestation * 2 = 128
// -> SyncContributionAndProof * 2 = 2
// -> 4 SyncCommitteeSubnets * 2 = 8
// -> BlsToExecutionChange * 2 = 2
// -> 128 DataColumnSidecar * 2 = 256
// -------------------------------------
// TOTAL = 406
// (Note: BlobSidecar is not included in this list since it is superseded by DataColumnSidecar)
const pubsubSubscriptionRequestLimit = 500

// CanSubscribe returns true if the topic is of interest and we could subscribe to it.
func (s *Service) CanSubscribe(topic string) bool {
Expand Down Expand Up @@ -95,8 +106,15 @@ func (s *Service) CanSubscribe(topic string) bool {
// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
// This method returns only the topics of interest and may return an error if the subscription
// request contains too many topics.
func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) {
func (s *Service) FilterIncomingSubscriptions(peerID peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) {
if len(subs) > pubsubSubscriptionRequestLimit {
subsCount := len(subs)
log.WithFields(logrus.Fields{
"peerID": peerID,
"subscriptionCounts": subsCount,
"subscriptionLimit": pubsubSubscriptionRequestLimit,
}).Error("Too many incoming subscriptions, filtering them")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this a debug ? This can be triggered by any peer, so they could simply causes your logs to be spammed/blown up by sending you rpc subscriptions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ab0b0e8.


return nil, pubsub.ErrTooManySubscriptions
}

Expand Down
12 changes: 9 additions & 3 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,13 @@ func (s *Service) Start() {
s.startupErr = err
return
}
err = s.connectToBootnodes()
if err != nil {
log.WithError(err).Error("Could not add bootnode to the exclusion list")

if err := s.connectToBootnodes(); err != nil {
log.WithError(err).Error("Could not connect to boot nodes")
s.startupErr = err
return
}

s.dv5Listener = listener
go s.listenForNewNodes()
}
Expand Down Expand Up @@ -396,9 +397,14 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er
func (s *Service) pingPeers() {
s.pingMethodLock.RLock()
defer s.pingMethodLock.RUnlock()

localENR := s.dv5Listener.Self()
log.WithField("ENR", localENR).Info("New node record")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we logging this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not?

We log the very first ENR here:

INFO p2p: Started discovery v5
ENR=enr:-Me4QOjJD9yIdENgUqCEWvF8-i4ptgYCcE2_-585RTOkZmTbH09IuXNW5Y3hX_WG_-DEncEytbRry_tLe31BECrKPRGGAZG3Q7i0h2F0dG5ldHOIAAAAAAAAAACDY3NjBIRldGgykDdW2IZgAAA4AOH1BQAAAACCaWSCdjSCaXCErBAAEolzZWNwMjU2azGhA4Vo-9vK3iXroeg9TmKANgrXuntjWj6atypZL7bbpjBviHN5bmNuZXRzAIN0Y3CCMsiDdW
RwgjLI

But it turns out this ENR is the real one for a very small amount of time.
This ENR is modified shortly after.

Logging the initial ENR without logging modified ones is quite misleading since the user will think that it is it actual ENR, while it is not any more the case.

My opinion is:

  • We should log all ENR changes, or
  • We should NOT log any ENR at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do want to log enr changes, pingPeers is the wrong place to do it. It is more accurate to do it in RefreshPersistentSubnets as that is where the enr is actually modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.
However, currently pingPeers in only called by RefreshPersistentSubnets.
And the way RefreshPersistentSubnets is coded, we should add this log at three different locations, instead of only one.

I can of course do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also rename pingPeers into pingPeersAndLogEnr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renaming would be better then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bd82d71.


if s.pingMethod == nil {
return
}

for _, pid := range s.peers.Connected() {
go func(id peer.ID) {
if err := s.pingMethod(s.ctx, id); err != nil {
Expand Down
177 changes: 138 additions & 39 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"math"
"strings"
"sync"
"time"
Expand All @@ -20,9 +21,9 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
mathutil "github.com/prysmaticlabs/prysm/v5/math"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
)

var attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount
Expand Down Expand Up @@ -53,6 +54,79 @@ const blobSubnetLockerVal = 110
// chosen more than sync, attestation and blob subnet (6) combined.
const dataColumnSubnetVal = 150

// nodeFilter return a function that filters nodes based on the subnet topic and subnet index.
func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) {
switch {
case strings.Contains(topic, GossipAttestationMessage):
return s.filterPeerForAttSubnet(index), nil
case strings.Contains(topic, GossipSyncCommitteeMessage):
return s.filterPeerForSyncSubnet(index), nil
case strings.Contains(topic, GossipDataColumnSidecarMessage):
return s.filterPeerForDataColumnsSubnet(index), nil
default:
return nil, errors.Errorf("no subnet exists for provided topic: %s", topic)
}
}

// searchForPeers performs a network search for peers subscribed to a particular subnet.
// It exits as soon as one of these conditions is met:
// - It looped through `batchSize` nodes.
// - It found `peersToFindCount“ peers corresponding to the `filter` criteria.
// - Iterator is exhausted.
func searchForPeers(
iterator enode.Iterator,
batchSize int,
peersToFindCount int,
filter func(node *enode.Node) bool,
) []*enode.Node {
nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize)
for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ {
node := iterator.Node()

// Filter out nodes that do not meet the criteria.
if !filter(node) {
continue
}

// Remove duplicates, keeping the node with higher seq.
prevNode, ok := nodeFromNodeID[node.ID()]
if ok && prevNode.Seq() > node.Seq() {
continue
}

nodeFromNodeID[node.ID()] = node
}

// Convert the map to a slice.
nodes := make([]*enode.Node, 0, len(nodeFromNodeID))
for _, node := range nodeFromNodeID {
nodes = append(nodes, node)
}

return nodes
}

// dialPeer dials a peer in a separate goroutine.
func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) {
info, _, err := convertToAddrInfo(node)
if err != nil {
return
}

if info == nil {
return
}

wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}

wg.Done()
}()
}

// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then it tries to connect
// with those peers. This method will block until either:
Expand All @@ -61,69 +135,88 @@ const dataColumnSubnetVal = 150
// On some edge cases, this method may hang indefinitely while peers
// are actually found. In such a case, the user should cancel the context
// and re-run the method again.
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
index uint64, threshold int) (bool, error) {
func (s *Service) FindPeersWithSubnet(
ctx context.Context,
topic string,
index uint64,
threshold int,
) (bool, error) {
const batchSize = 200
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you make the batch bigger ? Ex: 1000 or 2000

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0b39d26.


ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
defer span.End()

span.AddAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing.

if s.dv5Listener == nil {
// return if discovery isn't set
// Return if discovery isn't set
return false, nil
}

topic += s.Encoding().ProtocolSuffix()
iterator := s.dv5Listener.RandomNodes()
defer iterator.Close()
switch {
case strings.Contains(topic, GossipAttestationMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(index))
case strings.Contains(topic, GossipSyncCommitteeMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(index))
case strings.Contains(topic, GossipDataColumnSidecarMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForDataColumnsSubnet(index))
default:
return false, errors.Errorf("no subnet exists for provided topic: %s", topic)

filter, err := s.nodeFilter(topic, index)
if err != nil {
return false, errors.Wrap(err, "node filter")
}

firstLoop := true

wg := new(sync.WaitGroup)
for {
currNum := len(s.pubsub.ListPeers(topic))
if currNum >= threshold {
// Retrieve how many peers we have for this topic.
peerCountForTopic := len(s.pubsub.ListPeers(topic))

// Compute how many peers we are missing to reach the threshold.
missingPeersCount := max(0, threshold-peerCountForTopic)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overflow if peerCountForTopic is greater than threshold

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both threshold and peerCountForTopic are (signed) int.

==> It will work as intended, since max(0, <negative number>) == 0.


// If we have enough peers, we can exit the loop. This is the happy path.
if missingPeersCount == 0 {
break
}

if firstLoop {
firstLoop = false

log.WithFields(logrus.Fields{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just log this out of the loop without having to use a firstLoop boolean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the peerCountForTopic is computed inside the loop.

Let me propose a maybe better solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5e2af9f.

"topic": topic,
"currentPeerCount": peerCountForTopic,
"targetPeerCount": threshold,
}).Debug("Searching for new peers in the network - Start")
}

// If the context is done, we can exit the loop. This is the unhappy path.
if err := ctx.Err(); err != nil {
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+
"only %d out of %d peers were able to be found", topic, currNum, threshold)
return false, errors.Errorf(
"unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching",
topic, peerCountForTopic, threshold,
)
}
nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)

// Search for new peers in the network.
nodes := searchForPeers(iterator, batchSize, missingPeersCount, filter)

// Restrict dials if limit is applied.
maxConcurrentDials := math.MaxInt
if flags.MaxDialIsActive() {
nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials)
maxConcurrentDials = flags.Get().MaxConcurrentDials
}
nodes := enode.ReadNodes(iterator, nodeCount)
for _, node := range nodes {
info, _, err := convertToAddrInfo(node)
if err != nil {
continue
}

if info == nil {
continue
// Dial the peers in batches.
for start := 0; start < len(nodes); start += maxConcurrentDials {
stop := min(start+maxConcurrentDials, len(nodes))
for _, node := range nodes[start:stop] {
s.dialPeer(ctx, wg, node)
}

wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}()
// Wait for all dials to be completed.
wg.Wait()
}
// Wait for all dials to be completed.
wg.Wait()
}

log.WithField("topic", topic).Debug("Searching for new peers in the network - Success")
return true, nil
}

Expand Down Expand Up @@ -183,11 +276,17 @@ func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode.
// lower threshold to broadcast object compared to searching
// for a subnet. So that even in the event of poor peer
// connectivity, we can still broadcast an attestation.
func (s *Service) hasPeerWithSubnet(topic string) bool {
func (s *Service) hasPeerWithSubnet(subnetTopic string) bool {
// In the event peer threshold is lower, we will choose the lower
// threshold.
minPeers := mathutil.Min(1, uint64(flags.Get().MinimumPeersPerSubnet))
return len(s.pubsub.ListPeers(topic+s.Encoding().ProtocolSuffix())) >= int(minPeers) // lint:ignore uintcast -- Min peers can be safely cast to int.
minPeers := min(1, flags.Get().MinimumPeersPerSubnet)
topic := subnetTopic + s.Encoding().ProtocolSuffix()
peersWithSubnet := s.pubsub.ListPeers(topic)
peersWithSubnetCount := len(peersWithSubnet)

enoughPeers := peersWithSubnetCount >= minPeers

return enoughPeers
}

// Updates the service's discv5 listener record's attestation subnet
Expand Down
12 changes: 8 additions & 4 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sync
import (
"context"
"fmt"
"slices"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -182,14 +183,17 @@ func (d *dataColumnSampler1D) refreshPeerInfo() {
}
}

columnWithNoPeers := make([]uint64, 0)
columnsWithoutPeers := make([]uint64, 0)
for column, peers := range d.peerFromColumn {
if len(peers) == 0 {
columnWithNoPeers = append(columnWithNoPeers, column)
columnsWithoutPeers = append(columnsWithoutPeers, column)
}
}
if len(columnWithNoPeers) > 0 {
log.WithField("columnWithNoPeers", columnWithNoPeers).Warn("Some columns have no peers responsible for custody")

slices.Sort[[]uint64](columnsWithoutPeers)

if len(columnsWithoutPeers) > 0 {
log.WithField("columns", columnsWithoutPeers).Warn("Some columns have no peers responsible for custody")
}
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
// If the peer responded with an error, increment the bad responses scorer.
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
return errors.New(errMsg)
return errors.Errorf("code: %d - %s", code, errMsg)
}

// Decode the sequence number from the peer.
Expand Down
Loading
Loading