Skip to content

Commit

Permalink
TBC
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Sep 2, 2024
1 parent 2d446a2 commit 47f70d7
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 115 deletions.
166 changes: 124 additions & 42 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -53,6 +54,79 @@ const blobSubnetLockerVal = 110
// chosen more than sync, attestation and blob subnet (6) combined.
const dataColumnSubnetVal = 150

// nodeFilter return a function that filters nodes based on the subnet topic and subnet index.
func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) {
switch {
case strings.Contains(topic, GossipAttestationMessage):
return s.filterPeerForAttSubnet(index), nil
case strings.Contains(topic, GossipSyncCommitteeMessage):
return s.filterPeerForSyncSubnet(index), nil
case strings.Contains(topic, GossipDataColumnSidecarMessage):
return s.filterPeerForDataColumnsSubnet(index), nil
default:
return nil, errors.Errorf("no subnet exists for provided topic: %s", topic)
}
}

// searchForPeers performs a network search for peers subscribed to a particular subnet.
// It exits as soon as one of these conditions is met:
// - It looped through `batchSize` nodes.
// - It found `peersToFindCount“ peers corresponding to the `filter` criteria.
// - Iterator is exhausted.
func searchForPeers(
iterator enode.Iterator,
batchSize int,
peersToFindCount int,
filter func(node *enode.Node) bool,
) []*enode.Node {
nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize)
for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ {
node := iterator.Node()

// Filter out nodes that do not meet the criteria.
if !filter(node) {
continue
}

// Remove duplicates, keeping the node with higher seq.
prevNode, ok := nodeFromNodeID[node.ID()]
if ok && prevNode.Seq() > node.Seq() {
continue
}

nodeFromNodeID[node.ID()] = node
}

// Convert the map to a slice.
nodes := make([]*enode.Node, 0, len(nodeFromNodeID))
for _, node := range nodeFromNodeID {
nodes = append(nodes, node)
}

return nodes
}

// dialPeer dials a peer in a separate goroutine.
func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) {
info, _, err := convertToAddrInfo(node)
if err != nil {
return
}

if info == nil {
return
}

wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}

wg.Done()
}()
}

// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then it tries to connect
// with those peers. This method will block until either:
Expand All @@ -67,67 +141,83 @@ func (s *Service) FindPeersWithSubnet(
index uint64,
threshold int,
) (bool, error) {
const batchSize = 200

ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
defer span.End()

span.AddAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing.

if s.dv5Listener == nil {
// return if discovery isn't set
// Return if discovery isn't set
return false, nil
}

topic += s.Encoding().ProtocolSuffix()
iterator := s.dv5Listener.RandomNodes()
defer iterator.Close()
switch {
case strings.Contains(topic, GossipAttestationMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(index))
case strings.Contains(topic, GossipSyncCommitteeMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(index))
case strings.Contains(topic, GossipDataColumnSidecarMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForDataColumnsSubnet(index))
default:
return false, errors.Errorf("no subnet exists for provided topic: %s", topic)

filter, err := s.nodeFilter(topic, index)
if err != nil {
return false, errors.Wrap(err, "node filter")
}

firstLoop := true

wg := new(sync.WaitGroup)
for {
currNum := len(s.pubsub.ListPeers(topic))
if currNum >= threshold {
for searchCount := 0; ; /*no exit condition*/ searchCount++ {
// Retrieve how many peers we have for this topic.
peerCountForTopic := len(s.pubsub.ListPeers(topic))

// Compute how many peers we are missing to reach the threshold.
missingPeersCount := max(0, threshold-peerCountForTopic)

// If we have enough peers, we can exit the loop. This is the happy path.
if missingPeersCount == 0 {
break
}

if firstLoop {
firstLoop = false

log.WithFields(logrus.Fields{
"topic": topic,
"currentPeerCount": peerCountForTopic,
"targetPeerCount": threshold,
"searchCount": searchCount,
}).Debug("Searching for new peers in the network - Start")
}

// If the context is done, we can exit the loop. This is the unhappy path.
if err := ctx.Err(); err != nil {
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+
"only %d out of %d peers were able to be found", topic, currNum, threshold)
return false, errors.Errorf(
"unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching",
topic, peerCountForTopic, threshold,
)
}
nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)

// Search for new peers in the network.
nodes := searchForPeers(iterator, batchSize, missingPeersCount, filter)

// Restrict dials if limit is applied.
maxConcurrentDials := math.MaxInt
if flags.MaxDialIsActive() {
nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials)
maxConcurrentDials = flags.Get().MaxConcurrentDials
}
nodes := enode.ReadNodes(iterator, nodeCount)
for _, node := range nodes {
info, _, err := convertToAddrInfo(node)
if err != nil {
continue
}

if info == nil {
continue
// Dial the peers in batches.
for start := 0; start < len(nodes); start += maxConcurrentDials {
stop := min(start+maxConcurrentDials, len(nodes))
for _, node := range nodes[start:stop] {
s.dialPeer(ctx, wg, node)
}

wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}()
// Wait for all dials to be completed.
wg.Wait()
}
// Wait for all dials to be completed.
wg.Wait()
}

log.WithField("topic", topic).Debug("Searching for new peers in the network - Success")
return true, nil
}

Expand Down Expand Up @@ -197,14 +287,6 @@ func (s *Service) hasPeerWithSubnet(subnetTopic string) bool {

enoughPeers := peersWithSubnetCount >= minPeers

if !enoughPeers {
log.WithFields(logrus.Fields{
"topic": topic,
"peersCount": peersWithSubnetCount,
"ctxError": s.ctx.Err(),
}).Debug("No valid peers, starting network search")
}

return enoughPeers
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
// If the peer responded with an error, increment the bad responses scorer.
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
return errors.New(errMsg)
return errors.Errorf("code: %d - %s", code, errMsg)
}

// Decode the sequence number from the peer.
Expand Down
Loading

0 comments on commit 47f70d7

Please sign in to comment.