Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start tracking ChannelMonitors by channel ID in ChainMonitor and ChannelManager #3554

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 60 additions & 49 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
/// for broadcast messages, where ordering isn't as strict).
pub(super) pending_msg_events: Vec<MessageSendEvent>,
/// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the
/// user but which have not yet completed.
/// user but which have not yet completed. We still keep the funding outpoint around to backfill
/// the legacy TLV field to support downgrading.
///
/// Note that the channel may no longer exist. For example if the channel was closed but we
/// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update
Expand All @@ -1321,7 +1322,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
/// where we complete one [`ChannelMonitorUpdate`] (but there are more pending as background
/// events) but we conclude all pending [`ChannelMonitorUpdate`]s have completed and its safe
/// to run post-completion actions.
in_flight_monitor_updates: BTreeMap<OutPoint, Vec<ChannelMonitorUpdate>>,
in_flight_monitor_updates: BTreeMap<(OutPoint, ChannelId), Vec<ChannelMonitorUpdate>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if it would simplify anything if either the ChannelId or OutPoint? were tupled with the value instead of the key.

/// Map from a specific channel to some action(s) that should be taken when all pending
/// [`ChannelMonitorUpdate`]s for the channel complete updating.
///
Expand Down Expand Up @@ -3284,7 +3285,7 @@ macro_rules! handle_new_monitor_update {
$chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident,
_internal_outer, $completed: expr
) => { {
$in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
$in_flight_updates = $peer_state.in_flight_monitor_updates.entry(($funding_txo, $chan_id))
.or_insert_with(Vec::new);
// During startup, we push monitor updates as background events through to here in
// order to replay updates that were in-flight when we shut down. Thus, we have to
Expand Down Expand Up @@ -4010,7 +4011,7 @@ where
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mtx) = per_peer_state.get(&shutdown_res.counterparty_node_id) {
let mut peer_state = peer_state_mtx.lock().unwrap();
if peer_state.in_flight_monitor_updates.get(&funding_txo).map(|l| l.is_empty()).unwrap_or(true) {
if peer_state.in_flight_monitor_updates.get(&(funding_txo, shutdown_res.channel_id)).map(|l| l.is_empty()).unwrap_or(true) {
let update_actions = peer_state.monitor_update_blocked_actions
.remove(&shutdown_res.channel_id).unwrap_or(Vec::new());

Expand Down Expand Up @@ -7574,7 +7575,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
let peer_state = &mut *peer_state_lock;

let remaining_in_flight =
if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) {
if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(&(*funding_txo, *channel_id)) {
pending.retain(|upd| upd.update_id > highest_applied_update_id);
pending.len()
} else { 0 };
Expand Down Expand Up @@ -12986,12 +12987,22 @@ where
pending_claiming_payments = None;
}

let mut in_flight_monitor_updates: Option<HashMap<(&PublicKey, &OutPoint), &Vec<ChannelMonitorUpdate>>> = None;
let mut legacy_in_flight_monitor_updates: Option<HashMap<(&PublicKey, &OutPoint), &Vec<ChannelMonitorUpdate>>> = None;
let mut in_flight_monitor_updates: Option<HashMap<(&PublicKey, &ChannelId), &Vec<ChannelMonitorUpdate>>> = None;
for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() {
for ((funding_txo, channel_id), updates) in peer_state.in_flight_monitor_updates.iter() {
if !updates.is_empty() {
if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(new_hash_map()); }
in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates);
if legacy_in_flight_monitor_updates.is_none() {
legacy_in_flight_monitor_updates = Some(new_hash_map());
}
legacy_in_flight_monitor_updates.as_mut().unwrap()
.insert((counterparty_id, funding_txo), updates);

if in_flight_monitor_updates.is_none() {
in_flight_monitor_updates = Some(new_hash_map());
}
in_flight_monitor_updates.as_mut().unwrap()
.insert((counterparty_id, channel_id), updates);
}
}
}
Expand All @@ -13006,11 +13017,12 @@ where
(7, self.fake_scid_rand_bytes, required),
(8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
(9, htlc_purposes, required_vec),
(10, in_flight_monitor_updates, option),
(10, legacy_in_flight_monitor_updates, option),
(11, self.probing_cookie_secret, required),
(13, htlc_onion_fields, optional_vec),
(14, decode_update_add_htlcs_opt, option),
(15, self.inbound_payment_id_secret, required),
(17, in_flight_monitor_updates, required),
});

Ok(())
Expand Down Expand Up @@ -13146,8 +13158,7 @@ where
/// runtime settings which were stored when the ChannelManager was serialized.
pub default_config: UserConfig,

/// A map from channel funding outpoints to ChannelMonitors for those channels (ie
/// value.context.get_funding_txo() should be the key).
/// A map from channel IDs to ChannelMonitors for those channels.
///
/// If a monitor is inconsistent with the channel state during deserialization the channel will
/// be force-closed using the data in the ChannelMonitor and the channel will be dropped. This
Expand All @@ -13158,7 +13169,7 @@ where
/// this struct.
///
/// This is not exported to bindings users because we have no HashMap bindings
pub channel_monitors: HashMap<OutPoint, &'a ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>>,
pub channel_monitors: HashMap<ChannelId, &'a ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>>,
}

impl<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, MR: Deref, L: Deref>
Expand Down Expand Up @@ -13187,7 +13198,7 @@ where
entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor,
tx_broadcaster, router, message_router, logger, default_config,
channel_monitors: hash_map_from_iter(
channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) })
channel_monitors.drain(..).map(|monitor| { (monitor.channel_id(), monitor) })
),
}
}
Expand Down Expand Up @@ -13250,22 +13261,21 @@ where

let mut failed_htlcs = Vec::new();
let channel_count: u64 = Readable::read(reader)?;
let mut funding_txo_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128));
let mut channel_id_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128));
let mut per_peer_state = hash_map_with_capacity(cmp::min(channel_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState<SP>>)>()));
let mut outpoint_to_peer = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
let mut short_to_chan_info = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
let mut channel_closures = VecDeque::new();
let mut close_background_events = Vec::new();
let mut funding_txo_to_channel_id = hash_map_with_capacity(channel_count as usize);
for _ in 0..channel_count {
let mut channel: FundedChannel<SP> = FundedChannel::read(reader, (
&args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
))?;
let logger = WithChannelContext::from(&args.logger, &channel.context, None);
let channel_id = channel.context.channel_id();
channel_id_set.insert(channel_id.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone not needed?

let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
funding_txo_to_channel_id.insert(funding_txo, channel.context.channel_id());
funding_txo_set.insert(funding_txo.clone());
if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
if let Some(ref mut monitor) = args.channel_monitors.get_mut(&channel_id) {
if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() ||
channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() ||
channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() ||
Expand Down Expand Up @@ -13348,9 +13358,7 @@ where
if let Some(short_channel_id) = channel.context.get_short_channel_id() {
short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
}
if let Some(funding_txo) = channel.context.get_funding_txo() {
outpoint_to_peer.insert(funding_txo, channel.context.get_counterparty_node_id());
}
outpoint_to_peer.insert(funding_txo, channel.context.get_counterparty_node_id());
per_peer_state.entry(channel.context.get_counterparty_node_id())
.or_insert_with(|| Mutex::new(empty_peer_state()))
.get_mut().unwrap()
Expand Down Expand Up @@ -13380,8 +13388,8 @@ where
}
}

for (funding_txo, monitor) in args.channel_monitors.iter() {
if !funding_txo_set.contains(funding_txo) {
for (channel_id, monitor) in args.channel_monitors.iter() {
if !channel_id_set.contains(channel_id) {
let mut should_queue_fc_update = false;
if let Some(counterparty_node_id) = monitor.get_counterparty_node_id() {
// If the ChannelMonitor had any updates, we may need to update it further and
Expand Down Expand Up @@ -13419,10 +13427,11 @@ where
updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
channel_id: Some(monitor.channel_id()),
};
let funding_txo = monitor.get_funding_txo().0;
if let Some(counterparty_node_id) = monitor.get_counterparty_node_id() {
let update = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
funding_txo: *funding_txo,
funding_txo,
channel_id,
update: monitor_update,
};
Expand All @@ -13435,7 +13444,7 @@ where
// generate a `ChannelMonitorUpdate` for it aside from this
// `ChannelForceClosed` one.
monitor_update.update_id = u64::MAX;
close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, channel_id, monitor_update)));
close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, channel_id, monitor_update)));
}
}
}
Expand Down Expand Up @@ -13535,7 +13544,10 @@ where
let mut pending_claiming_payments = Some(new_hash_map());
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
let mut events_override = None;
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
let mut _legacy_in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
// We use this one over the legacy since they represent the same data, just with a different
// key. We still need to read the legacy one as it's an even TLV.
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
let mut inbound_payment_id_secret = None;
read_tlv_fields!(reader, {
Expand All @@ -13548,11 +13560,12 @@ where
(7, fake_scid_rand_bytes, option),
(8, events_override, option),
(9, claimable_htlc_purposes, optional_vec),
(10, in_flight_monitor_updates, option),
(10, _legacy_in_flight_monitor_updates, option),
(11, probing_cookie_secret, option),
(13, claimable_htlc_onion_fields, optional_vec),
(14, decode_update_add_htlcs, option),
(15, inbound_payment_id_secret, option),
(17, in_flight_monitor_updates, required),
});
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
if fake_scid_rand_bytes.is_none() {
Expand Down Expand Up @@ -13599,19 +13612,20 @@ where
// Because the actual handling of the in-flight updates is the same, it's macro'ized here:
let mut pending_background_events = Vec::new();
macro_rules! handle_in_flight_updates {
($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr,
$monitor: expr, $peer_state: expr, $logger: expr, $channel_info_log: expr
($counterparty_node_id: expr, $chan_in_flight_upds: expr, $monitor: expr,
$peer_state: expr, $logger: expr, $channel_info_log: expr
) => { {
let mut max_in_flight_update_id = 0;
$chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id());
let funding_txo = $monitor.get_funding_txo().0;
for update in $chan_in_flight_upds.iter() {
log_trace!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}",
update.update_id, $channel_info_log, &$monitor.channel_id());
max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id);
pending_background_events.push(
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id: $counterparty_node_id,
funding_txo: $funding_txo,
funding_txo: funding_txo.clone(),
channel_id: $monitor.channel_id(),
update: update.clone(),
});
Expand All @@ -13630,7 +13644,7 @@ where
.and_modify(|v| *v = cmp::max(max_in_flight_update_id, *v))
.or_insert(max_in_flight_update_id);
}
if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() {
if $peer_state.in_flight_monitor_updates.insert((funding_txo, $monitor.channel_id()), $chan_in_flight_upds).is_some() {
log_error!($logger, "Duplicate in-flight monitor update set for the same channel!");
return Err(DecodeError::InvalidValue);
}
Expand All @@ -13641,28 +13655,27 @@ where
for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() {
let mut peer_state_lock = peer_state_mtx.lock().unwrap();
let peer_state = &mut *peer_state_lock;
for phase in peer_state.channel_by_id.values() {
for (channel_id, phase) in &peer_state.channel_by_id {
if let Some(chan) = phase.as_funded() {
let logger = WithChannelContext::from(&args.logger, &chan.context, None);

// Channels that were persisted have to be funded, otherwise they should have been
// discarded.
let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
let monitor = args.channel_monitors.get(&funding_txo)
let monitor = args.channel_monitors.get(channel_id)
.expect("We already checked for monitor presence when loading channels");
let mut max_in_flight_update_id = monitor.get_latest_update_id();
if let Some(in_flight_upds) = &mut in_flight_monitor_updates {
if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) {
if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, *channel_id)) {
max_in_flight_update_id = cmp::max(max_in_flight_update_id,
handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds,
funding_txo, monitor, peer_state, logger, ""));
monitor, peer_state, logger, ""));
}
}
if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id {
// If the channel is ahead of the monitor, return DangerousValue:
log_error!(logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight",
chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id);
channel_id, monitor.get_latest_update_id(), max_in_flight_update_id);
log_error!(logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id());
log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
Expand All @@ -13680,23 +13693,21 @@ where
}

if let Some(in_flight_upds) = in_flight_monitor_updates {
for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds {
let channel_id = funding_txo_to_channel_id.get(&funding_txo).copied();
let logger = WithContext::from(&args.logger, Some(counterparty_id), channel_id, None);
if let Some(monitor) = args.channel_monitors.get(&funding_txo) {
for ((counterparty_id, channel_id), mut chan_in_flight_updates) in in_flight_upds {
let logger = WithContext::from(&args.logger, Some(counterparty_id), Some(channel_id), None);
if let Some(monitor) = args.channel_monitors.get(&channel_id) {
// Now that we've removed all the in-flight monitor updates for channels that are
// still open, we need to replay any monitor updates that are for closed channels,
// creating the neccessary peer_state entries as we go.
let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| {
Mutex::new(empty_peer_state())
});
let mut peer_state = peer_state_mutex.lock().unwrap();
handle_in_flight_updates!(counterparty_id, chan_in_flight_updates,
funding_txo, monitor, peer_state, logger, "closed ");
handle_in_flight_updates!(counterparty_id, chan_in_flight_updates, monitor,
peer_state, logger, "closed ");
} else {
log_error!(logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!");
log_error!(logger, " The ChannelMonitor for channel {} is missing.", if let Some(channel_id) =
channel_id { channel_id.to_string() } else { format!("with outpoint {}", funding_txo) } );
log_error!(logger, " The ChannelMonitor for channel {} is missing.", channel_id.to_string());
log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
Expand Down Expand Up @@ -13748,7 +13759,7 @@ where
.or_insert(update.update_id);
}
let in_flight_updates = per_peer_state.in_flight_monitor_updates
.entry(*funding_txo)
.entry((*funding_txo, *channel_id))
.or_insert_with(Vec::new);
debug_assert!(!in_flight_updates.iter().any(|upd| upd == update));
in_flight_updates.push(update.clone());
Expand Down Expand Up @@ -13873,7 +13884,7 @@ where
.filter_map(|(htlc_source, (htlc, preimage_opt))| {
if let HTLCSource::PreviousHopData(prev_hop) = &htlc_source {
if let Some(payment_preimage) = preimage_opt {
let inbound_edge_monitor = args.channel_monitors.get(&prev_hop.outpoint);
let inbound_edge_monitor = args.channel_monitors.get(&prev_hop.channel_id);
// Note that for channels which have gone to chain,
// `get_all_current_outbound_htlcs` is never pruned and always returns
// a constant set until the monitor is removed/archived. Thus, we
Expand Down Expand Up @@ -14361,7 +14372,7 @@ where
);
}
}
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.channel_id) {
// Note that this is unsafe as we no longer require the
// `ChannelMonitor`s to be re-persisted prior to this
// `ChannelManager` being persisted after we get started running.
Expand Down
Loading