Skip to content

Commit

Permalink
Merge pull request #848 from TheCharlatan/progressReporter
Browse files Browse the repository at this point in the history
Service: Implement Reporter trait
  • Loading branch information
h4sh3d authored Dec 19, 2022
2 parents 67b96e5 + 7fc5521 commit 61b7c6c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 56 deletions.
71 changes: 44 additions & 27 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,19 +356,20 @@ impl TryToServiceId for Option<ServiceId> {
}
}

pub trait CtlServer
pub trait Reporter
where
Self: esb::Handler<ServiceBus>,
esb::Error<ServiceId>: From<Self::Error>,
{
fn report_success_to(
fn report_to(&self) -> Option<ServiceId>;

fn report_success_message(
&mut self,
senders: &mut Endpoints,
dest: impl TryToServiceId,
endpoints: &mut Endpoints,
msg: Option<impl ToString>,
) -> Result<(), Error> {
if let Some(dest) = dest.try_to_service_id() {
senders.send_to(
if let Some(dest) = self.report_to() {
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
dest,
Expand All @@ -378,14 +379,29 @@ where
Ok(())
}

fn report_progress_message_to(
fn report_progress(
&mut self,
senders: &mut Endpoints,
dest: impl TryToServiceId,
endpoints: &mut Endpoints,
progress: Progress,
) -> Result<(), Error> {
if let Some(dest) = self.report_to() {
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
dest,
BusMsg::Ctl(CtlMsg::Progress(progress)),
)?;
}
Ok(())
}

fn report_progress_message(
&mut self,
endpoints: &mut Endpoints,
msg: impl ToString,
) -> Result<(), Error> {
if let Some(dest) = dest.try_to_service_id() {
senders.send_to(
if let Some(dest) = self.report_to() {
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
dest,
Expand All @@ -395,15 +411,10 @@ where
Ok(())
}

fn report_failure_to(
&mut self,
senders: &mut Endpoints,
dest: impl TryToServiceId,
failure: Failure,
) -> Error {
if let Some(dest) = dest.try_to_service_id() {
fn report_failure(&mut self, endpoints: &mut Endpoints, failure: Failure) -> Error {
if let Some(dest) = self.report_to() {
// Even if we fail, we still have to terminate :)
let _ = senders.send_to(
let _ = endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
dest,
Expand All @@ -412,45 +423,51 @@ where
}
Error::Terminate(failure.to_string())
}
}

pub trait CtlServer
where
Self: esb::Handler<ServiceBus>,
esb::Error<ServiceId>: From<Self::Error>,
{
fn send_ctl(
&mut self,
senders: &mut Endpoints,
endpoints: &mut Endpoints,
dest: impl TryToServiceId,
request: BusMsg,
) -> Result<(), Error> {
if let Some(dest) = dest.try_to_service_id() {
senders.send_to(ServiceBus::Ctl, self.identity(), dest, request)?;
endpoints.send_to(ServiceBus::Ctl, self.identity(), dest, request)?;
}
Ok(())
}

fn send_client_ctl(
&mut self,
senders: &mut Endpoints,
endpoints: &mut Endpoints,
dest: ServiceId,
request: CtlMsg,
) -> Result<(), Error> {
let bus = ServiceBus::Ctl;
if let ServiceId::GrpcdClient(_) = dest {
senders.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Ctl(request))?;
endpoints.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Ctl(request))?;
} else {
senders.send_to(bus, self.identity(), dest, BusMsg::Ctl(request))?;
endpoints.send_to(bus, self.identity(), dest, BusMsg::Ctl(request))?;
}
Ok(())
}

fn send_client_info(
&mut self,
senders: &mut Endpoints,
endpoints: &mut Endpoints,
dest: ServiceId,
request: InfoMsg,
) -> Result<(), Error> {
let bus = ServiceBus::Info;
if let ServiceId::GrpcdClient(_) = dest {
senders.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Info(request))?;
endpoints.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Info(request))?;
} else {
senders.send_to(bus, self.identity(), dest, BusMsg::Info(request))?;
endpoints.send_to(bus, self.identity(), dest, BusMsg::Info(request))?;
}
Ok(())
}
Expand Down
23 changes: 10 additions & 13 deletions src/swapd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
temporal_safety::TemporalSafety,
StateReport,
};
use crate::service::Endpoints;
use crate::service::{Endpoints, Reporter};
use crate::swapd::Opts;
use crate::syncerd::bitcoin_syncer::p2wpkh_signed_tx_fee;
use crate::syncerd::types::{Event, TransactionConfirmations};
Expand Down Expand Up @@ -186,6 +186,11 @@ pub struct CheckpointSwapd {
}

impl CtlServer for Runtime {}
impl Reporter for Runtime {
fn report_to(&self) -> Option<ServiceId> {
self.enquirer.clone()
}
}

impl esb::Handler<ServiceBus> for Runtime {
type Request = BusMsg;
Expand Down Expand Up @@ -728,14 +733,7 @@ impl Runtime {
.latest_state_report
.generate_progress_update_or_transition(&new_state_report);
self.latest_state_report = new_state_report;
if let Some(enquirer) = self.enquirer.clone() {
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
enquirer,
BusMsg::Ctl(CtlMsg::Progress(progress)),
)?;
}
self.report_progress(endpoints, progress)?;
}
Ok(())
}
Expand Down Expand Up @@ -789,7 +787,7 @@ impl Runtime {
);
// Ignoring possible reporting errors here and after: do not want to
// halt the swap just because the client disconnected
let _ = self.report_progress_message_to(endpoints, &self.enquirer.clone(), msg);
let _ = self.report_progress_message(endpoints, msg);

let engine = CommitmentEngine;
let commitment = match params {
Expand Down Expand Up @@ -820,10 +818,9 @@ impl Runtime {
"Accepting swap {} as Maker from Taker through peerd {}",
swap_id, self.peer_service
);
let enquirer = self.enquirer.clone();
// Ignoring possible reporting errors here and after: do not want to
// halt the swap just because the client disconnected
let _ = self.report_progress_message_to(endpoints, &enquirer, msg);
// halt the swap just because the enquirer (farcasterd) disconnected
let _ = self.report_progress_message(endpoints, msg);

let engine = CommitmentEngine;
let commitment = match params.clone() {
Expand Down
23 changes: 7 additions & 16 deletions src/swapd/swap_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use crate::{
BusMsg, Failure, FailureCode,
},
event::{Event, StateMachine},
service::Reporter,
syncerd::{FeeEstimation, FeeEstimations, SweepAddress, TaskAborted},
CtlServer, ServiceId,
ServiceId,
};
use crate::{
bus::{
Expand Down Expand Up @@ -631,9 +632,8 @@ fn attempt_transition_to_init_taker(
.taker_commit(event.endpoints, local_params.clone())
.map_err(|err| {
runtime.log_error(&err);
runtime.report_failure_to(
runtime.report_failure(
event.endpoints,
&runtime.enquirer.clone(),
Failure {
code: FailureCode::Unknown,
info: err.to_string(),
Expand Down Expand Up @@ -702,9 +702,8 @@ fn attempt_transition_to_init_maker(
let local_commit = runtime
.maker_commit(event.endpoints, swap_id, local_params.clone())
.map_err(|err| {
runtime.report_failure_to(
runtime.report_failure(
event.endpoints,
&runtime.enquirer.clone(),
Failure {
code: FailureCode::Unknown,
info: err.to_string(),
Expand Down Expand Up @@ -950,7 +949,7 @@ fn try_bob_fee_estimated_to_bob_funded(
// incorrect funding, start aborting procedure
let msg = format!("Incorrect amount funded. Required: {}, Funded: {}. Do not fund this swap anymore, will abort and atttempt to sweep the Bitcoin to the provided address.", amount, required_funding_amount);
runtime.log_error(&msg);
runtime.report_progress_message_to(event.endpoints, ServiceId::Farcasterd, msg)?;
runtime.report_progress_message(event.endpoints, msg)?;
return handle_bob_abort_swap(event, runtime, wallet);
} else {
// funding completed, amount is correct
Expand Down Expand Up @@ -1640,11 +1639,7 @@ fn try_alice_arbitrating_lock_final_to_alice_accordant_lock(
monero::Amount::from_pico(amount.clone())
);
runtime.log_error(&msg);
runtime.report_progress_message_to(
event.endpoints,
runtime.enquirer.clone(),
msg,
)?;
runtime.report_progress_message(event.endpoints, msg)?;
} else if amount.clone() > required_funding_amount.as_pico() {
// Alice overfunded. To ensure that she does not publish the buy transaction
// if Bob gives her the BuySig, go straight to AliceCanceled
Expand All @@ -1654,11 +1649,7 @@ fn try_alice_arbitrating_lock_final_to_alice_accordant_lock(
monero::Amount::from_pico(amount.clone())
);
runtime.log_error(&msg);
runtime.report_progress_message_to(
event.endpoints,
runtime.enquirer.clone(),
msg,
)?;
runtime.report_progress_message(event.endpoints, msg)?;

// Alice moves on to AliceCanceled despite not broadcasting the cancel transaction.
return Ok(Some(SwapStateMachine::AliceCanceled(AliceCanceled {
Expand Down

0 comments on commit 61b7c6c

Please sign in to comment.