Skip to content

Commit

Permalink
Peerd: Add support for multiple connections on the same port
Browse files Browse the repository at this point in the history
  • Loading branch information
TheCharlatan committed Dec 19, 2021
1 parent b0f8e53 commit 3d28b7d
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 26 deletions.
49 changes: 37 additions & 12 deletions src/bin/peerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ extern crate log;
#[macro_use]
extern crate amplify_derive;

use bitcoin::secp256k1::rand::RngCore;
use clap::Clap;
use internet2::addr::InetSocketAddr;
use internet2::LocalNode;
use nix::unistd::{fork, ForkResult};
use std::convert::TryFrom;
use std::net::TcpListener;
Expand Down Expand Up @@ -184,10 +186,11 @@ fn main() {
local_id.bright_yellow_bold()
);

let peer_socket = PeerSocket::from(opts);
let peer_socket = PeerSocket::from(opts.clone());
debug!("Peer socket parameter interpreted as {}", peer_socket);

let id: NodeAddr;
let external_id: NodeAddr;
let mut internal_id: NodeAddr;
let mut local_socket: Option<InetSocketAddr> = None;
let mut remote_id: Option<PublicKey> = None;
let mut remote_socket: InetSocketAddr;
Expand All @@ -198,7 +201,12 @@ fn main() {

connect = false;
local_socket = Some(inet_addr);
id = NodeAddr::Remote(RemoteNodeAddr {

external_id = NodeAddr::Remote(RemoteNodeAddr {
node_id: local_node.node_id(),
remote_addr: RemoteSocketAddr::Ftcp(inet_addr),
});
internal_id = NodeAddr::Remote(RemoteNodeAddr {
node_id: local_node.node_id(),
remote_addr: RemoteSocketAddr::Ftcp(inet_addr),
});
Expand All @@ -209,40 +217,56 @@ fn main() {
)
.expect("Unable to bind to Lightning network peer socket");

debug!("Running TCP listener event loop");
info!("Running TCP listener event loop");
loop {
debug!("Awaiting for incoming connections...");
info!("Awaiting for incoming connections...");
let (stream, remote_socket_addr) = listener
.accept()
.expect("Error accepting incoming peer connection");
debug!("New connection from {}", remote_socket_addr);
info!("New connection from {}", remote_socket_addr);

remote_socket = remote_socket_addr.into();

// TODO: Support multithread mode
debug!("Forking child process");
info!("Forking child process");
if let ForkResult::Child = unsafe { fork().expect("Unable to fork child process") }
{
stream
.set_read_timeout(Some(Duration::from_secs(30)))
.expect("Unable to set up timeout for TCP connection");

debug!("Establishing session with the remote");
info!("Establishing session with the remote");
let session = session::Raw::with_ftcp_unencrypted(stream, inet_addr)
.expect("Unable to establish session with the remote peer");
use bitcoin::secp256k1::{rand::thread_rng, SecretKey};
let mut rng = thread_rng();
// kick the rng a little to generate us new bytes
rng.next_u64();
let secret_key = SecretKey::new(&mut rng);
info!("secret key: {}", secret_key);
let ephemeral_secret_key = SecretKey::new(&mut rng);
let local_node = LocalNode::from_keys(secret_key, ephemeral_secret_key);
info!("I should generate new keys: {}", local_node.node_id());

internal_id = NodeAddr::Remote(RemoteNodeAddr {
node_id: local_node.node_id(),
remote_addr: RemoteSocketAddr::Ftcp(inet_addr),
});
info!("new session has id: {}", internal_id);

debug!("Session successfully established");
info!("Session successfully established");
break PeerConnection::with(session);
}
trace!("Child forked; returning into main listener event loop");
info!("Child forked; returning into main listener event loop");
continue;
}
}
PeerSocket::Connect(remote_node_addr) => {
debug!("Running in CONNECT mode");

connect = true;
id = NodeAddr::Remote(remote_node_addr.clone());
external_id = NodeAddr::Remote(remote_node_addr.clone());
internal_id = NodeAddr::Remote(remote_node_addr.clone());
remote_id = Some(remote_node_addr.node_id);
remote_socket = remote_node_addr.remote_addr.into();

Expand All @@ -257,7 +281,8 @@ fn main() {
peerd::run(
service_config,
connection,
id,
external_id,
internal_id,
local_id,
remote_id,
local_socket,
Expand Down
18 changes: 14 additions & 4 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub fn run(
acc_addrs: none!(),
public_offers: none!(),
node_ids: none!(),
peerd_ids: none!(),
wallet_token,
pending_requests: none!(),
syncer_services: none!(),
Expand Down Expand Up @@ -144,6 +145,7 @@ pub struct Runtime {
acc_addrs: HashMap<PublicOfferId, monero::Address>,
consumed_offers: HashMap<OfferId, SwapId>,
node_ids: HashMap<OfferId, PublicKey>, // TODO is it possible? HashMap<SwapId, PublicKey>
peerd_ids: HashMap<OfferId, ServiceId>,
wallet_token: Token,
pending_requests: HashMap<request::RequestId, (Request, ServiceId)>,
syncer_services: HashMap<(Coin, Network), ServiceId>,
Expand Down Expand Up @@ -442,6 +444,9 @@ impl Runtime {
} else {
error!("missing acc_addr")
}
info!("passing request to walletd from {}", source);
self.peerd_ids
.insert(public_offer.offer.id(), source.clone());

senders.send_to(ServiceBus::Msg, source, ServiceId::Wallet, request)?;
}
Expand Down Expand Up @@ -663,6 +668,7 @@ impl Runtime {
let offerid = &public_offer.offer.id();
let listener = self.listens.get(&offerid);
let node_id = self.node_ids.get(&offerid);
let peerd_id = self.peerd_ids.get(&offerid);
let (node_id, peer_address) = match local_trade_role {
// Maker has only one listener, MAYBE for more listeners self.listens may be a
// HashMap<RemoteSocketAddr, Vec<OfferId>>
Expand Down Expand Up @@ -691,10 +697,14 @@ impl Runtime {
node_id,
remote_addr: peer_address,
};
let peer = daemon_service
.to_node_addr(internet2::LIGHTNING_P2P_DEFAULT_PORT)
.ok_or(internet2::presentation::Error::InvalidEndpoint)?
.into();
let peer: ServiceId = if peerd_id.is_none() {
daemon_service
.to_node_addr(internet2::LIGHTNING_P2P_DEFAULT_PORT)
.ok_or(internet2::presentation::Error::InvalidEndpoint)?
.into()
} else {
peerd_id.unwrap().clone()
};

self.consumed_offers
.insert(public_offer.offer.id(), swap_id);
Expand Down
12 changes: 12 additions & 0 deletions src/peerd/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,23 @@ impl PeerKeyOpts {
self.node_secrets().local_node
}

pub fn internal_node(&self) -> LocalNode {
self.ephemeral_secrets().local_node
}

pub fn node_secrets(&self) -> PeerSecrets {
let mut rng = thread_rng();
let secret_key = SecretKey::from_str(&self.peer_secret_key).expect("peer secret key");
let ephemeral_secret_key = SecretKey::new(&mut rng);
let local_node = LocalNode::from_keys(secret_key, ephemeral_secret_key);
PeerSecrets { local_node }
}

pub fn ephemeral_secrets(&self) -> PeerSecrets {
let mut rng = thread_rng();
let secret_key = SecretKey::new(&mut rng);
let ephemeral_secret_key = SecretKey::new(&mut rng);
let local_node = LocalNode::from_keys(secret_key, ephemeral_secret_key);
PeerSecrets { local_node }
}
}
27 changes: 18 additions & 9 deletions src/peerd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::{CtlServer, Error, LogStyle, Service, ServiceConfig, ServiceId};
pub fn run(
config: ServiceConfig,
connection: PeerConnection,
id: NodeAddr,
external_id: NodeAddr,
internal_id: NodeAddr,
local_id: PublicKey,
remote_id: Option<PublicKey>,
local_socket: Option<InetSocketAddr>,
Expand All @@ -52,11 +53,13 @@ pub fn run(
tx.connect("inproc://bridge")?;
rx.bind("inproc://bridge")?;

let identity = ServiceId::Peer(id);
let external_identity = ServiceId::Peer(external_id);
let internal_identity = ServiceId::Peer(internal_id);

debug!("Starting thread listening for messages from the remote peer");
let bridge_handler = ListenerRuntime {
identity: identity.clone(),
identity: external_identity.clone(),
internal_identity: internal_identity.clone(),
bridge: esb::Controller::with(
map! {
ServiceBus::Bridge => esb::BusConfig {
Expand All @@ -76,8 +79,9 @@ pub fn run(
// TODO: Use the handle returned by spawn to track the child process

debug!("Starting main service runtime");
info!("Starting peerd with identity: {}", internal_identity);
let runtime = Runtime {
identity,
identity: internal_identity,
local_id,
remote_id,
local_socket,
Expand Down Expand Up @@ -127,6 +131,7 @@ impl esb::Handler<ServiceBus> for BridgeHandler {

pub struct ListenerRuntime {
identity: ServiceId,
internal_identity: ServiceId,
bridge: esb::Controller<ServiceBus, Request, BridgeHandler>,
}

Expand All @@ -136,10 +141,10 @@ impl ListenerRuntime {
&mut self,
req: <Unmarshaller<Msg> as Unmarshall>::Data,
) -> Result<(), Error> {
debug!("Forwarding FWP message over BRIDGE interface to the runtime");
info!("Forwarding FWP message over BRIDGE interface to the runtime");
self.bridge.send_to(
ServiceBus::Bridge,
self.identity.clone(),
self.internal_identity.clone(),
Request::Protocol((&*req).clone()),
)?;
Ok(())
Expand All @@ -155,6 +160,7 @@ impl peer::Handler<Msg> for ListenerRuntime {
) -> Result<(), Self::Error> {
// Forwarding all received messages to the runtime
trace!("FWP message details: {:?}", message);
info!("Sending message over bridge!");
self.send_over_bridge(message)
}

Expand Down Expand Up @@ -291,11 +297,13 @@ impl Runtime {
Request::Protocol(message) => {
// 1. Check permissions
// 2. Forward to the remote peer
debug!("Message type: {}", message.get_type());
debug!(
info!("Message type: {}", message.get_type());
info!(
"Forwarding peer message to the remote peer, request: {}",
&request.get_type()
);
// info!("sending message: {:?}", message);
// info!("sending with: {:?}", self.sender.sender);
self.messages_sent += 1;
self.sender.send_message(message)?;
}
Expand Down Expand Up @@ -363,7 +371,8 @@ impl Runtime {
_source: ServiceId,
request: Request,
) -> Result<(), Error> {
debug!("BRIDGE RPC request: {}", request);
info!("BRIDGE RPC request");
trace!("BRIDGE RPC request: {}", request);

if let Request::Protocol(_) = request {
self.messages_received += 1;
Expand Down
2 changes: 1 addition & 1 deletion src/swapd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,7 @@ impl Runtime {
Some(remote_commit),
);

trace!("sending peer MakerCommit msg {}", &local_commit);
println!("sending peer MakerCommit msg {}", &local_commit);
self.send_peer(senders, Msg::MakerCommit(local_commit))?;
self.state_update(senders, next_state)?;
}
Expand Down

0 comments on commit 3d28b7d

Please sign in to comment.