From 2aca333e1832595295a9cbc9973ab9871ec82317 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 6 Mar 2024 13:50:12 +1100 Subject: [PATCH 1/4] Attempt to publish to mesh_n peers --- .../src/gossipsub/behaviour.rs | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs b/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs index 9769adca278..1136b92c400 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs @@ -635,9 +635,33 @@ where || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 })); } else { - match self.mesh.get(&raw_message.topic) { + match self.mesh.get(&topic_hash) { // Mesh peers Some(mesh_peers) => { + // We have a mesh set. We want to make sure to publish to at least `mesh_n` + // peers (if possible). + let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len()); + + if needed_extra_peers > 0 { + // We don't have `mesh_n` peers in our mesh, we will randomly select extras + // and publish to them. + + let explicit_peers = &self.explicit_peers; + let scores = self.peer_score.as_ref().map(|(score, ..)| score); + // Get a random set of peers that are appropriate to send messages too. + let peer_list = get_random_peers( + &self.connected_peers, + &topic_hash, + needed_extra_peers, + |peer| { + !mesh_peers.contains(peer) + && !explicit_peers.contains(peer) + && scores.map(|score| score.score(peer)).unwrap_or(0.0) >= 0.0 + }, + ); + recipient_peers.extend(peer_list); + } + recipient_peers.extend(mesh_peers); } // Gossipsub peers From cd73ca4e0bd77de9a798078d690ac6a96dd8b435 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 6 Mar 2024 14:04:17 +1100 Subject: [PATCH 2/4] Differentiate errors and better scoring --- .../src/gossipsub/behaviour.rs | 17 ++++++++++++++--- .../lighthouse_network/src/gossipsub/error.rs | 3 +++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs b/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs index 1136b92c400..c14f8e0141f 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs @@ -647,7 +647,10 @@ where // and publish to them. let explicit_peers = &self.explicit_peers; - let scores = self.peer_score.as_ref().map(|(score, ..)| score); + let scores = self + .peer_score + .as_ref() + .map(|(score, thresholds, ..)| (score, thresholds)); // Get a random set of peers that are appropriate to send messages too. let peer_list = get_random_peers( &self.connected_peers, @@ -656,7 +659,11 @@ where |peer| { !mesh_peers.contains(peer) && !explicit_peers.contains(peer) - && scores.map(|score| score.score(peer)).unwrap_or(0.0) >= 0.0 + && scores + .map(|(score, thresholds)| { + score.score(peer) > thresholds.publish_threshold + }) + .unwrap_or(true) }, ); recipient_peers.extend(peer_list); @@ -753,10 +760,14 @@ where } } - if publish_failed { + if recipient_peers.is_empty() { return Err(PublishError::InsufficientPeers); } + if publish_failed { + return Err(PublishError::AllQueuesFull(recipient_peers.len())); + } + tracing::debug!(message=%msg_id, "Published message"); if let Some(metrics) = self.metrics.as_mut() { diff --git a/beacon_node/lighthouse_network/src/gossipsub/error.rs b/beacon_node/lighthouse_network/src/gossipsub/error.rs index d00e1ec6d22..df3332bc923 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/error.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/error.rs @@ -36,6 +36,9 @@ pub enum PublishError { MessageTooLarge, /// The compression algorithm failed. TransformFailed(std::io::Error), + /// Messages could not be sent because all queues for peers were full. The usize represents the + /// number of peers that have full queues. + AllQueuesFull(usize), } impl std::fmt::Display for PublishError { From c4237455562435f423f190fe08ad64280d2d62d0 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 6 Mar 2024 16:01:37 +1100 Subject: [PATCH 3/4] Fix gossipsub tests --- .../lighthouse_network/src/gossipsub/behaviour/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/lighthouse_network/src/gossipsub/behaviour/tests.rs b/beacon_node/lighthouse_network/src/gossipsub/behaviour/tests.rs index eb006e52928..f191d38f515 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/behaviour/tests.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/behaviour/tests.rs @@ -741,8 +741,8 @@ fn test_publish_without_flood_publishing() { let config: Config = Config::default(); assert_eq!( publishes.len(), - config.mesh_n_low(), - "Should send a publish message to all known peers" + config.mesh_n(), + "Should send a publish message to at least mesh_n peers" ); assert!( From c7e1cfe6aeaace82099e893148d0642ee18f8737 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 7 Mar 2024 16:09:25 +1100 Subject: [PATCH 4/4] Code improvements --- .../src/gossipsub/behaviour.rs | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs b/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs index c14f8e0141f..455c285c811 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs @@ -646,11 +646,6 @@ where // We don't have `mesh_n` peers in our mesh, we will randomly select extras // and publish to them. - let explicit_peers = &self.explicit_peers; - let scores = self - .peer_score - .as_ref() - .map(|(score, thresholds, ..)| (score, thresholds)); // Get a random set of peers that are appropriate to send messages too. let peer_list = get_random_peers( &self.connected_peers, @@ -658,12 +653,10 @@ where needed_extra_peers, |peer| { !mesh_peers.contains(peer) - && !explicit_peers.contains(peer) - && scores - .map(|(score, thresholds)| { - score.score(peer) > thresholds.publish_threshold - }) - .unwrap_or(true) + && !self.explicit_peers.contains(peer) + && !self + .score_below_threshold(peer, |pst| pst.publish_threshold) + .0 }, ); recipient_peers.extend(peer_list); @@ -2238,10 +2231,9 @@ where if outbound <= self.config.mesh_outbound_min() { // do not remove anymore outbound peers continue; - } else { - // an outbound peer gets removed - outbound -= 1; } + // an outbound peer gets removed + outbound -= 1; } // remove the peer