Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peerd single port multi connections #393

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions src/bin/peerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ fn main() {
local_id.bright_yellow_bold()
);

let peer_socket = PeerSocket::from(opts);
let peer_socket = PeerSocket::from(opts.clone());
debug!("Peer socket parameter interpreted as {}", peer_socket);

let id: NodeAddr;
let internal_id: NodeAddr;
let mut local_socket: Option<InetSocketAddr> = None;
let mut remote_id: Option<PublicKey> = None;
let mut remote_socket: InetSocketAddr;
Expand All @@ -198,10 +198,6 @@ fn main() {

connect = false;
local_socket = Some(inet_addr);
id = NodeAddr::Remote(RemoteNodeAddr {
node_id: local_node.node_id(),
remote_addr: RemoteSocketAddr::Ftcp(inet_addr),
});

debug!("Binding TCP socket {}", inet_addr);
let listener = TcpListener::bind(
Expand Down Expand Up @@ -231,22 +227,29 @@ fn main() {
let session = session::Raw::with_ftcp_unencrypted(stream, inet_addr)
.expect("Unable to establish session with the remote peer");

debug!("Session successfully established");
internal_id = NodeAddr::Remote(RemoteNodeAddr {
node_id: opts.peer_key_opts.internal_node().node_id(),
remote_addr: RemoteSocketAddr::Ftcp(inet_addr),
});
debug!(
"Session successfully established with new unique id: {}",
internal_id
);
break PeerConnection::with(session);
}
trace!("Child forked; returning into main listener event loop");
debug!("Child forked; returning into main listener event loop");
continue;
}
}
PeerSocket::Connect(remote_node_addr) => {
debug!("Running in CONNECT mode");

connect = true;
id = NodeAddr::Remote(remote_node_addr.clone());
internal_id = NodeAddr::Remote(remote_node_addr.clone());
remote_id = Some(remote_node_addr.node_id);
remote_socket = remote_node_addr.remote_addr.into();

info!("Connecting to {}", &remote_node_addr.addr());
debug!("Connecting to {}", &remote_node_addr.addr());
PeerConnection::connect(remote_node_addr, &local_node)
.expect("Unable to connect to the remote peer")
}
Expand All @@ -257,7 +260,7 @@ fn main() {
peerd::run(
service_config,
connection,
id,
internal_id,
local_id,
remote_id,
local_socket,
Expand Down
21 changes: 16 additions & 5 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub fn run(
acc_addrs: none!(),
public_offers: none!(),
node_ids: none!(),
peerd_ids: none!(),
wallet_token,
pending_requests: none!(),
syncer_services: none!(),
Expand Down Expand Up @@ -144,6 +145,7 @@ pub struct Runtime {
acc_addrs: HashMap<PublicOfferId, monero::Address>,
consumed_offers: HashMap<OfferId, SwapId>,
node_ids: HashMap<OfferId, PublicKey>, // TODO is it possible? HashMap<SwapId, PublicKey>
peerd_ids: HashMap<OfferId, ServiceId>,
wallet_token: Token,
pending_requests: HashMap<request::RequestId, (Request, ServiceId)>,
syncer_services: HashMap<(Coin, Network), ServiceId>,
Expand Down Expand Up @@ -297,6 +299,7 @@ impl Runtime {
let identity = self.identity();
if let Some(offerid) = &offerid {
if self.listens.contains_key(offerid) && self.node_ids.contains_key(offerid) {
self.peerd_ids.remove(offerid);
let node_id = self.node_ids.remove(offerid).unwrap();
let remote_addr = self.listens.remove(offerid).unwrap();
// nr of offers using that peerd
Expand Down Expand Up @@ -410,7 +413,7 @@ impl Runtime {
// public offer gets removed on LaunchSwap
if !self.public_offers.contains(&public_offer) {
warn!(
"Unknow offer {}, you are not the maker of that offer, ignoring it",
"Unknown (or already taken) offer {}, you are not the maker of that offer (or you already had a taker for it), ignoring it",
&public_offer
);
} else {
Expand Down Expand Up @@ -442,6 +445,9 @@ impl Runtime {
} else {
error!("missing acc_addr")
}
info!("passing request to walletd from {}", source);
self.peerd_ids
.insert(public_offer.offer.id(), source.clone());

senders.send_to(ServiceBus::Msg, source, ServiceId::Wallet, request)?;
}
Expand Down Expand Up @@ -663,6 +669,7 @@ impl Runtime {
let offerid = &public_offer.offer.id();
let listener = self.listens.get(&offerid);
let node_id = self.node_ids.get(&offerid);
let peerd_id = self.peerd_ids.get(&offerid);
let (node_id, peer_address) = match local_trade_role {
// Maker has only one listener, MAYBE for more listeners self.listens may be a
// HashMap<RemoteSocketAddr, Vec<OfferId>>
Expand Down Expand Up @@ -691,10 +698,14 @@ impl Runtime {
node_id,
remote_addr: peer_address,
};
let peer = daemon_service
.to_node_addr(internet2::LIGHTNING_P2P_DEFAULT_PORT)
.ok_or(internet2::presentation::Error::InvalidEndpoint)?
.into();
let peer: ServiceId = if peerd_id.is_none() {
daemon_service
.to_node_addr(internet2::LIGHTNING_P2P_DEFAULT_PORT)
.ok_or(internet2::presentation::Error::InvalidEndpoint)?
.into()
} else {
peerd_id.unwrap().clone()
};

self.consumed_offers
.insert(public_offer.offer.id(), swap_id);
Expand Down
18 changes: 15 additions & 3 deletions src/peerd/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub struct PeerKeyOpts {
pub peer_secret_key: String,
}

use bitcoin::secp256k1::rand::RngCore;
use bitcoin::secp256k1::{rand::thread_rng, SecretKey};
use std::str::FromStr;

Expand All @@ -117,12 +118,23 @@ pub struct PeerSecrets {

impl PeerKeyOpts {
pub fn local_node(&self) -> LocalNode {
self.node_secrets().local_node
self.node_secrets(Some(
SecretKey::from_str(&self.peer_secret_key).expect("peer secret key"),
))
.local_node
}

pub fn node_secrets(&self) -> PeerSecrets {
pub fn internal_node(&self) -> LocalNode {
self.node_secrets(None).local_node
}

pub fn node_secrets(&self, opt_secret_key: Option<SecretKey>) -> PeerSecrets {
let mut rng = thread_rng();
let secret_key = SecretKey::from_str(&self.peer_secret_key).expect("peer secret key");
rng.next_u64();
let secret_key = match opt_secret_key {
Some(secret_key) => secret_key,
None => SecretKey::new(&mut rng),
};
let ephemeral_secret_key = SecretKey::new(&mut rng);
let local_node = LocalNode::from_keys(secret_key, ephemeral_secret_key);
PeerSecrets { local_node }
Expand Down
36 changes: 29 additions & 7 deletions src/peerd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{CtlServer, Error, LogStyle, Service, ServiceConfig, ServiceId};
pub fn run(
config: ServiceConfig,
connection: PeerConnection,
id: NodeAddr,
internal_id: NodeAddr,
local_id: PublicKey,
remote_id: Option<PublicKey>,
local_socket: Option<InetSocketAddr>,
Expand All @@ -52,11 +52,11 @@ pub fn run(
tx.connect("inproc://bridge")?;
rx.bind("inproc://bridge")?;

let identity = ServiceId::Peer(id);
let internal_identity = ServiceId::Peer(internal_id);

debug!("Starting thread listening for messages from the remote peer");
let bridge_handler = ListenerRuntime {
identity: identity.clone(),
internal_identity: internal_identity.clone(),
bridge: esb::Controller::with(
map! {
ServiceBus::Bridge => esb::BusConfig {
Expand All @@ -75,9 +75,12 @@ pub fn run(
spawn(move || listener.run_or_panic("peerd-listener"));
// TODO: Use the handle returned by spawn to track the child process

debug!("Starting main service runtime");
debug!(
"Starting main service runtime with identity: {}",
internal_identity
);
let runtime = Runtime {
identity,
identity: internal_identity,
local_id,
remote_id,
local_socket,
Expand Down Expand Up @@ -126,7 +129,7 @@ impl esb::Handler<ServiceBus> for BridgeHandler {
}

pub struct ListenerRuntime {
identity: ServiceId,
internal_identity: ServiceId,
bridge: esb::Controller<ServiceBus, Request, BridgeHandler>,
}

Expand All @@ -139,7 +142,7 @@ impl ListenerRuntime {
debug!("Forwarding FWP message over BRIDGE interface to the runtime");
self.bridge.send_to(
ServiceBus::Bridge,
self.identity.clone(),
self.internal_identity.clone(),
Request::Protocol((&*req).clone()),
)?;
Ok(())
Expand Down Expand Up @@ -169,6 +172,19 @@ impl peer::Handler<Msg> for ListenerRuntime {
self.send_over_bridge(Arc::new(Msg::PingPeer))?;
Ok(())
}
Error::Peer(presentation::Error::Transport(transport::Error::SocketIo(
std::io::ErrorKind::UnexpectedEof,
))) => {
error!(
"The remote peer has hung up, notifying that peerd has halted: {}",
err
);
self.send_over_bridge(Arc::new(Msg::PeerdShutdown))?;
// park this thread, the process exit is supposed to be handled by the parent
// the socket will continue spamming this error until peerd is shutdown, this ensures it is only handled once
std::thread::park();
Ok(())
}
// for all other error types, indicating internal errors, we
// propagate error to the upper level
_ => {
Expand Down Expand Up @@ -374,6 +390,12 @@ impl Runtime {
self.awaited_pong = None;
}

Request::Protocol(Msg::PeerdShutdown) => {
warn!("Exiting peerd");
// handle further side effects here, e.g. sending a message to farcasterd to cleanup the peer connection, before exiting.
std::process::exit(0);
}

// swap initiation message
Request::Protocol(Msg::TakerCommit(_)) => {
senders.send_to(
Expand Down
7 changes: 5 additions & 2 deletions src/rpc/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ pub enum Msg {
#[api(type = 33)]
#[display("ping_peer")]
PingPeer,
#[api(type = 34)]
#[display("error_shutdown")]
PeerdShutdown,
}

impl Msg {
Expand All @@ -137,8 +140,8 @@ impl Msg {
Msg::BuyProcedureSignature(protocol_message::BuyProcedureSignature {
swap_id, ..
}) => *swap_id,
Msg::Ping(_) | Msg::Pong(_) | Msg::PingPeer => {
unreachable!("Ping and Pong does not containt swapid")
Msg::Ping(_) | Msg::Pong(_) | Msg::PingPeer | Msg::PeerdShutdown => {
unreachable!("Ping and Pong does not contain swapid")
}
}
}
Expand Down