Skip to content

Commit

Permalink
Peerd: Add support for multiple connections on the same port
Browse files Browse the repository at this point in the history
  • Loading branch information
TheCharlatan committed Dec 19, 2021
1 parent b0f8e53 commit 8ca5dda
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 22 deletions.
25 changes: 14 additions & 11 deletions src/bin/peerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ fn main() {
local_id.bright_yellow_bold()
);

let peer_socket = PeerSocket::from(opts);
let peer_socket = PeerSocket::from(opts.clone());
debug!("Peer socket parameter interpreted as {}", peer_socket);

let id: NodeAddr;
let internal_id: NodeAddr;
let mut local_socket: Option<InetSocketAddr> = None;
let mut remote_id: Option<PublicKey> = None;
let mut remote_socket: InetSocketAddr;
Expand All @@ -198,10 +198,6 @@ fn main() {

connect = false;
local_socket = Some(inet_addr);
id = NodeAddr::Remote(RemoteNodeAddr {
node_id: local_node.node_id(),
remote_addr: RemoteSocketAddr::Ftcp(inet_addr),
});

debug!("Binding TCP socket {}", inet_addr);
let listener = TcpListener::bind(
Expand Down Expand Up @@ -231,22 +227,29 @@ fn main() {
let session = session::Raw::with_ftcp_unencrypted(stream, inet_addr)
.expect("Unable to establish session with the remote peer");

debug!("Session successfully established");
internal_id = NodeAddr::Remote(RemoteNodeAddr {
node_id: opts.peer_key_opts.internal_node().node_id(),
remote_addr: RemoteSocketAddr::Ftcp(inet_addr),
});
debug!(
"Session successfully established with new unique id: {}",
internal_id
);
break PeerConnection::with(session);
}
trace!("Child forked; returning into main listener event loop");
debug!("Child forked; returning into main listener event loop");
continue;
}
}
PeerSocket::Connect(remote_node_addr) => {
debug!("Running in CONNECT mode");

connect = true;
id = NodeAddr::Remote(remote_node_addr.clone());
internal_id = NodeAddr::Remote(remote_node_addr.clone());
remote_id = Some(remote_node_addr.node_id);
remote_socket = remote_node_addr.remote_addr.into();

info!("Connecting to {}", &remote_node_addr.addr());
debug!("Connecting to {}", &remote_node_addr.addr());
PeerConnection::connect(remote_node_addr, &local_node)
.expect("Unable to connect to the remote peer")
}
Expand All @@ -257,7 +260,7 @@ fn main() {
peerd::run(
service_config,
connection,
id,
internal_id,
local_id,
remote_id,
local_socket,
Expand Down
19 changes: 15 additions & 4 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub fn run(
acc_addrs: none!(),
public_offers: none!(),
node_ids: none!(),
peerd_ids: none!(),
wallet_token,
pending_requests: none!(),
syncer_services: none!(),
Expand Down Expand Up @@ -144,6 +145,7 @@ pub struct Runtime {
acc_addrs: HashMap<PublicOfferId, monero::Address>,
consumed_offers: HashMap<OfferId, SwapId>,
node_ids: HashMap<OfferId, PublicKey>, // TODO is it possible? HashMap<SwapId, PublicKey>
peerd_ids: HashMap<OfferId, ServiceId>,
wallet_token: Token,
pending_requests: HashMap<request::RequestId, (Request, ServiceId)>,
syncer_services: HashMap<(Coin, Network), ServiceId>,
Expand Down Expand Up @@ -297,6 +299,7 @@ impl Runtime {
let identity = self.identity();
if let Some(offerid) = &offerid {
if self.listens.contains_key(offerid) && self.node_ids.contains_key(offerid) {
self.peerd_ids.remove(offerid);
let node_id = self.node_ids.remove(offerid).unwrap();
let remote_addr = self.listens.remove(offerid).unwrap();
// nr of offers using that peerd
Expand Down Expand Up @@ -442,6 +445,9 @@ impl Runtime {
} else {
error!("missing acc_addr")
}
info!("passing request to walletd from {}", source);
self.peerd_ids
.insert(public_offer.offer.id(), source.clone());

senders.send_to(ServiceBus::Msg, source, ServiceId::Wallet, request)?;
}
Expand Down Expand Up @@ -663,6 +669,7 @@ impl Runtime {
let offerid = &public_offer.offer.id();
let listener = self.listens.get(&offerid);
let node_id = self.node_ids.get(&offerid);
let peerd_id = self.peerd_ids.get(&offerid);
let (node_id, peer_address) = match local_trade_role {
// Maker has only one listener, MAYBE for more listeners self.listens may be a
// HashMap<RemoteSocketAddr, Vec<OfferId>>
Expand Down Expand Up @@ -691,10 +698,14 @@ impl Runtime {
node_id,
remote_addr: peer_address,
};
let peer = daemon_service
.to_node_addr(internet2::LIGHTNING_P2P_DEFAULT_PORT)
.ok_or(internet2::presentation::Error::InvalidEndpoint)?
.into();
let peer: ServiceId = if peerd_id.is_none() {
daemon_service
.to_node_addr(internet2::LIGHTNING_P2P_DEFAULT_PORT)
.ok_or(internet2::presentation::Error::InvalidEndpoint)?
.into()
} else {
peerd_id.unwrap().clone()
};

self.consumed_offers
.insert(public_offer.offer.id(), swap_id);
Expand Down
14 changes: 14 additions & 0 deletions src/peerd/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub struct PeerKeyOpts {
pub peer_secret_key: String,
}

use bitcoin::secp256k1::rand::RngCore;
use bitcoin::secp256k1::{rand::thread_rng, SecretKey};
use std::str::FromStr;

Expand All @@ -120,11 +121,24 @@ impl PeerKeyOpts {
self.node_secrets().local_node
}

pub fn internal_node(&self) -> LocalNode {
self.ephemeral_secrets().local_node
}

pub fn node_secrets(&self) -> PeerSecrets {
let mut rng = thread_rng();
let secret_key = SecretKey::from_str(&self.peer_secret_key).expect("peer secret key");
let ephemeral_secret_key = SecretKey::new(&mut rng);
let local_node = LocalNode::from_keys(secret_key, ephemeral_secret_key);
PeerSecrets { local_node }
}

pub fn ephemeral_secrets(&self) -> PeerSecrets {
let mut rng = thread_rng();
rng.next_u64();
let secret_key = SecretKey::new(&mut rng);
let ephemeral_secret_key = SecretKey::new(&mut rng);
let local_node = LocalNode::from_keys(secret_key, ephemeral_secret_key);
PeerSecrets { local_node }
}
}
17 changes: 10 additions & 7 deletions src/peerd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{CtlServer, Error, LogStyle, Service, ServiceConfig, ServiceId};
pub fn run(
config: ServiceConfig,
connection: PeerConnection,
id: NodeAddr,
internal_id: NodeAddr,
local_id: PublicKey,
remote_id: Option<PublicKey>,
local_socket: Option<InetSocketAddr>,
Expand All @@ -52,11 +52,11 @@ pub fn run(
tx.connect("inproc://bridge")?;
rx.bind("inproc://bridge")?;

let identity = ServiceId::Peer(id);
let internal_identity = ServiceId::Peer(internal_id);

debug!("Starting thread listening for messages from the remote peer");
let bridge_handler = ListenerRuntime {
identity: identity.clone(),
internal_identity: internal_identity.clone(),
bridge: esb::Controller::with(
map! {
ServiceBus::Bridge => esb::BusConfig {
Expand All @@ -75,9 +75,12 @@ pub fn run(
spawn(move || listener.run_or_panic("peerd-listener"));
// TODO: Use the handle returned by spawn to track the child process

debug!("Starting main service runtime");
debug!(
"Starting main service runtime with identity: {}",
internal_identity
);
let runtime = Runtime {
identity,
identity: internal_identity,
local_id,
remote_id,
local_socket,
Expand Down Expand Up @@ -126,7 +129,7 @@ impl esb::Handler<ServiceBus> for BridgeHandler {
}

pub struct ListenerRuntime {
identity: ServiceId,
internal_identity: ServiceId,
bridge: esb::Controller<ServiceBus, Request, BridgeHandler>,
}

Expand All @@ -139,7 +142,7 @@ impl ListenerRuntime {
debug!("Forwarding FWP message over BRIDGE interface to the runtime");
self.bridge.send_to(
ServiceBus::Bridge,
self.identity.clone(),
self.internal_identity.clone(),
Request::Protocol((&*req).clone()),
)?;
Ok(())
Expand Down

0 comments on commit 8ca5dda

Please sign in to comment.