diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index ab327d3093b2..97f691e0bcf0 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -205,7 +205,7 @@ func (s *Service) RefreshPersistentSubnets() { s.updateSubnetRecordWithMetadata(bitV) // Ping all peers. - s.pingPeers() + s.pingPeersAndLogEnr() return } @@ -243,7 +243,7 @@ func (s *Service) RefreshPersistentSubnets() { s.updateSubnetRecordWithMetadataV2(bitV, bitS) // Ping all peers to inform them of new metadata - s.pingPeers() + s.pingPeersAndLogEnr() return } @@ -272,7 +272,7 @@ func (s *Service) RefreshPersistentSubnets() { s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodySubnetCount) // Ping all peers. - s.pingPeers() + s.pingPeersAndLogEnr() } // listen for new nodes watches for new nodes in the network and adds them to the peerstore. diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index 97d2af8eed72..3bf9bd7e7ddb 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -90,14 +90,15 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con disconnectFromPeer() return } + validPeerConnection := func() { s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected) // Go through the handshake process. log.WithFields(logrus.Fields{ - "direction": conn.Stat().Direction, + "direction": conn.Stat().Direction.String(), "multiAddr": peerMultiaddrString(conn), "activePeers": len(s.peers.Active()), - }).Debug("Peer connected") + }).Debug("New peer connection") } // Do not perform handshake on inbound dials. @@ -173,7 +174,11 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected) // Only log disconnections if we were fully connected. if priorState == peers.PeerConnected { - log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected") + log.WithFields(logrus.Fields{ + "direction": conn.Stat().Direction.String(), + "multiAddr": peerMultiaddrString(conn), + "activePeers": len(s.peers.Active()), + }).Debug("Peer disconnected") } }() }, diff --git a/beacon-chain/p2p/pubsub_filter.go b/beacon-chain/p2p/pubsub_filter.go index e02371c587f9..2239f972bac6 100644 --- a/beacon-chain/p2p/pubsub_filter.go +++ b/beacon-chain/p2p/pubsub_filter.go @@ -10,16 +10,27 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/network/forks" + "github.com/sirupsen/logrus" ) var _ pubsub.SubscriptionFilter = (*Service)(nil) // It is set at this limit to handle the possibility // of double topic subscriptions at fork boundaries. -// -> 64 Attestation Subnets * 2. -// -> 4 Sync Committee Subnets * 2. -// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2. -const pubsubSubscriptionRequestLimit = 200 +// -> BeaconBlock * 2 = 2 +// -> BeaconAggregateAndProof * 2 = 2 +// -> VoluntaryExit * 2 = 2 +// -> ProposerSlashing * 2 = 2 +// -> AttesterSlashing * 2 = 2 +// -> 64 Beacon Attestation * 2 = 128 +// -> SyncContributionAndProof * 2 = 2 +// -> 4 SyncCommitteeSubnets * 2 = 8 +// -> BlsToExecutionChange * 2 = 2 +// -> 128 DataColumnSidecar * 2 = 256 +// ------------------------------------- +// TOTAL = 406 +// (Note: BlobSidecar is not included in this list since it is superseded by DataColumnSidecar) +const pubsubSubscriptionRequestLimit = 500 // CanSubscribe returns true if the topic is of interest and we could subscribe to it. func (s *Service) CanSubscribe(topic string) bool { @@ -95,8 +106,15 @@ func (s *Service) CanSubscribe(topic string) bool { // FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications. // This method returns only the topics of interest and may return an error if the subscription // request contains too many topics. -func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) { +func (s *Service) FilterIncomingSubscriptions(peerID peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) { if len(subs) > pubsubSubscriptionRequestLimit { + subsCount := len(subs) + log.WithFields(logrus.Fields{ + "peerID": peerID, + "subscriptionCounts": subsCount, + "subscriptionLimit": pubsubSubscriptionRequestLimit, + }).Debug("Too many incoming subscriptions, filtering them") + return nil, pubsub.ErrTooManySubscriptions } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 4c1fa7769f42..122e58a2b894 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -202,12 +202,13 @@ func (s *Service) Start() { s.startupErr = err return } - err = s.connectToBootnodes() - if err != nil { - log.WithError(err).Error("Could not add bootnode to the exclusion list") + + if err := s.connectToBootnodes(); err != nil { + log.WithError(err).Error("Could not connect to boot nodes") s.startupErr = err return } + s.dv5Listener = listener go s.listenForNewNodes() } @@ -393,12 +394,17 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er s.pingMethodLock.Unlock() } -func (s *Service) pingPeers() { +func (s *Service) pingPeersAndLogEnr() { s.pingMethodLock.RLock() defer s.pingMethodLock.RUnlock() + + localENR := s.dv5Listener.Self() + log.WithField("ENR", localENR).Info("New node record") + if s.pingMethod == nil { return } + for _, pid := range s.peers.Connected() { go func(id peer.ID) { if err := s.pingMethod(s.ctx, id); err != nil { diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index cd3370cf3b89..5c270f7c0535 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "math" "strings" "sync" "time" @@ -20,9 +21,9 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/v5/crypto/hash" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" - mathutil "github.com/prysmaticlabs/prysm/v5/math" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/sirupsen/logrus" ) var attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount @@ -53,6 +54,79 @@ const blobSubnetLockerVal = 110 // chosen more than sync, attestation and blob subnet (6) combined. const dataColumnSubnetVal = 150 +// nodeFilter return a function that filters nodes based on the subnet topic and subnet index. +func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) { + switch { + case strings.Contains(topic, GossipAttestationMessage): + return s.filterPeerForAttSubnet(index), nil + case strings.Contains(topic, GossipSyncCommitteeMessage): + return s.filterPeerForSyncSubnet(index), nil + case strings.Contains(topic, GossipDataColumnSidecarMessage): + return s.filterPeerForDataColumnsSubnet(index), nil + default: + return nil, errors.Errorf("no subnet exists for provided topic: %s", topic) + } +} + +// searchForPeers performs a network search for peers subscribed to a particular subnet. +// It exits as soon as one of these conditions is met: +// - It looped through `batchSize` nodes. +// - It found `peersToFindCount“ peers corresponding to the `filter` criteria. +// - Iterator is exhausted. +func searchForPeers( + iterator enode.Iterator, + batchSize int, + peersToFindCount int, + filter func(node *enode.Node) bool, +) []*enode.Node { + nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize) + for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ { + node := iterator.Node() + + // Filter out nodes that do not meet the criteria. + if !filter(node) { + continue + } + + // Remove duplicates, keeping the node with higher seq. + prevNode, ok := nodeFromNodeID[node.ID()] + if ok && prevNode.Seq() > node.Seq() { + continue + } + + nodeFromNodeID[node.ID()] = node + } + + // Convert the map to a slice. + nodes := make([]*enode.Node, 0, len(nodeFromNodeID)) + for _, node := range nodeFromNodeID { + nodes = append(nodes, node) + } + + return nodes +} + +// dialPeer dials a peer in a separate goroutine. +func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) { + info, _, err := convertToAddrInfo(node) + if err != nil { + return + } + + if info == nil { + return + } + + wg.Add(1) + go func() { + if err := s.connectWithPeer(ctx, *info); err != nil { + log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + } + + wg.Done() + }() +} + // FindPeersWithSubnet performs a network search for peers // subscribed to a particular subnet. Then it tries to connect // with those peers. This method will block until either: @@ -61,69 +135,96 @@ const dataColumnSubnetVal = 150 // On some edge cases, this method may hang indefinitely while peers // are actually found. In such a case, the user should cancel the context // and re-run the method again. -func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, - index uint64, threshold int) (bool, error) { +func (s *Service) FindPeersWithSubnet( + ctx context.Context, + topic string, + index uint64, + threshold int, +) (bool, error) { + const batchSize = 2000 + ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") defer span.End() span.SetAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing. if s.dv5Listener == nil { - // return if discovery isn't set + // Return if discovery isn't set return false, nil } topic += s.Encoding().ProtocolSuffix() iterator := s.dv5Listener.RandomNodes() defer iterator.Close() - switch { - case strings.Contains(topic, GossipAttestationMessage): - iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(index)) - case strings.Contains(topic, GossipSyncCommitteeMessage): - iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(index)) - case strings.Contains(topic, GossipDataColumnSidecarMessage): - iterator = filterNodes(ctx, iterator, s.filterPeerForDataColumnsSubnet(index)) - default: - return false, errors.Errorf("no subnet exists for provided topic: %s", topic) + + filter, err := s.nodeFilter(topic, index) + if err != nil { + return false, errors.Wrap(err, "node filter") + } + + peersSummary := func(topic string, threshold int) (int, int) { + // Retrieve how many peers we have for this topic. + peerCountForTopic := len(s.pubsub.ListPeers(topic)) + + // Compute how many peers we are missing to reach the threshold. + missingPeerCountForTopic := max(0, threshold-peerCountForTopic) + + return peerCountForTopic, missingPeerCountForTopic } + // Compute how many peers we are missing to reach the threshold. + peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold) + + // Exit early if we have enough peers. + if missingPeerCountForTopic == 0 { + return true, nil + } + + log.WithFields(logrus.Fields{ + "topic": topic, + "currentPeerCount": peerCountForTopic, + "targetPeerCount": threshold, + }).Debug("Searching for new peers in the network - Start") + wg := new(sync.WaitGroup) for { - currNum := len(s.pubsub.ListPeers(topic)) - if currNum >= threshold { + // If we have enough peers, we can exit the loop. This is the happy path. + if missingPeerCountForTopic == 0 { break } + + // If the context is done, we can exit the loop. This is the unhappy path. if err := ctx.Err(); err != nil { - return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+ - "only %d out of %d peers were able to be found", topic, currNum, threshold) + return false, errors.Errorf( + "unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching", + topic, peerCountForTopic, threshold, + ) } - nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch) + + // Search for new peers in the network. + nodes := searchForPeers(iterator, batchSize, missingPeerCountForTopic, filter) + // Restrict dials if limit is applied. + maxConcurrentDials := math.MaxInt if flags.MaxDialIsActive() { - nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials) + maxConcurrentDials = flags.Get().MaxConcurrentDials } - nodes := enode.ReadNodes(iterator, nodeCount) - for _, node := range nodes { - info, _, err := convertToAddrInfo(node) - if err != nil { - continue - } - if info == nil { - continue + // Dial the peers in batches. + for start := 0; start < len(nodes); start += maxConcurrentDials { + stop := min(start+maxConcurrentDials, len(nodes)) + for _, node := range nodes[start:stop] { + s.dialPeer(ctx, wg, node) } - wg.Add(1) - go func() { - if err := s.connectWithPeer(ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) - } - wg.Done() - }() + // Wait for all dials to be completed. + wg.Wait() } - // Wait for all dials to be completed. - wg.Wait() + + _, missingPeerCountForTopic = peersSummary(topic, threshold) } + + log.WithField("topic", topic).Debug("Searching for new peers in the network - Success") return true, nil } @@ -183,11 +284,17 @@ func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode. // lower threshold to broadcast object compared to searching // for a subnet. So that even in the event of poor peer // connectivity, we can still broadcast an attestation. -func (s *Service) hasPeerWithSubnet(topic string) bool { +func (s *Service) hasPeerWithSubnet(subnetTopic string) bool { // In the event peer threshold is lower, we will choose the lower // threshold. - minPeers := mathutil.Min(1, uint64(flags.Get().MinimumPeersPerSubnet)) - return len(s.pubsub.ListPeers(topic+s.Encoding().ProtocolSuffix())) >= int(minPeers) // lint:ignore uintcast -- Min peers can be safely cast to int. + minPeers := min(1, flags.Get().MinimumPeersPerSubnet) + topic := subnetTopic + s.Encoding().ProtocolSuffix() + peersWithSubnet := s.pubsub.ListPeers(topic) + peersWithSubnetCount := len(peersWithSubnet) + + enoughPeers := peersWithSubnetCount >= minPeers + + return enoughPeers } // Updates the service's discv5 listener record's attestation subnet diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 842b490cef8e..9c4ec5521c1f 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -305,7 +305,7 @@ func (*TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { // AddConnectionHandler handles the connection with a newly connected peer. func (p *TestP2P) AddConnectionHandler(f, _ func(ctx context.Context, id peer.ID) error) { p.BHost.Network().Notify(&network.NotifyBundle{ - ConnectedF: func(net network.Network, conn network.Conn) { + ConnectedF: func(_ network.Network, conn network.Conn) { // Must be handled in a goroutine as this callback cannot be blocking. go func() { p.peers.Add(new(enr.Record), conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction) @@ -329,7 +329,7 @@ func (p *TestP2P) AddConnectionHandler(f, _ func(ctx context.Context, id peer.ID // AddDisconnectionHandler -- func (p *TestP2P) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) { p.BHost.Network().Notify(&network.NotifyBundle{ - DisconnectedF: func(net network.Network, conn network.Conn) { + DisconnectedF: func(_ network.Network, conn network.Conn) { // Must be handled in a goroutine as this callback cannot be blocking. go func() { p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting) diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index 27c16fc0f2bf..d83b119bbe8f 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -3,6 +3,7 @@ package sync import ( "context" "fmt" + "slices" "sort" "sync" "time" @@ -182,14 +183,17 @@ func (d *dataColumnSampler1D) refreshPeerInfo() { } } - columnWithNoPeers := make([]uint64, 0) + columnsWithoutPeers := make([]uint64, 0) for column, peers := range d.peerFromColumn { if len(peers) == 0 { - columnWithNoPeers = append(columnWithNoPeers, column) + columnsWithoutPeers = append(columnsWithoutPeers, column) } } - if len(columnWithNoPeers) > 0 { - log.WithField("columnWithNoPeers", columnWithNoPeers).Warn("Some columns have no peers responsible for custody") + + slices.Sort[[]uint64](columnsWithoutPeers) + + if len(columnsWithoutPeers) > 0 { + log.WithField("columns", columnsWithoutPeers).Warn("Some columns have no peers responsible for custody") } } diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 2097b776ca5c..942d8422ceb2 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -21,6 +21,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v5/config/features" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/container/slice" @@ -191,7 +192,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s // Do not resubscribe already seen subscriptions. ok := s.subHandler.topicExists(topic) if ok { - log.Debugf("Provided topic already has an active subscription running: %s", topic) + log.WithField("topic", topic).Debug("Provided topic already has an active subscription running") return nil } @@ -208,6 +209,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s log.WithError(err).Error("Could not subscribe topic") return nil } + s.subHandler.addTopic(sub.Topic(), sub) // Pipeline decodes the incoming subscription data, runs the validation, and handles the @@ -215,6 +217,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s pipeline := func(msg *pubsub.Message) { ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout) defer cancel() + ctx, span := trace.StartSpan(ctx, "sync.pubsub") defer span.End() @@ -389,8 +392,6 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, // Check every slot that there are enough peers for i := uint64(0); i < subnetCount; i++ { if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) { - log.Debugf("No peers found subscribed to attestation gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", i) _, err := s.cfg.p2p.FindPeersWithSubnet( s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), @@ -454,10 +455,8 @@ func (s *Service) subscribeDynamicWithSubnets( return } wantedSubs := s.retrievePersistentSubs(currentSlot) - // Resize as appropriate. s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) - // subscribe desired aggregator subnets. for _, idx := range wantedSubs { s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle) } @@ -471,9 +470,15 @@ func (s *Service) subscribeDynamicWithSubnets( }() } -// revalidate that our currently connected subnets are valid. -func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription, - wantedSubs []uint64, topicFormat string, digest [4]byte) { +// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are +// not in the list of wanted subnets. +// TODO: Rename this functions as it does not only revalidate subscriptions. +func (s *Service) reValidateSubscriptions( + subscriptions map[uint64]*pubsub.Subscription, + wantedSubs []uint64, + topicFormat string, + digest [4]byte, +) { for k, v := range subscriptions { var wanted bool for _, idx := range wantedSubs { @@ -482,6 +487,7 @@ func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc break } } + if !wanted && v != nil { v.Cancel() fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix() @@ -508,34 +514,6 @@ func (s *Service) subscribeAggregatorSubnet( subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) } if !s.validPeersExist(subnetTopic) { - log.Debugf("No peers found subscribed to attestation gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", idx) - _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) - if err != nil { - log.WithError(err).Debug("Could not search for peers") - } - } -} - -// subscribe missing subnets for our sync committee members. -func (s *Service) subscribeSyncSubnet( - subscriptions map[uint64]*pubsub.Subscription, - idx uint64, - digest [4]byte, - validate wrappedVal, - handle subHandler, -) { - // do not subscribe if we have no peers in the same - // subnet - topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] - subnetTopic := fmt.Sprintf(topic, digest, idx) - // check if subscription exists and if not subscribe the relevant subnet. - if _, exists := subscriptions[idx]; !exists { - subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) - } - if !s.validPeersExist(subnetTopic) { - log.Debugf("No peers found subscribed to sync gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", idx) _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -589,8 +567,6 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrapped // Check every slot that there are enough peers for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) { - log.Debugf("No peers found subscribed to sync gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", i) _, err := s.cfg.p2p.FindPeersWithSubnet( s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), @@ -608,59 +584,138 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrapped }() } -// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible -// string for the topic name and the list of subnets for subscribed topics that should be -// maintained. +// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed. +// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise. +func (s *Service) subscribeToSyncSubnets( + topicFormat string, + digest [4]byte, + genesisValidatorsRoot [fieldparams.RootLength]byte, + genesisTime time.Time, + subscriptions map[uint64]*pubsub.Subscription, + currentSlot primitives.Slot, + validate wrappedVal, + handle subHandler, +) bool { + // Get sync subnets topic. + topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] + + // Do not subscribe if not synced. + if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { + return true + } + + // Do not subscribe is the digest is not valid. + valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot) + if err != nil { + log.Error(err) + return true + } + + // Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork. + if !valid { + log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.") + s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) + return false + } + + // Get the current epoch. + currentEpoch := slots.ToEpoch(currentSlot) + + // Retrieve the subnets we want to subscribe to. + wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch) + + // Remove subscriptions that are no longer wanted. + s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest) + + // Subscribe to wanted subnets. + for _, subnetIndex := range wantedSubnetsIndex { + subnetTopic := fmt.Sprintf(topic, digest, subnetIndex) + + // Check if subscription exists. + if _, exists := subscriptions[subnetIndex]; exists { + continue + } + + // We need to subscribe to the subnet. + subscription := s.subscribeWithBase(subnetTopic, validate, handle) + subscriptions[subnetIndex] = subscription + } + + // Find new peers for wanted subnets if needed. + for _, subnetIndex := range wantedSubnetsIndex { + subnetTopic := fmt.Sprintf(topic, digest, subnetIndex) + + // Check if we have enough peers in the subnet. Skip if we do. + if s.validPeersExist(subnetTopic) { + continue + } + + // Not enough peers in the subnet, we need to search for more. + _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet) + if err != nil { + log.WithError(err).Debug("Could not search for peers") + } + } + + return true +} + +// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets. func (s *Service) subscribeDynamicWithSyncSubnets( topicFormat string, validate wrappedVal, handle subHandler, digest [4]byte, ) { - genRoot := s.cfg.clock.GenesisValidatorsRoot() - _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) + // Retrieve the number of committee subnets we need to subscribe to. + syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount + + // Initialize the subscriptions map. + subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount) + + // Retrieve the genesis validators root. + genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() + + // Retrieve the epoch of the fork corresponding to the digest. + _, e, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) if err != nil { panic(err) } + + // Retrieve the base protobuf message. base := p2p.GossipTopicMappings(topicFormat, e) if base == nil { panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) } - subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().SyncCommitteeSubnetCount) - genesis := s.cfg.clock.GenesisTime() - ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) + + // Retrieve the genesis time. + genesisTime := s.cfg.clock.GenesisTime() + + // Define a ticker ticking every slot. + secondsPerSlot := params.BeaconConfig().SecondsPerSlot + ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot) + + // Retrieve the current slot. + currentSlot := s.cfg.clock.CurrentSlot() + + // Subscribe to the sync subnets. + s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) go func() { for { select { - case <-s.ctx.Done(): - ticker.Done() - return case currentSlot := <-ticker.C(): - if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - continue - } - valid, err := isDigestValid(digest, genesis, genRoot) - if err != nil { - log.Error(err) - continue - } - if !valid { - log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) - // Unsubscribes from all our current subnets. - s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) + isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) + + // Stop the ticker if the digest is not valid. Likely to happen after a hard fork. + if !isDigestValid { ticker.Done() return } - wantedSubs := s.retrieveActiveSyncSubnets(slots.ToEpoch(currentSlot)) - // Resize as appropriate. - s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) - - // subscribe desired aggregator subnets. - for _, idx := range wantedSubs { - s.subscribeSyncSubnet(subscriptions, idx, digest, validate, handle) - } + case <-s.ctx.Done(): + ticker.Done() + return } } }() @@ -686,12 +741,6 @@ func (s *Service) subscribeColumnSubnet( minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet if !s.validPeersExist(subnetTopic) { - log.WithFields(logrus.Fields{ - "columnSubnet": idx, - "minimumPeersPerSubnet": minimumPeersPerSubnet, - "topic": subnetTopic, - }).Debug("No peers found subscribed to column gossip subnet. Searching network for peers subscribed to it") - _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, minimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -763,8 +812,6 @@ func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] subnetTopic := fmt.Sprintf(topic, digest, idx) if !s.validPeersExist(subnetTopic) { - log.Debugf("No peers found subscribed to attestation gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", idx) // perform a search for peers with the desired committee index. _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { @@ -790,8 +837,13 @@ func (s *Service) unSubscribeFromTopic(topic string) { // find if we have peers who are subscribed to the same subnet func (s *Service) validPeersExist(subnetTopic string) bool { - numOfPeers := s.cfg.p2p.PubSub().ListPeers(subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix()) - return len(numOfPeers) >= flags.Get().MinimumPeersPerSubnet + topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix() + threshold := flags.Get().MinimumPeersPerSubnet + + peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic) + peersWithSubnetCount := len(peersWithSubnet) + + return peersWithSubnetCount >= threshold } func (s *Service) retrievePersistentSubs(currSlot primitives.Slot) []uint64 { diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index fb2654c27f56..69c7283d8f82 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -168,7 +168,7 @@ func before(ctx *cli.Context) error { switch format { case "text": formatter := new(prefixed.TextFormatter) - formatter.TimestampFormat = "2006-01-02 15:04:05" + formatter.TimestampFormat = "2006-01-02 15:04:05.00" formatter.FullTimestamp = true // If persistent log files are written - we disable the log messages coloring because @@ -184,7 +184,9 @@ func before(ctx *cli.Context) error { logrus.SetFormatter(f) case "json": - logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetFormatter(&logrus.JSONFormatter{ + TimestampFormat: "2006-01-02 15:04:05.00", + }) case "journald": if err := journald.Enable(); err != nil { return err diff --git a/config/features/config.go b/config/features/config.go index 869415ec36c5..a8ec0a97f6b3 100644 --- a/config/features/config.go +++ b/config/features/config.go @@ -147,6 +147,7 @@ func configureTestnet(ctx *cli.Context) error { } else { if ctx.IsSet(cmd.ChainConfigFileFlag.Name) { log.Warn("Running on custom Ethereum network specified in a chain configuration yaml file") + params.UseCustomNetworkConfig() } else { log.Info("Running on Ethereum Mainnet") } @@ -158,11 +159,11 @@ func configureTestnet(ctx *cli.Context) error { } // Insert feature flags within the function to be enabled for Sepolia testnet. -func applySepoliaFeatureFlags(ctx *cli.Context) { +func applySepoliaFeatureFlags(_ *cli.Context) { } // Insert feature flags within the function to be enabled for Holesky testnet. -func applyHoleskyFeatureFlags(ctx *cli.Context) { +func applyHoleskyFeatureFlags(_ *cli.Context) { } // ConfigureBeaconChain sets the global config based diff --git a/config/params/BUILD.bazel b/config/params/BUILD.bazel index eed7a596fe11..574b491d61a6 100644 --- a/config/params/BUILD.bazel +++ b/config/params/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "mainnet_config.go", "minimal_config.go", "network_config.go", + "testnet_custom_network_config.go", "testnet_e2e_config.go", "testnet_holesky_config.go", "testnet_sepolia_config.go", diff --git a/config/params/testnet_custom_network_config.go b/config/params/testnet_custom_network_config.go new file mode 100644 index 000000000000..7ce6780fd59a --- /dev/null +++ b/config/params/testnet_custom_network_config.go @@ -0,0 +1,9 @@ +package params + +func UseCustomNetworkConfig() { + cfg := BeaconNetworkConfig().Copy() + cfg.ContractDeploymentBlock = 0 + cfg.BootstrapNodes = []string{} + + OverrideBeaconNetworkConfig(cfg) +}