Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer Storage Feature – Part 2 #3623

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b133e80
Add method to derive Peer Storage encryption key
Jan 17, 2025
3778b81
Add OurPeerStorage for serialized Peer Storage backups
adi2011 Feb 26, 2025
28d60ec
Enable ChainMonitor to distribute PeerStorage
Jan 18, 2025
ebefcd1
Distribute PeerStorage from ChainMonitor
Jan 19, 2025
65aee4d
Handle PeerStorageRetrieval in ChannelManager
Jan 19, 2025
35863ce
test: update test_peer_storage to validate latest changes
adi2011 Mar 7, 2025
777f6b1
fixup: Add OurPeerStorage for serialized Peer Storage backups
adi2011 Mar 10, 2025
c3a3859
fixup: Add method to derive Peer Storage encryption key
adi2011 Mar 10, 2025
ef2b8a6
fixup: Add to Enable ChainMonitor to distribute PeerStorage
adi2011 Mar 10, 2025
162b6c4
fixup: Add method to derive Peer Storage encryption key
adi2011 Mar 10, 2025
5a881a3
fixup: Add OurPeerStorage for serialized Peer Storage backups
adi2011 Mar 11, 2025
30d9068
fixup: Add method to derive Peer Storage encryption key
adi2011 Mar 11, 2025
5deb6ad
fixup: Distribute PeerStorage from ChainMonitor
adi2011 Mar 11, 2025
569caa4
fixup: Enable ChainMonitor to distribute PeerStorage
adi2011 Mar 11, 2025
58affb5
fixup: Handle PeerStorageRetrieval in ChannelManager
adi2011 Mar 11, 2025
4730cdc
fixup: Enable ChainMonitor to distribute PeerStorage
adi2011 Mar 11, 2025
5ef58e9
fixup: Add method to derive Peer Storage encryption key
adi2011 Mar 11, 2025
f6d6204
fixup: Add method to derive Peer Storage encryption key
adi2011 Mar 11, 2025
57341a1
fixup: Add method to derive Peer Storage encryption key
adi2011 Mar 11, 2025
42af2ab
fixup: Add OurPeerStorage for serialized Peer Storage backups
adi2011 Mar 11, 2025
0fd40de
fixup: Distribute PeerStorage from ChainMonitor
adi2011 Mar 11, 2025
0c10bd5
fixup: Handle PeerStorageRetrieval in ChannelManager
adi2011 Mar 11, 2025
4adeeee
fixup: Add OurPeerStorage for serialized Peer Storage backups
adi2011 Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl TestChainMonitor {
logger.clone(),
feeest,
Arc::clone(&persister),
keys.get_peer_storage_key(),
)),
logger,
keys,
Expand Down Expand Up @@ -337,6 +338,10 @@ impl NodeSigner for KeyProvider {
unreachable!()
}

fn get_peer_storage_key(&self) -> SecretKey {
SecretKey::from_slice(&[42; 32]).unwrap()
}

fn sign_bolt12_invoice(
&self, _invoice: &UnsignedBolt12Invoice,
) -> Result<schnorr::Signature, ()> {
Expand Down
21 changes: 15 additions & 6 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ type PeerMan<'a> = PeerManager<
Arc<dyn Logger>,
IgnoringMessageHandler,
Arc<KeyProvider>,
IgnoringMessageHandler,
>;

struct MoneyLossDetector<'a> {
Expand Down Expand Up @@ -420,6 +421,10 @@ impl NodeSigner for KeyProvider {
let secp_ctx = Secp256k1::signing_only();
Ok(secp_ctx.sign_ecdsa(&msg_hash, &self.node_secret))
}

fn get_peer_storage_key(&self) -> SecretKey {
SecretKey::from_slice(&[42; 32]).unwrap()
}
}

impl SignerProvider for KeyProvider {
Expand Down Expand Up @@ -608,20 +613,23 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger>) {
];

let broadcast = Arc::new(TestBroadcaster { txn_broadcasted: Mutex::new(Vec::new()) });

let keys_manager = Arc::new(KeyProvider {
node_secret: our_network_key.clone(),
inbound_payment_key: ExpandedKey::new(inbound_payment_key),
counter: AtomicU64::new(0),
signer_state: RefCell::new(new_hash_map()),
});

let monitor = Arc::new(chainmonitor::ChainMonitor::new(
None,
broadcast.clone(),
Arc::clone(&logger),
fee_est.clone(),
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }),
keys_manager.get_peer_storage_key(),
));

let keys_manager = Arc::new(KeyProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure why this code move is necessary?

Copy link
Author

@adi2011 adi2011 Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that we can use keys_manager.get_peer_storage_key() inside ChainMonitor::new().

node_secret: our_network_key.clone(),
inbound_payment_key: ExpandedKey::new(inbound_payment_key),
counter: AtomicU64::new(0),
signer_state: RefCell::new(new_hash_map()),
});
let network = Network::Bitcoin;
let best_block_timestamp = genesis_block(network).header.time;
let params = ChainParameters { network, best_block: BestBlock::from_network(network) };
Expand Down Expand Up @@ -653,6 +661,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger>) {
route_handler: gossip_sync.clone(),
onion_message_handler: IgnoringMessageHandler {},
custom_message_handler: IgnoringMessageHandler {},
send_only_message_handler: IgnoringMessageHandler {},
};
let random_data = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
Expand Down
4 changes: 4 additions & 0 deletions fuzz/src/onion_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ impl NodeSigner for KeyProvider {
) -> Result<bitcoin::secp256k1::ecdsa::Signature, ()> {
unreachable!()
}

fn get_peer_storage_key(&self) -> SecretKey {
unreachable!()
}
}

impl SignerProvider for KeyProvider {
Expand Down
7 changes: 5 additions & 2 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, Store>;
/// #
/// # struct Node<
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
Expand Down Expand Up @@ -1085,7 +1085,7 @@ mod tests {
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager};
use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager, NodeSigner};
use lightning::types::features::{ChannelFeatures, NodeFeatures};
use lightning::types::payment::PaymentHash;
use lightning::util::config::UserConfig;
Expand Down Expand Up @@ -1208,6 +1208,7 @@ mod tests {
Arc<test_utils::TestLogger>,
IgnoringMessageHandler,
Arc<KeysManager>,
IgnoringMessageHandler,
>,
>,
chain_monitor: Arc<ChainMonitor>,
Expand Down Expand Up @@ -1568,6 +1569,7 @@ mod tests {
logger.clone(),
fee_estimator.clone(),
kv_store.clone(),
keys_manager.get_peer_storage_key(),
));
let best_block = BestBlock::from_network(network);
let params = ChainParameters { network, best_block };
Expand Down Expand Up @@ -1621,6 +1623,7 @@ mod tests {
route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
onion_message_handler: messenger.clone(),
custom_message_handler: IgnoringMessageHandler {},
send_only_message_handler: IgnoringMessageHandler {},
};
let peer_manager = Arc::new(PeerManager::new(
msg_handler,
Expand Down
2 changes: 1 addition & 1 deletion lightning-liquidity/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ where {
/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger, MyFilter, MyStore>;
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
Expand Down
5 changes: 4 additions & 1 deletion lightning-liquidity/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#![allow(unused_macros)]

use lightning::chain::Filter;
use lightning::sign::EntropySource;
use lightning::sign::{EntropySource, NodeSigner};

use bitcoin::blockdata::constants::{genesis_block, ChainHash};
use bitcoin::blockdata::transaction::Transaction;
Expand Down Expand Up @@ -130,6 +130,7 @@ pub(crate) struct Node {
>,
>,
Arc<KeysManager>,
Arc<ChainMonitor>,
>,
>,
pub(crate) liquidity_manager:
Expand Down Expand Up @@ -430,6 +431,7 @@ pub(crate) fn create_liquidity_node(
logger.clone(),
fee_estimator.clone(),
kv_store.clone(),
keys_manager.get_peer_storage_key(),
));
let best_block = BestBlock::from_network(network);
let chain_params = ChainParameters { network, best_block };
Expand Down Expand Up @@ -465,6 +467,7 @@ pub(crate) fn create_liquidity_node(
chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
ChainHash::using_genesis_block(Network::Testnet),
)),
send_only_message_handler: Arc::clone(&chain_monitor),
route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
onion_message_handler: IgnoringMessageHandler {},
custom_message_handler: Arc::clone(&liquidity_manager),
Expand Down
3 changes: 3 additions & 0 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ mod tests {
route_handler: Arc::clone(&a_handler),
onion_message_handler: Arc::new(IgnoringMessageHandler {}),
custom_message_handler: Arc::new(IgnoringMessageHandler {}),
send_only_message_handler: Arc::new(IgnoringMessageHandler {}),
};
let a_manager = Arc::new(PeerManager::new(
a_msg_handler,
Expand All @@ -855,6 +856,7 @@ mod tests {
route_handler: Arc::clone(&b_handler),
onion_message_handler: Arc::new(IgnoringMessageHandler {}),
custom_message_handler: Arc::new(IgnoringMessageHandler {}),
send_only_message_handler: Arc::new(IgnoringMessageHandler {}),
};
let b_manager = Arc::new(PeerManager::new(
b_msg_handler,
Expand Down Expand Up @@ -917,6 +919,7 @@ mod tests {
onion_message_handler: Arc::new(IgnoringMessageHandler {}),
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler {}),
custom_message_handler: Arc::new(IgnoringMessageHandler {}),
send_only_message_handler: Arc::new(IgnoringMessageHandler {}),
};
let a_manager = Arc::new(PeerManager::new(
a_msg_handler,
Expand Down
79 changes: 76 additions & 3 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor};
use crate::chain::transaction::{OutPoint, TransactionData};
use crate::ln::types::ChannelId;
use crate::ln::msgs::{self, BaseMessageHandler, Init, MessageSendEvent};
use crate::ln::our_peer_storage::OurPeerStorage;
use crate::sign::ecdsa::EcdsaChannelSigner;
use crate::events::{self, Event, EventHandler, ReplayEvent};
use crate::util::logger::{Logger, WithContext};
use crate::util::errors::APIError;
use crate::util::persist::MonitorName;
use crate::util::wakers::{Future, Notifier};
use crate::ln::channel_state::ChannelDetails;

use crate::prelude::*;
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use crate::types::features::{InitFeatures, NodeFeatures};
use core::ops::Deref;
use core::sync::atomic::{AtomicUsize, Ordering};
use bitcoin::secp256k1::PublicKey;
use bitcoin::secp256k1::{PublicKey, SecretKey};

/// `Persist` defines behavior for persisting channel monitors: this could mean
/// writing once to disk, and/or uploading to one or more backup services.
Expand Down Expand Up @@ -253,6 +255,9 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
/// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
/// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process).
event_notifier: Notifier,
pending_send_only_events: Mutex<Vec<MessageSendEvent>>,

our_peerstorage_encryption_key: SecretKey,
}

impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
Expand Down Expand Up @@ -386,7 +391,15 @@ where C::Target: chain::Filter,
/// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
/// always need to fetch full blocks absent another means for determining which blocks contain
/// transactions relevant to the watched channels.
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
///
/// # Note
/// `our_peerstorage_encryption_key` must be obtained from [`crate::sign::NodeSigner::get_peer_storage_key()`].
/// This key is used to encrypt peer storage backups.
///
/// **Important**: This key should not be set arbitrarily or changed after initialization. The same key
/// is obtained by the `ChannelManager` through `KeyMananger` to decrypt peer backups.
/// Using an inconsistent or incorrect key will result in the inability to decrypt previously encrypted backups.
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P, our_peerstorage_encryption_key: SecretKey) -> Self {
Self {
monitors: RwLock::new(new_hash_map()),
chain_source,
Expand All @@ -397,6 +410,8 @@ where C::Target: chain::Filter,
pending_monitor_events: Mutex::new(Vec::new()),
highest_chain_height: AtomicUsize::new(0),
event_notifier: Notifier::new(),
pending_send_only_events: Mutex::new(Vec::new()),
our_peerstorage_encryption_key
}
}

Expand Down Expand Up @@ -665,6 +680,52 @@ where C::Target: chain::Filter,
});
}
}

/// Retrieves all node IDs associated with the monitors.
///
/// This function collects the counterparty node IDs from all monitors into a `HashSet`,
/// ensuring unique IDs are returned.
fn get_peer_node_ids(&self) -> HashSet<PublicKey> {
let mon = self.monitors.read().unwrap();
mon
.values()
.map(|monitor| monitor.monitor.get_counterparty_node_id())
.collect()
}

fn send_peer_storage(&self, their_node_id: PublicKey) {
// TODO: Serialize `ChannelMonitor`s inside `our_peer_storage`.

let encrypted_data = OurPeerStorage::create_from_data(self.our_peerstorage_encryption_key, Vec::new());
log_debug!(self.logger, "Sending Peer Storage from chainmonitor");
self.pending_send_only_events.lock().unwrap().push(MessageSendEvent::SendPeerStorage { node_id: their_node_id,
msg: msgs::PeerStorage { data: encrypted_data } })
}
}

impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> BaseMessageHandler for ChainMonitor<ChannelSigner, C, T, F, L, P>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let mut pending_events = self.pending_send_only_events.lock().unwrap();
core::mem::take(&mut *pending_events)
}

fn peer_disconnected(&self, _their_node_id: PublicKey) {}

fn provided_node_features(&self) -> NodeFeatures {
NodeFeatures::empty()
}

fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
InitFeatures::empty()
}

fn peer_connected(&self, _their_node_id: PublicKey, _msg: &Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
}

impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
Expand All @@ -682,6 +743,12 @@ where
monitor.block_connected(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
});

// Send peer storage everytime a new block arrives.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we choose that interval? Do we only want to send if the data actually changed?

for node_id in self.get_peer_node_ids() {
self.send_peer_storage(node_id);
}

// Assume we may have some new events and wake the event processor
self.event_notifier.notify();
}
Expand Down Expand Up @@ -733,6 +800,12 @@ where
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger
)
});

// Send peer storage everytime a new block arrives.
for node_id in self.get_peer_node_ids() {
self.send_peer_storage(node_id);
}

// Assume we may have some new events and wake the event processor
self.event_notifier.notify();
}
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/ln/blinded_payment_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,7 @@ fn route_blinding_spec_test_vector() {
fn sign_invoice(
&self, _invoice: &RawBolt11Invoice, _recipient: Recipient,
) -> Result<RecoverableSignature, ()> { unreachable!() }
fn get_peer_storage_key(&self) -> SecretKey { unreachable!() }
fn sign_bolt12_invoice(
&self, _invoice: &UnsignedBolt12Invoice,
) -> Result<schnorr::Signature, ()> { unreachable!() }
Expand Down Expand Up @@ -1918,6 +1919,7 @@ fn test_trampoline_inbound_payment_decoding() {
fn sign_invoice(
&self, _invoice: &RawBolt11Invoice, _recipient: Recipient,
) -> Result<RecoverableSignature, ()> { unreachable!() }
fn get_peer_storage_key(&self) -> SecretKey { unreachable!() }
fn sign_bolt12_invoice(
&self, _invoice: &UnsignedBolt12Invoice,
) -> Result<schnorr::Signature, ()> { unreachable!() }
Expand Down
Loading
Loading