From 20f2dab5c0c7d72d1d9835edf3a043b755b05af8 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 10 May 2024 20:39:53 +0000 Subject: [PATCH 1/8] Create an `AOnionMessenger` trait similar to our other `AStruct`s This allows us to have a bound on any `OnionMessenger` without having to create bounds for every bound in `OnionMessenger`. --- lightning/src/onion_message/messenger.rs | 64 ++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index b10bd185100..80ce86498e6 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -47,6 +47,70 @@ use { pub(super) const MAX_TIMER_TICKS: usize = 2; +/// A trivial trait which describes any [`OnionMessenger`]. +/// +/// This is not exported to bindings users as general cover traits aren't useful in other +/// languages. +pub trait AOnionMessenger { + /// A type implementing [`EntropySource`] + type EntropySource: EntropySource + ?Sized; + /// A type that may be dereferenced to [`Self::EntropySource`] + type ES: Deref; + /// A type implementing [`NodeSigner`] + type NodeSigner: NodeSigner + ?Sized; + /// A type that may be dereferenced to [`Self::NodeSigner`] + type NS: Deref; + /// A type implementing [`Logger`] + type Logger: Logger + ?Sized; + /// A type that may be dereferenced to [`Self::Logger`] + type L: Deref; + /// A type implementing [`NodeIdLookUp`] + type NodeIdLookUp: NodeIdLookUp + ?Sized; + /// A type that may be dereferenced to [`Self::NodeIdLookUp`] + type NL: Deref; + /// A type implementing [`MessageRouter`] + type MessageRouter: MessageRouter + ?Sized; + /// A type that may be dereferenced to [`Self::MessageRouter`] + type MR: Deref; + /// A type implementing [`OffersMessageHandler`] + type OffersMessageHandler: OffersMessageHandler + ?Sized; + /// A type that may be dereferenced to [`Self::OffersMessageHandler`] + type OMH: Deref; + /// A type implementing [`CustomOnionMessageHandler`] + type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized; + /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`] + type CMH: Deref; + /// Returns a reference to the actual [`OnionMessenger`] object. + fn get_om(&self) -> &OnionMessenger; +} + +impl AOnionMessenger +for OnionMessenger where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + NL::Target: NodeIdLookUp, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler, +{ + type EntropySource = ES::Target; + type ES = ES; + type NodeSigner = NS::Target; + type NS = NS; + type Logger = L::Target; + type L = L; + type NodeIdLookUp = NL::Target; + type NL = NL; + type MessageRouter = MR::Target; + type MR = MR; + type OffersMessageHandler = OMH::Target; + type OMH = OMH; + type CustomOnionMessageHandler = CMH::Target; + type CMH = CMH; + fn get_om(&self) -> &OnionMessenger { self } +} + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages From 98022e6d6d2f8756abd82d1ebcf29f2e292e3983 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 10 May 2024 20:49:30 +0000 Subject: [PATCH 2/8] Add a utility to poll multiple futures simultaneously If we have a handful of futures we want to make progress on simultaneously we need some way to poll all of them in series, which we add here in the form of `MultiFuturePoller`. Its probably not as effecient as some of the options in the `futures` crate, but it is very trivial and not that bad. --- lightning/src/util/async_poll.rs | 34 ++++++++++++++++++++++++++++++++ lightning/src/util/mod.rs | 1 + 2 files changed, 35 insertions(+) create mode 100644 lightning/src/util/async_poll.rs diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs new file mode 100644 index 00000000000..979d906392d --- /dev/null +++ b/lightning/src/util/async_poll.rs @@ -0,0 +1,34 @@ +//! Somse utilities to make working with std Futures easier + +use crate::prelude::*; +use core::future::Future; +use core::marker::Unpin; +use core::pin::Pin; +use core::task::{Context, Poll}; + +pub(crate) struct MultiFuturePoller + Unpin>(pub Vec>); + +impl + Unpin> Future for MultiFuturePoller { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let mut have_pending_futures = false; + for fut_option in self.get_mut().0.iter_mut() { + let mut fut = match fut_option.take() { + None => continue, + Some(fut) => fut, + }; + match Pin::new(&mut fut).poll(cx) { + Poll::Ready(()) => {}, + Poll::Pending => { + have_pending_futures = true; + *fut_option = Some(fut); + }, + } + } + if have_pending_futures { + Poll::Pending + } else { + Poll::Ready(()) + } + } +} diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index c1ab8c75c2e..a81a36c5583 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -30,6 +30,7 @@ pub mod base32; pub(crate) mod base32; pub(crate) mod atomic_counter; +pub(crate) mod async_poll; pub(crate) mod byte_utils; pub(crate) mod transaction_utils; pub(crate) mod time; From c1ea761e70aaa33c9b07d2468160488a0f4e05cc Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 28 May 2024 15:21:38 +0000 Subject: [PATCH 3/8] f docs --- lightning/src/util/async_poll.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 979d906392d..7a368af7bae 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -1,4 +1,13 @@ -//! Somse utilities to make working with std Futures easier +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! Some utilities to make working with the standard library's [`Future`]s easier use crate::prelude::*; use core::future::Future; From 9ef61748e419e3e993784ee1c7f785ed434f5c67 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Jun 2024 18:28:45 +0000 Subject: [PATCH 4/8] Store `OnionMessenger` events in different `Vec`s In the next commit, `OnionMessenger` events are handled in parallel using rust async. When we do that, we'll want to handle `OnionMessageIntercepted` events prior to `OnionMessagePeerConnected` ones. While we'd generally prefer to handle all events in the order they were generated, if we want to handle them in parallel, we don't want a `OnionMessageIntercepted` event to start being processed, then handle an `OnionMessagePeerConnected` prior to the first completing. This could cause us to store a freshly-intercepted message for a peer in a DB that was just wiped because the peer is now connected. This does run the risk of processing a `OnionMessagePeerConnected` event prior to an `OnionMessageIntercepted` event (because a peer connected, then disconnected, then we received a message for that peer all before any events were handled), that is somewhat less likely and discarding a message in a rare race is better than leaving a message lying around undelivered. Thus, here, we store `OnionMessenger` events in separate `Vec`s which we can pull from in message-type-order. --- lightning/src/onion_message/messenger.rs | 41 +++++++++++++++++------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 80ce86498e6..9043eccbe74 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -241,7 +241,12 @@ where offers_handler: OMH, custom_handler: CMH, intercept_messages_for_offline_peers: bool, - pending_events: Mutex>, + pending_events: Mutex, +} + +struct PendingEvents { + intercepted_msgs: Vec, + peer_connecteds: Vec, } /// [`OnionMessage`]s buffered to be sent. @@ -963,7 +968,10 @@ where offers_handler, custom_handler, intercept_messages_for_offline_peers, - pending_events: Mutex::new(Vec::new()), + pending_events: Mutex::new(PendingEvents { + intercepted_msgs: Vec::new(), + peer_connecteds: Vec::new(), + }), } } @@ -1135,18 +1143,16 @@ where msgs } - fn enqueue_event(&self, event: Event) { + fn enqueue_intercepted_event(&self, event: Event) { const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; let mut pending_events = self.pending_events.lock().unwrap(); - let total_buffered_bytes: usize = pending_events - .iter() - .map(|ev| ev.serialized_length()) - .sum(); + let total_buffered_bytes: usize = + pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum(); if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { log_trace!(self.logger, "Dropping event {:?}: buffer full", event); return } - pending_events.push(event); + pending_events.intercepted_msgs.push(event); } } @@ -1193,7 +1199,20 @@ where } } let mut events = Vec::new(); - core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events); + { + let mut pending_events = self.pending_events.lock().unwrap(); + #[cfg(debug_assertions)] { + for ev in pending_events.intercepted_msgs.iter() { + if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } + } + for ev in pending_events.peer_connecteds.iter() { + if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } + } + } + core::mem::swap(&mut pending_events.intercepted_msgs, &mut events); + events.append(&mut pending_events.peer_connecteds); + pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage + } for ev in events { handler.handle_event(ev); } @@ -1271,7 +1290,7 @@ where log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id); }, _ if self.intercept_messages_for_offline_peers => { - self.enqueue_event( + self.enqueue_intercepted_event( Event::OnionMessageIntercepted { peer_node_id: next_node_id, message: onion_message } @@ -1299,7 +1318,7 @@ where .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); if self.intercept_messages_for_offline_peers { - self.enqueue_event( + self.pending_events.lock().unwrap().peer_connecteds.push( Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } ); } From 3abec4f03194d818b72bf377b2086563c56f1812 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 10 May 2024 20:52:10 +0000 Subject: [PATCH 5/8] Add a parallel async event handler to `OnionMessenger` This adds an `OnionMessenger::process_pending_events_async` mirroring the same in `ChannelManager`. However, unlike the one in `ChannelManager`, this processes the events in parallel by spawning all futures and using the new `MultiFuturePoller`. Because `OnionMessenger` just generates a stream of messages to store/fetch, we first process all the events to store new messages, `await` them, then process all the events to fetch stored messages, ensuring reordering shouldn't result in lost messages (unless we race with a peer disconnection, which could happen anyway). --- lightning/src/onion_message/messenger.rs | 45 ++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 9043eccbe74..7bea65ab5af 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -1154,6 +1154,51 @@ where } pending_events.intercepted_msgs.push(event); } + + /// Processes any events asynchronously using the given handler. + /// + /// Note that the event handler is called in the order each event was generated, however + /// futures are polled in parallel for some events to allow for parallelism where events do not + /// have an ordering requirement. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + pub async fn process_pending_events_async + core::marker::Unpin, H: Fn(Event) -> Future>( + &self, handler: H + ) { + let mut intercepted_msgs = Vec::new(); + let mut peer_connecteds = Vec::new(); + { + let mut pending_events = self.pending_events.lock().unwrap(); + core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs); + core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds); + } + + let mut futures = Vec::with_capacity(intercepted_msgs.len()); + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }))); + } + } + } + + for ev in intercepted_msgs { + if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } + futures.push(Some(handler(ev))); + } + // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds + crate::util::async_poll::MultiFuturePoller(futures).await; + + if peer_connecteds.len() <= 1 { + for event in peer_connecteds { handler(event).await; } + } else { + let mut futures = Vec::new(); + for event in peer_connecteds { + futures.push(Some(handler(event))); + } + crate::util::async_poll::MultiFuturePoller(futures).await; + } + } } fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { From 92bf4602927ea6346e81f82ffbddda1a83b1541a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 10 May 2024 21:07:08 +0000 Subject: [PATCH 6/8] Switch to using the `OnionMessenger` directly in BP When `OnionMessenger` first developed a timer and events interface, we accessed the `OnionMessenger` indirectly via the `PeerManager`. While this is a fairly awkward interface, it avoided a large pile of generics on the background processor interfaces. However, since we now have an `AOnionMessenger` trait, this concern is no longer significant. Further, because we now want to use the built-in `OnionMessenger` async event processing method, we really need a direct referene to the `OnionMessenger` in the background processor, which we add here optionally. --- lightning-background-processor/src/lib.rs | 92 ++++++++++++----------- 1 file changed, 47 insertions(+), 45 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index ad1056ea922..b612dd96f6a 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -32,6 +32,7 @@ use lightning::events::EventsProvider; use lightning::ln::channelmanager::AChannelManager; use lightning::ln::msgs::OnionMessageHandler; +use lightning::onion_message::messenger::AOnionMessenger; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; @@ -54,6 +55,8 @@ use std::thread::{self, JoinHandle}; #[cfg(feature = "std")] use std::time::Instant; +#[cfg(not(feature = "std"))] +use alloc::boxed::Box; #[cfg(not(feature = "std"))] use alloc::vec::Vec; @@ -281,7 +284,8 @@ macro_rules! define_run_body { ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, - $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, + $onion_messenger: ident, $process_onion_message_handler_events: expr, + $peer_manager: ident, $gossip_sync: ident, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, ) => { { @@ -347,8 +351,10 @@ macro_rules! define_run_body { last_freshness_call = $get_timer(FRESHNESS_TIMER); } if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { - log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); - $peer_manager.onion_message_handler().timer_tick_occurred(); + if let Some(om) = &$onion_messenger { + log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); + om.get_om().timer_tick_occurred(); + } last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); } if await_slow { @@ -564,6 +570,7 @@ use core::task; /// # type NetworkGraph = lightning::routing::gossip::NetworkGraph>; /// # type P2PGossipSync
    = lightning::routing::gossip::P2PGossipSync, Arc
      , Arc>; /// # type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager, B, FE, Logger>; +/// # type OnionMessenger = lightning::onion_message::messenger::OnionMessenger, Arc, Arc, Arc>, Arc, Arc, Arc>>, Arc>, lightning::ln::peer_handler::IgnoringMessageHandler>; /// # type Scorer = RwLock, Arc>>; /// # type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, B, FE, Arc
        , Logger>; /// # @@ -576,6 +583,7 @@ use core::task; /// # peer_manager: Arc>, /// # event_handler: Arc, /// # channel_manager: Arc>, +/// # onion_messenger: Arc>, /// # chain_monitor: Arc>, /// # gossip_sync: Arc>, /// # persister: Arc, @@ -595,6 +603,7 @@ use core::task; /// let background_chan_man = Arc::clone(&node.channel_manager); /// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync)); /// let background_peer_man = Arc::clone(&node.peer_manager); +/// let background_onion_messenger = Arc::clone(&node.onion_messenger); /// let background_logger = Arc::clone(&node.logger); /// let background_scorer = Arc::clone(&node.scorer); /// @@ -619,6 +628,7 @@ use core::task; /// |e| background_event_handler.handle_event(e), /// background_chain_mon, /// background_chan_man, +/// Some(background_onion_messenger), /// background_gossip_sync, /// background_peer_man, /// background_logger, @@ -651,6 +661,7 @@ pub async fn process_events_async< PS: 'static + Deref + Send, M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref + Send + Sync, + OM: 'static + Deref + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, PM: 'static + Deref + Send + Sync, @@ -661,6 +672,7 @@ pub async fn process_events_async< FetchTime: Fn() -> Option, >( persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, + onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, ) -> Result<(), lightning::io::Error> @@ -673,6 +685,7 @@ where P::Target: 'static + Persist<::Signer>, PS::Target: 'static + Persister<'a, CM, L, SC>, CM::Target: AChannelManager + Send + Sync, + OM::Target: AOnionMessenger + Send + Sync, PM::Target: APeerManager + Send + Sync, { let mut should_break = false; @@ -683,7 +696,7 @@ where let logger = &logger; let persister = &persister; let fetch_time = &fetch_time; - async move { + Box::pin(async move { // We should be able to drop the Box once our MSRV is 1.68 if let Some(network_graph) = network_graph { handle_network_graph_update(network_graph, &event) } @@ -698,14 +711,14 @@ where } } event_handler(event).await; - } + }) }; define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await, - peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, - gossip_sync, logger, scorer, should_break, { + onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await }, + peer_manager, gossip_sync, logger, scorer, should_break, { let fut = Selector { a: channel_manager.get_cm().get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), @@ -729,25 +742,6 @@ where ) } -#[cfg(feature = "futures")] -async fn process_onion_message_handler_events_async< - EventHandlerFuture: core::future::Future, - EventHandler: Fn(Event) -> EventHandlerFuture, - PM: 'static + Deref + Send + Sync, ->( - peer_manager: &PM, handler: EventHandler -) -where - PM::Target: APeerManager + Send + Sync, -{ - let events = core::cell::RefCell::new(Vec::new()); - peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); - - for event in events.into_inner() { - handler(event).await - } -} - #[cfg(feature = "std")] impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the [top-level @@ -807,6 +801,7 @@ impl BackgroundProcessor { PS: 'static + Deref + Send, M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref + Send + Sync, + OM: 'static + Deref + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, PM: 'static + Deref + Send + Sync, @@ -814,6 +809,7 @@ impl BackgroundProcessor { SC: for <'b> WriteableScore<'b>, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, ) -> Self where @@ -825,6 +821,7 @@ impl BackgroundProcessor { P::Target: 'static + Persist<::Signer>, PS::Target: 'static + Persister<'a, CM, L, SC>, CM::Target: AChannelManager + Send + Sync, + OM::Target: AOnionMessenger + Send + Sync, PM::Target: APeerManager + Send + Sync, { let stop_thread = Arc::new(AtomicBool::new(false)); @@ -851,9 +848,8 @@ impl BackgroundProcessor { define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.get_cm().process_pending_events(&event_handler), - peer_manager, - peer_manager.onion_message_handler().process_pending_events(&event_handler), - gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), + onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) }, + peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( &channel_manager.get_cm().get_event_or_persistence_needed_future(), &chain_monitor.get_update_future() @@ -939,6 +935,7 @@ mod tests { use lightning::ln::functional_test_utils::*; use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; + use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore}; use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop}; @@ -1000,11 +997,14 @@ mod tests { type PGS = Arc>>, Arc, Arc>>; type RGS = Arc>>, Arc>>; + type OM = OnionMessenger, Arc, Arc, Arc, Arc>>, Arc, Arc>>, IgnoringMessageHandler, IgnoringMessageHandler>; + struct Node { node: Arc, + messenger: Arc, p2p_gossip_sync: PGS, rapid_gossip_sync: RGS, - peer_manager: Arc, Arc, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>>, + peer_manager: Arc, Arc, Arc, Arc, IgnoringMessageHandler, Arc>>, chain_monitor: Arc, kv_store: Arc, tx_broadcaster: Arc, @@ -1283,6 +1283,7 @@ mod tests { let seed = [i as u8; 32]; let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default())); + let msg_router = Arc::new(DefaultMessageRouter::new(network_graph.clone(), Arc::clone(&keys_manager))); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); let now = Duration::from_secs(genesis_block.header.time as u64); @@ -1291,6 +1292,7 @@ mod tests { let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time)); + let messenger = Arc::new(OnionMessenger::new(keys_manager.clone(), keys_manager.clone(), logger.clone(), manager.clone(), msg_router.clone(), IgnoringMessageHandler {}, IgnoringMessageHandler {})); let wallet = Arc::new(TestWallet {}); let sweeper = Arc::new(OutputSweeper::new(best_block, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator), None::>, Arc::clone(&keys_manager), wallet, Arc::clone(&kv_store), Arc::clone(&logger))); @@ -1299,10 +1301,10 @@ mod tests { let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), - onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{} + onion_message_handler: messenger.clone(), custom_message_handler: IgnoringMessageHandler{} }; let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone())); - let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper }; + let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper, messenger }; nodes.push(node); } @@ -1425,7 +1427,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); macro_rules! check_persisted_data { ($node: expr, $filepath: expr) => { @@ -1492,7 +1494,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); @@ -1521,7 +1523,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.join() { Ok(_) => panic!("Expected error persisting manager"), Err(e) => { @@ -1542,7 +1544,7 @@ mod tests { let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); let bp_future = super::process_events_async( - persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), move |dur: Duration| { Box::pin(async move { @@ -1567,7 +1569,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.stop() { Ok(_) => panic!("Expected error persisting network graph"), @@ -1585,7 +1587,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.stop() { Ok(_) => panic!("Expected error persisting scorer"), @@ -1613,7 +1615,7 @@ mod tests { _ => panic!("Unexpected event: {:?}", event), }; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); // Open a channel and check that the FundingGenerationReady event was handled. begin_open_channel!(nodes[0], nodes[1], channel_value); @@ -1653,7 +1655,7 @@ mod tests { _ => panic!("Unexpected event: {:?}", event), }; let persister = Arc::new(Persister::new(data_dir)); - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); // Force close the channel and check that the SpendableOutputs event was handled. nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap(); @@ -1764,7 +1766,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); @@ -1837,7 +1839,7 @@ mod tests { let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); let event_handler = |_: _| {}; - let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)), @@ -1857,7 +1859,7 @@ mod tests { let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); let bp_future = super::process_events_async( - persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), move |dur: Duration| { let mut exit_receiver = exit_receiver.clone(); @@ -1995,7 +1997,7 @@ mod tests { let (_, nodes) = create_nodes(1, "test_payment_path_scoring"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE))); @@ -2032,7 +2034,7 @@ mod tests { let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); let bp_future = super::process_events_async( - persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), move |dur: Duration| { let mut exit_receiver = exit_receiver.clone(); From f96fbdd3613c81a43c31a1659e732ec18d387332 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 28 May 2024 15:24:59 +0000 Subject: [PATCH 7/8] f fix imports in BP --- lightning-background-processor/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index b612dd96f6a..6b7cd5b0567 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -27,7 +27,7 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] use lightning::events::EventHandler; -#[cfg(any(feature = "std", feature = "futures"))] +#[cfg(feature = "std")] use lightning::events::EventsProvider; use lightning::ln::channelmanager::AChannelManager; @@ -57,8 +57,6 @@ use std::time::Instant; #[cfg(not(feature = "std"))] use alloc::boxed::Box; -#[cfg(not(feature = "std"))] -use alloc::vec::Vec; /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its From fadb26875202c18cdd8fb1bd6eaf3f965a95a314 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 10 May 2024 21:28:50 +0000 Subject: [PATCH 8/8] Drop `EventsProvider` bounds on `OnionMessageHandler`s This never really made a lot of sense from an API perspective, but was required to avoid handing the background processor an explicit `OnionMessegner`, which we are now doing. Thus, we can simply drop these bounds as unnecessary. --- lightning/src/ln/msgs.rs | 4 ++-- lightning/src/ln/peer_handler.rs | 10 +--------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 87e8a814d33..7a6ba09c22c 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -52,7 +52,7 @@ use core::fmt::Display; use crate::io::{self, Cursor, Read}; use crate::io_extras::read_to_end; -use crate::events::{EventsProvider, MessageSendEventsProvider}; +use crate::events::MessageSendEventsProvider; use crate::crypto::streams::ChaChaPolyReadAdapter; use crate::util::logger; use crate::util::ser::{LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer, WithoutLength, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname, TransactionU16LenLimited, BigSize}; @@ -1623,7 +1623,7 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { } /// A handler for received [`OnionMessage`]s and for providing generated ones to send. -pub trait OnionMessageHandler: EventsProvider { +pub trait OnionMessageHandler { /// Handle an incoming `onion_message` message from the given peer. fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage); diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index f31a8e131e0..81b5c8a5cf8 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -19,7 +19,7 @@ use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use crate::sign::{NodeSigner, Recipient}; -use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use crate::events::{MessageSendEvent, MessageSendEventsProvider}; use crate::ln::types::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs; @@ -97,9 +97,6 @@ pub trait CustomMessageHandler: wire::CustomMessageReader { /// A dummy struct which implements `RoutingMessageHandler` without storing any routing information /// or doing any processing. You can provide one of these as the route_handler in a MessageHandler. pub struct IgnoringMessageHandler{} -impl EventsProvider for IgnoringMessageHandler { - fn process_pending_events(&self, _handler: H) where H::Target: EventHandler {} -} impl MessageSendEventsProvider for IgnoringMessageHandler { fn get_and_clear_pending_msg_events(&self) -> Vec { Vec::new() } } @@ -723,8 +720,6 @@ pub trait APeerManager { type NS: Deref; /// Gets a reference to the underlying [`PeerManager`]. fn as_ref(&self) -> &PeerManager; - /// Returns the peer manager's [`OnionMessageHandler`]. - fn onion_message_handler(&self) -> &Self::OMT; } impl @@ -750,9 +745,6 @@ APeerManager for PeerManager where type NST = ::Target; type NS = NS; fn as_ref(&self) -> &PeerManager { self } - fn onion_message_handler(&self) -> &Self::OMT { - self.message_handler.onion_message_handler.deref() - } } /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls