Skip to content

Commit

Permalink
Add litep2p network protocol benches (paritytech#6455)
Browse files Browse the repository at this point in the history
# Description

Add support to run networking protocol benchmarks with litep2p backend.

Now we can compare the work of both libp2p and litep2p backends for
notifications and request-response protocols.

Next step: extract worker initialization from the benchmark loop.

### Example run on local machine
<img width="916" alt="image"
src="https://github.com/user-attachments/assets/6bb9f90a-76a4-417e-b9d3-db27aa8a356f">


## Integration

Does not affect downstream projects.

## Review Notes


https://github.com/paritytech/polkadot-sdk/blob/d4d9502538e8a940b809ecc77843af3cea101e19/substrate/client/network/src/litep2p/service.rs#L510-L520

This method should be implemented to run request benchmarks.

---------

Co-authored-by: GitHub Action <action@github.com>
  • Loading branch information
2 people authored and dudo50 committed Jan 4, 2025
1 parent 9073661 commit 2e26e8d
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 73 deletions.
8 changes: 8 additions & 0 deletions prdoc/pr_6455.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
title: Add litep2p network protocol benches
doc:
- audience: Node Dev
description: |-
Adds networking protocol benchmarks with litep2p backend
crates:
- name: sc-network
validate: false
114 changes: 86 additions & 28 deletions substrate/client/network/benches/notifications_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ use criterion::{
};
use sc_network::{
config::{
FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig,
NonReservedPeerMode, NotificationHandshake, Params, ProtocolId, Role, SetConfig,
FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonReservedPeerMode,
NotificationHandshake, Params, ProtocolId, Role, SetConfig,
},
service::traits::NotificationEvent,
NetworkWorker, NotificationMetrics, NotificationService, Roles,
Litep2pNetworkBackend, NetworkBackend, NetworkWorker, NotificationMetrics, NotificationService,
Roles,
};
use sc_network_common::sync::message::BlockAnnouncesHandshake;
use sc_network_common::{sync::message::BlockAnnouncesHandshake, ExHashT};
use sc_network_types::build_multiaddr;
use sp_runtime::traits::Zero;
use sp_core::H256;
use sp_runtime::traits::{Block as BlockT, Zero};
use std::{
net::{IpAddr, Ipv4Addr, TcpListener},
str::FromStr,
Expand Down Expand Up @@ -61,12 +63,20 @@ fn get_listen_address() -> sc_network::Multiaddr {
build_multiaddr!(Ip4(ip), Tcp(port))
}

pub fn create_network_worker(
fn create_network_worker<B, H, N>(
listen_addr: sc_network::Multiaddr,
) -> (NetworkWorker<runtime::Block, runtime::Hash>, Box<dyn NotificationService>) {
) -> (N, Box<dyn NotificationService>)
where
B: BlockT<Hash = H256> + 'static,
H: ExHashT,
N: NetworkBackend<B, H>,
{
let role = Role::Full;
let mut net_conf = NetworkConfiguration::new_local();
net_conf.listen_addresses = vec![listen_addr];
let network_config = FullNetworkConfiguration::<B, H, N>::new(&net_conf, None);
let genesis_hash = runtime::Hash::zero();
let (block_announce_config, notification_service) = NonDefaultSetConfig::new(
let (block_announce_config, notification_service) = N::notification_config(
"/block-announces/1".into(),
vec!["/bench-notifications-protocol/block-announces/1".into()],
MAX_SIZE,
Expand All @@ -82,21 +92,17 @@ pub fn create_network_worker(
reserved_nodes: vec![],
non_reserved_mode: NonReservedPeerMode::Accept,
},
NotificationMetrics::new(None),
network_config.peer_store_handle(),
);
let mut net_conf = NetworkConfiguration::new_local();
net_conf.listen_addresses = vec![listen_addr];
let worker = NetworkWorker::<runtime::Block, runtime::Hash>::new(Params::<
runtime::Block,
runtime::Hash,
NetworkWorker<_, _>,
> {
let worker = N::new(Params::<B, H, N> {
block_announce_config,
role,
executor: Box::new(|f| {
tokio::spawn(f);
}),
genesis_hash,
network_config: FullNetworkConfiguration::new(&net_conf, None),
network_config,
protocol_id: ProtocolId::from("bench-protocol-name"),
fork_id: None,
metrics_registry: None,
Expand All @@ -108,14 +114,21 @@ pub fn create_network_worker(
(worker, notification_service)
}

async fn run_serially(size: usize, limit: usize) {
async fn run_serially<B, H, N>(size: usize, limit: usize)
where
B: BlockT<Hash = H256> + 'static,
H: ExHashT,
N: NetworkBackend<B, H>,
{
let listen_address1 = get_listen_address();
let listen_address2 = get_listen_address();
let (worker1, mut notification_service1) = create_network_worker(listen_address1);
let (worker2, mut notification_service2) = create_network_worker(listen_address2.clone());
let peer_id2: sc_network::PeerId = (*worker2.local_peer_id()).into();
let (worker1, mut notification_service1) = create_network_worker::<B, H, N>(listen_address1);
let (worker2, mut notification_service2) =
create_network_worker::<B, H, N>(listen_address2.clone());
let peer_id2: sc_network::PeerId = worker2.network_service().local_peer_id().into();

worker1
.network_service()
.add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 })
.unwrap();

Expand All @@ -124,24 +137,33 @@ async fn run_serially(size: usize, limit: usize) {
let (tx, rx) = async_channel::bounded(10);

let network1 = tokio::spawn(async move {
let mut sent_counter = 0;
tokio::pin!(network1_run);
loop {
tokio::select! {
_ = &mut network1_run => {},
event = notification_service1.next_event() => {
match event {
Some(NotificationEvent::NotificationStreamOpened { .. }) => {
sent_counter += 1;
notification_service1
.send_async_notification(&peer_id2, vec![0; size])
.await
.unwrap();
},
Some(NotificationEvent::NotificationStreamClosed { .. }) => {
if sent_counter >= limit {
break;
}
panic!("Unexpected stream closure {:?}", event);
}
event => panic!("Unexpected event {:?}", event),
};
},
message = rx.recv() => {
match message {
Ok(Some(_)) => {
sent_counter += 1;
notification_service1
.send_async_notification(&peer_id2, vec![0; size])
.await
Expand Down Expand Up @@ -185,14 +207,21 @@ async fn run_serially(size: usize, limit: usize) {
let _ = tokio::join!(network1, network2);
}

async fn run_with_backpressure(size: usize, limit: usize) {
async fn run_with_backpressure<B, H, N>(size: usize, limit: usize)
where
B: BlockT<Hash = H256> + 'static,
H: ExHashT,
N: NetworkBackend<B, H>,
{
let listen_address1 = get_listen_address();
let listen_address2 = get_listen_address();
let (worker1, mut notification_service1) = create_network_worker(listen_address1);
let (worker2, mut notification_service2) = create_network_worker(listen_address2.clone());
let peer_id2: sc_network::PeerId = (*worker2.local_peer_id()).into();
let (worker1, mut notification_service1) = create_network_worker::<B, H, N>(listen_address1);
let (worker2, mut notification_service2) =
create_network_worker::<B, H, N>(listen_address2.clone());
let peer_id2: sc_network::PeerId = worker2.network_service().local_peer_id().into();

worker1
.network_service()
.add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 })
.unwrap();

Expand Down Expand Up @@ -265,18 +294,47 @@ fn run_benchmark(c: &mut Criterion) {
for &(exponent, label) in EXPONENTS.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(NOTIFICATIONS as u64 * size as u64));

group.bench_with_input(
BenchmarkId::new("libp2p/serially", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| {
run_serially::<runtime::Block, runtime::Hash, NetworkWorker<_, _>>(size, limit)
});
},
);
group.bench_with_input(
BenchmarkId::new("litep2p/serially", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| {
run_serially::<runtime::Block, runtime::Hash, Litep2pNetworkBackend>(
size, limit,
)
});
},
);
group.bench_with_input(
BenchmarkId::new("consistently", label),
BenchmarkId::new("libp2p/with_backpressure", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_serially(size, limit));
b.to_async(&rt).iter(|| {
run_with_backpressure::<runtime::Block, runtime::Hash, NetworkWorker<_, _>>(
size, limit,
)
});
},
);
group.bench_with_input(
BenchmarkId::new("with_backpressure", label),
BenchmarkId::new("litep2p/with_backpressure", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_with_backpressure(size, limit));
b.to_async(&rt).iter(|| {
run_with_backpressure::<runtime::Block, runtime::Hash, Litep2pNetworkBackend>(
size, limit,
)
});
},
);
}
Expand Down
Loading

0 comments on commit 2e26e8d

Please sign in to comment.