Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make progress serializable #438

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cli/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl Exec for Command {

Command::Progress { swapid } => {
runtime.request(ServiceId::Farcasterd, Request::ReadProgress(swapid))?;
runtime.report_progress()?;
runtime.report_response()?;
}

Command::NeedsFunding { coin } => {
Expand Down
47 changes: 32 additions & 15 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// If not, see <https://opensource.org/licenses/MIT>.

use crate::farcasterd::runtime::request::MadeOffer;
use crate::farcasterd::runtime::request::SwapProgress;
use crate::farcasterd::runtime::request::TookOffer;
use crate::{
clap::Parser,
Expand Down Expand Up @@ -554,7 +555,10 @@ impl Runtime {
);
report_to.push((
swap_params.report_to.clone(), // walletd
Request::Progress(format!("Swap daemon {} operational", source)),
Request::Progress(request::Progress::ProgressMessage(format!(
"Swap daemon {} operational",
source
))),
));
let swapid = get_swap_id(&source)?;
// when online, Syncers say Hello, then they get registered to self.syncers
Expand Down Expand Up @@ -597,7 +601,10 @@ impl Runtime {
);
report_to.push((
swap_params.report_to.clone(), // walletd
Request::Progress(format!("Swap daemon {} operational", source)),
Request::Progress(request::Progress::ProgressMessage(format!(
"Swap daemon {} operational",
source
))),
));
match swap_params.local_params {
Params::Alice(_) => {}
Expand Down Expand Up @@ -1030,22 +1037,32 @@ impl Runtime {

Request::ReadProgress(swapid) => {
if let Some(queue) = self.progress.get_mut(&ServiceId::Swap(swapid)) {
let n = queue.len();

for (i, req) in queue.iter().enumerate() {
let x = match req {
Request::Progress(x)
| Request::Success(OptionDetails(Some(x)))
| Request::Failure(Failure { code: _, info: x }) => x,
let mut swap_progress = SwapProgress {
failure: None,
messages: vec![],
state_transitions: vec![],
};
for (_i, req) in queue.iter().enumerate() {
match req {
Request::Progress(request::Progress::ProgressMessage(x))
| Request::Success(OptionDetails(Some(x))) => {
swap_progress.messages.push(x.clone());
}
Request::Progress(request::Progress::StateTransition(x)) => {
swap_progress.state_transitions.push(x.clone());
}
Request::Failure(Failure { code: _, info: x }) => {
swap_progress.failure = Some(x.clone());
}
_ => unreachable!("not handled here"),
};
let req = if i < n - 1 {
Request::Progress(x.clone())
} else {
Request::Success(OptionDetails(Some(x.clone())))
};
report_to.push((Some(source.clone()), req));
}
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
source,
Request::SwapProgress(swap_progress),
)?;
} else {
let info = if self.making_swaps.contains_key(&ServiceId::Swap(swapid))
|| self.taking_swaps.contains_key(&ServiceId::Swap(swapid))
Expand Down
39 changes: 35 additions & 4 deletions src/rpc/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use lazy_static::lazy_static;
use lightning_encoding::{strategies::AsStrict, LightningDecode, LightningEncode};
use monero::consensus::{Decodable as MoneroDecodable, Encodable as MoneroEncodable};
#[cfg(feature = "serde")]
use serde_with::{DisplayFromStr, DurationSeconds, Same};
use serde_with::{skip_serializing_none, DisplayFromStr, DurationSeconds, Same};
use std::{collections::BTreeMap, convert::TryInto};
use std::{
fmt::{self, Debug, Display, Formatter},
Expand Down Expand Up @@ -484,10 +484,17 @@ pub enum Request {
#[api(type = 1004)]
#[display("{0}")]
String(String),

#[api(type = 1002)]
#[display("progress({0})")]
Progress(String),

Progress(Progress),

#[api(type = 1005)]
#[display("swap_progress({0})", alt = "{0:#}")]
SwapProgress(SwapProgress),
// #[api(type = 207)]
// #[display("took_offer({0})", alt = "{0:#}")]
// TookOffer(TookOffer),
#[api(type = 1003)]
#[display("read_progress({0})")]
ReadProgress(SwapId),
Expand Down Expand Up @@ -740,6 +747,13 @@ pub struct InitSwap {
pub funding_address: Option<bitcoin::Address>,
}

#[derive(Clone, Debug, Display, StrictEncode, StrictDecode)]
#[display("progress {}")]
pub enum Progress {
ProgressMessage(String),
StateTransition(String),
}

#[cfg_attr(feature = "serde", serde_as)]
#[derive(Clone, PartialEq, Eq, Debug, Display, StrictEncode, StrictDecode)]
#[cfg_attr(
Expand Down Expand Up @@ -836,6 +850,21 @@ pub struct TookOffer {
pub message: String,
}

#[cfg_attr(feature = "serde", serde_as)]
#[derive(Clone, PartialEq, Eq, Debug, Display, Default, StrictEncode, StrictDecode)]
#[cfg_attr(
feature = "serde",
derive(Serialize, Deserialize),
serde(crate = "serde_crate")
)]
#[display(SwapProgress::to_yaml_string)]
pub struct SwapProgress {
#[serde(skip_serializing_if = "Option::is_none")]
pub failure: Option<String>,
pub state_transitions: Vec<String>,
pub messages: Vec<String>,
}

#[cfg_attr(feature = "serde", serde_as)]
#[derive(Clone, PartialEq, Eq, Debug, Display, StrictEncode, StrictDecode)]
#[cfg_attr(
Expand Down Expand Up @@ -915,6 +944,8 @@ impl ToYamlString for SyncerInfo {}
impl ToYamlString for MadeOffer {}
#[cfg(feature = "serde")]
impl ToYamlString for TookOffer {}
#[cfg(feature = "serde")]
impl ToYamlString for SwapProgress {}

#[derive(Wrapper, Clone, PartialEq, Eq, Debug, From, StrictEncode, StrictDecode)]
#[wrapper(IndexRange)]
Expand Down Expand Up @@ -995,7 +1026,7 @@ pub trait IntoSuccessOrFailure {
impl IntoProgressOrFailure for Result<String, crate::Error> {
fn into_progress_or_failure(self) -> Request {
match self {
Ok(val) => Request::Progress(val),
Ok(val) => Request::Progress(Progress::ProgressMessage(val)),
Err(err) => Request::from(err),
}
}
Expand Down
22 changes: 20 additions & 2 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// along with this software.
// If not, see <https://opensource.org/licenses/MIT>.

use crate::rpc::request::Progress;
use crate::syncerd::opts::Coin;
use std::convert::TryInto;
use std::fmt::{self, Display, Formatter};
Expand Down Expand Up @@ -304,7 +305,7 @@ where
Ok(())
}

fn report_progress_to(
fn report_progress_message_to(
&mut self,
senders: &mut Endpoints,
dest: impl TryToServiceId,
Expand All @@ -315,7 +316,24 @@ where
ServiceBus::Ctl,
self.identity(),
dest,
Request::Progress(msg.to_string()),
Request::Progress(Progress::ProgressMessage(msg.to_string())),
)?;
}
Ok(())
}

fn report_state_transition_progress_message_to(
&mut self,
senders: &mut Endpoints,
dest: impl TryToServiceId,
msg: impl ToString,
) -> Result<(), Error> {
if let Some(dest) = dest.try_to_service_id() {
senders.send_to(
ServiceBus::Ctl,
self.identity(),
dest,
Request::Progress(Progress::StateTransition(msg.to_string())),
)?;
}
Ok(())
Expand Down
40 changes: 18 additions & 22 deletions src/swapd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,14 +1038,15 @@ impl Runtime {
}

fn state_update(&mut self, endpoints: &mut Endpoints, next_state: State) -> Result<(), Error> {
let msg = format!(
"State transition: {} -> {}",
info!(
"{} | State transition: {} -> {}",
self.swap_id.bright_blue_italic(),
self.state.bright_white_bold(),
next_state.bright_white_bold()
next_state.bright_white_bold(),
);
info!("{} | {}", self.swap_id.bright_blue_italic(), &msg);
let msg = format!("{} -> {}", self.state, next_state,);
self.state = next_state;
self.report_success_to(endpoints, self.enquirer.clone(), Some(msg))?;
self.report_state_transition_progress_message_to(endpoints, self.enquirer.clone(), msg)?;
Ok(())
}

Expand Down Expand Up @@ -2681,12 +2682,11 @@ impl Runtime {
endpoints: &mut Endpoints,
params: Params,
) -> Result<request::Commit, Error> {
let msg = format!(
"{} {} to Maker remote peer",
info!(
"{} | {} to Maker remote peer",
self.swap_id().bright_blue_italic(),
"Proposing to take swap".bright_white_bold(),
self.swap_id().bright_blue_italic()
);
info!("{} | {}", self.swap_id.bright_blue_italic(), &msg);
let engine = CommitmentEngine;
let commitment = match params {
Params::Bob(params) => request::Commit::BobParameters(
Expand All @@ -2699,7 +2699,8 @@ impl Runtime {
// Ignoring possible reporting errors here and after: do not want to
// halt the swap just because the client disconnected
let enquirer = self.enquirer.clone();
let _ = self.report_progress_to(endpoints, &enquirer, msg);
let msg = format!("Proposing to take swap to Maker remote peer",);
let _ = self.report_progress_message_to(endpoints, &enquirer, msg);

Ok(commitment)
}
Expand All @@ -2711,18 +2712,21 @@ impl Runtime {
swap_id: SwapId,
params: &Params,
) -> Result<request::Commit, Error> {
let msg = format!(
"{} {} as Maker from Taker remote peer {}",
info!(
"{} | {} as Maker from Taker through peerd {}",
"Accepting swap".bright_white_bold(),
swap_id.bright_blue_italic(),
peerd.bright_blue_italic()
);
info!("{} | {}", self.swap_id.bright_blue_italic(), msg);

// Ignoring possible reporting errors here and after: do not want to
// halt the channel just because the client disconnected
let msg = format!(
"Accepting swap {} as Maker from Taker through peerd {}",
swap_id, peerd
);
let enquirer = self.enquirer.clone();
let _ = self.report_progress_to(endpoints, &enquirer, msg);
let _ = self.report_progress_message_to(endpoints, &enquirer, msg);

let engine = CommitmentEngine;
let commitment = match params.clone() {
Expand All @@ -2733,14 +2737,6 @@ impl Runtime {
CommitAliceParameters::commit_to_bundle(self.swap_id(), &engine, params),
),
};

let msg = format!(
"{} swap {:#} from remote peer Taker {}",
"Making".bright_green_bold(),
swap_id.bright_green_italic(),
peerd.bright_green_italic()
);
let _ = self.report_success_to(endpoints, &enquirer, Some(msg));
Ok(commitment)
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/syncerd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::service::Endpoints;
use crate::syncerd::bitcoin_syncer::BitcoinSyncer;
use crate::syncerd::monero_syncer::MoneroSyncer;
use crate::syncerd::opts::{Coin, Opts};
use crate::syncerd::runtime::request::Progress;
use amplify::Wrapper;
use farcaster_core::blockchain::Network;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -206,7 +207,7 @@ impl Runtime {
source.clone(),
Request::TaskList(self.tasks.iter().cloned().collect()),
)?;
let resp = Request::Progress("ListedTasks?".to_string());
let resp = Request::Progress(Progress::ProgressMessage("ListedTasks?".to_string()));
notify_cli = Some((Some(source), resp));
}

Expand Down