From 40366931d5930bb72aaba68466c0acf7eb439d03 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 22 Jun 2022 17:07:48 -0400 Subject: [PATCH] Forward onion messages in PeerManager. We want to prioritize channel messages over onion messages, so we only pull an onion message to forward if the peer's outbound buffer is empty enough. --- fuzz/src/full_stack.rs | 3 +- lightning-background-processor/src/lib.rs | 10 +- lightning-net-tokio/src/lib.rs | 20 +++- lightning/src/ln/channelmanager.rs | 1 + lightning/src/ln/msgs.rs | 8 +- lightning/src/ln/peer_handler.rs | 107 ++++++++++++++---- lightning/src/ln/wire.rs | 11 +- .../src/onion_message/functional_tests.rs | 1 + lightning/src/onion_message/messenger.rs | 42 +++++-- lightning/src/util/events.rs | 12 ++ 10 files changed, 168 insertions(+), 47 deletions(-) diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index b4ca316ed0d..be99427ed32 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -163,7 +163,7 @@ type ChannelMan = ChannelManager< EnforcingSigner, Arc, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>; -type PeerMan<'a> = PeerManager, Arc, Arc>>, Arc, Arc>>, Arc, IgnoringMessageHandler>; +type PeerMan<'a> = PeerManager, Arc, Arc>>, Arc, Arc>>, IgnoringMessageHandler, Arc, IgnoringMessageHandler>; struct MoneyLossDetector<'a> { manager: Arc, @@ -403,6 +403,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler { chan_handler: channelmanager.clone(), route_handler: gossip_sync.clone(), + onion_message_handler: IgnoringMessageHandler {}, }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{})); let mut should_forward = false; diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 11dca44bbab..95e4f3039ce 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -16,7 +16,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; -use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; +use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::scoring::WriteableScore; @@ -229,6 +229,7 @@ impl BackgroundProcessor { P: 'static + Deref + Send + Sync, Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, + OMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, @@ -237,7 +238,7 @@ impl BackgroundProcessor { PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, - PM: 'static + Deref> + Send + Sync, + PM: 'static + Deref> + Send + Sync, S: 'static + Deref + Send + Sync, SC: WriteableScore<'a>, >( @@ -254,6 +255,7 @@ impl BackgroundProcessor { L::Target: 'static + Logger, P::Target: 'static + Persist, CMH::Target: 'static + ChannelMessageHandler, + OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, @@ -489,7 +491,7 @@ mod tests { node: Arc>, p2p_gossip_sync: PGS, rapid_gossip_sync: RGS, - peer_manager: Arc, Arc, Arc, IgnoringMessageHandler>>, + peer_manager: Arc, Arc, IgnoringMessageHandler, Arc, IgnoringMessageHandler>>, chain_monitor: Arc, persister: Arc, tx_broadcaster: Arc, @@ -608,7 +610,7 @@ mod tests { let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone())); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone())); - let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; + let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{})); let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 2ac10762b04..16a5a0485d9 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -81,7 +81,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; use lightning::ln::peer_handler::CustomMessageHandler; -use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress}; +use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, NetAddress, RoutingMessageHandler}; use lightning::util::logger::Logger; use std::task; @@ -120,9 +120,10 @@ struct Connection { id: u64, } impl Connection { - async fn poll_event_process(peer_manager: Arc, Arc, Arc, Arc>>, mut event_receiver: mpsc::Receiver<()>) where + async fn poll_event_process(peer_manager: Arc, Arc, Arc, Arc, Arc>>, mut event_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { loop { @@ -133,9 +134,10 @@ impl Connection { } } - async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { // Create a waker to wake up poll_event_process, above @@ -255,9 +257,10 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { let remote_addr = get_addr_from_stream(&stream); @@ -297,9 +300,10 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { let remote_addr = get_addr_from_stream(&stream); @@ -368,9 +372,10 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc, Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { @@ -618,6 +623,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), + onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); @@ -632,6 +638,7 @@ mod tests { let b_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), + onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); // We bind on localhost, hoping the environment is properly configured with a local @@ -683,6 +690,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()), + onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), }, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 40218089e3e..29449efa8e9 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -6005,6 +6005,7 @@ impl &events::MessageSendEvent::SendShortIdsQuery { .. } => false, &events::MessageSendEvent::SendReplyChannelRange { .. } => false, &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, + &events::MessageSendEvent::SendOnionMessage { .. } => false, } }); } diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index ccba9a7e4ea..f413c968f07 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -39,7 +39,7 @@ use core::fmt::Debug; use io::{self, Read}; use io_extras::read_to_end; -use util::events::MessageSendEventsProvider; +use util::events::{MessageSendEventsProvider, OnionMessageProvider}; use util::logger; use util::ser::{LengthReadable, Readable, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedVarInt, Hostname}; @@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>; } +/// A trait to describe an object that can receive onion messages. +pub trait OnionMessageHandler : OnionMessageProvider { + /// Handle an incoming onion_message message from the given peer. + fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage); +} + mod fuzzy_internal_msgs { use prelude::*; use ln::{PaymentPreimage, PaymentSecret}; diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 2a026821f8a..bdabb00f501 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -19,15 +19,16 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use ln::features::InitFeatures; use ln::msgs; -use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, RoutingMessageHandler}; +use ln::msgs::{ChannelMessageHandler, LightningError, OnionMessageHandler, NetAddress, RoutingMessageHandler}; use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use util::ser::{VecWriter, Writeable, Writer}; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; use ln::wire; use ln::wire::Encode; +use onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger}; use routing::gossip::{NetworkGraph, P2PGossipSync}; use util::atomic_counter::AtomicCounter; -use util::events::{MessageSendEvent, MessageSendEventsProvider}; +use util::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider}; use util::logger::Logger; use prelude::*; @@ -76,6 +77,12 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } } +impl OnionMessageProvider for IgnoringMessageHandler { + fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } +} +impl OnionMessageHandler for IgnoringMessageHandler { + fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {} +} impl Deref for IgnoringMessageHandler { type Target = IgnoringMessageHandler; fn deref(&self) -> &Self { self } @@ -199,9 +206,11 @@ impl Deref for ErroringMessageHandler { } /// Provides references to trait impls which handle different types of messages. -pub struct MessageHandler where +pub struct MessageHandler where CM::Target: ChannelMessageHandler, - RM::Target: RoutingMessageHandler { + RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, +{ /// A message handler which handles messages specific to channels. Usually this is just a /// [`ChannelManager`] object or an [`ErroringMessageHandler`]. /// @@ -212,6 +221,12 @@ pub struct MessageHandler where /// /// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync pub route_handler: RM, + + /// A message handler which handles onion messages. Using this is just an [`OnionMessenger`] object + /// or an [`IgnoringMessageHandler`]. + /// + /// [`OnionMessenger`]: crate::onion_message::OnionMessenger + pub onion_message_handler: OM, } /// Provides an object which can be used to send data to and which uniquely identifies a connection @@ -294,15 +309,23 @@ enum InitSyncTracker{ /// forwarding gossip messages to peers altogether. const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2; +/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause +/// forwarding onion messages to peers altogether. +const OM_BUFFER_LIMIT_RATIO: usize = 3; + /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until /// we have fewer than this many messages in the outbound buffer again. -/// We also use this as the target number of outbound gossip messages to keep in the write buffer, -/// refilled as we send bytes. +/// We also use this as the target number of outbound gossip and onion messages to keep in the write +/// buffer, refilled as we send bytes. const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10; /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to /// the peer. const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO; +/// When the outbound buffer has this many messages, we won't poll for new onion messages for this +/// peer. +const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO; + /// If we've sent a ping, and are still awaiting a response, we may need to churn our way through /// the socket receive buffer before receiving the ping. /// @@ -378,6 +401,17 @@ impl Peer { InitSyncTracker::NodesSyncing(pk) => pk < node_id, } } + + /// We want to prioritize channel messages over onion messages, so we only forward them when + /// there's enough room in the peer's outbound buffer. + fn onion_msg_buffer_space_available(&self) -> bool { + if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_PAUSE_OMS + || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO + { + return false + } + true + } } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -387,7 +421,7 @@ impl Peer { /// issues such as overly long function definitions. /// /// (C-not exported) as Arcs don't make sense in bindings -pub type SimpleArcPeerManager = PeerManager>, Arc>>, Arc, Arc>>, Arc, Arc>; +pub type SimpleArcPeerManager = PeerManager>, Arc>>, Arc, Arc>>, Arc>, Arc, Arc>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -397,7 +431,7 @@ pub type SimpleArcPeerManager = PeerManager = PeerManager, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'f L, IgnoringMessageHandler>; +pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, SimpleRefOnionMessenger<'c, 'f, L>, &'f L, IgnoringMessageHandler>; /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls /// socket events into messages which it passes on to its [`MessageHandler`]. @@ -418,12 +452,13 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> /// you're using lightning-net-tokio. /// /// [`read_event`]: PeerManager::read_event -pub struct PeerManager where +pub struct PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, L::Target: Logger, CMH::Target: CustomMessageHandler { - message_handler: MessageHandler, + message_handler: MessageHandler, /// Connection state for each connected peer - we have an outer read-write lock which is taken /// as read while we're doing processing for a peer and taken write when a peer is being added /// or removed. @@ -482,31 +517,33 @@ macro_rules! encode_msg { }} } -impl PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, L::Target: Logger { - /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message - /// handler is used and network graph messages are ignored. + /// Constructs a new PeerManager with the given ChannelMessageHandler and OnionMessageHandler. No + /// routing message handler is used and network graph messages are ignored. /// /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. /// /// (C-not exported) as we can't export a PeerManager with a dummy route handler - pub fn new_channel_only(channel_message_handler: CM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { + pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { Self::new(MessageHandler { chan_handler: channel_message_handler, route_handler: IgnoringMessageHandler{}, + onion_message_handler, }, our_node_secret, ephemeral_random_data, logger, IgnoringMessageHandler{}) } } -impl PeerManager where +impl PeerManager where RM::Target: RoutingMessageHandler, L::Target: Logger { - /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message - /// handler is used and messages related to channels will be ignored (or generate error - /// messages). Note that some other lightning implementations time-out connections after some - /// time if no channel is built with the peer. + /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message handler + /// or onion message handler is used and messages related to channels will be ignored (or generate + /// error messages). Note that some other lightning implementations time-out connections after + /// some time if no channel is built with the peer. /// /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. @@ -516,6 +553,7 @@ impl PeerManager) -> Option { } } -impl PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, L::Target: Logger, CMH::Target: CustomMessageHandler { /// Constructs a new PeerManager with the given message handlers and node_id secret key /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. - pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self { + pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self { let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); @@ -1282,6 +1321,11 @@ impl P self.message_handler.route_handler.handle_reply_channel_range(&their_node_id, msg)?; }, + // Onion message: + wire::Message::OnionMessage(msg) => { + self.message_handler.onion_message_handler.handle_onion_message(&their_node_id, &msg); + }, + // Unknown messages: wire::Message::Unknown(type_id) if message.is_even() => { log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id); @@ -1427,6 +1471,17 @@ impl P let peers_lock = self.peers.read().unwrap(); let peers = &*peers_lock; + + for (_, peer_mutex) in peers.iter() { + let peer = peer_mutex.lock().unwrap(); + if let Some(peer_node_id) = peer.their_node_id { + if peer.onion_msg_buffer_space_available() { + if let Some(msg) = self.message_handler.onion_message_handler.next_onion_message_for_peer(peer_node_id) { + events_generated.push(msg); + } + } + } + } macro_rules! get_peer_for_forwarding { ($node_id: expr) => { { @@ -1627,6 +1682,10 @@ impl P MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } + MessageSendEvent::SendOnionMessage { ref node_id, ref msg } => { + log_trace!(self.logger, "Handling SendOnionMessage event in peer_handler for node {}", log_pubkey!(node_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } } } @@ -1904,12 +1963,12 @@ mod tests { cfgs } - fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { let mut peers = Vec::new(); for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); let ephemeral_bytes = [i as u8; 32]; - let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler }; + let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, onion_message_handler: IgnoringMessageHandler {} }; let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger, IgnoringMessageHandler {}); peers.push(peer); } @@ -1917,7 +1976,7 @@ mod tests { peers } - fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let secp_ctx = Secp256k1::new(); let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index cbf5c77d600..1191a8d3d53 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -9,7 +9,7 @@ //! Wire encoding/decoding for Lightning messages according to [BOLT #1], and for //! custom message through the [`CustomMessageReader`] trait. -//! +//! //! [BOLT #1]: https://github.com/lightning/bolts/blob/master/01-messaging.md use io; @@ -60,6 +60,7 @@ pub(crate) enum Message where T: core::fmt::Debug + Type + TestEq { ChannelReady(msgs::ChannelReady), Shutdown(msgs::Shutdown), ClosingSigned(msgs::ClosingSigned), + OnionMessage(msgs::OnionMessage), UpdateAddHTLC(msgs::UpdateAddHTLC), UpdateFulfillHTLC(msgs::UpdateFulfillHTLC), UpdateFailHTLC(msgs::UpdateFailHTLC), @@ -100,6 +101,7 @@ impl Message where T: core::fmt::Debug + Type + TestEq { &Message::ChannelReady(ref msg) => msg.type_id(), &Message::Shutdown(ref msg) => msg.type_id(), &Message::ClosingSigned(ref msg) => msg.type_id(), + &Message::OnionMessage(ref msg) => msg.type_id(), &Message::UpdateAddHTLC(ref msg) => msg.type_id(), &Message::UpdateFulfillHTLC(ref msg) => msg.type_id(), &Message::UpdateFailHTLC(ref msg) => msg.type_id(), @@ -185,6 +187,9 @@ fn do_read(buffer: &mut R, message_type: u1 msgs::ClosingSigned::TYPE => { Ok(Message::ClosingSigned(Readable::read(buffer)?)) }, + msgs::OnionMessage::TYPE => { + Ok(Message::OnionMessage(Readable::read(buffer)?)) + }, msgs::UpdateAddHTLC::TYPE => { Ok(Message::UpdateAddHTLC(Readable::read(buffer)?)) }, @@ -344,6 +349,10 @@ impl Encode for msgs::ClosingSigned { const TYPE: u16 = 39; } +impl Encode for msgs::OnionMessage { + const TYPE: u16 = 513; +} + impl Encode for msgs::UpdateAddHTLC { const TYPE: u16 = 128; } diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 996d0bf50e9..b444f4fb4b8 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -10,6 +10,7 @@ //! Onion message testing and test utilities live here. use chain::keysinterface::{KeysInterface, Recipient}; +use ln::msgs::OnionMessageHandler; use super::{BlindedRoute, Destination, OnionMessenger, SendError}; use util::enforcing_trait_impls::EnforcingSigner; use util::test_utils; diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 724dd4b31d1..06ccd6661c3 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -17,12 +17,13 @@ use bitcoin::secp256k1::{self, PublicKey, Secp256k1, SecretKey}; use bitcoin::secp256k1::ecdh::SharedSecret; use chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Recipient, Sign}; -use ln::msgs; +use ln::msgs::{self, OnionMessageHandler}; use ln::onion_utils; use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs}; use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN}; use super::utils; use util::chacha20::ChaCha20; +use util::events::{MessageSendEvent, OnionMessageProvider}; use util::logger::Logger; use util::ser::Writeable; @@ -181,10 +182,23 @@ impl OnionMessenger Ok(()) } + #[cfg(test)] + pub(super) fn get_and_clear_pending_msgs(&self) -> HashMap> { + let mut pending_msgs = self.pending_messages.lock().unwrap(); + let mut msgs = HashMap::new(); + core::mem::swap(&mut *pending_msgs, &mut msgs); + msgs + } +} + +impl OnionMessageHandler for OnionMessenger + where K::Target: KeysInterface, + L::Target: Logger, +{ /// Handle an incoming onion message. Currently, if a message was destined for us we will log, but /// soon we'll delegate the onion message to a handler that can generate invoices or send /// payments. - pub fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) { + fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) { let node_secret = match self.keys_manager.get_node_secret(Recipient::Node) { Ok(secret) => secret, Err(e) => { @@ -271,18 +285,26 @@ impl OnionMessenger }, }; } +} - #[cfg(test)] - pub(super) fn get_and_clear_pending_msgs(&self) -> HashMap> { +impl OnionMessageProvider for OnionMessenger + where K::Target: KeysInterface, + L::Target: Logger, +{ + fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option { let mut pending_msgs = self.pending_messages.lock().unwrap(); - let mut msgs = HashMap::new(); - core::mem::swap(&mut *pending_msgs, &mut msgs); - msgs + if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) { + if let Some(msg) = msgs.pop() { + return Some(MessageSendEvent::SendOnionMessage { + node_id: peer_node_id, + msg, + }) + } + } + None } } -// TODO: parameterize the below Simple* types with OnionMessenger and handle the messages it -// produces /// Useful for simplifying the parameters of [`SimpleArcChannelManager`] and /// [`SimpleArcPeerManager`]. See their docs for more details. /// @@ -399,4 +421,4 @@ fn construct_onion_message_packet(payloads: Vec<(Payload, [u8; 32])>, onion_keys Ok(onion_utils::construct_onion_packet_with_init_noise::<_, _>( payloads, onion_keys, packet_data, None)) -} \ No newline at end of file +} diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index caba7753f3f..70b909147ff 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -1029,6 +1029,13 @@ pub enum MessageSendEvent { /// The gossip_timestamp_filter which should be sent. msg: msgs::GossipTimestampFilter, }, + /// Sends an onion message. + SendOnionMessage { + /// The node_id of this message recipient + node_id: PublicKey, + /// The onion message which should be sent. + msg: msgs::OnionMessage, + }, } /// A trait indicating an object may generate message send events @@ -1038,6 +1045,11 @@ pub trait MessageSendEventsProvider { fn get_and_clear_pending_msg_events(&self) -> Vec; } +/// A trait indicating an object may generate onion message send events +pub trait OnionMessageProvider { + fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option; +} + /// A trait indicating an object may generate events. /// /// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].