diff --git a/mm2src/floodsub/src/layer.rs b/mm2src/floodsub/src/layer.rs index 63933114b3..7a076745c6 100644 --- a/mm2src/floodsub/src/layer.rs +++ b/mm2src/floodsub/src/layer.rs @@ -53,7 +53,9 @@ pub struct Floodsub { impl Floodsub { /// Creates a `Floodsub` with default configuration. - pub fn new(local_peer_id: PeerId) -> Self { Self::from_config(FloodsubConfig::new(local_peer_id)) } + pub fn new(local_peer_id: PeerId, forward_messages: bool) -> Self { + Self::from_config(FloodsubConfig::new(local_peer_id, forward_messages)) + } /// Creates a `Floodsub` with the given configuration. pub fn from_config(config: FloodsubConfig) -> Self { @@ -286,23 +288,25 @@ impl NetworkBehaviour for Floodsub { self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } - // Propagate the message to everyone else who is subscribed to any of the topics. - for (peer_id, subscr_topics) in self.connected_peers.iter() { - if peer_id == &propagation_source { - continue; - } + if self.config.forward_messages { + // Propagate the message to everyone else who is subscribed to any of the topics. + for (peer_id, subscr_topics) in self.connected_peers.iter() { + if peer_id == &propagation_source { + continue; + } - if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) { - continue; - } + if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) { + continue; + } - if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) { - rpcs_to_dispatch[pos].1.messages.push(message.clone()); - } else { - rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc { - subscriptions: Vec::new(), - messages: vec![message.clone()], - })); + if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) { + rpcs_to_dispatch[pos].1.messages.push(message.clone()); + } else { + rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc { + subscriptions: Vec::new(), + messages: vec![message.clone()], + })); + } } } } diff --git a/mm2src/floodsub/src/lib.rs b/mm2src/floodsub/src/lib.rs index c848fdd7e0..90592a9fe5 100644 --- a/mm2src/floodsub/src/lib.rs +++ b/mm2src/floodsub/src/lib.rs @@ -46,13 +46,16 @@ pub struct FloodsubConfig { /// `true` if messages published by local node should be propagated as messages received from /// the network, `false` by default. pub subscribe_local_messages: bool, + + pub forward_messages: bool, } impl FloodsubConfig { - pub fn new(local_peer_id: PeerId) -> Self { + pub fn new(local_peer_id: PeerId, forward_messages: bool) -> Self { Self { local_peer_id, subscribe_local_messages: false, + forward_messages, } } } diff --git a/mm2src/lp_native_dex.rs b/mm2src/lp_native_dex.rs index 39976fe93e..4144b296be 100644 --- a/mm2src/lp_native_dex.rs +++ b/mm2src/lp_native_dex.rs @@ -44,6 +44,7 @@ use crate::mm2::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, lp_order BalanceUpdateOrdermatchHandler}; use crate::mm2::lp_swap::{running_swaps_num, swap_kick_starts}; use crate::mm2::rpc::spawn_rpc; +use bitcrypto::sha256; pub fn lp_ports(netid: u16) -> Result<(u16, u16, u16), String> { const LP_RPCPORT: u16 = 7783; @@ -478,15 +479,29 @@ pub async fn lp_init(mypubport: u16, ctx: MmArc) -> Result<(), String> { }; let ctx_on_poll = ctx.clone(); - let (cmd_tx, event_rx, peer_id) = - start_gossipsub(myipaddr, mypubport, spawn_boxed, seednodes, i_am_seed, move |swarm| { + let force_p2p_key = if i_am_seed { + let key = sha256(&*ctx.secp256k1_key_pair().private().secret); + Some(key.take()) + } else { + None + }; + let (cmd_tx, event_rx, peer_id) = start_gossipsub( + myipaddr, + mypubport, + ctx.netid(), + force_p2p_key, + spawn_boxed, + seednodes, + i_am_seed, + move |swarm| { mm_gauge!( ctx_on_poll.metrics, "p2p.connected_relays.len", swarm.connected_relays_len() as i64 ); mm_gauge!(ctx_on_poll.metrics, "p2p.relay_mesh.len", swarm.relay_mesh_len() as i64); - }); + }, + ); try_s!(ctx.peer_id.pin(peer_id.to_string())); let p2p_context = P2PContext::new(cmd_tx); p2p_context.store_to_mm_arc(&ctx); diff --git a/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs b/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs index 8a933e863d..5427938420 100644 --- a/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs +++ b/mm2src/mm2_libp2p/src/atomicdex_behaviour.rs @@ -20,8 +20,10 @@ use libp2p::{core::{ConnectedPoint, Multiaddr, Transport}, use libp2p_floodsub::{Floodsub, FloodsubEvent, Topic as FloodsubTopic}; use log::{debug, error, info}; use rand::{seq::SliceRandom, thread_rng}; +use std::collections::HashSet; use std::{collections::hash_map::{DefaultHasher, HashMap}, hash::{Hash, Hasher}, + iter::{self, FromIterator}, net::IpAddr, pin::Pin, task::{Context, Poll}, @@ -42,6 +44,7 @@ const CONNECTED_RELAYS_CHECK_INTERVAL: Duration = Duration::from_secs(30); const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(600); const ANNOUNCE_INITIAL_DELAY: Duration = Duration::from_secs(60); const CHANNEL_BUF_SIZE: usize = 1024 * 8; +const NETID_7777: u16 = 7777; impl libp2p::core::Executor for &SwarmRuntime { fn exec(&self, future: Pin + Send>>) { self.0.spawn(future); } @@ -237,6 +240,8 @@ pub struct AtomicDexBehaviour { spawn_fn: fn(Box + Send + Unpin + 'static>) -> (), #[behaviour(ignore)] cmd_rx: Receiver, + #[behaviour(ignore)] + netid: u16, floodsub: Floodsub, gossipsub: Gossipsub, request_response: RequestResponseBehaviour, @@ -397,14 +402,17 @@ impl NetworkBehaviourEventProcess for AtomicDexBehaviour { impl NetworkBehaviourEventProcess for AtomicDexBehaviour { fn inject_event(&mut self, event: FloodsubEvent) { - if let FloodsubEvent::Message(message) = &event { - for topic in &message.topics { - if topic == &FloodsubTopic::new(PEERS_TOPIC) { - let addresses: PeerAddresses = match rmp_serde::from_read_ref(&message.data) { - Ok(a) => a, - Err(_) => return, - }; - self.peers_exchange.add_peer_addresses(&message.source, addresses); + // do not process peer announce on 7777 temporary + if self.netid != NETID_7777 { + if let FloodsubEvent::Message(message) = &event { + for topic in &message.topics { + if topic == &FloodsubTopic::new(PEERS_TOPIC) { + let addresses: PeerAddresses = match rmp_serde::from_read_ref(&message.data) { + Ok(a) => a, + Err(_) => return, + }; + self.peers_exchange.add_peer_addresses(&message.source, addresses); + } } } } @@ -527,6 +535,24 @@ fn announce_my_addresses(swarm: &mut AtomicDexSwarm) { } } +const ALL_NETID_7777_SEEDNODES: &[&str] = &[ + "168.119.236.241", + "168.119.236.249", + "168.119.236.240", + "168.119.236.239", + "168.119.236.251", + "168.119.237.8", + "168.119.236.233", + "168.119.236.243", + "168.119.236.246", + "168.119.237.13", + "195.201.91.96", + "195.201.91.53", + "168.119.174.126", + "46.4.78.11", + "46.4.87.18", +]; + /// Creates and spawns new AdexBehaviour Swarm returning: /// 1. tx to send control commands /// 2. rx emitting gossip events to processing side @@ -534,12 +560,21 @@ fn announce_my_addresses(swarm: &mut AtomicDexSwarm) { pub fn start_gossipsub( ip: IpAddr, port: u16, + netid: u16, + force_key: Option<[u8; 32]>, spawn_fn: fn(Box + Send + Unpin + 'static>) -> (), to_dial: Vec, i_am_relay: bool, on_poll: impl Fn(&AtomicDexSwarm) + Send + 'static, ) -> (Sender, AdexEventRx, PeerId) { - let local_key = identity::Keypair::generate_ed25519(); + let local_key = match force_key { + Some(mut key) => { + let secret = identity::ed25519::SecretKey::from_bytes(&mut key).expect("Secret length is 32 bytes"); + let keypair = identity::ed25519::Keypair::from(secret); + identity::Keypair::Ed25519(keypair) + }, + None => identity::Keypair::generate_ed25519(), + }; let local_peer_id = PeerId::from(local_key.public()); info!("Local peer id: {:?}", local_peer_id); @@ -598,7 +633,15 @@ pub fn start_gossipsub( // build a gossipsub network behaviour let gossipsub = Gossipsub::new(local_peer_id.clone(), gossipsub_config); - let floodsub = Floodsub::new(local_peer_id.clone()); + let floodsub = Floodsub::new(local_peer_id.clone(), netid != NETID_7777); + + let mut peers_exchange = PeersExchange::new(port); + if netid == NETID_7777 { + for address in ALL_NETID_7777_SEEDNODES { + let multiaddr = parse_relay_address((*address).to_owned(), port); + peers_exchange.add_peer_addresses(&PeerId::random(), HashSet::from_iter(iter::once(multiaddr))); + } + } // build a request-response network behaviour let request_response = build_request_response_behaviour(); @@ -613,8 +656,9 @@ pub fn start_gossipsub( gossipsub, floodsub, request_response, - peers_exchange: PeersExchange::new(port), + peers_exchange, ping, + netid, }; libp2p::swarm::SwarmBuilder::new(transport, adex_behavior, local_peer_id.clone()) .executor(Box::new(&*SWARM_RUNTIME)) diff --git a/mm2src/mm2_libp2p/src/atomicdex_behaviour/tests.rs b/mm2src/mm2_libp2p/src/atomicdex_behaviour/tests.rs index 6ac6d58eeb..b0eb680372 100644 --- a/mm2src/mm2_libp2p/src/atomicdex_behaviour/tests.rs +++ b/mm2src/mm2_libp2p/src/atomicdex_behaviour/tests.rs @@ -27,7 +27,8 @@ impl Node { let mut rng = rand::thread_rng(); let secret = SecretKey::new(&mut rng); - let (cmd_tx, mut event_rx, peer_id) = start_gossipsub(my_address, port, spawn_boxed, seednodes, true, |_| {}); + let (cmd_tx, mut event_rx, peer_id) = + start_gossipsub(my_address, port, 333, None, spawn_boxed, seednodes, true, |_| {}); // spawn a response future let cmd_tx_fut = cmd_tx.clone(); diff --git a/mm2src/mm2_libp2p/src/peers_exchange.rs b/mm2src/mm2_libp2p/src/peers_exchange.rs index af22d37227..6d16d47afa 100644 --- a/mm2src/mm2_libp2p/src/peers_exchange.rs +++ b/mm2src/mm2_libp2p/src/peers_exchange.rs @@ -34,7 +34,7 @@ impl ProtocolName for PeersExchangeProtocol { type PeersExchangeCodec = Codec; const DEFAULT_PEERS_NUM: usize = 20; -const REQUEST_PEERS_INITIAL_DELAY: u64 = 60; +const REQUEST_PEERS_INITIAL_DELAY: u64 = 20; const REQUEST_PEERS_INTERVAL: u64 = 300; const MAX_PEERS: usize = 100;