Skip to content

Commit

Permalink
Swap: Ensure a checkpoint is only set once and in the correct order
Browse files Browse the repository at this point in the history
This also solves the problem where a checkpoint of the same state would
be written to the database again on restore.
  • Loading branch information
TheCharlatan committed Jul 3, 2022
1 parent 482985f commit f97c2ad
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 77 deletions.
1 change: 1 addition & 0 deletions src/swapd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ pub use runtime::get_swap_id;
pub use runtime::run;
pub use runtime::CheckpointSwapd;
pub use swap_state::State;
pub use swap_state::SwapCheckpointType;
156 changes: 82 additions & 74 deletions src/swapd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::{

use super::{
storage::{self, Driver},
swap_state::{AliceState, BobState, State},
swap_state::{AliceState, BobState, State, SwapCheckpointType},
syncer_client::{log_tx_received, log_tx_seen, SyncerState, SyncerTasks},
temporal_safety::TemporalSafety,
};
Expand Down Expand Up @@ -771,22 +771,24 @@ impl Runtime {
"{} | checkpointing alice pre buy swapd state",
self.swap_id.bright_blue_italic()
);
checkpoint_send(
endpoints,
self.swap_id,
self.identity(),
ServiceId::Database,
request::CheckpointState::CheckpointSwapd(CheckpointSwapd {
state: self.state.clone(),
last_msg: Msg::BuyProcedureSignature(buy_proc_sig.clone()),
enquirer: self.enquirer.clone(),
temporal_safety: self.temporal_safety.clone(),
txs: self.txs.clone(),
txids: self.syncer_state.tasks.txids.clone(),
pending_requests: self.pending_requests.clone(),
xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(),
}),
)?;
if self.state.a_sup_checkpoint_pre_buy() {
checkpoint_send(
endpoints,
self.swap_id,
self.identity(),
ServiceId::Database,
request::CheckpointState::CheckpointSwapd(CheckpointSwapd {
state: self.state.clone(),
last_msg: Msg::BuyProcedureSignature(buy_proc_sig.clone()),
enquirer: self.enquirer.clone(),
temporal_safety: self.temporal_safety.clone(),
txs: self.txs.clone(),
txids: self.syncer_state.tasks.txids.clone(),
pending_requests: self.pending_requests.clone(),
xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(),
}),
)?;
}

self.send_wallet(msg_bus, endpoints, request)?
}
Expand Down Expand Up @@ -1240,7 +1242,10 @@ impl Runtime {
{
endpoints.send_to(bus_id, self.identity(), dest, request)?;
debug!("sent buyproceduresignature at state {}", &self.state);
let next_state = State::Bob(BobState::BuySigB { buy_tx_seen: false });
let next_state = State::Bob(BobState::BuySigB {
buy_tx_seen: false,
checkpoint: self.state.checkpoint().unwrap(),
});
self.state_update(endpoints, next_state)?;
} else {
error!(
Expand Down Expand Up @@ -1586,6 +1591,7 @@ impl Runtime {
cancel_seen: false,
refund_seen: false,
remote_params: self.state.remote_params().unwrap(),
checkpoint: self.state.checkpoint().unwrap(),
});
} else {
warn!(
Expand Down Expand Up @@ -1911,22 +1917,24 @@ impl Runtime {
"{} | checkpointing bob pre lock swapd state",
self.swap_id.bright_blue_italic()
);
checkpoint_send(
endpoints,
self.swap_id,
self.identity(),
ServiceId::Database,
request::CheckpointState::CheckpointSwapd(CheckpointSwapd {
state: self.state.clone(),
last_msg: Msg::CoreArbitratingSetup(core_arb_setup.clone()),
enquirer: self.enquirer.clone(),
temporal_safety: self.temporal_safety.clone(),
txs: self.txs.clone(),
txids: self.syncer_state.tasks.txids.clone(),
pending_requests: self.pending_requests.clone(),
xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(),
}),
)?;
if self.state.b_sup_checkpoint_pre_lock() {
checkpoint_send(
endpoints,
self.swap_id,
self.identity(),
ServiceId::Database,
request::CheckpointState::CheckpointSwapd(CheckpointSwapd {
state: self.state.clone(),
last_msg: Msg::CoreArbitratingSetup(core_arb_setup.clone()),
enquirer: self.enquirer.clone(),
temporal_safety: self.temporal_safety.clone(),
txs: self.txs.clone(),
txids: self.syncer_state.tasks.txids.clone(),
pending_requests: self.pending_requests.clone(),
xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(),
}),
)?;
}
let CoreArbitratingSetup {
swap_id: _,
lock,
Expand Down Expand Up @@ -1958,6 +1966,7 @@ impl Runtime {
cancel_seen: false,
remote_params: self.state.remote_params().unwrap(),
b_address: self.state.b_address().cloned().unwrap(),
checkpoint: self.state.checkpoint().unwrap(),
});
self.state_update(endpoints, next_state)?;
}
Expand Down Expand Up @@ -2054,26 +2063,29 @@ impl Runtime {
"{} | checkpointing alice pre lock swapd state",
self.swap_id.bright_blue_italic()
);
checkpoint_send(
endpoints,
self.swap_id,
self.identity(),
ServiceId::Database,
request::CheckpointState::CheckpointSwapd(CheckpointSwapd {
state: self.state.clone(),
last_msg: Msg::RefundProcedureSignatures(refund_proc_sigs.clone()),
enquirer: self.enquirer.clone(),
temporal_safety: self.temporal_safety.clone(),
txs: self.txs.clone(),
txids: self.syncer_state.tasks.txids.clone(),
pending_requests: self.pending_requests.clone(),
xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(),
}),
)?;
if self.state.a_sup_checkpoint_pre_lock() {
checkpoint_send(
endpoints,
self.swap_id,
self.identity(),
ServiceId::Database,
request::CheckpointState::CheckpointSwapd(CheckpointSwapd {
state: self.state.clone(),
last_msg: Msg::RefundProcedureSignatures(refund_proc_sigs.clone()),
enquirer: self.enquirer.clone(),
temporal_safety: self.temporal_safety.clone(),
txs: self.txs.clone(),
txids: self.syncer_state.tasks.txids.clone(),
pending_requests: self.pending_requests.clone(),
xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(),
}),
)?;
}

self.send_peer(endpoints, Msg::RefundProcedureSignatures(refund_proc_sigs))?;
trace!("sent peer RefundProcedureSignatures msg");
let next_state = State::Alice(AliceState::RefundSigA {
checkpoint: SwapCheckpointType::CheckpointAlicePreLock,
local_params: self.state.local_params().cloned().unwrap(),
xmr_locked: false,
buy_published: false,
Expand All @@ -2093,22 +2105,24 @@ impl Runtime {
"{} | checkpointing bob pre buy swapd state",
self.swap_id.bright_blue_italic()
);
checkpoint_send(
endpoints,
self.swap_id,
self.identity(),
ServiceId::Database,
request::CheckpointState::CheckpointSwapd(CheckpointSwapd {
state: self.state.clone(),
last_msg: Msg::BuyProcedureSignature(buy_proc_sig.clone()),
enquirer: self.enquirer.clone(),
temporal_safety: self.temporal_safety.clone(),
txs: self.txs.clone(),
txids: self.syncer_state.tasks.txids.clone(),
pending_requests: self.pending_requests.clone(),
xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(),
}),
)?;
if self.state.b_sup_checkpoint_pre_buy() {
checkpoint_send(
endpoints,
self.swap_id,
self.identity(),
ServiceId::Database,
request::CheckpointState::CheckpointSwapd(CheckpointSwapd {
state: self.state.clone(),
last_msg: Msg::BuyProcedureSignature(buy_proc_sig.clone()),
enquirer: self.enquirer.clone(),
temporal_safety: self.temporal_safety.clone(),
txs: self.txs.clone(),
txids: self.syncer_state.tasks.txids.clone(),
pending_requests: self.pending_requests.clone(),
xmr_addr_addendum: self.syncer_state.xmr_addr_addendum.clone(),
}),
)?;
}

debug!("subscribing with syncer for receiving raw buy tx ");

Expand Down Expand Up @@ -2274,10 +2288,7 @@ impl Runtime {
self.pending_requests = pending_requests;
self.txs = txs.clone();
trace!("Watch height bitcoin");
let watch_height_bitcoin = Task::WatchHeight(WatchHeight {
id: self.syncer_state.tasks.new_taskid(),
lifetime: self.syncer_state.task_lifetime(Coin::Bitcoin),
});
let watch_height_bitcoin = self.syncer_state.watch_height(Coin::Bitcoin);
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
Expand All @@ -2286,10 +2297,7 @@ impl Runtime {
)?;

trace!("Watch height monero");
let watch_height_monero = Task::WatchHeight(WatchHeight {
id: self.syncer_state.tasks.new_taskid(),
lifetime: self.syncer_state.task_lifetime(Coin::Monero),
});
let watch_height_monero = self.syncer_state.watch_height(Coin::Monero);
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
Expand Down
Loading

0 comments on commit f97c2ad

Please sign in to comment.