Skip to content

Commit

Permalink
Merge pull request #881 from TheCharlatan/cacheLatestSyncerEvents
Browse files Browse the repository at this point in the history
Swapd: Cache all transaction related syncer events
  • Loading branch information
h4sh3d authored Dec 27, 2022
2 parents 2fbb26f + d4b5d3e commit 26be457
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 32 deletions.
69 changes: 42 additions & 27 deletions src/swapd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use super::{
temporal_safety::TemporalSafety,
StateReport,
};
use crate::service::{Endpoints, Reporter};
use crate::swapd::Opts;
use crate::syncerd::bitcoin_syncer::p2wpkh_signed_tx_fee;
use crate::syncerd::types::{Event, TransactionConfirmations};
Expand All @@ -22,6 +21,10 @@ use crate::{
bus::{BusMsg, Outcome, ServiceBus},
syncerd::{HeightChanged, TransactionRetrieved, XmrAddressAddendum},
};
use crate::{
service::{Endpoints, Reporter},
syncerd::AddressTransaction,
};
use crate::{CtlServer, Error, LogStyle, Service, ServiceConfig, ServiceId};

use std::time::{Duration, SystemTime};
Expand Down Expand Up @@ -107,9 +110,7 @@ pub fn run(config: ServiceConfig, opts: Opts) -> Result<(), Error> {
monero_height: 0,
bitcoin_height: 0,
confirmation_bound: 50000,
lock_tx_confs: None,
cancel_tx_confs: None,
buy_tx_confs: None,
last_tx_event: none!(),
network,
bitcoin_syncer: ServiceId::Syncer(Blockchain::Bitcoin, network),
monero_syncer: ServiceId::Syncer(Blockchain::Monero, network),
Expand Down Expand Up @@ -542,9 +543,23 @@ impl Runtime {
self.temporal_safety.xmr_finality_thr,
endpoints,
);

// saving requests of interest for later replaying latest event
if let Some(txlabel) = self.syncer_state.tasks.watched_txs.get(id) {
self.syncer_state
.last_tx_event
.insert(*txlabel, request.clone());
}
}

Event::AddressTransaction(_) => {}
Event::AddressTransaction(AddressTransaction { id, .. }) => {
// saving requests of interest for later replaying latest event
if let Some(txlabel) = self.syncer_state.tasks.watched_addrs.get(id) {
self.syncer_state
.last_tx_event
.insert(*txlabel, request.clone());
}
}

Event::SweepSuccess(_) => {}

Expand Down Expand Up @@ -596,19 +611,11 @@ impl Runtime {
self.temporal_safety.btc_finality_thr,
endpoints,
);
let txlabel = self.syncer_state.tasks.watched_txs.get(id);
// saving requests of interest for later replaying latest event
match txlabel {
Some(&TxLabel::Lock) => {
self.syncer_state.lock_tx_confs = Some(request.clone());
}
Some(&TxLabel::Cancel) => {
self.syncer_state.cancel_tx_confs = Some(request.clone());
}
Some(&TxLabel::Buy) => {
self.syncer_state.buy_tx_confs = Some(request.clone())
}
_ => {}
if let Some(txlabel) = self.syncer_state.tasks.watched_txs.get(id) {
self.syncer_state
.last_tx_event
.insert(*txlabel, request.clone());
}
}

Expand All @@ -624,13 +631,27 @@ impl Runtime {
self.temporal_safety.btc_finality_thr,
endpoints,
);
// saving requests of interest for later replaying latest event
if let Some(txlabel) = self.syncer_state.tasks.watched_txs.get(id) {
self.syncer_state
.last_tx_event
.insert(*txlabel, request.clone());
}
}

Event::TransactionBroadcasted(event) => {
self.syncer_state.transaction_broadcasted(event);
}

Event::AddressTransaction(_) => {}
Event::AddressTransaction(AddressTransaction { id, .. }) => {
// saving requests of interest for later replaying latest event
if let Some(txlabel) = self.syncer_state.tasks.watched_addrs.get(id) {
self.syncer_state
.last_tx_event
.insert(*txlabel, request.clone());
}
self.log_debug(event);
}

Event::TaskAborted(event) => {
self.log_debug(event);
Expand Down Expand Up @@ -705,15 +726,9 @@ impl Runtime {
if let Some(peer_msg) = self.unhandled_peer_message.clone() {
self.handle_msg(endpoints, source.clone(), peer_msg)?;
}
// Replay confirmation events to ensure we immediately advance through states that can be skipped
if let Some(buy_tx_confs_req) = self.syncer_state.buy_tx_confs.clone() {
self.handle_sync(endpoints, source.clone(), buy_tx_confs_req)?;
}
if let Some(lock_tx_confs_req) = self.syncer_state.lock_tx_confs.clone() {
self.handle_sync(endpoints, source.clone(), lock_tx_confs_req)?;
}
if let Some(cancel_tx_confs_req) = self.syncer_state.cancel_tx_confs.clone() {
self.handle_sync(endpoints, source, cancel_tx_confs_req)?;
// Replay syncer events to ensure we immediately advance through states that can be skipped
for event in self.syncer_state.last_tx_event.clone().values() {
self.handle_sync(endpoints, source.clone(), event.clone())?;
}
} else if let BusMsg::P2p(peer_msg) = msg {
self.unhandled_peer_message = Some(peer_msg);
Expand Down
4 changes: 2 additions & 2 deletions src/swapd/swap_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1644,9 +1644,9 @@ fn try_alice_accordant_lock_to_alice_buy_procedure_signature(
confirmations: Some(confirmations),
..
},
))) = runtime.syncer_state.lock_tx_confs.clone()
))) = runtime.syncer_state.last_tx_event.get(&TxLabel::Lock)
{
if runtime.temporal_safety.valid_cancel(confirmations) {
if runtime.temporal_safety.valid_cancel(*confirmations) {
runtime.broadcast(cancel_tx, TxLabel::Cancel, event.endpoints)?;
return Ok(Some(SwapStateMachine::AliceCanceled(AliceCanceled {
wallet,
Expand Down
4 changes: 1 addition & 3 deletions src/swapd/syncer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ pub struct SyncerState {
pub bitcoin_height: u64,
pub monero_height: u64,
pub confirmation_bound: u32,
pub lock_tx_confs: Option<SyncMsg>,
pub cancel_tx_confs: Option<SyncMsg>,
pub buy_tx_confs: Option<SyncMsg>,
pub last_tx_event: HashMap<TxLabel, SyncMsg>,
pub network: farcaster_core::blockchain::Network,
pub bitcoin_syncer: ServiceId,
pub monero_syncer: ServiceId,
Expand Down

0 comments on commit 26be457

Please sign in to comment.