Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service: Implement Reporter trait #848

Merged
merged 3 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 44 additions & 27 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,19 +356,20 @@ impl TryToServiceId for Option<ServiceId> {
}
}

pub trait CtlServer
pub trait Reporter
where
Self: esb::Handler<ServiceBus>,
esb::Error<ServiceId>: From<Self::Error>,
{
fn report_success_to(
fn report_to(&self) -> Option<ServiceId>;

fn report_success_message(
&mut self,
senders: &mut Endpoints,
dest: impl TryToServiceId,
endpoints: &mut Endpoints,
msg: Option<impl ToString>,
) -> Result<(), Error> {
if let Some(dest) = dest.try_to_service_id() {
senders.send_to(
if let Some(dest) = self.report_to() {
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
dest,
Expand All @@ -378,14 +379,29 @@ where
Ok(())
}

fn report_progress_message_to(
fn report_progress(
&mut self,
senders: &mut Endpoints,
dest: impl TryToServiceId,
endpoints: &mut Endpoints,
progress: Progress,
) -> Result<(), Error> {
if let Some(dest) = self.report_to() {
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
dest,
BusMsg::Ctl(CtlMsg::Progress(progress)),
)?;
}
Ok(())
}

fn report_progress_message(
&mut self,
endpoints: &mut Endpoints,
msg: impl ToString,
) -> Result<(), Error> {
if let Some(dest) = dest.try_to_service_id() {
senders.send_to(
if let Some(dest) = self.report_to() {
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
dest,
Expand All @@ -395,15 +411,10 @@ where
Ok(())
}

fn report_failure_to(
&mut self,
senders: &mut Endpoints,
dest: impl TryToServiceId,
failure: Failure,
) -> Error {
if let Some(dest) = dest.try_to_service_id() {
fn report_failure(&mut self, endpoints: &mut Endpoints, failure: Failure) -> Error {
if let Some(dest) = self.report_to() {
// Even if we fail, we still have to terminate :)
let _ = senders.send_to(
let _ = endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
dest,
Expand All @@ -412,45 +423,51 @@ where
}
Error::Terminate(failure.to_string())
}
}

pub trait CtlServer
where
Self: esb::Handler<ServiceBus>,
esb::Error<ServiceId>: From<Self::Error>,
{
fn send_ctl(
&mut self,
senders: &mut Endpoints,
endpoints: &mut Endpoints,
dest: impl TryToServiceId,
request: BusMsg,
) -> Result<(), Error> {
if let Some(dest) = dest.try_to_service_id() {
senders.send_to(ServiceBus::Ctl, self.identity(), dest, request)?;
endpoints.send_to(ServiceBus::Ctl, self.identity(), dest, request)?;
}
Ok(())
}

fn send_client_ctl(
&mut self,
senders: &mut Endpoints,
endpoints: &mut Endpoints,
dest: ServiceId,
request: CtlMsg,
) -> Result<(), Error> {
let bus = ServiceBus::Ctl;
if let ServiceId::GrpcdClient(_) = dest {
senders.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Ctl(request))?;
endpoints.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Ctl(request))?;
} else {
senders.send_to(bus, self.identity(), dest, BusMsg::Ctl(request))?;
endpoints.send_to(bus, self.identity(), dest, BusMsg::Ctl(request))?;
}
Ok(())
}

fn send_client_info(
&mut self,
senders: &mut Endpoints,
endpoints: &mut Endpoints,
dest: ServiceId,
request: InfoMsg,
) -> Result<(), Error> {
let bus = ServiceBus::Info;
if let ServiceId::GrpcdClient(_) = dest {
senders.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Info(request))?;
endpoints.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Info(request))?;
} else {
senders.send_to(bus, self.identity(), dest, BusMsg::Info(request))?;
endpoints.send_to(bus, self.identity(), dest, BusMsg::Info(request))?;
}
Ok(())
}
Expand Down
23 changes: 10 additions & 13 deletions src/swapd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
temporal_safety::TemporalSafety,
StateReport,
};
use crate::service::Endpoints;
use crate::service::{Endpoints, Reporter};
use crate::swapd::Opts;
use crate::syncerd::bitcoin_syncer::p2wpkh_signed_tx_fee;
use crate::syncerd::types::{Event, TransactionConfirmations};
Expand Down Expand Up @@ -186,6 +186,11 @@ pub struct CheckpointSwapd {
}

impl CtlServer for Runtime {}
impl Reporter for Runtime {
fn report_to(&self) -> Option<ServiceId> {
self.enquirer.clone()
}
}

impl esb::Handler<ServiceBus> for Runtime {
type Request = BusMsg;
Expand Down Expand Up @@ -728,14 +733,7 @@ impl Runtime {
.latest_state_report
.generate_progress_update_or_transition(&new_state_report);
self.latest_state_report = new_state_report;
if let Some(enquirer) = self.enquirer.clone() {
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
enquirer,
BusMsg::Ctl(CtlMsg::Progress(progress)),
)?;
}
self.report_progress(endpoints, progress)?;
}
Ok(())
}
Expand Down Expand Up @@ -789,7 +787,7 @@ impl Runtime {
);
// Ignoring possible reporting errors here and after: do not want to
// halt the swap just because the client disconnected
let _ = self.report_progress_message_to(endpoints, &self.enquirer.clone(), msg);
let _ = self.report_progress_message(endpoints, msg);

let engine = CommitmentEngine;
let commitment = match params {
Expand Down Expand Up @@ -820,10 +818,9 @@ impl Runtime {
"Accepting swap {} as Maker from Taker through peerd {}",
swap_id, self.peer_service
);
let enquirer = self.enquirer.clone();
// Ignoring possible reporting errors here and after: do not want to
// halt the swap just because the client disconnected
let _ = self.report_progress_message_to(endpoints, &enquirer, msg);
// halt the swap just because the enquirer (farcasterd) disconnected
let _ = self.report_progress_message(endpoints, msg);

let engine = CommitmentEngine;
let commitment = match params.clone() {
Expand Down
23 changes: 7 additions & 16 deletions src/swapd/swap_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use crate::{
BusMsg, Failure, FailureCode,
},
event::{Event, StateMachine},
service::Reporter,
syncerd::{FeeEstimation, FeeEstimations, SweepAddress, TaskAborted},
CtlServer, ServiceId,
ServiceId,
};
use crate::{
bus::{
Expand Down Expand Up @@ -638,9 +639,8 @@ fn attempt_transition_to_init_taker(
.taker_commit(event.endpoints, local_params.clone())
.map_err(|err| {
runtime.log_error(&err);
runtime.report_failure_to(
runtime.report_failure(
event.endpoints,
&runtime.enquirer.clone(),
Failure {
code: FailureCode::Unknown,
info: err.to_string(),
Expand Down Expand Up @@ -709,9 +709,8 @@ fn attempt_transition_to_init_maker(
let local_commit = runtime
.maker_commit(event.endpoints, swap_id, local_params.clone())
.map_err(|err| {
runtime.report_failure_to(
runtime.report_failure(
event.endpoints,
&runtime.enquirer.clone(),
Failure {
code: FailureCode::Unknown,
info: err.to_string(),
Expand Down Expand Up @@ -957,7 +956,7 @@ fn try_bob_fee_estimated_to_bob_funded(
// incorrect funding, start aborting procedure
let msg = format!("Incorrect amount funded. Required: {}, Funded: {}. Do not fund this swap anymore, will abort and atttempt to sweep the Bitcoin to the provided address.", amount, required_funding_amount);
runtime.log_error(&msg);
runtime.report_progress_message_to(event.endpoints, ServiceId::Farcasterd, msg)?;
runtime.report_progress_message(event.endpoints, msg)?;
return handle_bob_abort_swap(event, runtime, wallet);
} else {
// funding completed, amount is correct
Expand Down Expand Up @@ -1642,11 +1641,7 @@ fn try_alice_arbitrating_lock_final_to_alice_accordant_lock(
monero::Amount::from_pico(amount.clone())
);
runtime.log_error(&msg);
runtime.report_progress_message_to(
event.endpoints,
runtime.enquirer.clone(),
msg,
)?;
runtime.report_progress_message(event.endpoints, msg)?;
} else if amount.clone() > required_funding_amount.as_pico() {
// Alice overfunded. To ensure that she does not publish the buy transaction
// if Bob gives her the BuySig, go straight to AliceCanceled
Expand All @@ -1656,11 +1651,7 @@ fn try_alice_arbitrating_lock_final_to_alice_accordant_lock(
monero::Amount::from_pico(amount.clone())
);
runtime.log_error(&msg);
runtime.report_progress_message_to(
event.endpoints,
runtime.enquirer.clone(),
msg,
)?;
runtime.report_progress_message(event.endpoints, msg)?;

// Alice moves on to AliceCanceled despite not broadcasting the cancel transaction.
return Ok(Some(SwapStateMachine::AliceCanceled(AliceCanceled {
Expand Down