Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeerManager: add separate queue for gossip broadcasts #1683

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 42 additions & 25 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ struct Peer {

pending_outbound_buffer: LinkedList<Vec<u8>>,
pending_outbound_buffer_first_msg_offset: usize,
// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
// channel messages over them.
gossip_broadcast_buffer: LinkedList<Vec<u8>>,
awaiting_write_event: bool,

pending_read_buffer: Vec<u8>,
Expand Down Expand Up @@ -389,21 +392,27 @@ impl Peer {
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
}

/// Determines if we should push additional gossip messages onto a peer's outbound buffer for
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
/// been drained.
/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
/// outbound buffer. This is checked every time the peer's buffer may have been drained.
fn should_buffer_gossip_backfill(&self) -> bool {
self.pending_outbound_buffer.is_empty() &&
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
}

/// Returns whether this peer's buffer is full and we should drop gossip messages.
fn buffer_full_drop_gossip(&self) -> bool {
if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|| self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
return false
}
true
/// Determines if we should push additional gossip broadcast messages onto a peer's outbound
/// buffer. This is checked every time the peer's buffer may have been drained.
fn should_buffer_gossip_broadcast(&self) -> bool {
self.pending_outbound_buffer.is_empty()
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
}

/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
let total_outbound_buffered =
self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();

total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
}
}

Expand Down Expand Up @@ -671,6 +680,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P

pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,

pending_read_buffer,
Expand Down Expand Up @@ -717,6 +727,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P

pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,

pending_read_buffer,
Expand All @@ -737,6 +748,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P

fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
if peer.should_buffer_gossip_broadcast() {
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
peer.pending_outbound_buffer.push_back(msg);
}
}
if peer.should_buffer_gossip_backfill() {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
Expand Down Expand Up @@ -851,12 +867,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
}
}

/// Append a message to a peer's pending outbound/write buffer
fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
peer.msgs_sent_since_pong += 1;
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
}

/// Append a message to a peer's pending outbound/write buffer
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
let mut buffer = VecWriter(Vec::with_capacity(2048));
Expand All @@ -867,7 +877,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
} else {
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
}
self.enqueue_encoded_message(peer, &buffer.0);
peer.msgs_sent_since_pong += 1;
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..]));
}

/// Append a message to a peer's pending outbound/write gossip broadcast buffer
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
peer.msgs_sent_since_pong += 1;
peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
}

fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
Expand Down Expand Up @@ -1325,7 +1342,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
if peer.buffer_full_drop_gossip() {
if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
Expand All @@ -1336,7 +1353,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
}
},
wire::Message::NodeAnnouncement(ref msg) => {
Expand All @@ -1349,7 +1366,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
!peer.should_forward_node_announcement(msg.contents.node_id) {
continue
}
if peer.buffer_full_drop_gossip() {
if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
Expand All @@ -1359,7 +1376,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
}
},
wire::Message::ChannelUpdate(ref msg) => {
Expand All @@ -1372,14 +1389,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
if peer.buffer_full_drop_gossip() {
if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
Expand Down