Skip to content

Commit

Permalink
Merge pull request #792 from TheCharlatan/peerdConnectTreatmentRebase
Browse files Browse the repository at this point in the history
Refactor: Improve reliability of peerd connections
  • Loading branch information
h4sh3d authored Dec 1, 2022
2 parents fc0de2f + ed4247f commit 5d8a61f
Show file tree
Hide file tree
Showing 24 changed files with 1,640 additions and 483 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ jobs:
swap_bob_maker_punish_kill_bob,
swap_bob_maker_refund,
swap_bob_maker_restore_checkpoint,
swap_bob_maker_restore_reconnect_alice_pre_lock,
swap_bob_maker_user_abort_sweep_btc,
swap_parallel_execution,
swap_revoke_offer_bob_maker_normal,
Expand Down
2 changes: 1 addition & 1 deletion diagrams/reconnect_sequencediagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 7 additions & 2 deletions diagrams/reconnect_sequencediagram.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
title Peer Reconnect Procedure

participant t_swapd
participant t_farcasterd
participant t_peerd_run
participant t_peerd_recv
participant m_peerd_recv
Expand All @@ -13,12 +15,15 @@ m_farcasterd -> m_peerd_run : Ctl Terminate
m_peerd_run -> m_farcasterd : Ctl PeerdTerminated
m_peerd_recv -> m_peerd_recv: terminate
m_peerd_run -> m_peerd_run: terminate
t_peerd_recv -> t_peerd_run : Ctl PeerdListenerRuntimeShutdown
t_peerd_recv -> t_peerd_run : Ctl PeerReceiverRuntimeShutdown
t_peerd_recv -> t_peerd_recv: terminate
t_peerd_run -> t_farcasterd: Ctl Disconnected
t_farcasterd -> t_swapd: Ctl Disconnected
==Taker peerd restarts connection, connects to maker forked peerd==
m_peerd_run -> m_peerd_run : forked from listener
t_peerd_run -> m_peerd_run : NodeId (raw)
t_peerd_run -> t_peerd_recv: spawn
m_peerd_run -> m_farcasterd: Ctl Hello
m_farcasterd -> m_swapd: Ctl PeerdReconnected

t_peerd_run -> t_farcasterd: Ctl Reconnected
t_farcasterd -> t_swapd: Ctl Reconnected
2 changes: 1 addition & 1 deletion diagrams/sequencediagram.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion diagrams/sequencediagram.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ m_cli -> m_farcasterd : MakeOffer
m_farcasterd -> m_farcasterd : launch\npeerd listen
t_farcasterd <- t_cli : TakeOffer
t_farcasterd -> t_farcasterd : launch\npeerd connect
peerd -> t_farcasterd: Ctl ConnectSuccess
peerd -> m_farcasterd: Ctl Hello
t_wallet <- t_farcasterd : Ctl TakeOffer
t_wallet -> t_wallet : create taker wallet
t_wallet -> t_farcasterd : Ctl LaunchSwap
Expand Down Expand Up @@ -70,7 +72,6 @@ end
group for Bob swap role
m_swap -> m_swap : ADD PENDING Msg Reveal
m_swap -> m_farcasterd : Ctl FundingInfo

m_swap -> m_syncer: Watch Arbitrating Funding Address
m_swap <- m_syncer: Arbitrating Funding event
m_farcasterd<-m_swap: Ctl FundingCompleted Bitcoin
Expand Down
22 changes: 9 additions & 13 deletions src/bin/peerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,22 @@ fn main() {

let local_node = opts.peer_key_opts.local_node();
info!(
"{}: {}",
"{}: {}, {}: {}",
"Local node id".bright_green_bold(),
local_node.node_id().bright_yellow_bold()
local_node.node_id().bright_yellow_bold(),
"PID".bright_green_bold(),
std::process::id().bright_yellow_bold(),
);

let peer_socket = PeerSocket::from(opts.clone());
debug!("Peer socket parameter interpreted as {}", peer_socket);

let mut local_socket: Option<InetSocketAddr> = None;
let mut remote_node_addr: Option<NodeAddr> = None;
let forked_from_listener: bool;
let remote_node_addr: Option<NodeAddr> = None;
let connection = match peer_socket {
PeerSocket::Listen(inet_addr) => {
debug!("Running in LISTEN mode");

forked_from_listener = true;
local_socket = Some(inet_addr);

debug!("Binding TCP socket {}", inet_addr);
Expand Down Expand Up @@ -220,13 +220,10 @@ fn main() {
}
PeerSocket::Connect(remote_node) => {
debug!("Peerd running in CONNECT mode");

forked_from_listener = false;
remote_node_addr = Some(remote_node);

debug!("Connecting to {}", &remote_node.addr());
PeerConnection::connect_brontozaur(local_node, remote_node)
.expect("Unable to connect to the remote peer")
peerd::run_from_connect(service_config, remote_node, local_socket, local_node)
.expect("Error running peerd runtime");
unreachable!()
}
};

Expand All @@ -241,13 +238,12 @@ fn main() {
remote_node_addr: full internet2 remote node address
local_socket: None
connect: true */
peerd::run(
peerd::run_from_listener(
service_config,
connection,
remote_node_addr,
local_socket,
local_node,
forked_from_listener,
)
.expect("Error running peerd runtime");

Expand Down
17 changes: 16 additions & 1 deletion src/bus/ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ pub enum CtlMsg {
#[display("peerd_terminated()")]
PeerdTerminated,

#[display("disconnected")]
Disconnected,

#[display("re-connected")]
Reconnected,

#[display("connect({0})")]
Connect(SwapId),

#[display("Connect success")]
ConnectSuccess,

#[display("restore_checkpoint({0})", alt = "{0:#}")]
RestoreCheckpoint(CheckpointEntry),

Expand Down Expand Up @@ -143,6 +155,9 @@ pub enum CtlMsg {

#[display("failed peer message")]
FailedPeerMessage(PeerMsg),

#[display("connect failed")]
ConnectFailed,
}

#[derive(Clone, Debug, Display, NetworkEncode, NetworkDecode)]
Expand Down Expand Up @@ -203,7 +218,7 @@ pub enum Params {
}

#[derive(Clone, Debug, Display, NetworkEncode, NetworkDecode)]
#[display("{peerd}, {swap_id}, ..")]
#[display("{swap_id}, ..")]
pub struct InitSwap {
pub peerd: ServiceId,
pub report_to: Option<ServiceId>,
Expand Down
6 changes: 4 additions & 2 deletions src/bus/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,10 @@ pub struct PeerInfo {
pub struct SwapInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
pub swap_id: Option<SwapId>,
#[serde_as(as = "Vec<DisplayFromStr>")]
pub maker_peer: Vec<NodeAddr>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub connection: Option<NodeAddr>,
pub connected: bool,
pub state: String,
#[serde_as(as = "DurationSeconds")]
pub uptime: Duration,
pub since: u64,
Expand Down
12 changes: 9 additions & 3 deletions src/cli/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ impl Exec for Command {
1 => {
let subj = subject.get(0).expect("vec of lenght 1");
if let Ok(node_addr) = NodeAddr::from_str(&subj) {
runtime.request_info(ServiceId::Peer(node_addr), InfoMsg::GetInfo)?;
ServiceId::Peer(node_addr)
runtime
.request_info(ServiceId::Peer(0, node_addr), InfoMsg::GetInfo)?;
ServiceId::Peer(0, node_addr)
} else if let Ok(swap_id) = SwapId::from_str(&subj) {
runtime.request_info(ServiceId::Swap(swap_id), InfoMsg::GetInfo)?;
ServiceId::Swap(swap_id)
Expand Down Expand Up @@ -94,7 +95,7 @@ impl Exec for Command {
if code == FailureCode::TargetServiceNotFound =>
{
match target_service_id {
ServiceId::Peer(node_addr) => {
ServiceId::Peer(_, node_addr) => {
return Err(Error::Farcaster(format!(
"No connected peerd with address {}",
node_addr
Expand Down Expand Up @@ -172,6 +173,11 @@ impl Exec for Command {
}
}

Command::Connect { swap_id } => {
runtime.request_ctl(ServiceId::Farcasterd, CtlMsg::Connect(swap_id))?;
runtime.report_response_or_fail()?;
}

Command::Make {
network,
arbitrating_blockchain,
Expand Down
7 changes: 7 additions & 0 deletions src/cli/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ pub enum Command {
swap_id: SwapId,
},

/// Connects a running swap to its counterparty
#[clap(aliases = &["c"])]
Connect {
// The swap id of the swap we wish to connect again
swap_id: SwapId,
},

/// Maker creates offer and start listening for incoming connections. Command used to to print
/// the resulting public offer that shall be shared with Taker. Additionally it spins up the
/// listener awaiting for connection related to this offer.
Expand Down
17 changes: 17 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,23 @@ impl<'esb> Event<'esb> {
Ok(())
}

/// Sends reply request via RPC message bus to a client
pub fn send_client_ctl(
&mut self,
service: ServiceId,
request: ctl::CtlMsg,
) -> Result<(), esb::Error<ServiceId>> {
let bus = ServiceBus::Ctl;
if let ServiceId::GrpcdClient(_) = service {
self.endpoints
.send_to(bus, service, ServiceId::Grpcd, BusMsg::Ctl(request))?;
} else {
self.endpoints
.send_to(bus, self.service.clone(), service, BusMsg::Ctl(request))?;
}
Ok(())
}

/// Send reply request via MSG message bus to a specific service (different from the event originating service)
pub fn send_msg_service(
&mut self,
Expand Down
Loading

0 comments on commit 5d8a61f

Please sign in to comment.