Skip to content

Commit

Permalink
Merge branch 'master' into heartbeat-check-todo
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx authored Sep 25, 2023
2 parents 3f1d7a1 + 2558946 commit 35a4cfd
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 98 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ Description of the upcoming release here.

### Changed

- [#1380](https://github.com/FuelLabs/fuel-core/pull/1380): Add preliminary, hard-coded config values for heartbeat peer reputation, removing `todo`
- [#1380](https://github.com/FuelLabs/fuel-core/pull/1380): Add preliminary, hard-coded config values for heartbeat peer reputation, removing `todo`.
- [#1377](https://github.com/FuelLabs/fuel-core/pull/1377): Remove `DiscoveryEvent` and use `KademliaEvent` directly in `DiscoveryBehavior`.
- [#1366](https://github.com/FuelLabs/fuel-core/pull/1366): Improve caching during docker builds in CI by replacing gha
- [#1358](https://github.com/FuelLabs/fuel-core/pull/1358): Upgraded the Rust version used in CI to 1.72.0. Also includes associated Clippy changes.
- [#1318](https://github.com/FuelLabs/fuel-core/pull/1318): Modified block synchronization to use asynchronous task execution when retrieving block headers.
Expand Down
8 changes: 4 additions & 4 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
discovery::{
DiscoveryBehaviour,
DiscoveryConfig,
DiscoveryEvent,
},
gossipsub::{
config::build_gossipsub_behaviour,
Expand Down Expand Up @@ -40,10 +39,11 @@ use libp2p::{
Multiaddr,
PeerId,
};
use libp2p_kad::KademliaEvent;

#[derive(Debug)]
pub enum FuelBehaviourEvent {
Discovery(DiscoveryEvent),
Discovery(KademliaEvent),
PeerReport(PeerReportEvent),
Gossipsub(GossipsubEvent),
RequestResponse(RequestResponseEvent<RequestMessage, NetworkResponse>),
Expand Down Expand Up @@ -190,8 +190,8 @@ impl<Codec: NetworkCodec> FuelBehaviour<Codec> {
}
}

impl From<DiscoveryEvent> for FuelBehaviourEvent {
fn from(event: DiscoveryEvent) -> Self {
impl From<KademliaEvent> for FuelBehaviourEvent {
fn from(event: KademliaEvent) -> Self {
FuelBehaviourEvent::Discovery(event)
}
}
Expand Down
101 changes: 13 additions & 88 deletions crates/services/p2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use libp2p::{
handler::KademliaHandlerProto,
store::MemoryStore,
Kademlia,
KademliaEvent,
QueryId,
},
mdns::Event as MdnsEvent,
Expand All @@ -27,11 +26,9 @@ use libp2p::{
Multiaddr,
PeerId,
};
use libp2p_kad::KademliaEvent;
use std::{
collections::{
HashSet,
VecDeque,
},
collections::HashSet,
pin::Pin,
task::{
Context,
Expand All @@ -46,19 +43,6 @@ pub use discovery_config::DiscoveryConfig;

const SIXTY_SECONDS: Duration = Duration::from_secs(60);

/// Event generated by the `DiscoveryBehaviour`.
#[derive(Debug)]
pub enum DiscoveryEvent {
/// Notify the swarm of an UnroutablePeer
UnroutablePeer(PeerId),

/// Notify the swarm of a connected peer and its addresses
PeerInfoOnConnect {
peer_id: PeerId,
addresses: Vec<Multiaddr>,
},
}

/// NetworkBehavior for discovery of nodes
pub struct DiscoveryBehaviour {
/// List of bootstrap nodes and their addresses
Expand All @@ -70,9 +54,6 @@ pub struct DiscoveryBehaviour {
/// Track the connected peers
connected_peers: HashSet<PeerId>,

/// Events to report to the swarm
pending_events: VecDeque<DiscoveryEvent>,

/// For discovery on local network, optionally available
mdns: MdnsWrapper,

Expand Down Expand Up @@ -103,7 +84,7 @@ impl DiscoveryBehaviour {

impl NetworkBehaviour for DiscoveryBehaviour {
type ConnectionHandler = KademliaHandlerProto<QueryId>;
type OutEvent = DiscoveryEvent;
type OutEvent = KademliaEvent;

// Initializes new handler on a new opened connection
fn new_handler(&mut self) -> Self::ConnectionHandler {
Expand Down Expand Up @@ -131,13 +112,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}) => {
if *other_established == 0 {
self.connected_peers.insert(*peer_id);
let addresses = self.addresses_of_peer(peer_id);

self.pending_events
.push_back(DiscoveryEvent::PeerInfoOnConnect {
peer_id: *peer_id,
addresses,
});

trace!("Connected to a peer {:?}", peer_id);
}
Expand All @@ -163,10 +137,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(next_event) = self.pending_events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(next_event))
}

// if random walk is enabled poll the stream that will fire when random walk is scheduled
if let Some(next_kad_random_query) = self.next_kad_random_walk.as_mut() {
while next_kad_random_query.poll_unpin(cx).is_ready() {
Expand All @@ -184,59 +154,15 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}

// poll Kademlia behaviour
while let Poll::Ready(kad_action) = self.kademlia.poll(cx, params) {
match kad_action {
NetworkBehaviourAction::GenerateEvent(
KademliaEvent::UnroutablePeer { peer },
) => {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
DiscoveryEvent::UnroutablePeer(peer),
))
}

NetworkBehaviourAction::Dial { handler, opts } => {
return Poll::Ready(NetworkBehaviourAction::Dial { handler, opts })
}
NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
} => {
return Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
})
}
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
} => {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
})
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address,
score,
})
}
_ => {}
}
}

// poll sub-behaviors
if let Poll::Ready(kad_action) = self.kademlia.poll(cx, params) {
return Poll::Ready(kad_action)
};
while let Poll::Ready(mdns_event) = self.mdns.poll(cx, params) {
match mdns_event {
NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(list)) => {
// inform kademlia of newly discovered local peers
// only if there aren't enough peers already connected
if self.connected_peers.len() < self.max_peers_connected {
for (peer_id, multiaddr) in list {
self.kademlia.add_address(&peer_id, multiaddr);
}
for (peer_id, multiaddr) in list {
self.kademlia.add_address(&peer_id, multiaddr);
}
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
Expand All @@ -257,7 +183,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
_ => {}
}
}

Poll::Pending
}

Expand Down Expand Up @@ -311,8 +236,8 @@ mod tests {
use super::{
DiscoveryBehaviour,
DiscoveryConfig,
KademliaEvent,
};
use crate::discovery::DiscoveryEvent;
use futures::{
future::poll_fn,
StreamExt,
Expand Down Expand Up @@ -443,9 +368,9 @@ mod tests {
// if peer has connected - remove it from the set
left_to_discover[swarm_index].remove(&peer_id);
}
SwarmEvent::Behaviour(DiscoveryEvent::UnroutablePeer(
peer_id,
)) => {
SwarmEvent::Behaviour(KademliaEvent::UnroutablePeer {
peer: peer_id,
}) => {
// kademlia discovered a peer but does not have it's address
// we simulate Identify happening and provide the address
let unroutable_peer_addr = discovery_swarms
Expand Down
6 changes: 1 addition & 5 deletions crates/services/p2p/src/discovery/discovery_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use libp2p::{
PeerId,
};
use std::{
collections::{
HashSet,
VecDeque,
},
collections::HashSet,
time::Duration,
};
use tracing::warn;
Expand Down Expand Up @@ -183,7 +180,6 @@ impl DiscoveryConfig {
bootstrap_nodes,
reserved_nodes,
connected_peers: HashSet::new(),
pending_events: VecDeque::new(),
kademlia,
next_kad_random_walk,
duration_to_next_kad: Duration::from_secs(1),
Expand Down

0 comments on commit 35a4cfd

Please sign in to comment.