From f97c2ad45be0bf927e7a0f33afb0af3d290cccd2 Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Sat, 2 Jul 2022 14:33:15 +0200 Subject: [PATCH] Swap: Ensure a checkpoint is only set once and in the correct order This also solves the problem where a checkpoint of the same state would be written to the database again on restore. --- src/swapd/mod.rs | 1 + src/swapd/runtime.rs | 156 +++++++++++++++++++++------------------- src/swapd/swap_state.rs | 96 ++++++++++++++++++++++++- 3 files changed, 176 insertions(+), 77 deletions(-) diff --git a/src/swapd/mod.rs b/src/swapd/mod.rs index a2ce8b344..660f68adb 100644 --- a/src/swapd/mod.rs +++ b/src/swapd/mod.rs @@ -27,3 +27,4 @@ pub use runtime::get_swap_id; pub use runtime::run; pub use runtime::CheckpointSwapd; pub use swap_state::State; +pub use swap_state::SwapCheckpointType; diff --git a/src/swapd/runtime.rs b/src/swapd/runtime.rs index 29619446d..4de6af72a 100644 --- a/src/swapd/runtime.rs +++ b/src/swapd/runtime.rs @@ -38,7 +38,7 @@ use std::{ use super::{ storage::{self, Driver}, - swap_state::{AliceState, BobState, State}, + swap_state::{AliceState, BobState, State, SwapCheckpointType}, syncer_client::{log_tx_received, log_tx_seen, SyncerState, SyncerTasks}, temporal_safety::TemporalSafety, }; @@ -771,22 +771,24 @@ impl Runtime { "{} | checkpointing alice pre buy swapd state", self.swap_id.bright_blue_italic() ); - checkpoint_send( - endpoints, - self.swap_id, - self.identity(), - ServiceId::Database, - request::CheckpointState::CheckpointSwapd(CheckpointSwapd { - state: self.state.clone(), - last_msg: Msg::BuyProcedureSignature(buy_proc_sig.clone()), - enquirer: self.enquirer.clone(), - temporal_safety: self.temporal_safety.clone(), - txs: self.txs.clone(), - txids: self.syncer_state.tasks.txids.clone(), - pending_requests: self.pending_requests.clone(), - xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(), - }), - )?; + if self.state.a_sup_checkpoint_pre_buy() { + checkpoint_send( + endpoints, + self.swap_id, + self.identity(), + ServiceId::Database, + request::CheckpointState::CheckpointSwapd(CheckpointSwapd { + state: self.state.clone(), + last_msg: Msg::BuyProcedureSignature(buy_proc_sig.clone()), + enquirer: self.enquirer.clone(), + temporal_safety: self.temporal_safety.clone(), + txs: self.txs.clone(), + txids: self.syncer_state.tasks.txids.clone(), + pending_requests: self.pending_requests.clone(), + xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(), + }), + )?; + } self.send_wallet(msg_bus, endpoints, request)? } @@ -1240,7 +1242,10 @@ impl Runtime { { endpoints.send_to(bus_id, self.identity(), dest, request)?; debug!("sent buyproceduresignature at state {}", &self.state); - let next_state = State::Bob(BobState::BuySigB { buy_tx_seen: false }); + let next_state = State::Bob(BobState::BuySigB { + buy_tx_seen: false, + checkpoint: self.state.checkpoint().unwrap(), + }); self.state_update(endpoints, next_state)?; } else { error!( @@ -1586,6 +1591,7 @@ impl Runtime { cancel_seen: false, refund_seen: false, remote_params: self.state.remote_params().unwrap(), + checkpoint: self.state.checkpoint().unwrap(), }); } else { warn!( @@ -1911,22 +1917,24 @@ impl Runtime { "{} | checkpointing bob pre lock swapd state", self.swap_id.bright_blue_italic() ); - checkpoint_send( - endpoints, - self.swap_id, - self.identity(), - ServiceId::Database, - request::CheckpointState::CheckpointSwapd(CheckpointSwapd { - state: self.state.clone(), - last_msg: Msg::CoreArbitratingSetup(core_arb_setup.clone()), - enquirer: self.enquirer.clone(), - temporal_safety: self.temporal_safety.clone(), - txs: self.txs.clone(), - txids: self.syncer_state.tasks.txids.clone(), - pending_requests: self.pending_requests.clone(), - xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(), - }), - )?; + if self.state.b_sup_checkpoint_pre_lock() { + checkpoint_send( + endpoints, + self.swap_id, + self.identity(), + ServiceId::Database, + request::CheckpointState::CheckpointSwapd(CheckpointSwapd { + state: self.state.clone(), + last_msg: Msg::CoreArbitratingSetup(core_arb_setup.clone()), + enquirer: self.enquirer.clone(), + temporal_safety: self.temporal_safety.clone(), + txs: self.txs.clone(), + txids: self.syncer_state.tasks.txids.clone(), + pending_requests: self.pending_requests.clone(), + xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(), + }), + )?; + } let CoreArbitratingSetup { swap_id: _, lock, @@ -1958,6 +1966,7 @@ impl Runtime { cancel_seen: false, remote_params: self.state.remote_params().unwrap(), b_address: self.state.b_address().cloned().unwrap(), + checkpoint: self.state.checkpoint().unwrap(), }); self.state_update(endpoints, next_state)?; } @@ -2054,26 +2063,29 @@ impl Runtime { "{} | checkpointing alice pre lock swapd state", self.swap_id.bright_blue_italic() ); - checkpoint_send( - endpoints, - self.swap_id, - self.identity(), - ServiceId::Database, - request::CheckpointState::CheckpointSwapd(CheckpointSwapd { - state: self.state.clone(), - last_msg: Msg::RefundProcedureSignatures(refund_proc_sigs.clone()), - enquirer: self.enquirer.clone(), - temporal_safety: self.temporal_safety.clone(), - txs: self.txs.clone(), - txids: self.syncer_state.tasks.txids.clone(), - pending_requests: self.pending_requests.clone(), - xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(), - }), - )?; + if self.state.a_sup_checkpoint_pre_lock() { + checkpoint_send( + endpoints, + self.swap_id, + self.identity(), + ServiceId::Database, + request::CheckpointState::CheckpointSwapd(CheckpointSwapd { + state: self.state.clone(), + last_msg: Msg::RefundProcedureSignatures(refund_proc_sigs.clone()), + enquirer: self.enquirer.clone(), + temporal_safety: self.temporal_safety.clone(), + txs: self.txs.clone(), + txids: self.syncer_state.tasks.txids.clone(), + pending_requests: self.pending_requests.clone(), + xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(), + }), + )?; + } self.send_peer(endpoints, Msg::RefundProcedureSignatures(refund_proc_sigs))?; trace!("sent peer RefundProcedureSignatures msg"); let next_state = State::Alice(AliceState::RefundSigA { + checkpoint: SwapCheckpointType::CheckpointAlicePreLock, local_params: self.state.local_params().cloned().unwrap(), xmr_locked: false, buy_published: false, @@ -2093,22 +2105,24 @@ impl Runtime { "{} | checkpointing bob pre buy swapd state", self.swap_id.bright_blue_italic() ); - checkpoint_send( - endpoints, - self.swap_id, - self.identity(), - ServiceId::Database, - request::CheckpointState::CheckpointSwapd(CheckpointSwapd { - state: self.state.clone(), - last_msg: Msg::BuyProcedureSignature(buy_proc_sig.clone()), - enquirer: self.enquirer.clone(), - temporal_safety: self.temporal_safety.clone(), - txs: self.txs.clone(), - txids: self.syncer_state.tasks.txids.clone(), - pending_requests: self.pending_requests.clone(), - xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(), - }), - )?; + if self.state.b_sup_checkpoint_pre_buy() { + checkpoint_send( + endpoints, + self.swap_id, + self.identity(), + ServiceId::Database, + request::CheckpointState::CheckpointSwapd(CheckpointSwapd { + state: self.state.clone(), + last_msg: Msg::BuyProcedureSignature(buy_proc_sig.clone()), + enquirer: self.enquirer.clone(), + temporal_safety: self.temporal_safety.clone(), + txs: self.txs.clone(), + txids: self.syncer_state.tasks.txids.clone(), + pending_requests: self.pending_requests.clone(), + xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(), + }), + )?; + } debug!("subscribing with syncer for receiving raw buy tx "); @@ -2274,10 +2288,7 @@ impl Runtime { self.pending_requests = pending_requests; self.txs = txs.clone(); trace!("Watch height bitcoin"); - let watch_height_bitcoin = Task::WatchHeight(WatchHeight { - id: self.syncer_state.tasks.new_taskid(), - lifetime: self.syncer_state.task_lifetime(Coin::Bitcoin), - }); + let watch_height_bitcoin = self.syncer_state.watch_height(Coin::Bitcoin); endpoints.send_to( ServiceBus::Ctl, self.identity(), @@ -2286,10 +2297,7 @@ impl Runtime { )?; trace!("Watch height monero"); - let watch_height_monero = Task::WatchHeight(WatchHeight { - id: self.syncer_state.tasks.new_taskid(), - lifetime: self.syncer_state.task_lifetime(Coin::Monero), - }); + let watch_height_monero = self.syncer_state.watch_height(Coin::Monero); endpoints.send_to( ServiceBus::Ctl, self.identity(), diff --git a/src/swapd/swap_state.rs b/src/swapd/swap_state.rs index 2548a6da7..5d95b8fef 100644 --- a/src/swapd/swap_state.rs +++ b/src/swapd/swap_state.rs @@ -30,9 +30,11 @@ pub enum AliceState { local_params: Params, remote_commit: Commit, remote_params: Option, + checkpoint: Option, }, // local, remote, remote #[display("RefundSigs(xmr_locked({xmr_locked}), buy_pub({buy_published}), cancel_seen({cancel_seen}), refund_seen({refund_seen}))")] RefundSigA { + checkpoint: SwapCheckpointType, xmr_locked: bool, buy_published: bool, cancel_seen: bool, @@ -66,6 +68,7 @@ pub enum BobState { // #[display("Reveal: {0:#?}")] #[display("Reveal")] RevealB { + checkpoint: Option, local_params: Params, remote_commit: Commit, b_address: bitcoin::Address, @@ -75,6 +78,7 @@ pub enum BobState { // #[display("CoreArb: {0:#?}")] #[display("CoreArb")] CorearbB { + checkpoint: SwapCheckpointType, received_refund_procedure_signatures: bool, cancel_seen: bool, remote_params: Params, @@ -82,7 +86,10 @@ pub enum BobState { b_address: bitcoin::Address, }, // lock (not signed), cancel_seen, remote #[display("BuySig")] - BuySigB { buy_tx_seen: bool }, + BuySigB { + buy_tx_seen: bool, + checkpoint: SwapCheckpointType, + }, #[display("Finish({0})")] FinishB(Outcome), } @@ -96,6 +103,19 @@ pub enum State { Bob(BobState), } +#[derive(Display, Debug, Clone, StrictEncode, StrictDecode)] +#[display(inner)] +pub enum SwapCheckpointType { + #[display("CheckpointBobPreLock")] + CheckpointBobPreLock, + #[display("CheckpointBobPreBuy")] + CheckpointBobPreBuy, + #[display("CheckpointAlicePreLock")] + CheckpointAlicePreLock, + #[display("CheckpointAlicePreBuy")] + CheckpointAlicePreBuy, +} + // The state impl is not public and may contain code not used yet, we can relax the linter and // allow dead code. #[allow(dead_code)] @@ -277,6 +297,16 @@ impl State { State::Alice(AliceState::RevealA { .. }) | State::Bob(BobState::RevealB { .. }) ) } + pub fn checkpoint(&self) -> Option { + match self { + State::Alice(AliceState::RevealA { checkpoint, .. }) + | State::Bob(BobState::RevealB { checkpoint, .. }) => checkpoint.clone(), + State::Alice(AliceState::RefundSigA { checkpoint, .. }) + | State::Bob(BobState::CorearbB { checkpoint, .. }) + | State::Bob(BobState::BuySigB { checkpoint, .. }) => Some(checkpoint.clone()), + _ => None, + } + } pub fn a_refundsig(&self) -> bool { matches!(self, State::Alice(AliceState::RefundSigA { .. })) } @@ -285,7 +315,7 @@ impl State { return false; } match self { - State::Bob(BobState::BuySigB { buy_tx_seen }) => *buy_tx_seen, + State::Bob(BobState::BuySigB { buy_tx_seen, .. }) => *buy_tx_seen, _ => unreachable!("conditional early return"), } } @@ -393,6 +423,7 @@ impl State { local_params, remote_commit, remote_params, + checkpoint: None, }), State::Bob(BobState::CommitB { @@ -408,6 +439,7 @@ impl State { b_address, local_trade_role, remote_params, + checkpoint: None, }), _ => unreachable!("checked state on pattern to be Commit"), @@ -446,7 +478,9 @@ impl State { return; } match self { - State::Bob(BobState::BuySigB { buy_tx_seen }) if !(*buy_tx_seen) => *buy_tx_seen = true, + State::Bob(BobState::BuySigB { buy_tx_seen, .. }) if !(*buy_tx_seen) => { + *buy_tx_seen = true + } _ => unreachable!("checked state"), } } @@ -481,4 +515,60 @@ impl State { false } } + pub fn a_sup_checkpoint_pre_lock(&mut self) -> bool { + if let State::Alice(AliceState::RevealA { checkpoint, .. }) = self { + if checkpoint.is_none() { + *checkpoint = Some(SwapCheckpointType::CheckpointAlicePreLock); + true + } else { + debug!("checkpoint alice pre lock already set"); + false + } + } else { + error!("Not on RevealA state"); + false + } + } + pub fn a_sup_checkpoint_pre_buy(&mut self) -> bool { + if let State::Alice(AliceState::RefundSigA { checkpoint, .. }) = self { + if let SwapCheckpointType::CheckpointAlicePreLock = *checkpoint { + *checkpoint = SwapCheckpointType::CheckpointAlicePreBuy; + true + } else { + debug!("checkpoint alice pre buy already set"); + false + } + } else { + error!("Not on RefundSigA state"); + false + } + } + pub fn b_sup_checkpoint_pre_lock(&mut self) -> bool { + if let State::Bob(BobState::RevealB { checkpoint, .. }) = self { + if checkpoint.is_none() { + *checkpoint = Some(SwapCheckpointType::CheckpointBobPreLock); + true + } else { + debug!("checkpoint bob pre lock already set"); + false + } + } else { + error!("Not on RevealB state"); + false + } + } + pub fn b_sup_checkpoint_pre_buy(&mut self) -> bool { + if let State::Bob(BobState::CorearbB { checkpoint, .. }) = self { + if let SwapCheckpointType::CheckpointBobPreLock = *checkpoint { + *checkpoint = SwapCheckpointType::CheckpointBobPreBuy; + true + } else { + debug!("checkpoint bob pre buy already set"); + false + } + } else { + error!("Not on CoreArbB state"); + false + } + } }