Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Kusama Litep2p Deployed Version #6638

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a0fca05
authority-discovery: Filter out empty addresses after removing `/p2p/..`
lexnv Nov 19, 2024
3e3e461
authority-discovery: Ignore empty addresses from DHT
lexnv Nov 19, 2024
0724372
authority-discovery/tests: Add test for is empty address
lexnv Nov 19, 2024
7e5aa7b
litep2p: Ignore empty addresses
lexnv Nov 19, 2024
5e71861
litep2p/discovery: Validate empty / no-IP multiaddresses
lexnv Nov 19, 2024
f36d749
libp2p/peer_info: Confirm address only if it is valid
lexnv Nov 19, 2024
9e7c867
Add PRdoc
lexnv Nov 19, 2024
109bb1c
peer_info: Downgrade warn to debug logs
lexnv Nov 20, 2024
281e1eb
network/types: Move verification of external addr
lexnv Nov 21, 2024
f09e25c
network/types: Ensure peerID functionality
lexnv Nov 21, 2024
3906c57
Use crate::type::Multiaddr shared functionality
lexnv Nov 21, 2024
1dccdb3
litep2p/req-resp: Always provide main protocol name in responses
lexnv Nov 21, 2024
9061359
authority-discovery: Use is_valid_external_address method
lexnv Nov 22, 2024
52acb47
network/types: Enhance address validation by TCP / UDP protocols
lexnv Nov 22, 2024
6389be9
network: Rename is_valid_external_address
lexnv Nov 22, 2024
d109dd7
types/tests: Check empty addresses
lexnv Nov 22, 2024
d6da5d4
types/tests: Extensive tests to check the address is valid
lexnv Nov 22, 2024
6372b5f
litep2p: Use new multiaddr functionality
lexnv Nov 22, 2024
0144119
Merge remote-tracking branch 'origin/master' into lexnv/authorit-empt…
lexnv Nov 22, 2024
335e0e8
authority-discovery: Extra checks before publishing addresses
lexnv Nov 22, 2024
19a3c4e
authority-discovery/tests: Test are moved to network/types
lexnv Nov 22, 2024
a0243b7
network/types: Move is_global directly on the multiaddr
lexnv Nov 22, 2024
9fe8cff
Merge branch 'master' into lexnv/authorit-empty-addr
lexnv Nov 25, 2024
bb5653e
authorit-discovery: Add more logs
lexnv Nov 25, 2024
36ff0c7
types: Do not check peerID implicitely
lexnv Nov 25, 2024
457b4f1
types/tests: Adjust testing
lexnv Nov 25, 2024
658154b
Add PR doc
lexnv Nov 25, 2024
99a8d1b
Merge remote-tracking branch 'origin/master' into lexnv/fix-sync-panic2
lexnv Nov 25, 2024
a5c0379
Merge remote-tracking branch 'origin/lexnv/authorit-empty-addr' into …
lexnv Nov 25, 2024
ff8042d
cargo: Udpdate litep2p to specific branch
lexnv Nov 26, 2024
68872d9
litep2p: Increment incoming connections metric
lexnv Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.8.1", features = ["websocket"] }
litep2p = { git = "https://github.com/paritytech/litep2p.git", branch = "lexnv/investigate-mismatch", features = ["websocket"] }
log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
12 changes: 12 additions & 0 deletions prdoc/pr_6545.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
title: Ignore multi-addresses without IP from DHT records

doc:
- audience: [ Node Dev, Node Operator ]
description: |
This PR ensures that multiaddresses without an IP protocol are not published or utilized from DHT records.

crates:
- name: sc-network
bump: patch
- name: sc-authority-discovery
bump: patch
16 changes: 16 additions & 0 deletions prdoc/pr_6603.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Always provide main protocol name in litep2p responses

doc:
- audience: [ Node Dev, Node Operator ]
description: |
This PR aligns litep2p behavior with libp2p. Previously, litep2p network backend
would provide the actual negotiated request-response protocol that produced a
response message. After this PR, only the main protocol name is reported to other
subsystems.

crates:
- name: sc-network
bump: patch
1 change: 0 additions & 1 deletion substrate/client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ prost-build = { workspace = true }
codec = { workspace = true }
futures = { workspace = true }
futures-timer = { workspace = true }
ip_network = { workspace = true }
multihash = { workspace = true }
linked_hash_set = { workspace = true }
log = { workspace = true, default-features = true }
Expand Down
49 changes: 30 additions & 19 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt}

use addr_cache::AddrCache;
use codec::{Decode, Encode};
use ip_network::IpNetwork;
use linked_hash_set::LinkedHashSet;
use sc_network_types::kad::{Key, PeerRecord, Record};

Expand Down Expand Up @@ -280,7 +279,9 @@ where
config
.public_addresses
.into_iter()
.map(|address| AddressType::PublicAddress(address).without_p2p(local_peer_id))
.filter_map(|address| {
AddressType::PublicAddress(address).without_p2p(local_peer_id)
})
.collect()
};

Expand Down Expand Up @@ -374,17 +375,6 @@ where
let local_peer_id = self.network.local_peer_id();
let publish_non_global_ips = self.publish_non_global_ips;

// Checks that the address is global.
let address_is_global = |address: &Multiaddr| {
address.iter().all(|protocol| match protocol {
// The `ip_network` library is used because its `is_global()` method is stable,
// while `is_global()` in the standard library currently isn't.
multiaddr::Protocol::Ip4(ip) => IpNetwork::from(ip).is_global(),
multiaddr::Protocol::Ip6(ip) => IpNetwork::from(ip).is_global(),
_ => true,
})
};

// These are the addresses the node is listening for incoming connections,
// as reported by installed protocols (tcp / websocket etc).
//
Expand All @@ -397,8 +387,9 @@ where
.listen_addresses()
.into_iter()
.filter_map(|address| {
address_is_global(&address)
(address.is_external_address_valid() && address.is_global())
.then(|| AddressType::GlobalListenAddress(address).without_p2p(local_peer_id))
.flatten()
})
.take(MAX_GLOBAL_LISTEN_ADDRESSES)
.peekable();
Expand All @@ -409,8 +400,10 @@ where
.external_addresses()
.into_iter()
.filter_map(|address| {
(publish_non_global_ips || address_is_global(&address))
.then(|| AddressType::ExternalAddress(address).without_p2p(local_peer_id))
(publish_non_global_ips ||
(address.is_external_address_valid() && address.is_global()))
.then(|| AddressType::ExternalAddress(address).without_p2p(local_peer_id))
.flatten()
})
.peekable();

Expand Down Expand Up @@ -843,10 +836,20 @@ where
_ => None,
};

log::debug!(
target: LOG_TARGET,
"Received DHT record for authority {:?} with addresses {:?}",
authority_id,
addresses
);

// Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses.
let addresses: Vec<Multiaddr> = addresses
.into_iter()
.filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some())
.filter(|addr| {
addr.is_external_address_valid() &&
get_peer_id(&addr).filter(|p| *p != local_peer_id).is_some()
})
.collect();

let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a)))
Expand Down Expand Up @@ -998,7 +1001,9 @@ impl AddressType {
///
/// In case the peer id in the address does not match the local peer id, an error is logged for
/// `ExternalAddress` and `GlobalListenAddress`.
fn without_p2p(self, local_peer_id: PeerId) -> Multiaddr {
///
/// Returns `None` if the address is empty after removing the `/p2p/..` part.
fn without_p2p(self, local_peer_id: PeerId) -> Option<Multiaddr> {
// Get the address and the source str for logging.
let (mut address, source) = match self {
AddressType::PublicAddress(address) => (address, "public address"),
Expand All @@ -1016,7 +1021,13 @@ impl AddressType {
}
address.pop();
}
address

// Empty addresses cannot be published.
if address.is_empty() {
None
} else {
Some(address)
}
}
}

Expand Down
81 changes: 43 additions & 38 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,48 @@ impl Discovery {

(false, None)
}

/// Handle the observed address from the network.
fn handle_observed_addresses(&mut self, observed_address: Multiaddr, peer: PeerId) {
// Converting to and from `sc_network_types::multiaddr::Multiaddr` is cheap considering
// it is a small wrapper over litep2p `Multiaddr`.
let observed_address: sc_network_types::multiaddr::Multiaddr = observed_address.into();
if !observed_address.is_external_address_valid() {
log::debug!(
target: LOG_TARGET,
"Ignoring invalid external address {observed_address} from {peer:?}",
);
return;
}

let Some(observed_address) = observed_address.ensure_peer_id(self.local_peer_id.into())
else {
log::debug!(
target: LOG_TARGET,
"Ignoring external address with different peer ID from {peer:?}",
);
return
};
let observed_address = observed_address.into();

let (is_new, expired_address) = self.is_new_external_address(&observed_address, peer);

if let Some(expired_address) = expired_address {
log::trace!(
target: LOG_TARGET,
"Removing expired external address expired={expired_address} is_new={is_new} observed={observed_address}",
);

self.pending_events
.push_back(DiscoveryEvent::ExternalAddressExpired { address: expired_address });
}

if is_new {
self.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
address: observed_address.clone(),
});
}
}
}

impl Stream for Discovery {
Expand Down Expand Up @@ -596,44 +638,7 @@ impl Stream for Discovery {
observed_address,
..
})) => {
let observed_address =
if let Some(Protocol::P2p(peer_id)) = observed_address.iter().last() {
if peer_id != *this.local_peer_id.as_ref() {
log::warn!(
target: LOG_TARGET,
"Discovered external address for a peer that is not us: {observed_address}",
);
None
} else {
Some(observed_address)
}
} else {
Some(observed_address.with(Protocol::P2p(this.local_peer_id.into())))
};

// Ensure that an external address with a different peer ID does not have
// side effects of evicting other external addresses via `ExternalAddressExpired`.
if let Some(observed_address) = observed_address {
let (is_new, expired_address) =
this.is_new_external_address(&observed_address, peer);

if let Some(expired_address) = expired_address {
log::trace!(
target: LOG_TARGET,
"Removing expired external address expired={expired_address} is_new={is_new} observed={observed_address}",
);

this.pending_events.push_back(DiscoveryEvent::ExternalAddressExpired {
address: expired_address,
});
}

if is_new {
this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
address: observed_address.clone(),
});
}
}
this.handle_observed_addresses(observed_address, peer);

return Poll::Ready(Some(DiscoveryEvent::Identified {
peer,
Expand Down
32 changes: 25 additions & 7 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,13 +746,21 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
};
}
NetworkServiceCommand::AddKnownAddress { peer, address } => {
let mut address: Multiaddr = address.into();

if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
if !address.is_external_address_valid() {
log::warn!(
target: LOG_TARGET,
"ignoring invalid external address {address} for {peer:?}",
);
continue
}

if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) == 0usize {
let Some(address) = address.clone().ensure_peer_id(peer) else {
log::warn!(
target: LOG_TARGET,
"ignoring address ({address}) with different peer ID {peer:?}",
);
continue
};
if self.litep2p.add_known_address(peer.into(), iter::once(address.clone().into())) == 0usize {
log::warn!(
target: LOG_TARGET,
"couldn't add known address ({address}) for {peer:?}, unsupported transport"
Expand Down Expand Up @@ -986,7 +994,16 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac

let direction = match endpoint {
Endpoint::Dialer { .. } => "out",
Endpoint::Listener { .. } => "in",
Endpoint::Listener { .. } => {
// Increment incoming connections counter.
//
// Note: For litep2p these are represented by established connections,
// while in libp2p (legacy) these are not-yet-negotiated connections. However,
// wasting CPU cycles does not justify a slight difference in the metric.
metrics.incoming_connections_total.inc();

"in"
},
};
metrics.connections_opened_total.with_label_values(&[direction]).inc();

Expand Down Expand Up @@ -1058,6 +1075,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
NegotiationError::ParseError(_) => "parse-error",
NegotiationError::IoError(_) => "io-error",
NegotiationError::WebSocket(_) => "webscoket-error",
NegotiationError::BadSignature => "bad-signature",
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl RequestResponseProtocol {
&mut self,
peer: litep2p::PeerId,
request_id: RequestId,
fallback: Option<litep2p::ProtocolName>,
_fallback: Option<litep2p::ProtocolName>,
response: Vec<u8>,
) {
match self.pending_inbound_responses.remove(&request_id) {
Expand All @@ -337,10 +337,7 @@ impl RequestResponseProtocol {
response.len(),
);

let _ = tx.send(Ok((
response,
fallback.map_or_else(|| self.protocol.clone(), Into::into),
)));
let _ = tx.send(Ok((response, self.protocol.clone())));
self.metrics.register_outbound_request_success(started.elapsed());
},
}
Expand Down
Loading
Loading