diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index b4ca316ed0d..be99427ed32 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -163,7 +163,7 @@ type ChannelMan = ChannelManager< EnforcingSigner, Arc, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>; -type PeerMan<'a> = PeerManager, Arc, Arc>>, Arc, Arc>>, Arc, IgnoringMessageHandler>; +type PeerMan<'a> = PeerManager, Arc, Arc>>, Arc, Arc>>, IgnoringMessageHandler, Arc, IgnoringMessageHandler>; struct MoneyLossDetector<'a> { manager: Arc, @@ -403,6 +403,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler { chan_handler: channelmanager.clone(), route_handler: gossip_sync.clone(), + onion_message_handler: IgnoringMessageHandler {}, }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{})); let mut should_forward = false; diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 11dca44bbab..95e4f3039ce 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -16,7 +16,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; -use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; +use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::scoring::WriteableScore; @@ -229,6 +229,7 @@ impl BackgroundProcessor { P: 'static + Deref + Send + Sync, Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, + OMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, @@ -237,7 +238,7 @@ impl BackgroundProcessor { PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, - PM: 'static + Deref> + Send + Sync, + PM: 'static + Deref> + Send + Sync, S: 'static + Deref + Send + Sync, SC: WriteableScore<'a>, >( @@ -254,6 +255,7 @@ impl BackgroundProcessor { L::Target: 'static + Logger, P::Target: 'static + Persist, CMH::Target: 'static + ChannelMessageHandler, + OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, @@ -489,7 +491,7 @@ mod tests { node: Arc>, p2p_gossip_sync: PGS, rapid_gossip_sync: RGS, - peer_manager: Arc, Arc, Arc, IgnoringMessageHandler>>, + peer_manager: Arc, Arc, IgnoringMessageHandler, Arc, IgnoringMessageHandler>>, chain_monitor: Arc, persister: Arc, tx_broadcaster: Arc, @@ -608,7 +610,7 @@ mod tests { let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone())); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone())); - let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; + let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{})); let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 2ac10762b04..16a5a0485d9 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -81,7 +81,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; use lightning::ln::peer_handler::CustomMessageHandler; -use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress}; +use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, NetAddress, RoutingMessageHandler}; use lightning::util::logger::Logger; use std::task; @@ -120,9 +120,10 @@ struct Connection { id: u64, } impl Connection { - async fn poll_event_process(peer_manager: Arc, Arc, Arc, Arc>>, mut event_receiver: mpsc::Receiver<()>) where + async fn poll_event_process(peer_manager: Arc, Arc, Arc, Arc, Arc>>, mut event_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { loop { @@ -133,9 +134,10 @@ impl Connection { } } - async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { // Create a waker to wake up poll_event_process, above @@ -255,9 +257,10 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { let remote_addr = get_addr_from_stream(&stream); @@ -297,9 +300,10 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { let remote_addr = get_addr_from_stream(&stream); @@ -368,9 +372,10 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc, Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, + OMH: OnionMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync, UMH: CustomMessageHandler + 'static + Send + Sync { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { @@ -618,6 +623,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), + onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); @@ -632,6 +638,7 @@ mod tests { let b_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), + onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); // We bind on localhost, hoping the environment is properly configured with a local @@ -683,6 +690,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()), + onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), }, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 40218089e3e..29449efa8e9 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -6005,6 +6005,7 @@ impl &events::MessageSendEvent::SendShortIdsQuery { .. } => false, &events::MessageSendEvent::SendReplyChannelRange { .. } => false, &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, + &events::MessageSendEvent::SendOnionMessage { .. } => false, } }); } diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index ccba9a7e4ea..f413c968f07 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -39,7 +39,7 @@ use core::fmt::Debug; use io::{self, Read}; use io_extras::read_to_end; -use util::events::MessageSendEventsProvider; +use util::events::{MessageSendEventsProvider, OnionMessageProvider}; use util::logger; use util::ser::{LengthReadable, Readable, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedVarInt, Hostname}; @@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>; } +/// A trait to describe an object that can receive onion messages. +pub trait OnionMessageHandler : OnionMessageProvider { + /// Handle an incoming onion_message message from the given peer. + fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage); +} + mod fuzzy_internal_msgs { use prelude::*; use ln::{PaymentPreimage, PaymentSecret}; diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 2a026821f8a..bdabb00f501 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -19,15 +19,16 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use ln::features::InitFeatures; use ln::msgs; -use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, RoutingMessageHandler}; +use ln::msgs::{ChannelMessageHandler, LightningError, OnionMessageHandler, NetAddress, RoutingMessageHandler}; use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use util::ser::{VecWriter, Writeable, Writer}; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; use ln::wire; use ln::wire::Encode; +use onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger}; use routing::gossip::{NetworkGraph, P2PGossipSync}; use util::atomic_counter::AtomicCounter; -use util::events::{MessageSendEvent, MessageSendEventsProvider}; +use util::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider}; use util::logger::Logger; use prelude::*; @@ -76,6 +77,12 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } } +impl OnionMessageProvider for IgnoringMessageHandler { + fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } +} +impl OnionMessageHandler for IgnoringMessageHandler { + fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {} +} impl Deref for IgnoringMessageHandler { type Target = IgnoringMessageHandler; fn deref(&self) -> &Self { self } @@ -199,9 +206,11 @@ impl Deref for ErroringMessageHandler { } /// Provides references to trait impls which handle different types of messages. -pub struct MessageHandler where +pub struct MessageHandler where CM::Target: ChannelMessageHandler, - RM::Target: RoutingMessageHandler { + RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, +{ /// A message handler which handles messages specific to channels. Usually this is just a /// [`ChannelManager`] object or an [`ErroringMessageHandler`]. /// @@ -212,6 +221,12 @@ pub struct MessageHandler where /// /// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync pub route_handler: RM, + + /// A message handler which handles onion messages. Using this is just an [`OnionMessenger`] object + /// or an [`IgnoringMessageHandler`]. + /// + /// [`OnionMessenger`]: crate::onion_message::OnionMessenger + pub onion_message_handler: OM, } /// Provides an object which can be used to send data to and which uniquely identifies a connection @@ -294,15 +309,23 @@ enum InitSyncTracker{ /// forwarding gossip messages to peers altogether. const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2; +/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause +/// forwarding onion messages to peers altogether. +const OM_BUFFER_LIMIT_RATIO: usize = 3; + /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until /// we have fewer than this many messages in the outbound buffer again. -/// We also use this as the target number of outbound gossip messages to keep in the write buffer, -/// refilled as we send bytes. +/// We also use this as the target number of outbound gossip and onion messages to keep in the write +/// buffer, refilled as we send bytes. const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10; /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to /// the peer. const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO; +/// When the outbound buffer has this many messages, we won't poll for new onion messages for this +/// peer. +const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO; + /// If we've sent a ping, and are still awaiting a response, we may need to churn our way through /// the socket receive buffer before receiving the ping. /// @@ -378,6 +401,17 @@ impl Peer { InitSyncTracker::NodesSyncing(pk) => pk < node_id, } } + + /// We want to prioritize channel messages over onion messages, so we only forward them when + /// there's enough room in the peer's outbound buffer. + fn onion_msg_buffer_space_available(&self) -> bool { + if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_PAUSE_OMS + || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO + { + return false + } + true + } } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -387,7 +421,7 @@ impl Peer { /// issues such as overly long function definitions. /// /// (C-not exported) as Arcs don't make sense in bindings -pub type SimpleArcPeerManager = PeerManager>, Arc>>, Arc, Arc>>, Arc, Arc>; +pub type SimpleArcPeerManager = PeerManager>, Arc>>, Arc, Arc>>, Arc>, Arc, Arc>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -397,7 +431,7 @@ pub type SimpleArcPeerManager = PeerManager = PeerManager, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'f L, IgnoringMessageHandler>; +pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, SimpleRefOnionMessenger<'c, 'f, L>, &'f L, IgnoringMessageHandler>; /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls /// socket events into messages which it passes on to its [`MessageHandler`]. @@ -418,12 +452,13 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> /// you're using lightning-net-tokio. /// /// [`read_event`]: PeerManager::read_event -pub struct PeerManager where +pub struct PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, L::Target: Logger, CMH::Target: CustomMessageHandler { - message_handler: MessageHandler, + message_handler: MessageHandler, /// Connection state for each connected peer - we have an outer read-write lock which is taken /// as read while we're doing processing for a peer and taken write when a peer is being added /// or removed. @@ -482,31 +517,33 @@ macro_rules! encode_msg { }} } -impl PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, L::Target: Logger { - /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message - /// handler is used and network graph messages are ignored. + /// Constructs a new PeerManager with the given ChannelMessageHandler and OnionMessageHandler. No + /// routing message handler is used and network graph messages are ignored. /// /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. /// /// (C-not exported) as we can't export a PeerManager with a dummy route handler - pub fn new_channel_only(channel_message_handler: CM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { + pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { Self::new(MessageHandler { chan_handler: channel_message_handler, route_handler: IgnoringMessageHandler{}, + onion_message_handler, }, our_node_secret, ephemeral_random_data, logger, IgnoringMessageHandler{}) } } -impl PeerManager where +impl PeerManager where RM::Target: RoutingMessageHandler, L::Target: Logger { - /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message - /// handler is used and messages related to channels will be ignored (or generate error - /// messages). Note that some other lightning implementations time-out connections after some - /// time if no channel is built with the peer. + /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message handler + /// or onion message handler is used and messages related to channels will be ignored (or generate + /// error messages). Note that some other lightning implementations time-out connections after + /// some time if no channel is built with the peer. /// /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. @@ -516,6 +553,7 @@ impl PeerManager) -> Option { } } -impl PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, L::Target: Logger, CMH::Target: CustomMessageHandler { /// Constructs a new PeerManager with the given message handlers and node_id secret key /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. - pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self { + pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self { let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); @@ -1282,6 +1321,11 @@ impl P self.message_handler.route_handler.handle_reply_channel_range(&their_node_id, msg)?; }, + // Onion message: + wire::Message::OnionMessage(msg) => { + self.message_handler.onion_message_handler.handle_onion_message(&their_node_id, &msg); + }, + // Unknown messages: wire::Message::Unknown(type_id) if message.is_even() => { log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id); @@ -1427,6 +1471,17 @@ impl P let peers_lock = self.peers.read().unwrap(); let peers = &*peers_lock; + + for (_, peer_mutex) in peers.iter() { + let peer = peer_mutex.lock().unwrap(); + if let Some(peer_node_id) = peer.their_node_id { + if peer.onion_msg_buffer_space_available() { + if let Some(msg) = self.message_handler.onion_message_handler.next_onion_message_for_peer(peer_node_id) { + events_generated.push(msg); + } + } + } + } macro_rules! get_peer_for_forwarding { ($node_id: expr) => { { @@ -1627,6 +1682,10 @@ impl P MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } + MessageSendEvent::SendOnionMessage { ref node_id, ref msg } => { + log_trace!(self.logger, "Handling SendOnionMessage event in peer_handler for node {}", log_pubkey!(node_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } } } @@ -1904,12 +1963,12 @@ mod tests { cfgs } - fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { let mut peers = Vec::new(); for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); let ephemeral_bytes = [i as u8; 32]; - let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler }; + let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, onion_message_handler: IgnoringMessageHandler {} }; let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger, IgnoringMessageHandler {}); peers.push(peer); } @@ -1917,7 +1976,7 @@ mod tests { peers } - fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let secp_ctx = Secp256k1::new(); let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index cbf5c77d600..1191a8d3d53 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -9,7 +9,7 @@ //! Wire encoding/decoding for Lightning messages according to [BOLT #1], and for //! custom message through the [`CustomMessageReader`] trait. -//! +//! //! [BOLT #1]: https://github.com/lightning/bolts/blob/master/01-messaging.md use io; @@ -60,6 +60,7 @@ pub(crate) enum Message where T: core::fmt::Debug + Type + TestEq { ChannelReady(msgs::ChannelReady), Shutdown(msgs::Shutdown), ClosingSigned(msgs::ClosingSigned), + OnionMessage(msgs::OnionMessage), UpdateAddHTLC(msgs::UpdateAddHTLC), UpdateFulfillHTLC(msgs::UpdateFulfillHTLC), UpdateFailHTLC(msgs::UpdateFailHTLC), @@ -100,6 +101,7 @@ impl Message where T: core::fmt::Debug + Type + TestEq { &Message::ChannelReady(ref msg) => msg.type_id(), &Message::Shutdown(ref msg) => msg.type_id(), &Message::ClosingSigned(ref msg) => msg.type_id(), + &Message::OnionMessage(ref msg) => msg.type_id(), &Message::UpdateAddHTLC(ref msg) => msg.type_id(), &Message::UpdateFulfillHTLC(ref msg) => msg.type_id(), &Message::UpdateFailHTLC(ref msg) => msg.type_id(), @@ -185,6 +187,9 @@ fn do_read(buffer: &mut R, message_type: u1 msgs::ClosingSigned::TYPE => { Ok(Message::ClosingSigned(Readable::read(buffer)?)) }, + msgs::OnionMessage::TYPE => { + Ok(Message::OnionMessage(Readable::read(buffer)?)) + }, msgs::UpdateAddHTLC::TYPE => { Ok(Message::UpdateAddHTLC(Readable::read(buffer)?)) }, @@ -344,6 +349,10 @@ impl Encode for msgs::ClosingSigned { const TYPE: u16 = 39; } +impl Encode for msgs::OnionMessage { + const TYPE: u16 = 513; +} + impl Encode for msgs::UpdateAddHTLC { const TYPE: u16 = 128; } diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 996d0bf50e9..b444f4fb4b8 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -10,6 +10,7 @@ //! Onion message testing and test utilities live here. use chain::keysinterface::{KeysInterface, Recipient}; +use ln::msgs::OnionMessageHandler; use super::{BlindedRoute, Destination, OnionMessenger, SendError}; use util::enforcing_trait_impls::EnforcingSigner; use util::test_utils; diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 724dd4b31d1..06ccd6661c3 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -17,12 +17,13 @@ use bitcoin::secp256k1::{self, PublicKey, Secp256k1, SecretKey}; use bitcoin::secp256k1::ecdh::SharedSecret; use chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Recipient, Sign}; -use ln::msgs; +use ln::msgs::{self, OnionMessageHandler}; use ln::onion_utils; use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs}; use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN}; use super::utils; use util::chacha20::ChaCha20; +use util::events::{MessageSendEvent, OnionMessageProvider}; use util::logger::Logger; use util::ser::Writeable; @@ -181,10 +182,23 @@ impl OnionMessenger Ok(()) } + #[cfg(test)] + pub(super) fn get_and_clear_pending_msgs(&self) -> HashMap> { + let mut pending_msgs = self.pending_messages.lock().unwrap(); + let mut msgs = HashMap::new(); + core::mem::swap(&mut *pending_msgs, &mut msgs); + msgs + } +} + +impl OnionMessageHandler for OnionMessenger + where K::Target: KeysInterface, + L::Target: Logger, +{ /// Handle an incoming onion message. Currently, if a message was destined for us we will log, but /// soon we'll delegate the onion message to a handler that can generate invoices or send /// payments. - pub fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) { + fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) { let node_secret = match self.keys_manager.get_node_secret(Recipient::Node) { Ok(secret) => secret, Err(e) => { @@ -271,18 +285,26 @@ impl OnionMessenger }, }; } +} - #[cfg(test)] - pub(super) fn get_and_clear_pending_msgs(&self) -> HashMap> { +impl OnionMessageProvider for OnionMessenger + where K::Target: KeysInterface, + L::Target: Logger, +{ + fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option { let mut pending_msgs = self.pending_messages.lock().unwrap(); - let mut msgs = HashMap::new(); - core::mem::swap(&mut *pending_msgs, &mut msgs); - msgs + if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) { + if let Some(msg) = msgs.pop() { + return Some(MessageSendEvent::SendOnionMessage { + node_id: peer_node_id, + msg, + }) + } + } + None } } -// TODO: parameterize the below Simple* types with OnionMessenger and handle the messages it -// produces /// Useful for simplifying the parameters of [`SimpleArcChannelManager`] and /// [`SimpleArcPeerManager`]. See their docs for more details. /// @@ -399,4 +421,4 @@ fn construct_onion_message_packet(payloads: Vec<(Payload, [u8; 32])>, onion_keys Ok(onion_utils::construct_onion_packet_with_init_noise::<_, _>( payloads, onion_keys, packet_data, None)) -} \ No newline at end of file +} diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index caba7753f3f..70b909147ff 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -1029,6 +1029,13 @@ pub enum MessageSendEvent { /// The gossip_timestamp_filter which should be sent. msg: msgs::GossipTimestampFilter, }, + /// Sends an onion message. + SendOnionMessage { + /// The node_id of this message recipient + node_id: PublicKey, + /// The onion message which should be sent. + msg: msgs::OnionMessage, + }, } /// A trait indicating an object may generate message send events @@ -1038,6 +1045,11 @@ pub trait MessageSendEventsProvider { fn get_and_clear_pending_msg_events(&self) -> Vec; } +/// A trait indicating an object may generate onion message send events +pub trait OnionMessageProvider { + fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option; +} + /// A trait indicating an object may generate events. /// /// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].