Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a parallel async event handler to OnionMessenger and pass it directly to BackgroundProcessor #3060

Merged
94 changes: 47 additions & 47 deletions lightning-background-processor/src/lib.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions lightning/src/ln/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use core::fmt::Display;
use crate::io::{self, Cursor, Read};
use crate::io_extras::read_to_end;

use crate::events::{EventsProvider, MessageSendEventsProvider};
use crate::events::MessageSendEventsProvider;
use crate::crypto::streams::ChaChaPolyReadAdapter;
use crate::util::logger;
use crate::util::ser::{LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer, WithoutLength, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname, TransactionU16LenLimited, BigSize};
Expand Down Expand Up @@ -1623,7 +1623,7 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
}

/// A handler for received [`OnionMessage`]s and for providing generated ones to send.
pub trait OnionMessageHandler: EventsProvider {
pub trait OnionMessageHandler {
/// Handle an incoming `onion_message` message from the given peer.
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);

Expand Down
10 changes: 1 addition & 9 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bitcoin::blockdata::constants::ChainHash;
use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};

use crate::sign::{NodeSigner, Recipient};
use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
use crate::events::{MessageSendEvent, MessageSendEventsProvider};
use crate::ln::types::ChannelId;
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::msgs;
Expand Down Expand Up @@ -97,9 +97,6 @@ pub trait CustomMessageHandler: wire::CustomMessageReader {
/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
pub struct IgnoringMessageHandler{}
impl EventsProvider for IgnoringMessageHandler {
fn process_pending_events<H: Deref>(&self, _handler: H) where H::Target: EventHandler {}
}
impl MessageSendEventsProvider for IgnoringMessageHandler {
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> { Vec::new() }
}
Expand Down Expand Up @@ -723,8 +720,6 @@ pub trait APeerManager {
type NS: Deref<Target=Self::NST>;
/// Gets a reference to the underlying [`PeerManager`].
fn as_ref(&self) -> &PeerManager<Self::Descriptor, Self::CM, Self::RM, Self::OM, Self::L, Self::CMH, Self::NS>;
/// Returns the peer manager's [`OnionMessageHandler`].
fn onion_message_handler(&self) -> &Self::OMT;
}

impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref, NS: Deref>
Expand All @@ -750,9 +745,6 @@ APeerManager for PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> where
type NST = <NS as Deref>::Target;
type NS = NS;
fn as_ref(&self) -> &PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> { self }
fn onion_message_handler(&self) -> &Self::OMT {
self.message_handler.onion_message_handler.deref()
}
}

/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
Expand Down
150 changes: 139 additions & 11 deletions lightning/src/onion_message/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,70 @@ use {

pub(super) const MAX_TIMER_TICKS: usize = 2;

/// A trivial trait which describes any [`OnionMessenger`].
///
/// This is not exported to bindings users as general cover traits aren't useful in other
/// languages.
pub trait AOnionMessenger {
/// A type implementing [`EntropySource`]
type EntropySource: EntropySource + ?Sized;
/// A type that may be dereferenced to [`Self::EntropySource`]
type ES: Deref<Target = Self::EntropySource>;
/// A type implementing [`NodeSigner`]
type NodeSigner: NodeSigner + ?Sized;
/// A type that may be dereferenced to [`Self::NodeSigner`]
type NS: Deref<Target = Self::NodeSigner>;
/// A type implementing [`Logger`]
type Logger: Logger + ?Sized;
/// A type that may be dereferenced to [`Self::Logger`]
type L: Deref<Target = Self::Logger>;
/// A type implementing [`NodeIdLookUp`]
type NodeIdLookUp: NodeIdLookUp + ?Sized;
/// A type that may be dereferenced to [`Self::NodeIdLookUp`]
type NL: Deref<Target = Self::NodeIdLookUp>;
/// A type implementing [`MessageRouter`]
type MessageRouter: MessageRouter + ?Sized;
/// A type that may be dereferenced to [`Self::MessageRouter`]
type MR: Deref<Target = Self::MessageRouter>;
/// A type implementing [`OffersMessageHandler`]
type OffersMessageHandler: OffersMessageHandler + ?Sized;
/// A type that may be dereferenced to [`Self::OffersMessageHandler`]
type OMH: Deref<Target = Self::OffersMessageHandler>;
/// A type implementing [`CustomOnionMessageHandler`]
type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
/// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`]
Comment on lines +55 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: end sentences in periods

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think they're full sentences?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AChannelManager's docs have the same sentence structure and use periods. Feel free to ignore, though.

type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
/// Returns a reference to the actual [`OnionMessenger`] object.
fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::CMH>;
}

impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> AOnionMessenger
for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> where
ES::Target: EntropySource,
NS::Target: NodeSigner,
L::Target: Logger,
NL::Target: NodeIdLookUp,
MR::Target: MessageRouter,
OMH::Target: OffersMessageHandler,
CMH::Target: CustomOnionMessageHandler,
{
type EntropySource = ES::Target;
type ES = ES;
type NodeSigner = NS::Target;
type NS = NS;
type Logger = L::Target;
type L = L;
type NodeIdLookUp = NL::Target;
type NL = NL;
type MessageRouter = MR::Target;
type MR = MR;
type OffersMessageHandler = OMH::Target;
type OMH = OMH;
type CustomOnionMessageHandler = CMH::Target;
type CMH = CMH;
fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> { self }
}

/// A sender, receiver and forwarder of [`OnionMessage`]s.
///
/// # Handling Messages
Expand Down Expand Up @@ -177,7 +241,12 @@ where
offers_handler: OMH,
custom_handler: CMH,
intercept_messages_for_offline_peers: bool,
pending_events: Mutex<Vec<Event>>,
pending_events: Mutex<PendingEvents>,
}

struct PendingEvents {
intercepted_msgs: Vec<Event>,
peer_connecteds: Vec<Event>,
}

/// [`OnionMessage`]s buffered to be sent.
Expand Down Expand Up @@ -899,7 +968,10 @@ where
offers_handler,
custom_handler,
intercept_messages_for_offline_peers,
pending_events: Mutex::new(Vec::new()),
pending_events: Mutex::new(PendingEvents {
intercepted_msgs: Vec::new(),
peer_connecteds: Vec::new(),
}),
}
}

Expand Down Expand Up @@ -1071,18 +1143,61 @@ where
msgs
}

fn enqueue_event(&self, event: Event) {
fn enqueue_intercepted_event(&self, event: Event) {
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
let mut pending_events = self.pending_events.lock().unwrap();
let total_buffered_bytes: usize = pending_events
.iter()
.map(|ev| ev.serialized_length())
.sum();
let total_buffered_bytes: usize =
pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
return
}
pending_events.push(event);
pending_events.intercepted_msgs.push(event);
}

/// Processes any events asynchronously using the given handler.
///
/// Note that the event handler is called in the order each event was generated, however
/// futures are polled in parallel for some events to allow for parallelism where events do not
/// have an ordering requirement.
///
/// See the trait-level documentation of [`EventsProvider`] for requirements.
pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
&self, handler: H
) {
let mut intercepted_msgs = Vec::new();
let mut peer_connecteds = Vec::new();
{
let mut pending_events = self.pending_events.lock().unwrap();
core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
}

let mut futures = Vec::with_capacity(intercepted_msgs.len());
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
}
}
}

for ev in intercepted_msgs {
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
futures.push(Some(handler(ev)));
}
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
crate::util::async_poll::MultiFuturePoller(futures).await;

if peer_connecteds.len() <= 1 {
for event in peer_connecteds { handler(event).await; }
} else {
let mut futures = Vec::new();
for event in peer_connecteds {
futures.push(Some(handler(event)));
}
crate::util::async_poll::MultiFuturePoller(futures).await;
}
}
}

Expand Down Expand Up @@ -1129,7 +1244,20 @@ where
}
}
let mut events = Vec::new();
core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
{
let mut pending_events = self.pending_events.lock().unwrap();
#[cfg(debug_assertions)] {
for ev in pending_events.intercepted_msgs.iter() {
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
}
for ev in pending_events.peer_connecteds.iter() {
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
}
}
core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
events.append(&mut pending_events.peer_connecteds);
pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
}
for ev in events {
handler.handle_event(ev);
}
Expand Down Expand Up @@ -1207,7 +1335,7 @@ where
log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
},
_ if self.intercept_messages_for_offline_peers => {
self.enqueue_event(
self.enqueue_intercepted_event(
Event::OnionMessageIntercepted {
peer_node_id: next_node_id, message: onion_message
}
Expand Down Expand Up @@ -1235,7 +1363,7 @@ where
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
.mark_connected();
if self.intercept_messages_for_offline_peers {
self.enqueue_event(
self.pending_events.lock().unwrap().peer_connecteds.push(
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
);
}
Expand Down
43 changes: 43 additions & 0 deletions lightning/src/util/async_poll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// This file is Copyright its original authors, visible in version control
// history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.

//! Some utilities to make working with the standard library's [`Future`]s easier

use crate::prelude::*;
use core::future::Future;
use core::marker::Unpin;
use core::pin::Pin;
use core::task::{Context, Poll};

pub(crate) struct MultiFuturePoller<F: Future<Output = ()> + Unpin>(pub Vec<Option<F>>);

impl<F: Future<Output = ()> + Unpin> Future for MultiFuturePoller<F> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut have_pending_futures = false;
for fut_option in self.get_mut().0.iter_mut() {
let mut fut = match fut_option.take() {
None => continue,
Some(fut) => fut,
};
match Pin::new(&mut fut).poll(cx) {
Poll::Ready(()) => {},
Poll::Pending => {
have_pending_futures = true;
*fut_option = Some(fut);
},
}
}
if have_pending_futures {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
1 change: 1 addition & 0 deletions lightning/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod base32;
pub(crate) mod base32;

pub(crate) mod atomic_counter;
pub(crate) mod async_poll;
pub(crate) mod byte_utils;
pub(crate) mod transaction_utils;
pub(crate) mod time;
Expand Down
Loading