From b89a3c2c7a4b7127d5f67da52b77585a490c2200 Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Thu, 15 Dec 2022 15:15:06 +0100 Subject: [PATCH 1/2] Service: Implement Reporter trait The Reporter trait is used to implement all report_ functions instead of inside the CtlServer trait. The Reporter allows an implementing service to define a target service to which the reports should be sent to. In swapd, the only service where we use these functions, we define the enquirer as the service to report to. --- src/service.rs | 45 ++++++++++++++++++++++++++++------------- src/swapd/runtime.rs | 23 +++++++++------------ src/swapd/swap_state.rs | 23 +++++++-------------- 3 files changed, 48 insertions(+), 43 deletions(-) diff --git a/src/service.rs b/src/service.rs index 0b1a37019..92c73ff4a 100644 --- a/src/service.rs +++ b/src/service.rs @@ -356,18 +356,19 @@ impl TryToServiceId for Option { } } -pub trait CtlServer +pub trait Reporter where Self: esb::Handler, esb::Error: From, { - fn report_success_to( + fn report_to(&self) -> Option; + + fn report_success_message( &mut self, senders: &mut Endpoints, - dest: impl TryToServiceId, msg: Option, ) -> Result<(), Error> { - if let Some(dest) = dest.try_to_service_id() { + if let Some(dest) = self.report_to() { senders.send_to( ServiceBus::Ctl, self.identity(), @@ -378,30 +379,40 @@ where Ok(()) } - fn report_progress_message_to( + fn report_progress( &mut self, senders: &mut Endpoints, - dest: impl TryToServiceId, - msg: impl ToString, + progress: Progress, ) -> Result<(), Error> { - if let Some(dest) = dest.try_to_service_id() { + if let Some(dest) = self.report_to() { senders.send_to( ServiceBus::Ctl, self.identity(), dest, - BusMsg::Ctl(CtlMsg::Progress(Progress::Message(msg.to_string()))), + BusMsg::Ctl(CtlMsg::Progress(progress)), )?; } Ok(()) } - fn report_failure_to( + fn report_progress_message( &mut self, senders: &mut Endpoints, - dest: impl TryToServiceId, - failure: Failure, - ) -> Error { - if let Some(dest) = dest.try_to_service_id() { + msg: impl ToString, + ) -> Result<(), Error> { + if let Some(dest) = self.report_to() { + senders.send_to( + ServiceBus::Ctl, + self.identity(), + dest, + BusMsg::Ctl(CtlMsg::Progress(Progress::Message(msg.to_string()))), + )?; + } + Ok(()) + } + + fn report_failure(&mut self, senders: &mut Endpoints, failure: Failure) -> Error { + if let Some(dest) = self.report_to() { // Even if we fail, we still have to terminate :) let _ = senders.send_to( ServiceBus::Ctl, @@ -412,7 +423,13 @@ where } Error::Terminate(failure.to_string()) } +} +pub trait CtlServer +where + Self: esb::Handler, + esb::Error: From, +{ fn send_ctl( &mut self, senders: &mut Endpoints, diff --git a/src/swapd/runtime.rs b/src/swapd/runtime.rs index f62f15cb7..f925c9a4e 100644 --- a/src/swapd/runtime.rs +++ b/src/swapd/runtime.rs @@ -10,7 +10,7 @@ use super::{ temporal_safety::TemporalSafety, StateReport, }; -use crate::service::Endpoints; +use crate::service::{Endpoints, Reporter}; use crate::swapd::Opts; use crate::syncerd::bitcoin_syncer::p2wpkh_signed_tx_fee; use crate::syncerd::types::{Event, TransactionConfirmations}; @@ -188,6 +188,11 @@ pub struct CheckpointSwapd { } impl CtlServer for Runtime {} +impl Reporter for Runtime { + fn report_to(&self) -> Option { + self.enquirer.clone() + } +} impl esb::Handler for Runtime { type Request = BusMsg; @@ -753,14 +758,7 @@ impl Runtime { .latest_state_report .generate_progress_update_or_transition(&new_state_report); self.latest_state_report = new_state_report; - if let Some(enquirer) = self.enquirer.clone() { - endpoints.send_to( - ServiceBus::Ctl, - self.identity(), - enquirer, - BusMsg::Ctl(CtlMsg::Progress(progress)), - )?; - } + self.report_progress(endpoints, progress)?; } Ok(()) } @@ -814,7 +812,7 @@ impl Runtime { ); // Ignoring possible reporting errors here and after: do not want to // halt the swap just because the client disconnected - let _ = self.report_progress_message_to(endpoints, &self.enquirer.clone(), msg); + let _ = self.report_progress_message(endpoints, msg); let engine = CommitmentEngine; let commitment = match params { @@ -845,10 +843,9 @@ impl Runtime { "Accepting swap {} as Maker from Taker through peerd {}", swap_id, self.peer_service ); - let enquirer = self.enquirer.clone(); // Ignoring possible reporting errors here and after: do not want to - // halt the swap just because the client disconnected - let _ = self.report_progress_message_to(endpoints, &enquirer, msg); + // halt the swap just because the enquirer (farcasterd) disconnected + let _ = self.report_progress_message(endpoints, msg); let engine = CommitmentEngine; let commitment = match params.clone() { diff --git a/src/swapd/swap_state.rs b/src/swapd/swap_state.rs index b5cfd84c9..52db04989 100644 --- a/src/swapd/swap_state.rs +++ b/src/swapd/swap_state.rs @@ -22,8 +22,9 @@ use crate::{ BusMsg, Failure, FailureCode, }, event::{Event, StateMachine}, + service::Reporter, syncerd::{SweepAddress, TaskAborted}, - CtlServer, ServiceId, + ServiceId, }; use crate::{ bus::{ @@ -635,9 +636,8 @@ fn attempt_transition_to_init_taker( .taker_commit(event.endpoints, local_params.clone()) .map_err(|err| { runtime.log_error(&err); - runtime.report_failure_to( + runtime.report_failure( event.endpoints, - &runtime.enquirer.clone(), Failure { code: FailureCode::Unknown, info: err.to_string(), @@ -703,9 +703,8 @@ fn attempt_transition_to_init_maker( let local_commit = runtime .maker_commit(event.endpoints, swap_id, local_params.clone()) .map_err(|err| { - runtime.report_failure_to( + runtime.report_failure( event.endpoints, - &runtime.enquirer.clone(), Failure { code: FailureCode::Unknown, info: err.to_string(), @@ -957,7 +956,7 @@ fn try_bob_reveal_to_bob_funded( // incorrect funding, start aborting procedure let msg = format!("Incorrect amount funded. Required: {}, Funded: {}. Do not fund this swap anymore, will abort and atttempt to sweep the Bitcoin to the provided address.", amount, required_funding_amount); runtime.log_error(&msg); - runtime.report_progress_message_to(event.endpoints, ServiceId::Farcasterd, msg)?; + runtime.report_progress_message(event.endpoints, msg)?; return handle_bob_abort_swap(event, runtime, wallet, funding_address); } else { // funding completed, amount is correct @@ -1686,11 +1685,7 @@ fn try_alice_arbitrating_lock_final_to_alice_accordant_lock( monero::Amount::from_pico(amount.clone()) ); runtime.log_error(&msg); - runtime.report_progress_message_to( - event.endpoints, - runtime.enquirer.clone(), - msg, - )?; + runtime.report_progress_message(event.endpoints, msg)?; } else if amount.clone() > required_funding_amount.as_pico() { // Alice overfunded. To ensure that she does not publish the buy transaction // if Bob gives her the BuySig, go straight to AliceCanceled @@ -1700,11 +1695,7 @@ fn try_alice_arbitrating_lock_final_to_alice_accordant_lock( monero::Amount::from_pico(amount.clone()) ); runtime.log_error(&msg); - runtime.report_progress_message_to( - event.endpoints, - runtime.enquirer.clone(), - msg, - )?; + runtime.report_progress_message(event.endpoints, msg)?; // Alice moves on to AliceCanceled despite not broadcasting the cancel transaction. return Ok(Some(SwapStateMachine::AliceCanceled(AliceCanceled { From 9e4b0589c340c25346b9b9cb60c8f88376bdf3bf Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Thu, 15 Dec 2022 15:17:02 +0100 Subject: [PATCH 2/2] Service: s/senders/endpoints/ --- src/service.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/service.rs b/src/service.rs index 92c73ff4a..09255ace6 100644 --- a/src/service.rs +++ b/src/service.rs @@ -365,11 +365,11 @@ where fn report_success_message( &mut self, - senders: &mut Endpoints, + endpoints: &mut Endpoints, msg: Option, ) -> Result<(), Error> { if let Some(dest) = self.report_to() { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), dest, @@ -381,11 +381,11 @@ where fn report_progress( &mut self, - senders: &mut Endpoints, + endpoints: &mut Endpoints, progress: Progress, ) -> Result<(), Error> { if let Some(dest) = self.report_to() { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), dest, @@ -397,11 +397,11 @@ where fn report_progress_message( &mut self, - senders: &mut Endpoints, + endpoints: &mut Endpoints, msg: impl ToString, ) -> Result<(), Error> { if let Some(dest) = self.report_to() { - senders.send_to( + endpoints.send_to( ServiceBus::Ctl, self.identity(), dest, @@ -411,10 +411,10 @@ where Ok(()) } - fn report_failure(&mut self, senders: &mut Endpoints, failure: Failure) -> Error { + fn report_failure(&mut self, endpoints: &mut Endpoints, failure: Failure) -> Error { if let Some(dest) = self.report_to() { // Even if we fail, we still have to terminate :) - let _ = senders.send_to( + let _ = endpoints.send_to( ServiceBus::Ctl, self.identity(), dest, @@ -432,42 +432,42 @@ where { fn send_ctl( &mut self, - senders: &mut Endpoints, + endpoints: &mut Endpoints, dest: impl TryToServiceId, request: BusMsg, ) -> Result<(), Error> { if let Some(dest) = dest.try_to_service_id() { - senders.send_to(ServiceBus::Ctl, self.identity(), dest, request)?; + endpoints.send_to(ServiceBus::Ctl, self.identity(), dest, request)?; } Ok(()) } fn send_client_ctl( &mut self, - senders: &mut Endpoints, + endpoints: &mut Endpoints, dest: ServiceId, request: CtlMsg, ) -> Result<(), Error> { let bus = ServiceBus::Ctl; if let ServiceId::GrpcdClient(_) = dest { - senders.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Ctl(request))?; + endpoints.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Ctl(request))?; } else { - senders.send_to(bus, self.identity(), dest, BusMsg::Ctl(request))?; + endpoints.send_to(bus, self.identity(), dest, BusMsg::Ctl(request))?; } Ok(()) } fn send_client_info( &mut self, - senders: &mut Endpoints, + endpoints: &mut Endpoints, dest: ServiceId, request: InfoMsg, ) -> Result<(), Error> { let bus = ServiceBus::Info; if let ServiceId::GrpcdClient(_) = dest { - senders.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Info(request))?; + endpoints.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Info(request))?; } else { - senders.send_to(bus, self.identity(), dest, BusMsg::Info(request))?; + endpoints.send_to(bus, self.identity(), dest, BusMsg::Info(request))?; } Ok(()) }