Skip to content

Commit

Permalink
add functionality to periodically update routing scores from an exter…
Browse files Browse the repository at this point in the history
…nal http source
  • Loading branch information
joostjager committed Feb 7, 2025
1 parent 16d1a11 commit 1a4b051
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 12 deletions.
2 changes: 2 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ interface Builder {
void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password);
void set_gossip_source_p2p();
void set_gossip_source_rgs(string rgs_server_url);
void set_pathfinding_scores_source(string url);
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
void set_storage_dir_path(string storage_dir_path);
void set_filesystem_logger(string? log_file_path, LogLevel? max_log_level);
Expand Down Expand Up @@ -275,6 +276,7 @@ dictionary NodeStatus {
u64? latest_onchain_wallet_sync_timestamp;
u64? latest_fee_rate_cache_update_timestamp;
u64? latest_rgs_snapshot_timestamp;
u64? latest_pathfinding_scores_sync_timestamp;
u64? latest_node_announcement_broadcast_timestamp;
u32? latest_channel_monitor_archival_height;
};
Expand Down
44 changes: 36 additions & 8 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
use lightning::routing::gossip::NodeAlias;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{
ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters,
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
ProbabilisticScoringFeeParameters,
};
use lightning::sign::EntropySource;

Expand Down Expand Up @@ -97,6 +98,11 @@ enum GossipSourceConfig {
RapidGossipSync(String),
}

#[derive(Debug, Clone)]
struct PathfindingScoresSyncConfig {
url: String,
}

#[derive(Debug, Clone)]
struct LiquiditySourceConfig {
// LSPS2 service's (address, node_id, token)
Expand Down Expand Up @@ -211,6 +217,7 @@ pub struct NodeBuilder {
gossip_source_config: Option<GossipSourceConfig>,
liquidity_source_config: Option<LiquiditySourceConfig>,
log_writer_config: Option<LogWriterConfig>,
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
}

impl NodeBuilder {
Expand All @@ -227,13 +234,15 @@ impl NodeBuilder {
let gossip_source_config = None;
let liquidity_source_config = None;
let log_writer_config = None;
let pathfinding_scores_sync_config = None;
Self {
config,
entropy_source_config,
chain_data_source_config,
gossip_source_config,
liquidity_source_config,
log_writer_config,
pathfinding_scores_sync_config,
}
}

Expand Down Expand Up @@ -304,6 +313,14 @@ impl NodeBuilder {
self
}

/// Configures the [`Node`] instance to source its external scores from the given URL.
///
/// The external scores are merged into the local scoring system to improve routing.
pub fn set_pathfinding_scores_source(&mut self, url: String) -> &mut Self {
self.pathfinding_scores_sync_config = Some(PathfindingScoresSyncConfig { url });
self
}

/// Configures the [`Node`] instance to source its inbound liquidity from the given
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
/// service.
Expand Down Expand Up @@ -529,6 +546,7 @@ impl NodeBuilder {
config,
self.chain_data_source_config.as_ref(),
self.gossip_source_config.as_ref(),
self.pathfinding_scores_sync_config.as_ref(),
self.liquidity_source_config.as_ref(),
seed_bytes,
logger,
Expand All @@ -551,6 +569,7 @@ impl NodeBuilder {
config,
self.chain_data_source_config.as_ref(),
self.gossip_source_config.as_ref(),
self.pathfinding_scores_sync_config.as_ref(),
self.liquidity_source_config.as_ref(),
seed_bytes,
logger,
Expand Down Expand Up @@ -643,6 +662,13 @@ impl ArcedNodeBuilder {
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
}

/// Configures the [`Node`] instance to source its external scores from the given URL.
///
/// The external scores are merged into the local scoring system to improve routing.
pub fn set_pathfinding_scores_source(&self, url: String) {
self.inner.write().unwrap().set_pathfinding_scores_source(url);
}

/// Configures the [`Node`] instance to source its inbound liquidity from the given
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
/// service.
Expand Down Expand Up @@ -802,6 +828,7 @@ impl ArcedNodeBuilder {
fn build_with_store_internal(
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
gossip_source_config: Option<&GossipSourceConfig>,
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
logger: Arc<Logger>, kv_store: Arc<DynStore>,
) -> Result<Node, BuildError> {
Expand Down Expand Up @@ -957,26 +984,24 @@ fn build_with_store_internal(
},
};

let scorer = match io::utils::read_scorer(
let local_scorer = match io::utils::read_scorer(
Arc::clone(&kv_store),
Arc::clone(&network_graph),
Arc::clone(&logger),
) {
Ok(scorer) => Arc::new(Mutex::new(scorer)),
Ok(scorer) => scorer,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
let params = ProbabilisticScoringDecayParameters::default();
Arc::new(Mutex::new(ProbabilisticScorer::new(
params,
Arc::clone(&network_graph),
Arc::clone(&logger),
)))
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
} else {
return Err(BuildError::ReadFailed);
}
},
};

let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));

let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
let router = Arc::new(DefaultRouter::new(
Arc::clone(&network_graph),
Expand Down Expand Up @@ -1282,6 +1307,8 @@ fn build_with_store_internal(
let (stop_sender, _) = tokio::sync::watch::channel(());
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());

let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone());

Ok(Node {
runtime,
stop_sender,
Expand All @@ -1300,6 +1327,7 @@ fn build_with_store_internal(
keys_manager,
network_graph,
gossip_source,
pathfinding_scores_sync_url,
liquidity_source,
kv_store,
logger,
Expand Down
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
// The time in-between RGS sync attempts.
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The time in-between external scores sync attempts.
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The time in-between node announcement broadcast attempts.
pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);

Expand All @@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5;
// The timeout after which we abort a RGS sync operation.
pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5;

// The timeout after which we abort a external scores sync operation.
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5;

// The length in bytes of our wallets' keys seed.
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;

Expand Down
28 changes: 26 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub mod logger;
mod message_handler;
pub mod payment;
mod peer_store;
mod scoring;
mod sweep;
mod tx_broadcaster;
mod types;
Expand Down Expand Up @@ -122,8 +123,9 @@ pub use builder::NodeBuilder as Builder;

use chain::ChainSource;
use config::{
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
default_user_config, may_announce_channel, ChannelConfig, Config,
EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
};
use connection::ConnectionManager;
use event::{EventHandler, EventQueue};
Expand All @@ -137,6 +139,7 @@ use payment::{
UnifiedQrPayment,
};
use peer_store::{PeerInfo, PeerStore};
use scoring::setup_background_pathfinding_scores_sync;
use types::{
Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph,
KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet,
Expand Down Expand Up @@ -189,6 +192,7 @@ pub struct Node {
keys_manager: Arc<KeysManager>,
network_graph: Arc<Graph>,
gossip_source: Arc<GossipSource>,
pathfinding_scores_sync_url: Option<String>,
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
kv_store: Arc<DynStore>,
logger: Arc<Logger>,
Expand Down Expand Up @@ -303,6 +307,18 @@ impl Node {
});
}

if let Some(pathfinding_scores_sync_url) = self.pathfinding_scores_sync_url.as_ref() {
setup_background_pathfinding_scores_sync(
pathfinding_scores_sync_url.clone(),
Arc::clone(&self.scorer),
Arc::clone(&self.node_metrics),
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
&runtime,
self.stop_sender.subscribe(),
);
}

if let Some(listening_addresses) = &self.config.listening_addresses {
// Setup networking
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
Expand Down Expand Up @@ -724,6 +740,8 @@ impl Node {
locked_node_metrics.latest_fee_rate_cache_update_timestamp;
let latest_rgs_snapshot_timestamp =
locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64);
let latest_pathfinding_scores_sync_timestamp =
locked_node_metrics.latest_pathfinding_scores_sync_timestamp;
let latest_node_announcement_broadcast_timestamp =
locked_node_metrics.latest_node_announcement_broadcast_timestamp;
let latest_channel_monitor_archival_height =
Expand All @@ -737,6 +755,7 @@ impl Node {
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_rgs_snapshot_timestamp,
latest_pathfinding_scores_sync_timestamp,
latest_node_announcement_broadcast_timestamp,
latest_channel_monitor_archival_height,
}
Expand Down Expand Up @@ -1565,6 +1584,8 @@ pub struct NodeStatus {
///
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet.
pub latest_rgs_snapshot_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores.
pub latest_pathfinding_scores_sync_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
/// announcement.
///
Expand All @@ -1583,6 +1604,7 @@ pub(crate) struct NodeMetrics {
latest_onchain_wallet_sync_timestamp: Option<u64>,
latest_fee_rate_cache_update_timestamp: Option<u64>,
latest_rgs_snapshot_timestamp: Option<u32>,
latest_pathfinding_scores_sync_timestamp: Option<u64>,
latest_node_announcement_broadcast_timestamp: Option<u64>,
latest_channel_monitor_archival_height: Option<u32>,
}
Expand All @@ -1594,6 +1616,7 @@ impl Default for NodeMetrics {
latest_onchain_wallet_sync_timestamp: None,
latest_fee_rate_cache_update_timestamp: None,
latest_rgs_snapshot_timestamp: None,
latest_pathfinding_scores_sync_timestamp: None,
latest_node_announcement_broadcast_timestamp: None,
latest_channel_monitor_archival_height: None,
}
Expand All @@ -1602,6 +1625,7 @@ impl Default for NodeMetrics {

impl_writeable_tlv_based!(NodeMetrics, {
(0, latest_lightning_wallet_sync_timestamp, option),
(1, latest_pathfinding_scores_sync_timestamp, option),
(2, latest_onchain_wallet_sync_timestamp, option),
(4, latest_fee_rate_cache_update_timestamp, option),
(6, latest_rgs_snapshot_timestamp, option),
Expand Down
96 changes: 96 additions & 0 deletions src/scoring.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::{
io::Cursor,
sync::{Arc, Mutex, RwLock},
time::{Duration, SystemTime},
};

use crate::{logger::LdkLogger, NodeMetrics, Scorer};
use crate::{
write_node_metrics, DynStore, Logger, EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL,
EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
};
use lightning::{
log_error, log_info, log_trace, routing::scoring::ChannelLiquidities, util::ser::Readable,
};

/// Start a background task that periodically downloads scores via an external url and merges them into the local
/// pathfinding scores.
pub fn setup_background_pathfinding_scores_sync(
url: String, scorer: Arc<Mutex<crate::types::Scorer>>, node_metrics: Arc<RwLock<NodeMetrics>>,
kv_store: Arc<DynStore>, logger: Arc<Logger>, runtime: &tokio::runtime::Runtime,
mut stop_receiver: tokio::sync::watch::Receiver<()>,
) {
log_info!(logger, "External scores background syncing enabled from {}", url);

let logger = Arc::clone(&logger);

runtime.spawn(async move {
let mut interval = tokio::time::interval(EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL);
loop {
tokio::select! {
_ = stop_receiver.changed() => {
log_trace!(
logger,
"Stopping background syncing external scores.",
);
return;
}
_ = interval.tick() => {
log_trace!(
logger,
"Background sync of external scores started.",
);

sync_external_scores(logger.as_ref(), scorer.as_ref(), node_metrics.as_ref(), Arc::clone(&kv_store), &url).await;
}
}
}
});
}

async fn sync_external_scores(
logger: &Logger, scorer: &Mutex<Scorer>, node_metrics: &RwLock<NodeMetrics>,
kv_store: Arc<DynStore>, url: &String,
) -> () {
let response = tokio::time::timeout(
Duration::from_secs(EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS),
reqwest::get(url),
)
.await;

if let Err(e) = response {
log_error!(logger, "Retrieving external scores timed out: {}", e);
return;
}
let response = response.unwrap();

if let Err(e) = response {
log_error!(logger, "Failed to retrieve external scores update: {}", e);
return;
}

let body = response.unwrap().bytes().await;
if let Err(e) = body {
log_error!(logger, "Failed to read external scores update: {}", e);
return;
}

let mut reader = Cursor::new(body.unwrap());
match ChannelLiquidities::read(&mut reader) {
Ok(liquidities) => {
let duration_since_epoch =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
scorer.lock().unwrap().merge(liquidities, duration_since_epoch);
let mut locked_node_metrics = node_metrics.write().unwrap();
locked_node_metrics.latest_pathfinding_scores_sync_timestamp =
Some(duration_since_epoch.as_secs());
write_node_metrics(&*locked_node_metrics, kv_store, logger).unwrap_or_else(|e| {
log_error!(logger, "Persisting node metrics failed: {}", e);
});
log_trace!(logger, "External scores merged successfully");
},
Err(e) => {
log_error!(logger, "Failed to parse external scores update: {}", e);
},
}
}
Loading

0 comments on commit 1a4b051

Please sign in to comment.