Skip to content

Commit

Permalink
Fix handling of transactions_confirmed and channel_ready
Browse files Browse the repository at this point in the history
  • Loading branch information
optout21 committed Sep 16, 2024
1 parent cdb6ab0 commit 0dfca33
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 103 deletions.
17 changes: 17 additions & 0 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,23 @@ impl<SP: Deref> FundedAndVariants<SP> where SP::Target: SignerProvider {
let n = self.funded_channels.len();
&mut self.funded_channels[n - 1]
}
/// Return all the funded channels
pub fn all_funded(&mut self) -> Vec<&mut Channel<SP>> {
// self.funded_channels
self.funded_channels.iter_mut().collect::<Vec<_>>()
}
/// Keep only the one confirmed channel, drop the other variants
pub fn keep_one_confirmed(&mut self, channel_index: usize) {
self.debug();
if self.funded_channels.len() > 1 {
println!("QQQ FundedAndVariants keep_one_confirmed collapsing {} to {}", self.funded_channels.len(), channel_index);
debug_assert!(channel_index < self.funded_channels.len());
self.funded_channels = vec![self.funded_channels.remove(channel_index)];
self.unfunded_channel_out = None;
self.unfunded_channel_in = None;
self.debug();
}
}
}

/// The `ChannelPhase` enum describes the current phase in life of a lightning channel with each of
Expand Down
232 changes: 129 additions & 103 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8344,34 +8344,40 @@ where
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan_phase_entry) => {
if let ChannelPhase::Funded(ch) = chan_phase_entry.get_mut() {
let chan = ch.channel_mut();
let logger = WithChannelContext::from(&self.logger, &chan.context, None);
let announcement_sigs_opt = try_chan_phase_entry!(self, chan.channel_ready(&msg, &self.node_signer,
self.chain_hash, &self.default_configuration, &self.best_block.read().unwrap(), &&logger), chan_phase_entry);
if let Some(announcement_sigs) = announcement_sigs_opt {
log_trace!(logger, "Sending announcement_signatures for channel {}", chan.context.channel_id());
peer_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
node_id: counterparty_node_id.clone(),
msg: announcement_sigs,
});
} else if chan.context.is_usable() {
// If we're sending an announcement_signatures, we'll send the (public)
// channel_update after sending a channel_announcement when we receive our
// counterparty's announcement_signatures. Thus, we only bother to send a
// channel_update here if the channel is not public, i.e. we're not sending an
// announcement_signatures.
log_trace!(logger, "Sending private initial channel_update for our counterparty on channel {}", chan.context.channel_id());
if let Ok(msg) = self.get_channel_update_for_unicast(chan) {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
// handle on all channels
// TODO move to channel, FundedAndVariants
// let chan = ch.channel_mut();
for ch_idx in 0..ch.all_funded().len() {
let chan = &mut ch.all_funded()[ch_idx];

let logger = WithChannelContext::from(&self.logger, &chan.context, None);
let announcement_sigs_opt = try_chan_phase_entry!(self, chan.channel_ready(&msg, &self.node_signer,
self.chain_hash, &self.default_configuration, &self.best_block.read().unwrap(), &&logger), chan_phase_entry);
if let Some(announcement_sigs) = announcement_sigs_opt {
log_trace!(logger, "Sending announcement_signatures for channel {}", chan.context.channel_id());
peer_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
node_id: counterparty_node_id.clone(),
msg,
msg: announcement_sigs,
});
} else if chan.context.is_usable() {
// If we're sending an announcement_signatures, we'll send the (public)
// channel_update after sending a channel_announcement when we receive our
// counterparty's announcement_signatures. Thus, we only bother to send a
// channel_update here if the channel is not public, i.e. we're not sending an
// announcement_signatures.
log_trace!(logger, "Sending private initial channel_update for our counterparty on channel {}", chan.context.channel_id());
if let Ok(msg) = self.get_channel_update_for_unicast(chan) {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
node_id: counterparty_node_id.clone(),
msg,
});
}
}
}

{
let mut pending_events = self.pending_events.lock().unwrap();
emit_channel_ready_event!(pending_events, chan);
{
let mut pending_events = self.pending_events.lock().unwrap();
emit_channel_ready_event!(pending_events, chan);
}
}

Ok(())
Expand Down Expand Up @@ -10541,96 +10547,116 @@ where
#[cfg(any(dual_funding, splicing))]
ChannelPhase::UnfundedOutboundV2(_) | ChannelPhase::UnfundedInboundV2(_) => true,
ChannelPhase::Funded(ch) => {
let channel = ch.channel_mut(); // TODO on all?
// let was_ready = channel.context.is_ready();

let res = f(channel);
if let Ok((channel_ready_opt, mut timed_out_pending_htlcs, announcement_sigs)) = res {
for (source, payment_hash) in timed_out_pending_htlcs.drain(..) {
let (failure_code, data) = self.get_htlc_inbound_temp_fail_err_and_data(0x1000|14 /* expiry_too_soon */, &channel);
timed_out_htlcs.push((source, payment_hash, HTLCFailReason::reason(failure_code, data),
HTLCDestination::NextHopChannel { node_id: Some(channel.context.get_counterparty_node_id()), channel_id: channel.context.channel_id() }));
// we need to iterate on all channel variants
// TODO move this to channel, to FundedAndVariants, move the collapsing logic there
// let channel = ch.channel_mut();
let mut channel_ready_index = None;
// for (ch_idx, _c) in ch.all_funded().iter().enumerate() {
for ch_idx in 0..ch.all_funded().len() {
let channel = &mut ch.all_funded()[ch_idx];

if channel_ready_index.is_some() {
// If we have one confirmation, stop, as it will be the only one remaining after collapse
// break;
}
let logger = WithChannelContext::from(&self.logger, &channel.context, None);
if let Some(channel_ready) = channel_ready_opt {
send_channel_ready!(self, pending_msg_events, channel, channel_ready);
if channel.context.is_usable() {
log_trace!(logger, "Sending channel_ready with private initial channel_update for our counterparty on channel {}", channel.context.channel_id());
if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
node_id: channel.context.get_counterparty_node_id(),
msg,
});

// let was_ready = channel.context.is_ready();

let res = f(channel);
if let Ok((channel_ready_opt, mut timed_out_pending_htlcs, announcement_sigs)) = res {
for (source, payment_hash) in timed_out_pending_htlcs.drain(..) {
let (failure_code, data) = self.get_htlc_inbound_temp_fail_err_and_data(0x1000|14 /* expiry_too_soon */, &channel);
timed_out_htlcs.push((source, payment_hash, HTLCFailReason::reason(failure_code, data),
HTLCDestination::NextHopChannel { node_id: Some(channel.context.get_counterparty_node_id()), channel_id: channel.context.channel_id() }));
}
let logger = WithChannelContext::from(&self.logger, &channel.context, None);
if let Some(channel_ready) = channel_ready_opt {

channel_ready_index = Some(ch_idx);

send_channel_ready!(self, pending_msg_events, channel, channel_ready);
if channel.context.is_usable() {
log_trace!(logger, "Sending channel_ready with private initial channel_update for our counterparty on channel {}", channel.context.channel_id());
if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
node_id: channel.context.get_counterparty_node_id(),
msg,
});
}
} else {
log_trace!(logger, "Sending channel_ready WITHOUT channel_update for {}", channel.context.channel_id());
}
} else {
log_trace!(logger, "Sending channel_ready WITHOUT channel_update for {}", channel.context.channel_id());
}
}

{
let mut pending_events = self.pending_events.lock().unwrap();
emit_channel_ready_event!(pending_events, channel);
}
{
let mut pending_events = self.pending_events.lock().unwrap();
emit_channel_ready_event!(pending_events, channel);
}

if let Some(announcement_sigs) = announcement_sigs {
log_trace!(logger, "Sending announcement_signatures for channel {}", channel.context.channel_id());
pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
node_id: channel.context.get_counterparty_node_id(),
msg: announcement_sigs,
});
if let Some(height) = height_opt {
if let Some(announcement) = channel.get_signed_channel_announcement(&self.node_signer, self.chain_hash, height, &self.default_configuration) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement {
msg: announcement,
// Note that announcement_signatures fails if the channel cannot be announced,
// so get_channel_update_for_broadcast will never fail by the time we get here.
update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()),
});
if let Some(announcement_sigs) = announcement_sigs {
log_trace!(logger, "Sending announcement_signatures for channel {}", channel.context.channel_id());
pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
node_id: channel.context.get_counterparty_node_id(),
msg: announcement_sigs,
});
if let Some(height) = height_opt {
if let Some(announcement) = channel.get_signed_channel_announcement(&self.node_signer, self.chain_hash, height, &self.default_configuration) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement {
msg: announcement,
// Note that announcement_signatures fails if the channel cannot be announced,
// so get_channel_update_for_broadcast will never fail by the time we get here.
update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()),
});
}
}
}
}
if channel.is_our_channel_ready() {
if let Some(real_scid) = channel.context.get_short_channel_id() {
// If we sent a 0conf channel_ready, and now have an SCID, we add it
// to the short_to_chan_info map here. Note that we check whether we
// can relay using the real SCID at relay-time (i.e.
// enforce option_scid_alias then), and if the funding tx is ever
// un-confirmed we force-close the channel, ensuring short_to_chan_info
// is always consistent.
let mut short_to_chan_info = self.short_to_chan_info.write().unwrap();
let scid_insert = short_to_chan_info.insert(real_scid, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
assert!(scid_insert.is_none() || scid_insert.unwrap() == (channel.context.get_counterparty_node_id(), channel.context.channel_id()),
"SCIDs should never collide - ensure you weren't behind by a full {} blocks when creating channels",
fake_scid::MAX_SCID_BLOCKS_FROM_NOW);
if channel.is_our_channel_ready() {
if let Some(real_scid) = channel.context.get_short_channel_id() {
// If we sent a 0conf channel_ready, and now have an SCID, we add it
// to the short_to_chan_info map here. Note that we check whether we
// can relay using the real SCID at relay-time (i.e.
// enforce option_scid_alias then), and if the funding tx is ever
// un-confirmed we force-close the channel, ensuring short_to_chan_info
// is always consistent.
let mut short_to_chan_info = self.short_to_chan_info.write().unwrap();
let scid_insert = short_to_chan_info.insert(real_scid, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
assert!(scid_insert.is_none() || scid_insert.unwrap() == (channel.context.get_counterparty_node_id(), channel.context.channel_id()),
"SCIDs should never collide - ensure you weren't behind by a full {} blocks when creating channels",
fake_scid::MAX_SCID_BLOCKS_FROM_NOW);
}
}
}

// let is_ready = channel.context.is_ready();
// if !was_ready && is_ready {
// println!("QQQ Channel became READY {}", channel.context.channel_id());
// }
} else if let Err(reason) = res {
update_maps_on_chan_removal!(self, &channel.context);
// It looks like our counterparty went on-chain or funding transaction was
// reorged out of the main chain. Close the channel.
let reason_message = format!("{}", reason);
failed_channels.push(channel.context.force_shutdown(true, reason));
if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
// let is_ready = channel.context.is_ready();
// if !was_ready && is_ready {
// println!("QQQ Channel became READY {}", channel.context.channel_id());
// }
} else if let Err(reason) = res {
update_maps_on_chan_removal!(self, &channel.context);
// It looks like our counterparty went on-chain or funding transaction was
// reorged out of the main chain. Close the channel.
let reason_message = format!("{}", reason);
failed_channels.push(channel.context.force_shutdown(true, reason));
if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
pending_msg_events.push(events::MessageSendEvent::HandleError {
node_id: channel.context.get_counterparty_node_id(),
action: msgs::ErrorAction::DisconnectPeer {
msg: Some(msgs::ErrorMessage {
channel_id: channel.context.channel_id(),
data: reason_message,
})
},
});
return false;
}
pending_msg_events.push(events::MessageSendEvent::HandleError {
node_id: channel.context.get_counterparty_node_id(),
action: msgs::ErrorAction::DisconnectPeer {
msg: Some(msgs::ErrorMessage {
channel_id: channel.context.channel_id(),
data: reason_message,
})
},
});
return false;
} // for channel
if let Some(channel_ready_idx) = channel_ready_index {
// we have a channel ready, collapse variants
ch.keep_one_confirmed(channel_ready_idx);
}
true
}
Expand Down

0 comments on commit 0dfca33

Please sign in to comment.