diff --git a/src/swapd/mod.rs b/src/swapd/mod.rs index 2279350e2..baf9251aa 100644 --- a/src/swapd/mod.rs +++ b/src/swapd/mod.rs @@ -17,6 +17,7 @@ mod opts; mod runtime; #[allow(dead_code)] pub(self) mod storage; +mod syncer_client; #[cfg(feature = "shell")] pub use opts::Opts; diff --git a/src/swapd/runtime.rs b/src/swapd/runtime.rs index 0a6106558..e8019f301 100644 --- a/src/swapd/runtime.rs +++ b/src/swapd/runtime.rs @@ -17,6 +17,7 @@ use crate::service::Endpoints; use crate::{ rpc::request::Outcome, rpc::request::{BitcoinFundingInfo, FundingInfo, MoneroFundingInfo}, + swapd::syncer_client::log_tx_seen, syncerd::{ opts::Coin, Abort, GetTx, HeightChanged, SweepAddress, SweepAddressAddendum, SweepSuccess, SweepXmrAddress, TaskId, TaskTarget, TransactionRetrieved, WatchHeight, XmrAddressAddendum, @@ -33,7 +34,10 @@ use std::{ time::{Duration, SystemTime}, }; -use super::storage::{self, Driver}; +use super::{ + storage::{self, Driver}, + syncer_client::{log_tx_received, SyncerState, SyncerTasks}, +}; use crate::rpc::{ request::{self, Msg}, Request, ServiceBus, @@ -270,40 +274,6 @@ impl TemporalSafety { } } -struct SyncerTasks { - counter: u32, - watched_txs: HashMap, - final_txs: HashMap, - watched_addrs: HashMap, - retrieving_txs: HashMap, - sweeping_addr: Option, - // external address: needed to subscribe for buy (bob) or refund (alice) address_txs - txids: HashMap, -} - -impl SyncerTasks { - fn new_taskid(&mut self) -> TaskId { - self.counter += 1; - TaskId(self.counter) - } -} - -struct SyncerState { - swap_id: SwapId, - tasks: SyncerTasks, - bitcoin_height: u64, - monero_height: u64, - confirmation_bound: u32, - lock_tx_confs: Option, - cancel_tx_confs: Option, - network: farcaster_core::blockchain::Network, - bitcoin_syncer: ServiceId, - monero_syncer: ServiceId, - monero_amount: monero::Amount, - bitcoin_amount: bitcoin::Amount, - awaiting_funding: bool, -} - #[derive(Display, Clone)] pub enum AliceState { // #[display("Start: {0:#?} {1:#?}")] @@ -764,260 +734,6 @@ impl esb::Handler for Runtime { } } -impl SyncerState { - fn task_lifetime(&self, coin: Coin) -> u64 { - let height = self.height(coin); - if height > 0 { - height + 500 - } else { - u64::MAX - } - } - fn bitcoin_syncer(&self) -> ServiceId { - self.bitcoin_syncer.clone() - } - fn monero_syncer(&self) -> ServiceId { - self.monero_syncer.clone() - } - fn height(&self, coin: Coin) -> u64 { - match coin { - Coin::Bitcoin => self.bitcoin_height, - Coin::Monero => self.monero_height, - } - } - fn handle_height_change(&mut self, new_height: u64, coin: Coin) { - let height = match coin { - Coin::Bitcoin => &mut self.bitcoin_height, - Coin::Monero => &mut self.monero_height, - }; - if &new_height > height { - debug!("{} new height {}", coin, &new_height); - *height = new_height; - } else { - warn!("block height did not increment, maybe syncer sends multiple events"); - } - } - fn watch_tx_btc(&mut self, txid: Txid, tx_label: TxLabel) -> Task { - let id = self.tasks.new_taskid(); - self.tasks.watched_txs.insert(id, tx_label); - info!( - "{} | Watching {} transaction ({})", - self.swap_id.bright_blue_italic(), - tx_label.bright_white_bold(), - txid.bright_yellow_italic() - ); - Task::WatchTransaction(WatchTransaction { - id, - lifetime: self.task_lifetime(Coin::Bitcoin), - hash: txid.to_vec(), - confirmation_bound: self.confirmation_bound, - }) - } - fn is_watched_tx(&self, tx_label: &TxLabel) -> bool { - self.tasks.watched_txs.values().any(|tx| tx == tx_label) - } - fn watch_tx_xmr(&mut self, hash: Vec, tx_label: TxLabel) -> Task { - let id = self.tasks.new_taskid(); - self.tasks.watched_txs.insert(id, tx_label); - info!( - "{} | Watching {} transaction ({})", - self.swap_id.bright_blue_italic(), - tx_label.bright_white_bold(), - hex::encode(&hash).bright_yellow_italic(), - ); - debug!("Watching transaction {} with {}", hex::encode(&hash), id); - Task::WatchTransaction(WatchTransaction { - id, - lifetime: self.task_lifetime(Coin::Monero), - hash, - confirmation_bound: self.confirmation_bound, - }) - } - fn retrieve_tx_btc(&mut self, txid: Txid, tx_label: TxLabel) -> Task { - let id = self.tasks.new_taskid(); - let task = Task::GetTx(GetTx { - id, - hash: txid.to_vec(), - }); - self.tasks - .retrieving_txs - .insert(id, (tx_label, task.clone())); - task - } - fn watch_addr_btc(&mut self, script_pubkey: Script, tx_label: TxLabel) -> Task { - let id = self.tasks.new_taskid(); - let from_height = self.from_height(Coin::Bitcoin, 6); - self.tasks.watched_addrs.insert(id, tx_label); - info!( - "{} | Watching {} transaction with scriptPubkey: {}", - self.swap_id.bright_blue_italic(), - tx_label.bright_white_bold(), - script_pubkey.asm().bright_blue_italic() - ); - let addendum = BtcAddressAddendum { - address: None, - from_height, - script_pubkey, - }; - Task::WatchAddress(WatchAddress { - id, - lifetime: self.task_lifetime(Coin::Bitcoin), - addendum: AddressAddendum::Bitcoin(addendum), - include_tx: Boolean::True, - }) - } - - fn is_watched_addr(&self, tx_label: &TxLabel) -> bool { - self.tasks.watched_addrs.values().any(|tx| tx == tx_label) - } - fn from_height(&self, coin: Coin, delta: u64) -> u64 { - let height = self.height(coin); - let delta = if height > delta { delta } else { height }; - height - delta - } - - fn watch_addr_xmr( - &mut self, - spend: monero::PublicKey, - view: monero::PrivateKey, - tx_label: TxLabel, - ) -> Task { - debug!( - "{} | Address's secret view key for {}: {}", - self.swap_id.bright_blue_italic(), - tx_label.bright_white_bold(), - view.bright_white_italic() - ); - debug!( - "{} | Address's public spend key for {}: {}", - self.swap_id.bright_blue_italic(), - tx_label.bright_white_bold(), - spend.bright_white_italic() - ); - let viewpair = monero::ViewPair { spend, view }; - let address = monero::Address::from_viewpair(self.network.into(), &viewpair); - let from_height = self.from_height(Coin::Monero, 20); - let addendum = AddressAddendum::Monero(XmrAddressAddendum { - spend_key: spend, - view_key: view, - from_height, - }); - - let id = self.tasks.new_taskid(); - self.tasks.watched_addrs.insert(id, tx_label); - - info!( - "{} | Watching {} on address {}", - self.swap_id.bright_blue_italic(), - tx_label.bright_white_bold(), - address - ); - - let watch_addr = WatchAddress { - id, - lifetime: self.task_lifetime(Coin::Monero), - addendum, - include_tx: Boolean::False, - }; - Task::WatchAddress(watch_addr) - } - fn sweep_xmr( - &mut self, - view_key: monero::PrivateKey, - spend_key: monero::PrivateKey, - address: monero::Address, - from_height: Option, - minimum_balance: monero::Amount, - ) -> Task { - let id = self.tasks.new_taskid(); - self.tasks.sweeping_addr = Some(id); - let lifetime = self.task_lifetime(Coin::Monero); - let addendum = SweepAddressAddendum::Monero(SweepXmrAddress { - view_key, - spend_key, - address, - minimum_balance, - }); - let sweep_task = SweepAddress { - id, - lifetime, - addendum, - from_height, - }; - Task::SweepAddress(sweep_task) - } - - fn acc_lock_watched(&self) -> bool { - self.tasks - .watched_addrs - .values() - .any(|&x| x == TxLabel::AccLock) - } - fn handle_tx_confs( - &mut self, - id: &TaskId, - confirmations: &Option, - swapid: SwapId, - finality_thr: u32, - ) { - if let Some(txlabel) = self.tasks.watched_txs.get(id) { - if !self.tasks.final_txs.contains_key(txlabel) - && confirmations.is_some() - && confirmations.unwrap() >= finality_thr - { - info!( - "{} | Tx {} {} with {} {}", - self.swap_id.bright_blue_italic(), - txlabel.bright_white_bold(), - "final".bright_green_bold(), - confirmations.unwrap().bright_green_bold(), - "confirmations".bright_green_bold() - ); - self.tasks.final_txs.insert(*txlabel, true); - } else if self.tasks.final_txs.contains_key(txlabel) { - debug!( - "{} | Tx {} {} with {} {}", - self.swap_id.bright_blue_italic(), - txlabel.bright_white_bold(), - "final".bright_green_bold(), - confirmations.unwrap().bright_green_bold(), - "confirmations".bright_green_bold() - ); - } else { - match confirmations { - Some(0) => { - info!( - "{} | Tx {} on mempool but hasn't been mined", - swapid.bright_blue_italic(), - txlabel.bright_white_bold() - ); - } - Some(confs) => { - info!( - "{} | Tx {} mined with {} {}", - swapid.bright_blue_italic(), - txlabel.bright_white_bold(), - confs.bright_green_bold(), - "confirmations".bright_green_bold(), - ) - } - None => { - info!( - "{} | Tx {} not on the mempool", - swapid.bright_blue_italic(), - txlabel.bright_white_bold() - ); - } - } - } - } else { - error!( - "received event with unknown transaction and task id {}", - &id - ) - } - } -} impl Runtime { fn send_peer(&mut self, endpoints: &mut Endpoints, msg: request::Msg) -> Result<(), Error> { trace!("sending peer message {}", msg.bright_yellow_bold()); @@ -2777,24 +2493,6 @@ pub fn get_swap_id(source: &ServiceId) -> Result { } } -fn log_tx_seen(swap_id: SwapId, txlabel: &TxLabel, txid: &Txid) { - info!( - "{} | {} transaction ({}) in mempool or blockchain, forward to walletd", - swap_id.bright_blue_italic(), - txlabel.bright_white_bold(), - txid.bright_yellow_italic(), - ); -} - -fn log_tx_received(swap_id: SwapId, txlabel: TxLabel) { - info!( - "{} | {} transaction received from {}", - swap_id.bright_blue_italic(), - txlabel.bright_white_bold(), - ServiceId::Wallet - ); -} - fn aggregate_xmr_spend_view( alice_params: &AliceParameters, bob_params: &BobParameters, diff --git a/src/swapd/syncer_client.rs b/src/swapd/syncer_client.rs new file mode 100644 index 000000000..75f6a7484 --- /dev/null +++ b/src/swapd/syncer_client.rs @@ -0,0 +1,323 @@ +use crate::{ + service::LogStyle, + syncerd::{ + AddressAddendum, Boolean, BtcAddressAddendum, Coin, GetTx, SweepAddress, + SweepAddressAddendum, SweepXmrAddress, WatchAddress, WatchTransaction, XmrAddressAddendum, + }, +}; +use bitcoin::{Script, Txid}; +use farcaster_core::{swap::SwapId, transaction::TxLabel}; +use std::collections::HashMap; + +use crate::{ + rpc::Request, + syncerd::{Task, TaskId}, + ServiceId, +}; +use strict_encoding::{StrictDecode, StrictEncode}; + +pub struct SyncerTasks { + pub counter: u32, + pub watched_txs: HashMap, + pub final_txs: HashMap, + pub watched_addrs: HashMap, + pub retrieving_txs: HashMap, + pub sweeping_addr: Option, + // external address: needed to subscribe for buy (bob) or refund (alice) address_txs + pub txids: HashMap, +} + +impl SyncerTasks { + pub fn new_taskid(&mut self) -> TaskId { + self.counter += 1; + TaskId(self.counter) + } +} + +pub struct SyncerState { + pub swap_id: SwapId, + pub tasks: SyncerTasks, + pub bitcoin_height: u64, + pub monero_height: u64, + pub confirmation_bound: u32, + pub lock_tx_confs: Option, + pub cancel_tx_confs: Option, + pub network: farcaster_core::blockchain::Network, + pub bitcoin_syncer: ServiceId, + pub monero_syncer: ServiceId, + pub monero_amount: monero::Amount, + pub bitcoin_amount: bitcoin::Amount, + pub awaiting_funding: bool, +} +impl SyncerState { + pub fn task_lifetime(&self, coin: Coin) -> u64 { + let height = self.height(coin); + if height > 0 { + height + 500 + } else { + u64::MAX + } + } + pub fn bitcoin_syncer(&self) -> ServiceId { + self.bitcoin_syncer.clone() + } + pub fn monero_syncer(&self) -> ServiceId { + self.monero_syncer.clone() + } + pub fn height(&self, coin: Coin) -> u64 { + match coin { + Coin::Bitcoin => self.bitcoin_height, + Coin::Monero => self.monero_height, + } + } + pub fn handle_height_change(&mut self, new_height: u64, coin: Coin) { + let height = match coin { + Coin::Bitcoin => &mut self.bitcoin_height, + Coin::Monero => &mut self.monero_height, + }; + if &new_height > height { + debug!("{} new height {}", coin, &new_height); + *height = new_height; + } else { + warn!("block height did not increment, maybe syncer sends multiple events"); + } + } + pub fn watch_tx_btc(&mut self, txid: Txid, tx_label: TxLabel) -> Task { + let id = self.tasks.new_taskid(); + self.tasks.watched_txs.insert(id, tx_label); + info!( + "{} | Watching {} transaction ({})", + self.swap_id.bright_blue_italic(), + tx_label.bright_white_bold(), + txid.bright_yellow_italic() + ); + Task::WatchTransaction(WatchTransaction { + id, + lifetime: self.task_lifetime(Coin::Bitcoin), + hash: txid.to_vec(), + confirmation_bound: self.confirmation_bound, + }) + } + pub fn is_watched_tx(&self, tx_label: &TxLabel) -> bool { + self.tasks.watched_txs.values().any(|tx| tx == tx_label) + } + pub fn watch_tx_xmr(&mut self, hash: Vec, tx_label: TxLabel) -> Task { + let id = self.tasks.new_taskid(); + self.tasks.watched_txs.insert(id, tx_label); + info!( + "{} | Watching {} transaction ({})", + self.swap_id.bright_blue_italic(), + tx_label.bright_white_bold(), + hex::encode(&hash).bright_yellow_italic(), + ); + debug!("Watching transaction {} with {}", hex::encode(&hash), id); + Task::WatchTransaction(WatchTransaction { + id, + lifetime: self.task_lifetime(Coin::Monero), + hash, + confirmation_bound: self.confirmation_bound, + }) + } + pub fn retrieve_tx_btc(&mut self, txid: Txid, tx_label: TxLabel) -> Task { + let id = self.tasks.new_taskid(); + let task = Task::GetTx(GetTx { + id, + hash: txid.to_vec(), + }); + self.tasks + .retrieving_txs + .insert(id, (tx_label, task.clone())); + task + } + pub fn watch_addr_btc(&mut self, script_pubkey: Script, tx_label: TxLabel) -> Task { + let id = self.tasks.new_taskid(); + let from_height = self.from_height(Coin::Bitcoin, 6); + self.tasks.watched_addrs.insert(id, tx_label); + info!( + "{} | Watching {} transaction with scriptPubkey: {}", + self.swap_id.bright_blue_italic(), + tx_label.bright_white_bold(), + script_pubkey.asm().bright_blue_italic() + ); + let addendum = BtcAddressAddendum { + address: None, + from_height, + script_pubkey, + }; + Task::WatchAddress(WatchAddress { + id, + lifetime: self.task_lifetime(Coin::Bitcoin), + addendum: AddressAddendum::Bitcoin(addendum), + include_tx: Boolean::True, + }) + } + + pub fn is_watched_addr(&self, tx_label: &TxLabel) -> bool { + self.tasks.watched_addrs.values().any(|tx| tx == tx_label) + } + pub fn from_height(&self, coin: Coin, delta: u64) -> u64 { + let height = self.height(coin); + let delta = if height > delta { delta } else { height }; + height - delta + } + + pub fn watch_addr_xmr( + &mut self, + spend: monero::PublicKey, + view: monero::PrivateKey, + tx_label: TxLabel, + ) -> Task { + debug!( + "{} | Address's secret view key for {}: {}", + self.swap_id.bright_blue_italic(), + tx_label.bright_white_bold(), + view.bright_white_italic() + ); + debug!( + "{} | Address's public spend key for {}: {}", + self.swap_id.bright_blue_italic(), + tx_label.bright_white_bold(), + spend.bright_white_italic() + ); + let viewpair = monero::ViewPair { spend, view }; + let address = monero::Address::from_viewpair(self.network.into(), &viewpair); + let from_height = self.from_height(Coin::Monero, 20); + let addendum = AddressAddendum::Monero(XmrAddressAddendum { + spend_key: spend, + view_key: view, + from_height, + }); + + let id = self.tasks.new_taskid(); + self.tasks.watched_addrs.insert(id, tx_label); + + info!( + "{} | Watching {} on address {}", + self.swap_id.bright_blue_italic(), + tx_label.bright_white_bold(), + address + ); + + let watch_addr = WatchAddress { + id, + lifetime: self.task_lifetime(Coin::Monero), + addendum, + include_tx: Boolean::False, + }; + Task::WatchAddress(watch_addr) + } + pub fn sweep_xmr( + &mut self, + view_key: monero::PrivateKey, + spend_key: monero::PrivateKey, + address: monero::Address, + from_height: Option, + minimum_balance: monero::Amount, + ) -> Task { + let id = self.tasks.new_taskid(); + self.tasks.sweeping_addr = Some(id); + let lifetime = self.task_lifetime(Coin::Monero); + let addendum = SweepAddressAddendum::Monero(SweepXmrAddress { + view_key, + spend_key, + address, + minimum_balance, + }); + let sweep_task = SweepAddress { + id, + lifetime, + addendum, + from_height, + }; + Task::SweepAddress(sweep_task) + } + + pub fn acc_lock_watched(&self) -> bool { + self.tasks + .watched_addrs + .values() + .any(|&x| x == TxLabel::AccLock) + } + pub fn handle_tx_confs( + &mut self, + id: &TaskId, + confirmations: &Option, + swapid: SwapId, + finality_thr: u32, + ) { + if let Some(txlabel) = self.tasks.watched_txs.get(id) { + if !self.tasks.final_txs.contains_key(txlabel) + && confirmations.is_some() + && confirmations.unwrap() >= finality_thr + { + info!( + "{} | Tx {} {} with {} {}", + self.swap_id.bright_blue_italic(), + txlabel.bright_white_bold(), + "final".bright_green_bold(), + confirmations.unwrap().bright_green_bold(), + "confirmations".bright_green_bold() + ); + self.tasks.final_txs.insert(*txlabel, true); + } else if self.tasks.final_txs.contains_key(txlabel) { + debug!( + "{} | Tx {} {} with {} {}", + self.swap_id.bright_blue_italic(), + txlabel.bright_white_bold(), + "final".bright_green_bold(), + confirmations.unwrap().bright_green_bold(), + "confirmations".bright_green_bold() + ); + } else { + match confirmations { + Some(0) => { + info!( + "{} | Tx {} on mempool but hasn't been mined", + swapid.bright_blue_italic(), + txlabel.bright_white_bold() + ); + } + Some(confs) => { + info!( + "{} | Tx {} mined with {} {}", + swapid.bright_blue_italic(), + txlabel.bright_white_bold(), + confs.bright_green_bold(), + "confirmations".bright_green_bold(), + ) + } + None => { + info!( + "{} | Tx {} not on the mempool", + swapid.bright_blue_italic(), + txlabel.bright_white_bold() + ); + } + } + } + } else { + error!( + "received event with unknown transaction and task id {}", + &id + ) + } + } +} + +pub fn log_tx_seen(swap_id: SwapId, txlabel: &TxLabel, txid: &Txid) { + info!( + "{} | {} transaction ({}) in mempool or blockchain, forward to walletd", + swap_id.bright_blue_italic(), + txlabel.bright_white_bold(), + txid.bright_yellow_italic(), + ); +} + +pub fn log_tx_received(swap_id: SwapId, txlabel: TxLabel) { + info!( + "{} | {} transaction received from {}", + swap_id.bright_blue_italic(), + txlabel.bright_white_bold(), + ServiceId::Wallet + ); +}