diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index f0f8054624d..b712bfbaa1c 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -164,7 +164,7 @@ type ChannelMan = ChannelManager< EnforcingSigner, Arc, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>; -type PeerMan<'a> = PeerManager, Arc, Arc>>, Arc, Arc>>, Arc, IgnoringMessageHandler>; +type PeerMan<'a> = PeerManager, Arc, Arc>>, Arc, Arc>>, IgnoringMessageHandler, Arc, IgnoringMessageHandler>; struct MoneyLossDetector<'a> { manager: Arc, @@ -412,6 +412,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler { chan_handler: channelmanager.clone(), route_handler: gossip_sync.clone(), + onion_message_handler: IgnoringMessageHandler {}, }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{})); let mut should_forward = false; diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 484439b3907..c27d9f9c7d6 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -16,7 +16,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; -use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; +use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::scoring::WriteableScore; @@ -278,6 +278,7 @@ impl BackgroundProcessor { P: 'static + Deref + Send + Sync, Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, + OMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, @@ -286,7 +287,7 @@ impl BackgroundProcessor { PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, - PM: 'static + Deref> + Send + Sync, + PM: 'static + Deref> + Send + Sync, S: 'static + Deref + Send + Sync, SC: WriteableScore<'a>, >( @@ -303,6 +304,7 @@ impl BackgroundProcessor { L::Target: 'static + Logger, P::Target: 'static + Persist, CMH::Target: 'static + ChannelMessageHandler, + OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, @@ -538,7 +540,7 @@ mod tests { node: Arc>, p2p_gossip_sync: PGS, rapid_gossip_sync: RGS, - peer_manager: Arc, Arc, Arc, IgnoringMessageHandler>>, + peer_manager: Arc, Arc, IgnoringMessageHandler, Arc, IgnoringMessageHandler>>, chain_monitor: Arc, persister: Arc, tx_broadcaster: Arc, @@ -657,7 +659,7 @@ mod tests { let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone())); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone())); - let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; + let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{})); let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 645a7434e45..aba82491161 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -81,7 +81,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; use lightning::ln::peer_handler::CustomMessageHandler; -use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress}; +use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, NetAddress, RoutingMessageHandler}; use lightning::util::logger::Logger; use std::ops::Deref; @@ -121,13 +121,15 @@ struct Connection { id: u64, } impl Connection { - async fn poll_event_process(peer_manager: Arc>, mut event_receiver: mpsc::Receiver<()>) where + async fn poll_event_process(peer_manager: Arc>, mut event_receiver: mpsc::Receiver<()>) where CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, + OMH: Deref + 'static + Send + Sync, L: Deref + 'static + Send + Sync, UMH: Deref + 'static + Send + Sync, CMH::Target: ChannelMessageHandler + Send + Sync, RMH::Target: RoutingMessageHandler + Send + Sync, + OMH::Target: OnionMessageHandler + Send + Sync, L::Target: Logger + Send + Sync, UMH::Target: CustomMessageHandler + Send + Sync, { @@ -139,13 +141,15 @@ impl Connection { } } - async fn schedule_read(peer_manager: Arc>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read(peer_manager: Arc>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, + OMH: Deref + 'static + Send + Sync, L: Deref + 'static + Send + Sync, UMH: Deref + 'static + Send + Sync, CMH::Target: ChannelMessageHandler + 'static + Send + Sync, RMH::Target: RoutingMessageHandler + 'static + Send + Sync, + OMH::Target: OnionMessageHandler + 'static + Send + Sync, L::Target: Logger + 'static + Send + Sync, UMH::Target: CustomMessageHandler + 'static + Send + Sync, { @@ -266,13 +270,15 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc>, stream: StdTcpStream) -> impl std::future::Future where CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, + OMH: Deref + 'static + Send + Sync, L: Deref + 'static + Send + Sync, UMH: Deref + 'static + Send + Sync, CMH::Target: ChannelMessageHandler + Send + Sync, RMH::Target: RoutingMessageHandler + Send + Sync, + OMH::Target: OnionMessageHandler + Send + Sync, L::Target: Logger + Send + Sync, UMH::Target: CustomMessageHandler + Send + Sync, { @@ -313,13 +319,15 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, + OMH: Deref + 'static + Send + Sync, L: Deref + 'static + Send + Sync, UMH: Deref + 'static + Send + Sync, CMH::Target: ChannelMessageHandler + Send + Sync, RMH::Target: RoutingMessageHandler + Send + Sync, + OMH::Target: OnionMessageHandler + Send + Sync, L::Target: Logger + Send + Sync, UMH::Target: CustomMessageHandler + Send + Sync, { @@ -389,13 +397,15 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, + OMH: Deref + 'static + Send + Sync, L: Deref + 'static + Send + Sync, UMH: Deref + 'static + Send + Sync, CMH::Target: ChannelMessageHandler + Send + Sync, RMH::Target: RoutingMessageHandler + Send + Sync, + OMH::Target: OnionMessageHandler + Send + Sync, L::Target: Logger + Send + Sync, UMH::Target: CustomMessageHandler + Send + Sync, { @@ -644,6 +654,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), + onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); @@ -658,6 +669,7 @@ mod tests { let b_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), + onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); // We bind on localhost, hoping the environment is properly configured with a local @@ -709,6 +721,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()), + onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), }, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index a02f5f13f43..66da5d82028 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -6153,6 +6153,7 @@ impl &events::MessageSendEvent::SendShortIdsQuery { .. } => false, &events::MessageSendEvent::SendReplyChannelRange { .. } => false, &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, + &events::MessageSendEvent::SendOnionMessage { .. } => false, } }); } diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 05a336c18ef..7dcc4b6577f 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -40,7 +40,7 @@ use core::fmt::Debug; use io::{self, Read}; use io_extras::read_to_end; -use util::events::MessageSendEventsProvider; +use util::events::{MessageSendEventsProvider, OnionMessageProvider}; use util::logger; use util::ser::{LengthReadable, Readable, ReadableArgs, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedVarInt, Hostname}; @@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>; } +/// A trait to describe an object that can receive onion messages. +pub trait OnionMessageHandler : OnionMessageProvider { + /// Handle an incoming onion_message message from the given peer. + fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage); +} + mod fuzzy_internal_msgs { use prelude::*; use ln::{PaymentPreimage, PaymentSecret}; diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 32e036d105a..f37a08336c2 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -19,15 +19,16 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use ln::features::InitFeatures; use ln::msgs; -use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, RoutingMessageHandler}; +use ln::msgs::{ChannelMessageHandler, LightningError, OnionMessageHandler, NetAddress, RoutingMessageHandler}; use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use util::ser::{VecWriter, Writeable, Writer}; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; use ln::wire; use ln::wire::Encode; +use onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger}; use routing::gossip::{NetworkGraph, P2PGossipSync}; use util::atomic_counter::AtomicCounter; -use util::events::{MessageSendEvent, MessageSendEventsProvider}; +use util::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider}; use util::logger::Logger; use prelude::*; @@ -76,6 +77,12 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } } +impl OnionMessageProvider for IgnoringMessageHandler { + fn next_onion_messages_for_peer(&self, _peer_node_id: PublicKey, _max_messages: usize) -> Vec { Vec::new() } +} +impl OnionMessageHandler for IgnoringMessageHandler { + fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {} +} impl Deref for IgnoringMessageHandler { type Target = IgnoringMessageHandler; fn deref(&self) -> &Self { self } @@ -199,9 +206,11 @@ impl Deref for ErroringMessageHandler { } /// Provides references to trait impls which handle different types of messages. -pub struct MessageHandler where +pub struct MessageHandler where CM::Target: ChannelMessageHandler, - RM::Target: RoutingMessageHandler { + RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, +{ /// A message handler which handles messages specific to channels. Usually this is just a /// [`ChannelManager`] object or an [`ErroringMessageHandler`]. /// @@ -212,6 +221,12 @@ pub struct MessageHandler where /// /// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync pub route_handler: RM, + + /// A message handler which handles onion messages. Using this is just an [`OnionMessenger`] object + /// or an [`IgnoringMessageHandler`]. + /// + /// [`OnionMessenger`]: crate::onion_message::OnionMessenger + pub onion_message_handler: OM, } /// Provides an object which can be used to send data to and which uniquely identifies a connection @@ -395,7 +410,7 @@ impl Peer { /// issues such as overly long function definitions. /// /// (C-not exported) as Arcs don't make sense in bindings -pub type SimpleArcPeerManager = PeerManager>, Arc>>, Arc, Arc>>, Arc, Arc>; +pub type SimpleArcPeerManager = PeerManager>, Arc>>, Arc, Arc>>, Arc>, Arc, Arc>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -405,7 +420,7 @@ pub type SimpleArcPeerManager = PeerManager = PeerManager, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'f L, IgnoringMessageHandler>; +pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, SimpleRefOnionMessenger<'c, 'f, L>, &'f L, IgnoringMessageHandler>; /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls /// socket events into messages which it passes on to its [`MessageHandler`]. @@ -426,12 +441,13 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> /// you're using lightning-net-tokio. /// /// [`read_event`]: PeerManager::read_event -pub struct PeerManager where +pub struct PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, L::Target: Logger, CMH::Target: CustomMessageHandler { - message_handler: MessageHandler, + message_handler: MessageHandler, /// Connection state for each connected peer - we have an outer read-write lock which is taken /// as read while we're doing processing for a peer and taken write when a peer is being added /// or removed. @@ -490,31 +506,33 @@ macro_rules! encode_msg { }} } -impl PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, + OM::Target: OnionMessageHandler, L::Target: Logger { - /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message - /// handler is used and network graph messages are ignored. + /// Constructs a new PeerManager with the given ChannelMessageHandler and OnionMessageHandler. No + /// routing message handler is used and network graph messages are ignored. /// /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. /// /// (C-not exported) as we can't export a PeerManager with a dummy route handler - pub fn new_channel_only(channel_message_handler: CM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { + pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { Self::new(MessageHandler { chan_handler: channel_message_handler, route_handler: IgnoringMessageHandler{}, + onion_message_handler, }, our_node_secret, ephemeral_random_data, logger, IgnoringMessageHandler{}) } } -impl PeerManager where +impl PeerManager where RM::Target: RoutingMessageHandler, L::Target: Logger { - /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message - /// handler is used and messages related to channels will be ignored (or generate error - /// messages). Note that some other lightning implementations time-out connections after some - /// time if no channel is built with the peer. + /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message handler + /// or onion message handler is used and messages related to channels will be ignored (or generate + /// error messages). Note that some other lightning implementations time-out connections after + /// some time if no channel is built with the peer. /// /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. @@ -524,6 +542,7 @@ impl PeerManager) -> Option { } } -impl PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, L::Target: Logger, CMH::Target: CustomMessageHandler { /// Constructs a new PeerManager with the given message handlers and node_id secret key /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. - pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self { + pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self { let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); @@ -1288,6 +1308,11 @@ impl P self.message_handler.route_handler.handle_reply_channel_range(&their_node_id, msg)?; }, + // Onion message: + wire::Message::OnionMessage(msg) => { + self.message_handler.onion_message_handler.handle_onion_message(&their_node_id, &msg); + }, + // Unknown messages: wire::Message::Unknown(type_id) if message.is_even() => { log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id); @@ -1627,6 +1652,10 @@ impl P MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } + MessageSendEvent::SendOnionMessage { ref node_id, msg: _ } => { + log_trace!(self.logger, "Unexpected onion message for peer {} in events queue", log_pubkey!(node_id)); + debug_assert!(false); + } } } @@ -1904,12 +1933,12 @@ mod tests { cfgs } - fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { let mut peers = Vec::new(); for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); let ephemeral_bytes = [i as u8; 32]; - let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler }; + let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, onion_message_handler: IgnoringMessageHandler {} }; let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger, IgnoringMessageHandler {}); peers.push(peer); } @@ -1917,7 +1946,7 @@ mod tests { peers } - fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let secp_ctx = Secp256k1::new(); let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index cbf5c77d600..1191a8d3d53 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -9,7 +9,7 @@ //! Wire encoding/decoding for Lightning messages according to [BOLT #1], and for //! custom message through the [`CustomMessageReader`] trait. -//! +//! //! [BOLT #1]: https://github.com/lightning/bolts/blob/master/01-messaging.md use io; @@ -60,6 +60,7 @@ pub(crate) enum Message where T: core::fmt::Debug + Type + TestEq { ChannelReady(msgs::ChannelReady), Shutdown(msgs::Shutdown), ClosingSigned(msgs::ClosingSigned), + OnionMessage(msgs::OnionMessage), UpdateAddHTLC(msgs::UpdateAddHTLC), UpdateFulfillHTLC(msgs::UpdateFulfillHTLC), UpdateFailHTLC(msgs::UpdateFailHTLC), @@ -100,6 +101,7 @@ impl Message where T: core::fmt::Debug + Type + TestEq { &Message::ChannelReady(ref msg) => msg.type_id(), &Message::Shutdown(ref msg) => msg.type_id(), &Message::ClosingSigned(ref msg) => msg.type_id(), + &Message::OnionMessage(ref msg) => msg.type_id(), &Message::UpdateAddHTLC(ref msg) => msg.type_id(), &Message::UpdateFulfillHTLC(ref msg) => msg.type_id(), &Message::UpdateFailHTLC(ref msg) => msg.type_id(), @@ -185,6 +187,9 @@ fn do_read(buffer: &mut R, message_type: u1 msgs::ClosingSigned::TYPE => { Ok(Message::ClosingSigned(Readable::read(buffer)?)) }, + msgs::OnionMessage::TYPE => { + Ok(Message::OnionMessage(Readable::read(buffer)?)) + }, msgs::UpdateAddHTLC::TYPE => { Ok(Message::UpdateAddHTLC(Readable::read(buffer)?)) }, @@ -344,6 +349,10 @@ impl Encode for msgs::ClosingSigned { const TYPE: u16 = 39; } +impl Encode for msgs::OnionMessage { + const TYPE: u16 = 513; +} + impl Encode for msgs::UpdateAddHTLC { const TYPE: u16 = 128; } diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 695064e467c..f6f508c418f 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -10,8 +10,10 @@ //! Onion message testing and test utilities live here. use chain::keysinterface::{KeysInterface, Recipient}; +use ln::msgs::OnionMessageHandler; use super::{BlindedRoute, Destination, OnionMessenger, SendError}; use util::enforcing_trait_impls::EnforcingSigner; +use util::events::MessageSendEvent; use util::test_utils; use bitcoin::network::constants::Network; @@ -54,9 +56,12 @@ fn pass_along_path(mut path: Vec, expected_path_id: Option<[u8; 3 let events = prev_node.messenger.release_pending_msgs(); assert_eq!(events.len(), 1); let onion_msg = { - let msgs = events.get(&node.get_node_pk()).unwrap(); - assert_eq!(msgs.len(), 1); - msgs[0].clone() + let events = events.get(&node.get_node_pk()).unwrap(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendOnionMessage { msg, .. } => msg.clone(), + _ => panic!() + } }; node.messenger.handle_onion_message(&prev_node.get_node_pk(), &onion_msg); if idx == num_nodes - 1 { diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 7eba3cdd254..aeb635349dc 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -16,13 +16,15 @@ use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::secp256k1::{self, PublicKey, Secp256k1, SecretKey}; use chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Recipient, Sign}; -use ln::msgs; +use ln::msgs::{self, OnionMessageHandler}; use ln::onion_utils; use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs}; use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN}; use super::utils; +use util::events::{MessageSendEvent, OnionMessageProvider}; use util::logger::Logger; +use core::mem; use core::ops::Deref; use sync::{Arc, Mutex}; use prelude::*; @@ -84,7 +86,7 @@ pub struct OnionMessenger { keys_manager: K, logger: L, - pending_messages: Mutex>>, + pending_messages: Mutex>>, secp_ctx: Secp256k1, // Coming soon: // invoice_handler: InvoiceHandler, @@ -168,20 +170,33 @@ impl OnionMessenger packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?; let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap(); - let pending_msgs = pending_per_peer_msgs.entry(introduction_node_id).or_insert(Vec::new()); - pending_msgs.push( - msgs::OnionMessage { + let pending_msgs = pending_per_peer_msgs.entry(introduction_node_id.clone()).or_insert(Vec::new()); + pending_msgs.push(MessageSendEvent::SendOnionMessage { + node_id: introduction_node_id, + msg: msgs::OnionMessage { blinding_point, onion_routing_packet: onion_packet, - } - ); + }, + }); Ok(()) } + #[cfg(test)] + pub(super) fn release_pending_msgs(&self) -> HashMap> { + let mut pending_msgs = self.pending_messages.lock().unwrap(); + let mut msgs = HashMap::new(); + mem::swap(&mut *pending_msgs, &mut msgs); + msgs + } +} + +impl OnionMessageHandler for OnionMessenger + where K::Target: KeysInterface, + L::Target: Logger, +{ /// Handle an incoming onion message. Currently, if a message was destined for us we will log, but - /// soon we'll delegate the onion message to a handler that can generate invoices or send - /// payments. - pub fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) { + /// soon we'll delegate the onion message to a handler that can generate invoices or send payments. + fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) { let control_tlvs_ss = match self.keys_manager.ecdh(Recipient::Node, &msg.blinding_point, None) { Ok(ss) => ss, Err(e) => { @@ -236,9 +251,10 @@ impl OnionMessenger }; let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap(); - let pending_msgs = pending_per_peer_msgs.entry(next_node_id).or_insert(Vec::new()); - pending_msgs.push( - msgs::OnionMessage { + let pending_msgs = pending_per_peer_msgs.entry(next_node_id.clone()).or_insert(Vec::new()); + pending_msgs.push(MessageSendEvent::SendOnionMessage { + node_id: next_node_id, + msg: msgs::OnionMessage { blinding_point: match next_blinding_override { Some(blinding_point) => blinding_point, None => { @@ -258,7 +274,7 @@ impl OnionMessenger }, onion_routing_packet: outgoing_packet, }, - ); + }); }, Err(e) => { log_trace!(self.logger, "Errored decoding onion message packet: {:?}", e); @@ -268,18 +284,27 @@ impl OnionMessenger }, }; } +} - #[cfg(test)] - pub(super) fn release_pending_msgs(&self) -> HashMap> { +impl OnionMessageProvider for OnionMessenger + where K::Target: KeysInterface, + L::Target: Logger, +{ + fn next_onion_messages_for_peer(&self, peer_node_id: PublicKey, max_messages: usize) -> Vec { let mut pending_msgs = self.pending_messages.lock().unwrap(); - let mut msgs = HashMap::new(); - core::mem::swap(&mut *pending_msgs, &mut msgs); - msgs + if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) { + if max_messages >= msgs.len() { + let mut peer_pending_msgs = Vec::new(); + mem::swap(msgs, &mut peer_pending_msgs); + return peer_pending_msgs + } else { + return msgs.split_off(max_messages) + } + } + Vec::new() } } -// TODO: parameterize the below Simple* types with OnionMessenger and handle the messages it -// produces /// Useful for simplifying the parameters of [`SimpleArcChannelManager`] and /// [`SimpleArcPeerManager`]. See their docs for more details. /// diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index 42b55163325..5c46fdc1068 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -1186,6 +1186,13 @@ pub enum MessageSendEvent { /// The gossip_timestamp_filter which should be sent. msg: msgs::GossipTimestampFilter, }, + /// Sends an onion message. + SendOnionMessage { + /// The node_id of this message recipient + node_id: PublicKey, + /// The onion message which should be sent. + msg: msgs::OnionMessage, + }, } /// A trait indicating an object may generate message send events @@ -1195,6 +1202,12 @@ pub trait MessageSendEventsProvider { fn get_and_clear_pending_msg_events(&self) -> Vec; } +/// A trait indicating an object may generate onion message send events +pub trait OnionMessageProvider { + /// Gets up to `max_messages` pending onion message events. + fn next_onion_messages_for_peer(&self, peer_node_id: PublicKey, max_messages: usize) -> Vec; +} + /// A trait indicating an object may generate events. /// /// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].