From 1a4b05176a46ed1ae21dc4ee0f7cfa2009f840e3 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Jan 2025 14:10:03 +0100 Subject: [PATCH] add functionality to periodically update routing scores from an external http source --- bindings/ldk_node.udl | 2 + src/builder.rs | 44 ++++++++++++++++---- src/config.rs | 6 +++ src/lib.rs | 28 ++++++++++++- src/scoring.rs | 96 +++++++++++++++++++++++++++++++++++++++++++ src/types.rs | 5 ++- 6 files changed, 169 insertions(+), 12 deletions(-) create mode 100644 src/scoring.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 632daa7df..7e1210a08 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -58,6 +58,7 @@ interface Builder { void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_gossip_source_p2p(); void set_gossip_source_rgs(string rgs_server_url); + void set_pathfinding_scores_source(string url); void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token); void set_storage_dir_path(string storage_dir_path); void set_filesystem_logger(string? log_file_path, LogLevel? max_log_level); @@ -275,6 +276,7 @@ dictionary NodeStatus { u64? latest_onchain_wallet_sync_timestamp; u64? latest_fee_rate_cache_update_timestamp; u64? latest_rgs_snapshot_timestamp; + u64? latest_pathfinding_scores_sync_timestamp; u64? latest_node_announcement_broadcast_timestamp; u32? latest_channel_monitor_archival_height; }; diff --git a/src/builder.rs b/src/builder.rs index 1729ece5f..7bc6fdbfb 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -41,7 +41,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; use lightning::routing::gossip::NodeAlias; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ - ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters, + CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters, + ProbabilisticScoringFeeParameters, }; use lightning::sign::EntropySource; @@ -97,6 +98,11 @@ enum GossipSourceConfig { RapidGossipSync(String), } +#[derive(Debug, Clone)] +struct PathfindingScoresSyncConfig { + url: String, +} + #[derive(Debug, Clone)] struct LiquiditySourceConfig { // LSPS2 service's (address, node_id, token) @@ -211,6 +217,7 @@ pub struct NodeBuilder { gossip_source_config: Option, liquidity_source_config: Option, log_writer_config: Option, + pathfinding_scores_sync_config: Option, } impl NodeBuilder { @@ -227,6 +234,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let pathfinding_scores_sync_config = None; Self { config, entropy_source_config, @@ -234,6 +242,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + pathfinding_scores_sync_config, } } @@ -304,6 +313,14 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source its external scores from the given URL. + /// + /// The external scores are merged into the local scoring system to improve routing. + pub fn set_pathfinding_scores_source(&mut self, url: String) -> &mut Self { + self.pathfinding_scores_sync_config = Some(PathfindingScoresSyncConfig { url }); + self + } + /// Configures the [`Node`] instance to source its inbound liquidity from the given /// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md) /// service. @@ -529,6 +546,7 @@ impl NodeBuilder { config, self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), + self.pathfinding_scores_sync_config.as_ref(), self.liquidity_source_config.as_ref(), seed_bytes, logger, @@ -551,6 +569,7 @@ impl NodeBuilder { config, self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), + self.pathfinding_scores_sync_config.as_ref(), self.liquidity_source_config.as_ref(), seed_bytes, logger, @@ -643,6 +662,13 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url); } + /// Configures the [`Node`] instance to source its external scores from the given URL. + /// + /// The external scores are merged into the local scoring system to improve routing. + pub fn set_pathfinding_scores_source(&self, url: String) { + self.inner.write().unwrap().set_pathfinding_scores_source(url); + } + /// Configures the [`Node`] instance to source its inbound liquidity from the given /// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md) /// service. @@ -802,6 +828,7 @@ impl ArcedNodeBuilder { fn build_with_store_internal( config: Arc, chain_data_source_config: Option<&ChainDataSourceConfig>, gossip_source_config: Option<&GossipSourceConfig>, + pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64], logger: Arc, kv_store: Arc, ) -> Result { @@ -957,26 +984,24 @@ fn build_with_store_internal( }, }; - let scorer = match io::utils::read_scorer( + let local_scorer = match io::utils::read_scorer( Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger), ) { - Ok(scorer) => Arc::new(Mutex::new(scorer)), + Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { let params = ProbabilisticScoringDecayParameters::default(); - Arc::new(Mutex::new(ProbabilisticScorer::new( - params, - Arc::clone(&network_graph), - Arc::clone(&logger), - ))) + ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger)) } else { return Err(BuildError::ReadFailed); } }, }; + let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); + let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), @@ -1282,6 +1307,8 @@ fn build_with_store_internal( let (stop_sender, _) = tokio::sync::watch::channel(()); let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); + let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone()); + Ok(Node { runtime, stop_sender, @@ -1300,6 +1327,7 @@ fn build_with_store_internal( keys_manager, network_graph, gossip_source, + pathfinding_scores_sync_url, liquidity_source, kv_store, logger, diff --git a/src/config.rs b/src/config.rs index fc2ac8a78..249e31773 100644 --- a/src/config.rs +++ b/src/config.rs @@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10); // The time in-between RGS sync attempts. pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); +// The time in-between external scores sync attempts. +pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); + // The time in-between node announcement broadcast attempts. pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60); @@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5; // The timeout after which we abort a RGS sync operation. pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5; +// The timeout after which we abort a external scores sync operation. +pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5; + // The length in bytes of our wallets' keys seed. pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64; diff --git a/src/lib.rs b/src/lib.rs index 9ddd4ab7f..05fae97f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,7 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +mod scoring; mod sweep; mod tx_broadcaster; mod types; @@ -122,8 +123,9 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ - default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, - PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + default_user_config, may_announce_channel, ChannelConfig, Config, + EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS, + NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -137,6 +139,7 @@ use payment::{ UnifiedQrPayment, }; use peer_store::{PeerInfo, PeerStore}; +use scoring::setup_background_pathfinding_scores_sync; use types::{ Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet, @@ -189,6 +192,7 @@ pub struct Node { keys_manager: Arc, network_graph: Arc, gossip_source: Arc, + pathfinding_scores_sync_url: Option, liquidity_source: Option>>>, kv_store: Arc, logger: Arc, @@ -303,6 +307,18 @@ impl Node { }); } + if let Some(pathfinding_scores_sync_url) = self.pathfinding_scores_sync_url.as_ref() { + setup_background_pathfinding_scores_sync( + pathfinding_scores_sync_url.clone(), + Arc::clone(&self.scorer), + Arc::clone(&self.node_metrics), + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + &runtime, + self.stop_sender.subscribe(), + ); + } + if let Some(listening_addresses) = &self.config.listening_addresses { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); @@ -724,6 +740,8 @@ impl Node { locked_node_metrics.latest_fee_rate_cache_update_timestamp; let latest_rgs_snapshot_timestamp = locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64); + let latest_pathfinding_scores_sync_timestamp = + locked_node_metrics.latest_pathfinding_scores_sync_timestamp; let latest_node_announcement_broadcast_timestamp = locked_node_metrics.latest_node_announcement_broadcast_timestamp; let latest_channel_monitor_archival_height = @@ -737,6 +755,7 @@ impl Node { latest_onchain_wallet_sync_timestamp, latest_fee_rate_cache_update_timestamp, latest_rgs_snapshot_timestamp, + latest_pathfinding_scores_sync_timestamp, latest_node_announcement_broadcast_timestamp, latest_channel_monitor_archival_height, } @@ -1565,6 +1584,8 @@ pub struct NodeStatus { /// /// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet. pub latest_rgs_snapshot_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores. + pub latest_pathfinding_scores_sync_timestamp: Option, /// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node /// announcement. /// @@ -1583,6 +1604,7 @@ pub(crate) struct NodeMetrics { latest_onchain_wallet_sync_timestamp: Option, latest_fee_rate_cache_update_timestamp: Option, latest_rgs_snapshot_timestamp: Option, + latest_pathfinding_scores_sync_timestamp: Option, latest_node_announcement_broadcast_timestamp: Option, latest_channel_monitor_archival_height: Option, } @@ -1594,6 +1616,7 @@ impl Default for NodeMetrics { latest_onchain_wallet_sync_timestamp: None, latest_fee_rate_cache_update_timestamp: None, latest_rgs_snapshot_timestamp: None, + latest_pathfinding_scores_sync_timestamp: None, latest_node_announcement_broadcast_timestamp: None, latest_channel_monitor_archival_height: None, } @@ -1602,6 +1625,7 @@ impl Default for NodeMetrics { impl_writeable_tlv_based!(NodeMetrics, { (0, latest_lightning_wallet_sync_timestamp, option), + (1, latest_pathfinding_scores_sync_timestamp, option), (2, latest_onchain_wallet_sync_timestamp, option), (4, latest_fee_rate_cache_update_timestamp, option), (6, latest_rgs_snapshot_timestamp, option), diff --git a/src/scoring.rs b/src/scoring.rs new file mode 100644 index 000000000..2c22f4c4c --- /dev/null +++ b/src/scoring.rs @@ -0,0 +1,96 @@ +use std::{ + io::Cursor, + sync::{Arc, Mutex, RwLock}, + time::{Duration, SystemTime}, +}; + +use crate::{logger::LdkLogger, NodeMetrics, Scorer}; +use crate::{ + write_node_metrics, DynStore, Logger, EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, + EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS, +}; +use lightning::{ + log_error, log_info, log_trace, routing::scoring::ChannelLiquidities, util::ser::Readable, +}; + +/// Start a background task that periodically downloads scores via an external url and merges them into the local +/// pathfinding scores. +pub fn setup_background_pathfinding_scores_sync( + url: String, scorer: Arc>, node_metrics: Arc>, + kv_store: Arc, logger: Arc, runtime: &tokio::runtime::Runtime, + mut stop_receiver: tokio::sync::watch::Receiver<()>, +) { + log_info!(logger, "External scores background syncing enabled from {}", url); + + let logger = Arc::clone(&logger); + + runtime.spawn(async move { + let mut interval = tokio::time::interval(EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL); + loop { + tokio::select! { + _ = stop_receiver.changed() => { + log_trace!( + logger, + "Stopping background syncing external scores.", + ); + return; + } + _ = interval.tick() => { + log_trace!( + logger, + "Background sync of external scores started.", + ); + + sync_external_scores(logger.as_ref(), scorer.as_ref(), node_metrics.as_ref(), Arc::clone(&kv_store), &url).await; + } + } + } + }); +} + +async fn sync_external_scores( + logger: &Logger, scorer: &Mutex, node_metrics: &RwLock, + kv_store: Arc, url: &String, +) -> () { + let response = tokio::time::timeout( + Duration::from_secs(EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS), + reqwest::get(url), + ) + .await; + + if let Err(e) = response { + log_error!(logger, "Retrieving external scores timed out: {}", e); + return; + } + let response = response.unwrap(); + + if let Err(e) = response { + log_error!(logger, "Failed to retrieve external scores update: {}", e); + return; + } + + let body = response.unwrap().bytes().await; + if let Err(e) = body { + log_error!(logger, "Failed to read external scores update: {}", e); + return; + } + + let mut reader = Cursor::new(body.unwrap()); + match ChannelLiquidities::read(&mut reader) { + Ok(liquidities) => { + let duration_since_epoch = + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + scorer.lock().unwrap().merge(liquidities, duration_since_epoch); + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_pathfinding_scores_sync_timestamp = + Some(duration_since_epoch.as_secs()); + write_node_metrics(&*locked_node_metrics, kv_store, logger).unwrap_or_else(|e| { + log_error!(logger, "Persisting node metrics failed: {}", e); + }); + log_trace!(logger, "External scores merged successfully"); + }, + Err(e) => { + log_error!(logger, "Failed to parse external scores update: {}", e); + }, + } +} diff --git a/src/types.rs b/src/types.rs index 1c9ab64b9..738203127 100644 --- a/src/types.rs +++ b/src/types.rs @@ -21,7 +21,8 @@ use lightning::ln::peer_handler::IgnoringMessageHandler; use lightning::ln::types::ChannelId; use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; -use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; +use lightning::routing::scoring::CombinedScorer; +use lightning::routing::scoring::ProbabilisticScoringFeeParameters; use lightning::sign::InMemorySigner; use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, Writeable, Writer}; @@ -88,7 +89,7 @@ pub(crate) type Router = DefaultRouter< ProbabilisticScoringFeeParameters, Scorer, >; -pub(crate) type Scorer = ProbabilisticScorer, Arc>; +pub(crate) type Scorer = CombinedScorer, Arc>; pub(crate) type Graph = gossip::NetworkGraph>;