Skip to content

Commit

Permalink
Add ALL_NETID_7777_SEEDNODES to known peers on init.
Browse files Browse the repository at this point in the history
  • Loading branch information
artemii235 committed Dec 12, 2020
1 parent e78ba12 commit d488b39
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 33 deletions.
36 changes: 20 additions & 16 deletions mm2src/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ pub struct Floodsub {

impl Floodsub {
/// Creates a `Floodsub` with default configuration.
pub fn new(local_peer_id: PeerId) -> Self { Self::from_config(FloodsubConfig::new(local_peer_id)) }
pub fn new(local_peer_id: PeerId, forward_messages: bool) -> Self {
Self::from_config(FloodsubConfig::new(local_peer_id, forward_messages))
}

/// Creates a `Floodsub` with the given configuration.
pub fn from_config(config: FloodsubConfig) -> Self {
Expand Down Expand Up @@ -286,23 +288,25 @@ impl NetworkBehaviour for Floodsub {
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}

// Propagate the message to everyone else who is subscribed to any of the topics.
for (peer_id, subscr_topics) in self.connected_peers.iter() {
if peer_id == &propagation_source {
continue;
}
if self.config.forward_messages {
// Propagate the message to everyone else who is subscribed to any of the topics.
for (peer_id, subscr_topics) in self.connected_peers.iter() {
if peer_id == &propagation_source {
continue;
}

if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
continue;
}
if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
continue;
}

if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
rpcs_to_dispatch[pos].1.messages.push(message.clone());
} else {
rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
}));
if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
rpcs_to_dispatch[pos].1.messages.push(message.clone());
} else {
rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
}));
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion mm2src/floodsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ pub struct FloodsubConfig {
/// `true` if messages published by local node should be propagated as messages received from
/// the network, `false` by default.
pub subscribe_local_messages: bool,

pub forward_messages: bool,
}

impl FloodsubConfig {
pub fn new(local_peer_id: PeerId) -> Self {
pub fn new(local_peer_id: PeerId, forward_messages: bool) -> Self {
Self {
local_peer_id,
subscribe_local_messages: false,
forward_messages,
}
}
}
21 changes: 18 additions & 3 deletions mm2src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::mm2::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, lp_order
BalanceUpdateOrdermatchHandler};
use crate::mm2::lp_swap::{running_swaps_num, swap_kick_starts};
use crate::mm2::rpc::spawn_rpc;
use bitcrypto::sha256;

pub fn lp_ports(netid: u16) -> Result<(u16, u16, u16), String> {
const LP_RPCPORT: u16 = 7783;
Expand Down Expand Up @@ -478,15 +479,29 @@ pub async fn lp_init(mypubport: u16, ctx: MmArc) -> Result<(), String> {
};

let ctx_on_poll = ctx.clone();
let (cmd_tx, event_rx, peer_id) =
start_gossipsub(myipaddr, mypubport, spawn_boxed, seednodes, i_am_seed, move |swarm| {
let force_p2p_key = if i_am_seed {
let key = sha256(&*ctx.secp256k1_key_pair().private().secret);
Some(key.take())
} else {
None
};
let (cmd_tx, event_rx, peer_id) = start_gossipsub(
myipaddr,
mypubport,
ctx.netid(),
force_p2p_key,
spawn_boxed,
seednodes,
i_am_seed,
move |swarm| {
mm_gauge!(
ctx_on_poll.metrics,
"p2p.connected_relays.len",
swarm.connected_relays_len() as i64
);
mm_gauge!(ctx_on_poll.metrics, "p2p.relay_mesh.len", swarm.relay_mesh_len() as i64);
});
},
);
try_s!(ctx.peer_id.pin(peer_id.to_string()));
let p2p_context = P2PContext::new(cmd_tx);
p2p_context.store_to_mm_arc(&ctx);
Expand Down
66 changes: 55 additions & 11 deletions mm2src/mm2_libp2p/src/atomicdex_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ use libp2p::{core::{ConnectedPoint, Multiaddr, Transport},
use libp2p_floodsub::{Floodsub, FloodsubEvent, Topic as FloodsubTopic};
use log::{debug, error, info};
use rand::{seq::SliceRandom, thread_rng};
use std::collections::HashSet;
use std::{collections::hash_map::{DefaultHasher, HashMap},
hash::{Hash, Hasher},
iter::{self, FromIterator},
net::IpAddr,
pin::Pin,
task::{Context, Poll},
Expand All @@ -42,6 +44,7 @@ const CONNECTED_RELAYS_CHECK_INTERVAL: Duration = Duration::from_secs(30);
const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(600);
const ANNOUNCE_INITIAL_DELAY: Duration = Duration::from_secs(60);
const CHANNEL_BUF_SIZE: usize = 1024 * 8;
const NETID_7777: u16 = 7777;

impl libp2p::core::Executor for &SwarmRuntime {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) { self.0.spawn(future); }
Expand Down Expand Up @@ -237,6 +240,8 @@ pub struct AtomicDexBehaviour {
spawn_fn: fn(Box<dyn Future<Output = ()> + Send + Unpin + 'static>) -> (),
#[behaviour(ignore)]
cmd_rx: Receiver<AdexBehaviourCmd>,
#[behaviour(ignore)]
netid: u16,
floodsub: Floodsub,
gossipsub: Gossipsub,
request_response: RequestResponseBehaviour,
Expand Down Expand Up @@ -397,14 +402,17 @@ impl NetworkBehaviourEventProcess<GossipsubEvent> for AtomicDexBehaviour {

impl NetworkBehaviourEventProcess<FloodsubEvent> for AtomicDexBehaviour {
fn inject_event(&mut self, event: FloodsubEvent) {
if let FloodsubEvent::Message(message) = &event {
for topic in &message.topics {
if topic == &FloodsubTopic::new(PEERS_TOPIC) {
let addresses: PeerAddresses = match rmp_serde::from_read_ref(&message.data) {
Ok(a) => a,
Err(_) => return,
};
self.peers_exchange.add_peer_addresses(&message.source, addresses);
// do not process peer announce on 7777 temporary
if self.netid != NETID_7777 {
if let FloodsubEvent::Message(message) = &event {
for topic in &message.topics {
if topic == &FloodsubTopic::new(PEERS_TOPIC) {
let addresses: PeerAddresses = match rmp_serde::from_read_ref(&message.data) {
Ok(a) => a,
Err(_) => return,
};
self.peers_exchange.add_peer_addresses(&message.source, addresses);
}
}
}
}
Expand Down Expand Up @@ -527,19 +535,46 @@ fn announce_my_addresses(swarm: &mut AtomicDexSwarm) {
}
}

const ALL_NETID_7777_SEEDNODES: &[&str] = &[
"168.119.236.241",
"168.119.236.249",
"168.119.236.240",
"168.119.236.239",
"168.119.236.251",
"168.119.237.8",
"168.119.236.233",
"168.119.236.243",
"168.119.236.246",
"168.119.237.13",
"195.201.91.96",
"195.201.91.53",
"168.119.174.126",
"46.4.78.11",
"46.4.87.18",
];

/// Creates and spawns new AdexBehaviour Swarm returning:
/// 1. tx to send control commands
/// 2. rx emitting gossip events to processing side
/// 3. our peer_id
pub fn start_gossipsub(
ip: IpAddr,
port: u16,
netid: u16,
force_key: Option<[u8; 32]>,
spawn_fn: fn(Box<dyn Future<Output = ()> + Send + Unpin + 'static>) -> (),
to_dial: Vec<String>,
i_am_relay: bool,
on_poll: impl Fn(&AtomicDexSwarm) + Send + 'static,
) -> (Sender<AdexBehaviourCmd>, AdexEventRx, PeerId) {
let local_key = identity::Keypair::generate_ed25519();
let local_key = match force_key {
Some(mut key) => {
let secret = identity::ed25519::SecretKey::from_bytes(&mut key).expect("Secret length is 32 bytes");
let keypair = identity::ed25519::Keypair::from(secret);
identity::Keypair::Ed25519(keypair)
},
None => identity::Keypair::generate_ed25519(),
};
let local_peer_id = PeerId::from(local_key.public());
info!("Local peer id: {:?}", local_peer_id);

Expand Down Expand Up @@ -598,7 +633,15 @@ pub fn start_gossipsub(
// build a gossipsub network behaviour
let gossipsub = Gossipsub::new(local_peer_id.clone(), gossipsub_config);

let floodsub = Floodsub::new(local_peer_id.clone());
let floodsub = Floodsub::new(local_peer_id.clone(), netid != NETID_7777);

let mut peers_exchange = PeersExchange::new(port);
if netid == NETID_7777 {
for address in ALL_NETID_7777_SEEDNODES {
let multiaddr = parse_relay_address((*address).to_owned(), port);
peers_exchange.add_peer_addresses(&PeerId::random(), HashSet::from_iter(iter::once(multiaddr)));
}
}

// build a request-response network behaviour
let request_response = build_request_response_behaviour();
Expand All @@ -613,8 +656,9 @@ pub fn start_gossipsub(
gossipsub,
floodsub,
request_response,
peers_exchange: PeersExchange::new(port),
peers_exchange,
ping,
netid,
};
libp2p::swarm::SwarmBuilder::new(transport, adex_behavior, local_peer_id.clone())
.executor(Box::new(&*SWARM_RUNTIME))
Expand Down
3 changes: 2 additions & 1 deletion mm2src/mm2_libp2p/src/atomicdex_behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ impl Node {

let mut rng = rand::thread_rng();
let secret = SecretKey::new(&mut rng);
let (cmd_tx, mut event_rx, peer_id) = start_gossipsub(my_address, port, spawn_boxed, seednodes, true, |_| {});
let (cmd_tx, mut event_rx, peer_id) =
start_gossipsub(my_address, port, 333, None, spawn_boxed, seednodes, true, |_| {});

// spawn a response future
let cmd_tx_fut = cmd_tx.clone();
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_libp2p/src/peers_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl ProtocolName for PeersExchangeProtocol {
type PeersExchangeCodec = Codec<PeersExchangeProtocol, PeersExchangeRequest, PeersExchangeResponse>;

const DEFAULT_PEERS_NUM: usize = 20;
const REQUEST_PEERS_INITIAL_DELAY: u64 = 60;
const REQUEST_PEERS_INITIAL_DELAY: u64 = 20;
const REQUEST_PEERS_INTERVAL: u64 = 300;
const MAX_PEERS: usize = 100;

Expand Down

0 comments on commit d488b39

Please sign in to comment.