Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swapd: state refactors #839

Merged
merged 9 commits into from
Dec 15, 2022
156 changes: 77 additions & 79 deletions src/swapd/swap_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use farcaster_core::{
blockchain::Blockchain,
role::{SwapRole, TradeRole},
swap::btcxmr::message::BuyProcedureSignature,
transaction::Fundable,
transaction::TxLabel,
};
use microservices::esb::Handler;
Expand All @@ -32,7 +33,7 @@ use crate::{
swapd::{
runtime::aggregate_xmr_spend_view,
syncer_client::{log_tx_created, log_tx_seen},
wallet::{HandleBuyProcedureSignatureRes, HandleRefundProcedureSignaturesRes},
wallet::{BobState, HandleBuyProcedureSignatureRes, HandleRefundProcedureSignaturesRes},
},
syncerd::{
Abort, Boolean, SweepSuccess, Task, TaskTarget, TransactionConfirmations,
Expand Down Expand Up @@ -540,7 +541,7 @@ impl StateMachine<Runtime, Error> for SwapStateMachine {
}
SwapStateMachine::AlicePunish => try_alice_punish_to_swap_end(event, runtime),

_ => Ok(Some(self)),
SwapStateMachine::SwapEnd(_) => Ok(None),
}
}

Expand Down Expand Up @@ -694,7 +695,6 @@ fn attempt_transition_to_init_maker(
commit.clone(),
)?;
let local_params = wallet.local_params();
let funding_address = wallet.funding_address();
runtime.peer_service = peerd;
if runtime.peer_service != ServiceId::Loopback {
runtime.connected = true;
Expand All @@ -715,20 +715,33 @@ fn attempt_transition_to_init_maker(
// send maker commit message to counter-party
runtime.log_trace(format!("sending peer MakerCommit msg {}", &local_commit));
runtime.send_peer(event.endpoints, PeerMsg::MakerCommit(local_commit.clone()))?;
match swap_role {
SwapRole::Bob => Ok(Some(SwapStateMachine::BobInitMaker(BobInitMaker {
local_commit,
local_params,
funding_address: funding_address.unwrap(),
remote_commit: commit,
wallet,
reveal: None,
}))),
SwapRole::Alice => Ok(Some(SwapStateMachine::AliceInitMaker(AliceInitMaker {
local_params,
remote_commit: commit,
wallet,
}))),
match (swap_role, wallet.clone()) {
(SwapRole::Bob, Wallet::Bob(BobState { funding_tx, .. })) => {
Ok(Some(SwapStateMachine::BobInitMaker(BobInitMaker {
local_commit,
local_params,
funding_address: funding_tx
.get_address()
.expect("Funding address should be valid"),
remote_commit: commit,
wallet,
reveal: None,
})))
}
(SwapRole::Alice, Wallet::Alice(_)) => {
Ok(Some(SwapStateMachine::AliceInitMaker(AliceInitMaker {
local_params,
remote_commit: commit,
wallet,
})))
}
_ => {
runtime.log_error(format!(
"Invalid swap role {} for wallet {}",
swap_role, wallet
));
Ok(None)
}
}
}
BusMsg::Ctl(CtlMsg::AbortSwap) => handle_abort_swap(event, runtime),
Expand Down Expand Up @@ -974,14 +987,12 @@ fn try_bob_reveal_to_bob_funded(
.zip([TxLabel::Lock, TxLabel::Cancel, TxLabel::Refund])
{
runtime.log_debug(format!("register watch {} tx", tx_label.label()));
if !runtime.syncer_state.is_watched_tx(&tx_label) {
let txid = tx.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let txid = tx.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}

// Set the monero address creation height for Bob before setting the first checkpoint
Expand Down Expand Up @@ -1066,16 +1077,11 @@ fn try_bob_funded_to_bob_refund_procedure_signature(
// register a watch task for buy tx.
// registration performed now already to ensure it's present in checkpoint.
runtime.log_debug("register watch buy tx task");
if !runtime.syncer_state.is_watched_tx(&TxLabel::Buy) {
let buy_tx = buy_procedure_signature.buy.clone().extract_tx();
let task = runtime
.syncer_state
.watch_tx_btc(buy_tx.txid(), TxLabel::Buy);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let buy_tx = buy_procedure_signature.buy.clone().extract_tx();
let task = runtime
.syncer_state
.watch_tx_btc(buy_tx.txid(), TxLabel::Buy);
event.send_sync_service(runtime.syncer_state.bitcoin_syncer(), SyncMsg::Task(task))?;
// Checkpoint BobRefundProcedureSignatures
let new_ssm =
SwapStateMachine::BobRefundProcedureSignatures(BobRefundProcedureSignatures {
Expand Down Expand Up @@ -1132,13 +1138,11 @@ fn try_bob_refund_procedure_signatures_to_bob_accordant_lock(
}
if let Some(tx_label) = runtime.syncer_state.tasks.watched_addrs.remove(&id) {
let abort_task = runtime.syncer_state.abort_task(id.clone());
if !runtime.syncer_state.is_watched_tx(&tx_label) {
let watch_tx = runtime.syncer_state.watch_tx_xmr(hash.clone(), tx_label);
event.send_sync_service(
runtime.syncer_state.monero_syncer(),
SyncMsg::Task(watch_tx),
)?;
}
let watch_tx = runtime.syncer_state.watch_tx_xmr(hash.clone(), tx_label);
event.send_sync_service(
runtime.syncer_state.monero_syncer(),
SyncMsg::Task(watch_tx),
)?;
event.send_sync_service(
runtime.syncer_state.monero_syncer(),
SyncMsg::Task(abort_task),
Expand Down Expand Up @@ -1457,14 +1461,12 @@ fn try_alice_reveal_to_alice_core_arbitrating_setup(
TxLabel::Refund,
]) {
runtime.log_debug(format!("Register watch {} tx", tx_label));
if !runtime.syncer_state.is_watched_tx(&tx_label) {
let txid = tx.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let txid = tx.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
// handle the core arbitrating setup message with the wallet
runtime.log_debug("Handling core arb setup with wallet");
Expand Down Expand Up @@ -1653,18 +1655,15 @@ fn try_alice_arbitrating_lock_final_to_alice_accordant_lock(
id, hash, amount, block, tx
));
let txlabel = TxLabel::AccLock;
if !runtime.syncer_state.is_watched_tx(&txlabel) {
let task = runtime.syncer_state.watch_tx_xmr(hash.clone(), txlabel);
if runtime.syncer_state.awaiting_funding {
event.send_ctl_service(
ServiceId::Farcasterd,
CtlMsg::FundingCompleted(Blockchain::Monero),
)?;
runtime.syncer_state.awaiting_funding = false;
}
event
.send_sync_service(runtime.syncer_state.monero_syncer(), SyncMsg::Task(task))?;
let task = runtime.syncer_state.watch_tx_xmr(hash.clone(), txlabel);
if runtime.syncer_state.awaiting_funding {
event.send_ctl_service(
ServiceId::Farcasterd,
CtlMsg::FundingCompleted(Blockchain::Monero),
)?;
runtime.syncer_state.awaiting_funding = false;
}
event.send_sync_service(runtime.syncer_state.monero_syncer(), SyncMsg::Task(task))?;
if runtime
.syncer_state
.tasks
Expand Down Expand Up @@ -1731,14 +1730,9 @@ fn try_alice_accordant_lock_to_alice_buy_procedure_signature(
BusMsg::P2p(PeerMsg::BuyProcedureSignature(buy_procedure_signature)) => {
// register a watch task for buy
runtime.log_debug("Registering watch buy tx task");
if !runtime.syncer_state.is_watched_tx(&TxLabel::Buy) {
let txid = buy_procedure_signature.buy.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, TxLabel::Buy);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let txid = buy_procedure_signature.buy.clone().extract_tx().txid();
let task = runtime.syncer_state.watch_tx_btc(txid, TxLabel::Buy);
event.send_sync_service(runtime.syncer_state.bitcoin_syncer(), SyncMsg::Task(task))?;
// Handle the received buy procedure signature message with the wallet
runtime.log_debug("Handling buy procedure signature with wallet");
let HandleBuyProcedureSignatureRes { cancel_tx, buy_tx } = wallet
Expand Down Expand Up @@ -1873,14 +1867,12 @@ fn try_alice_canceled_to_alice_refund_or_alice_punish(
runtime.log_debug("Publishing punish tx");
let (tx_label, punish_tx) = runtime.txs.remove_entry(&TxLabel::Punish).unwrap();
// syncer's watch punish tx task
if !runtime.syncer_state.is_watched_tx(&tx_label) {
let txid = punish_tx.txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
}
let txid = punish_tx.txid();
let task = runtime.syncer_state.watch_tx_btc(txid, tx_label);
event.send_sync_service(
runtime.syncer_state.bitcoin_syncer(),
SyncMsg::Task(task),
)?;
runtime.broadcast(punish_tx, tx_label, event.endpoints)?;
Ok(Some(SwapStateMachine::AlicePunish))
}
Expand Down Expand Up @@ -2066,8 +2058,14 @@ fn try_alice_punish_to_swap_end(
) -> Result<Option<SwapStateMachine>, Error> {
match event.request {
BusMsg::Sync(SyncMsg::Event(SyncEvent::TransactionConfirmations(
TransactionConfirmations { id, .. },
))) if runtime.syncer_state.tasks.watched_txs.get(&id) == Some(&TxLabel::Punish) => {
TransactionConfirmations {
id,
confirmations: Some(confirmations),
..
},
))) if runtime.syncer_state.tasks.watched_txs.get(&id) == Some(&TxLabel::Punish)
&& confirmations >= runtime.temporal_safety.btc_finality_thr =>
{
let abort_all = Task::Abort(Abort {
task_target: TaskTarget::AllTasks,
respond: Boolean::False,
Expand Down
14 changes: 14 additions & 0 deletions src/swapd/syncer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ impl SyncerState {
}

pub fn watch_tx_btc(&mut self, txid: Txid, tx_label: TxLabel) -> Task {
if self.is_watched_tx(&tx_label) {
warn!(
"{} | Already watching for tx with label {} - notifications will be repeated",
self.swap_id.swap_id(),
tx_label.label()
);
}
let id = self.tasks.new_taskid();
self.tasks.watched_txs.insert(id, tx_label);
self.tasks.txids.insert(tx_label, txid);
Expand All @@ -136,6 +143,13 @@ impl SyncerState {
self.tasks.watched_txs.values().any(|tx| tx == tx_label)
}
pub fn watch_tx_xmr(&mut self, hash: Vec<u8>, tx_label: TxLabel) -> Task {
if self.is_watched_tx(&tx_label) {
warn!(
"{} | Already watching for tx with label {} - notifications will be repeated",
self.swap_id.swap_id(),
tx_label.label()
);
}
let id = self.tasks.new_taskid();
self.tasks.watched_txs.insert(id, tx_label);
info!(
Expand Down
7 changes: 5 additions & 2 deletions src/swapd/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ pub struct HandleBuyProcedureSignatureRes {
pub buy_tx: bitcoin::Transaction,
}

#[derive(Clone, Debug, StrictEncode, StrictDecode)]
#[derive(Clone, Display, Debug, StrictEncode, StrictDecode)]
pub enum Wallet {
#[display("Wallet::Alice")]
h4sh3d marked this conversation as resolved.
Show resolved Hide resolved
Alice(AliceState),
#[display("Wallet::Bob")]
Bob(BobState),
}

Expand Down Expand Up @@ -752,8 +754,9 @@ impl Wallet {
}
} else {
error!(
"{} | Wallet not found or not on correct state",
"{} | Not correct wallet, should be Wallet::Bob: {}",
h4sh3d marked this conversation as resolved.
Show resolved Hide resolved
swap_id.swap_id(),
self
);
return Err(Error::Farcaster(
"Needs to be a Bob wallet to process Alice Commit".to_string(),
Expand Down