From f155f5c99b7b880a8df32dae9495efad4f132b6b Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Thu, 7 Jul 2022 14:21:40 +0200 Subject: [PATCH] Syncer: Cache estimate fee values Adds a polling loop and watch behavior for the FeeEstimation Task and Event. This is done in the hope of making the fee estimation response faster and to continuously stream updates to the estimated fee. --- src/syncerd/bitcoin_syncer.rs | 86 ++++++++++++++++------------------- src/syncerd/syncer_state.rs | 52 ++++++++++++++++++++- src/syncerd/types.rs | 13 +++++- tests/functional.rs | 16 +++++-- 4 files changed, 115 insertions(+), 52 deletions(-) diff --git a/src/syncerd/bitcoin_syncer.rs b/src/syncerd/bitcoin_syncer.rs index a6b5376b7..c7e9c4105 100644 --- a/src/syncerd/bitcoin_syncer.rs +++ b/src/syncerd/bitcoin_syncer.rs @@ -13,7 +13,7 @@ use crate::syncerd::BroadcastTransaction; use crate::syncerd::BtcAddressAddendum; use crate::syncerd::EstimateFee; use crate::syncerd::Event; -use crate::syncerd::FeeEstimation; +use crate::syncerd::FeeEstimations; use crate::syncerd::GetTx; use crate::syncerd::TaskTarget; use crate::syncerd::TransactionBroadcasted; @@ -565,7 +565,6 @@ async fn run_syncerd_task_receiver( state: Arc>, transaction_broadcast_tx: TokioSender<(BroadcastTransaction, ServiceId)>, transaction_get_tx: TokioSender<(GetTx, ServiceId)>, - estimate_fee_tx: TokioSender<(EstimateFee, ServiceId)>, ) { let task_receiver = Arc::new(Mutex::new(receive_task_channel)); tokio::spawn(async move { @@ -584,10 +583,9 @@ async fn run_syncerd_task_receiver( .expect("failed on transaction_get sender"); } Task::EstimateFee(task) => { - estimate_fee_tx - .send((task, syncerd_task.source)) - .await - .expect("failed on estimate fee sender"); + let mut state_guard = state.lock().await; + state_guard.estimate_fee(task, syncerd_task.source).await; + drop(state_guard); } Task::SweepAddress(task) => match task.addendum.clone() { SweepAddressAddendum::Bitcoin(sweep) => { @@ -877,45 +875,46 @@ fn transaction_broadcasting( }) } -fn estimate_fee( +fn estimate_fee_polling( electrum_server: String, - mut estimate_fee_rx: TokioReceiver<(EstimateFee, ServiceId)>, - tx_event: TokioSender, + state: Arc>, ) -> tokio::task::JoinHandle<()> { tokio::task::spawn(async move { - while let Some((estimate_fee_task, source)) = estimate_fee_rx.recv().await { - match Client::new(&electrum_server).and_then(|estimate_fee_client| { - estimate_fee_client.estimate_fee(estimate_fee_task.blocks_until_confirmation) - }) { - Ok(fee) => { - let fee = format!("{:.8} BTC", fee); - let amount = bitcoin::Amount::from_str_with_denomination(&fee).ok(); - tx_event - .send(SyncerdBridgeEvent { - event: Event::FeeEstimation(FeeEstimation { - id: estimate_fee_task.id, - btc_per_kbyte: amount, - }), - source, - }) - .await - .expect("error sending fee estimation event"); - debug!("Successfully sent fee estimation"); - } - Err(e) => { - println!("failed to retrieve fee estimation: {}", e.err()); - tx_event - .send(SyncerdBridgeEvent { - event: Event::FeeEstimation(FeeEstimation { - id: estimate_fee_task.id, - btc_per_kbyte: None, - }), - source, - }) - .await - .expect("error sending fee estimation event"); + let high_priority_confs = 2; + let low_priority_confs = 6; + loop { + if let Ok(client) = Client::new(&electrum_server) { + loop { + match client + .estimate_fee(high_priority_confs) + .and_then( + |high_priority| match client.estimate_fee(low_priority_confs) { + Ok(low_priority) => Ok((high_priority, low_priority)), + Err(err) => Err(err), + }, + ) { + Ok((high_priority, low_priority)) => { + let mut state_guard = state.lock().await; + state_guard + .fee_estimated(FeeEstimations::BitcoinFeeEstimation { + high_priority_sats_per_vbyte: (high_priority * 1e5 as f64) + .ceil() + as u64, + low_priority_sats_per_vbyte: (low_priority * 1e5 as f64).ceil() + as u64, + }) + .await; + drop(state_guard); + } + Err(err) => { + error!("Failed to retrieve fee estimation: {}", err); + break; + } + } + tokio::time::sleep(std::time::Duration::from_secs(20)).await; } } + tokio::time::sleep(std::time::Duration::from_secs(20)).await; } }) } @@ -1052,10 +1051,6 @@ impl Synclet for BitcoinSyncer { TokioSender<(GetTx, ServiceId)>, TokioReceiver<(GetTx, ServiceId)>, ) = tokio::sync::mpsc::channel(200); - let (estimate_fee_tx, estimate_fee_rx): ( - TokioSender<(EstimateFee, ServiceId)>, - TokioReceiver<(EstimateFee, ServiceId)>, - ) = tokio::sync::mpsc::channel(200); let state = Arc::new(Mutex::new(SyncerState::new( event_tx.clone(), Coin::Bitcoin, @@ -1066,7 +1061,6 @@ impl Synclet for BitcoinSyncer { Arc::clone(&state), transaction_broadcast_tx, transaction_get_tx, - estimate_fee_tx, ) .await; run_syncerd_bridge_event_sender(tx, event_rx, syncer_address).await; @@ -1096,7 +1090,7 @@ impl Synclet for BitcoinSyncer { ); let estimate_fee_handle = - estimate_fee(electrum_server.clone(), estimate_fee_rx, event_tx); + estimate_fee_polling(electrum_server.clone(), Arc::clone(&state)); let sweep_handle = sweep_polling(Arc::clone(&state), electrum_server, btc_network); diff --git a/src/syncerd/syncer_state.rs b/src/syncerd/syncer_state.rs index 0233463ea..ad67fba96 100644 --- a/src/syncerd/syncer_state.rs +++ b/src/syncerd/syncer_state.rs @@ -43,6 +43,7 @@ pub struct SyncerState { block_hash: Vec, tasks_sources: HashMap, watch_height: HashMap, + watch_fee_estimation: HashMap, lifetimes: HashMap>, pub addresses: HashMap, pub transactions: HashMap, @@ -51,6 +52,7 @@ pub struct SyncerState { tx_event: TokioSender, task_count: TaskCounter, pub subscribed_addresses: HashSet, + pub fee_estimation: Option, } #[derive(Clone, Debug)] @@ -83,6 +85,7 @@ impl SyncerState { block_hash: vec![0], tasks_sources: HashMap::new(), watch_height: HashMap::new(), + watch_fee_estimation: HashMap::new(), lifetimes: HashMap::new(), addresses: HashMap::new(), transactions: HashMap::new(), @@ -92,6 +95,7 @@ impl SyncerState { task_count: TaskCounter(0), coin: id, subscribed_addresses: HashSet::new(), + fee_estimation: None, } } @@ -330,6 +334,30 @@ impl SyncerState { self.unseen_transactions.insert(self.task_count.into()); } + pub async fn estimate_fee(&mut self, task: EstimateFee, source: ServiceId) { + // increment the count to use it as a unique internal id + self.task_count.increment(); + self.watch_fee_estimation + .insert(self.task_count.into(), task.clone()); + self.tasks_sources + .insert(self.task_count.into(), source.clone()); + + // try to emit an event immediately from the cached values + if let Some(fee_estimations) = self.fee_estimation.clone() { + send_event( + &self.tx_event, + &mut vec![( + Event::FeeEstimation(FeeEstimation { + id: task.id, + fee_estimations, + }), + source, + )], + ) + .await; + } + } + pub fn sweep_address(&mut self, task: SweepAddress, source: ServiceId) { self.task_count.increment(); // This is technically valid behavior; immediately prune the task for being past @@ -341,7 +369,6 @@ impl SyncerState { self.sweep_addresses.insert(self.task_count.into(), task); self.tasks_sources.insert(self.task_count.into(), source); } - pub async fn change_height(&mut self, new_height: u64, block: Vec) -> bool { if self.block_height != new_height || self.block_hash != block { self.handle_change_height(new_height, block.clone()); @@ -593,6 +620,28 @@ impl SyncerState { } } + pub async fn fee_estimated(&mut self, fee_estimations: FeeEstimations) { + // Emit fee estimation events + if self.fee_estimation != Some(fee_estimations.clone()) { + for (id, task) in self.watch_fee_estimation.iter() { + send_event( + &self.tx_event, + &mut vec![( + Event::FeeEstimation(FeeEstimation { + id: task.id, + fee_estimations: fee_estimations.clone(), + }), + self.tasks_sources.get(id).unwrap().clone(), + )], + ) + .await; + } + + self.fee_estimation = Some(fee_estimations); + } + self.drop_lifetimes(); + } + fn drop_lifetimes(&mut self) { let lifetimes: Vec = Iterator::collect(self.lifetimes.keys().map(|&x| x.to_owned())); for lifetime in lifetimes { @@ -625,6 +674,7 @@ impl SyncerState { self.transactions.remove(task); self.unseen_transactions.remove(task); self.watch_height.remove(task); + self.watch_fee_estimation.remove(task); self.sweep_addresses.remove(task); self.tasks_sources.remove(task); } diff --git a/src/syncerd/types.rs b/src/syncerd/types.rs index d265a9b6c..7edddc531 100644 --- a/src/syncerd/types.rs +++ b/src/syncerd/types.rs @@ -219,7 +219,7 @@ pub struct GetTx { #[display(Debug)] pub struct EstimateFee { pub id: TaskId, - pub blocks_until_confirmation: usize, + pub lifetime: u64, } /// Tasks created by the daemon and handle by syncers to process a blockchain @@ -300,7 +300,16 @@ pub struct TransactionRetrieved { #[display(Debug)] pub struct FeeEstimation { pub id: TaskId, - pub btc_per_kbyte: Option, + pub fee_estimations: FeeEstimations, +} + +#[derive(Clone, Debug, Display, StrictEncode, StrictDecode, Eq, PartialEq, Hash)] +#[display(Debug)] +pub enum FeeEstimations { + BitcoinFeeEstimation { + high_priority_sats_per_vbyte: u64, + low_priority_sats_per_vbyte: u64, + }, } /// Events returned by syncers to the daemon to update the blockchain states. diff --git a/tests/functional.rs b/tests/functional.rs index 16f38161a..cca1f3e68 100644 --- a/tests/functional.rs +++ b/tests/functional.rs @@ -11,6 +11,7 @@ use farcaster_node::syncerd::{ runtime::Synclet, SweepAddress, SweepAddressAddendum, SweepXmrAddress, TaskId, TaskTarget, XmrAddressAddendum, }; +use farcaster_node::syncerd::{FeeEstimation, FeeEstimations}; use farcaster_node::ServiceId; use internet2::transport::MAX_FRAME_SIZE; use internet2::Decrypt; @@ -145,7 +146,7 @@ fn bitcoin_syncer_estimate_fee_test() { let task = SyncerdTask { task: Task::EstimateFee(EstimateFee { id: TaskId(1), - blocks_until_confirmation: 1, + lifetime: 0, }), source: SOURCE1.clone(), }; @@ -158,10 +159,19 @@ fn bitcoin_syncer_estimate_fee_test() { fn assert_fee_estimation_received(request: Request) { match request { Request::SyncerdBridgeEvent(farcaster_node::rpc::request::SyncerdBridgeEvent { - event: Event::FeeEstimation(fee_estimation), + event: + Event::FeeEstimation(FeeEstimation { + fee_estimations: + FeeEstimations::BitcoinFeeEstimation { + high_priority_sats_per_vbyte, + low_priority_sats_per_vbyte, + }, + .. + }), .. }) => { - assert!(fee_estimation.btc_per_kbyte.is_some()); + assert!(high_priority_sats_per_vbyte >= 1); + assert!(low_priority_sats_per_vbyte >= 1); } _ => { panic!("expected syncerd bridge event");