Skip to content

Commit

Permalink
Backfill gossip without buffering directly in LDK
Browse files Browse the repository at this point in the history
Instead of backfilling gossip by buffering (up to) ten messages at
a time, only buffer one message at a time, as the peers' outbound
socket buffer drains. This moves the outbound backfill messages out
of `PeerHandler` and into the operating system buffer, where it
arguably belongs.

Not buffering causes us to walk the gossip B-Trees somewhat more
often, but avoids allocating vecs for the responses. While its
probably (without having benchmarked it) a net performance loss, it
simplifies buffer tracking and leaves us with more room to play
with the buffer sizing constants as we add onion message forwarding
which is an important win.

Note that because we change how often we check if we're out of
messages to send before pinging, we slightly change how many
messages are exchanged at once, impacting the
`test_do_attempt_write_data` constants.
  • Loading branch information
TheBlueMatt committed Aug 10, 2022
1 parent 22c1857 commit 4aaa673
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 109 deletions.
4 changes: 2 additions & 2 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@ mod tests {
fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
fn get_next_channel_announcements(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { None }
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> { None }
fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init) { }
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
Expand Down
12 changes: 6 additions & 6 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,20 +318,20 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
);
let mut chan_progress = 0;
loop {
let orig_announcements = self.gossip_sync.get_next_channel_announcements(chan_progress, 255);
let deserialized_announcements = gossip_sync.get_next_channel_announcements(chan_progress, 255);
let orig_announcements = self.gossip_sync.get_next_channel_announcements(chan_progress);
let deserialized_announcements = gossip_sync.get_next_channel_announcements(chan_progress);
assert!(orig_announcements == deserialized_announcements);
chan_progress = match orig_announcements.last() {
chan_progress = match orig_announcements {
Some(announcement) => announcement.0.contents.short_channel_id + 1,
None => break,
};
}
let mut node_progress = None;
loop {
let orig_announcements = self.gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
let deserialized_announcements = gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
let orig_announcements = self.gossip_sync.get_next_node_announcements(node_progress.as_ref());
let deserialized_announcements = gossip_sync.get_next_node_announcements(node_progress.as_ref());
assert!(orig_announcements == deserialized_announcements);
node_progress = match orig_announcements.last() {
node_progress = match orig_announcements {
Some(announcement) => Some(announcement.contents.node_id),
None => break,
};
Expand Down
16 changes: 8 additions & 8 deletions lightning/src/ln/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,15 +915,15 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
/// Handle an incoming channel_update message, returning true if it should be forwarded on,
/// false or returning an Err otherwise.
fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError>;
/// Gets a subset of the channel announcements and updates required to dump our routing table
/// to a remote node, starting at the short_channel_id indicated by starting_point and
/// including the batch_amount entries immediately higher in numerical value than starting_point.
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
/// Gets a subset of the node announcements required to dump our routing table to a remote node,
/// starting at the node *after* the provided publickey and including batch_amount entries
/// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
/// Gets channel announcements and updates required to dump our routing table to a remote node,
/// starting at the short_channel_id indicated by starting_point and including announcements
/// for a single channel.
fn get_next_channel_announcements(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
/// Gets a node announcement required to dump our routing table to a remote node, starting at
/// the node *after* the provided publickey and including up to one announcement immediately
/// higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
/// If None is provided for starting_point, we start at the first node.
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement>;
/// Called when a connection is established with a peer. This can be used to
/// perform routing table synchronization using a strategy defined by the
/// implementor.
Expand Down
65 changes: 28 additions & 37 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
fn get_next_channel_announcements(&self, _starting_point: u64) ->
Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { None }
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> { None }
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
Expand Down Expand Up @@ -383,19 +383,17 @@ impl Peer {
}
}

/// Returns the number of gossip messages we can fit in this peer's buffer.
fn gossip_buffer_slots_available(&self) -> usize {
OUTBOUND_BUFFER_LIMIT_READ_PAUSE.saturating_sub(self.pending_outbound_buffer.len())
}

/// Returns whether we should be reading bytes from this peer, based on whether its outbound
/// buffer still has space and we don't need to pause reads to get some writes out.
fn should_read(&self) -> bool {
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
}

fn should_backfill_gossip(&self) -> bool {
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
/// Determines if we should push additional gossip messages onto a peer's outbound buffer for
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
/// been drained.
fn should_buffer_gossip_backfill(&self) -> bool {
self.pending_outbound_buffer.is_empty() &&
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
}

Expand Down Expand Up @@ -739,46 +737,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P

fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
if peer.should_backfill_gossip() {
if peer.should_buffer_gossip_backfill() {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
let steps = ((peer.gossip_buffer_slots_available() + 2) / 3) as u8;
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
self.enqueue_message(peer, announce);
if let &Some(ref update_a) = update_a_option {
self.enqueue_message(peer, update_a);
if let Some((announce, update_a_option, update_b_option)) =
self.message_handler.route_handler.get_next_channel_announcements(c)
{
self.enqueue_message(peer, &announce);
if let Some(update_a) = update_a_option {
self.enqueue_message(peer, &update_a);
}
if let &Some(ref update_b) = update_b_option {
self.enqueue_message(peer, update_b);
if let Some(update_b) = update_b_option {
self.enqueue_message(peer, &update_b);
}
peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
} else {
peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
}
},
InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
let steps = peer.gossip_buffer_slots_available() as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
for msg in all_messages.iter() {
self.enqueue_message(peer, msg);
if let Some(msg) = self.message_handler.route_handler.get_next_node_announcements(None) {
self.enqueue_message(peer, &msg);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
} else {
peer.sync_status = InitSyncTracker::NoSyncRequested;
}
},
InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
InitSyncTracker::NodesSyncing(key) => {
let steps = peer.gossip_buffer_slots_available() as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
for msg in all_messages.iter() {
self.enqueue_message(peer, msg);
if let Some(msg) = self.message_handler.route_handler.get_next_node_announcements(Some(&key)) {
self.enqueue_message(peer, &msg);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
}
if all_messages.is_empty() || all_messages.len() != steps as usize {
} else {
peer.sync_status = InitSyncTracker::NoSyncRequested;
}
},
Expand Down Expand Up @@ -2082,10 +2073,10 @@ mod tests {

// Check that each peer has received the expected number of channel updates and channel
// announcements.
assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
}

#[test]
Expand Down
73 changes: 32 additions & 41 deletions lightning/src/routing/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,10 @@ where C::Target: chain::Access, L::Target: Logger
Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY)
}

fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
let mut result = Vec::with_capacity(batch_amount as usize);
fn get_next_channel_announcements(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
let channels = self.network_graph.channels.read().unwrap();
let mut iter = channels.range(starting_point..);
while result.len() < batch_amount as usize {
loop {
if let Some((_, ref chan)) = iter.next() {
if chan.announcement_message.is_some() {
let chan_announcement = chan.announcement_message.clone().unwrap();
Expand All @@ -334,20 +333,18 @@ where C::Target: chain::Access, L::Target: Logger
if let Some(two_to_one) = chan.two_to_one.as_ref() {
two_to_one_announcement = two_to_one.last_update_message.clone();
}
result.push((chan_announcement, one_to_two_announcement, two_to_one_announcement));
return Some((chan_announcement, one_to_two_announcement, two_to_one_announcement));
} else {
// TODO: We may end up sending un-announced channel_updates if we are sending
// initial sync data while receiving announce/updates for this channel.
}
} else {
return result;
return None;
}
}
result
}

fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
let mut result = Vec::with_capacity(batch_amount as usize);
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
let nodes = self.network_graph.nodes.read().unwrap();
let mut iter = if let Some(pubkey) = starting_point {
let mut iter = nodes.range(NodeId::from_pubkey(pubkey)..);
Expand All @@ -356,18 +353,17 @@ where C::Target: chain::Access, L::Target: Logger
} else {
nodes.range::<NodeId, _>(..)
};
while result.len() < batch_amount as usize {
loop {
if let Some((_, ref node)) = iter.next() {
if let Some(node_info) = node.announcement_info.as_ref() {
if node_info.announcement_message.is_some() {
result.push(node_info.announcement_message.clone().unwrap());
if let Some(msg) = node_info.announcement_message.clone() {
return Some(msg);
}
}
} else {
return result;
return None;
}
}
result
}

/// Initiates a stateless sync of routing gossip information with a peer
Expand Down Expand Up @@ -2412,8 +2408,8 @@ mod tests {
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();

// Channels were not announced yet.
let channels_with_announcements = gossip_sync.get_next_channel_announcements(0, 1);
assert_eq!(channels_with_announcements.len(), 0);
let channels_with_announcements = gossip_sync.get_next_channel_announcements(0);
assert!(channels_with_announcements.is_none());

let short_channel_id;
{
Expand All @@ -2427,17 +2423,15 @@ mod tests {
}

// Contains initial channel announcement now.
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
assert_eq!(channels_with_announcements.len(), 1);
if let Some(channel_announcements) = channels_with_announcements.first() {
let &(_, ref update_1, ref update_2) = channel_announcements;
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id);
if let Some(channel_announcements) = channels_with_announcements {
let (_, ref update_1, ref update_2) = channel_announcements;
assert_eq!(update_1, &None);
assert_eq!(update_2, &None);
} else {
panic!();
}


{
// Valid channel update
let valid_channel_update = get_signed_channel_update(|unsigned_channel_update| {
Expand All @@ -2450,10 +2444,9 @@ mod tests {
}

// Now contains an initial announcement and an update.
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
assert_eq!(channels_with_announcements.len(), 1);
if let Some(channel_announcements) = channels_with_announcements.first() {
let &(_, ref update_1, ref update_2) = channel_announcements;
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id);
if let Some(channel_announcements) = channels_with_announcements {
let (_, ref update_1, ref update_2) = channel_announcements;
assert_ne!(update_1, &None);
assert_eq!(update_2, &None);
} else {
Expand All @@ -2473,19 +2466,18 @@ mod tests {
}

// Test that announcements with excess data won't be returned
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
assert_eq!(channels_with_announcements.len(), 1);
if let Some(channel_announcements) = channels_with_announcements.first() {
let &(_, ref update_1, ref update_2) = channel_announcements;
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id);
if let Some(channel_announcements) = channels_with_announcements {
let (_, ref update_1, ref update_2) = channel_announcements;
assert_eq!(update_1, &None);
assert_eq!(update_2, &None);
} else {
panic!();
}

// Further starting point have no channels after it
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id + 1000, 1);
assert_eq!(channels_with_announcements.len(), 0);
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id + 1000);
assert!(channels_with_announcements.is_none());
}

#[test]
Expand All @@ -2497,8 +2489,8 @@ mod tests {
let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);

// No nodes yet.
let next_announcements = gossip_sync.get_next_node_announcements(None, 10);
assert_eq!(next_announcements.len(), 0);
let next_announcements = gossip_sync.get_next_node_announcements(None);
assert!(next_announcements.is_none());

{
// Announce a channel to add 2 nodes
Expand All @@ -2509,10 +2501,9 @@ mod tests {
};
}


// Nodes were never announced
let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
assert_eq!(next_announcements.len(), 0);
let next_announcements = gossip_sync.get_next_node_announcements(None);
assert!(next_announcements.is_none());

{
let valid_announcement = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
Expand All @@ -2528,12 +2519,12 @@ mod tests {
};
}

let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
assert_eq!(next_announcements.len(), 2);
let next_announcements = gossip_sync.get_next_node_announcements(None);
assert!(next_announcements.is_some());

// Skip the first node.
let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
assert_eq!(next_announcements.len(), 1);
let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1));
assert!(next_announcements.is_some());

{
// Later announcement which should not be relayed (excess data) prevent us from sharing a node
Expand All @@ -2547,8 +2538,8 @@ mod tests {
};
}

let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
assert_eq!(next_announcements.len(), 0);
let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1));
assert!(next_announcements.is_none());
}

#[test]
Expand Down
Loading

0 comments on commit 4aaa673

Please sign in to comment.