Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Make NetworkService callable for ChainSync #12542

Merged
merged 2 commits into from
Oct 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 179 additions & 3 deletions client/network/src/service/tests/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,34 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::service::tests::TestNetworkBuilder;
use crate::{
config,
service::tests::{TestNetworkBuilder, BLOCK_ANNOUNCE_PROTO_NAME},
};

use futures::prelude::*;
use libp2p::PeerId;
use sc_block_builder::BlockBuilderProvider;
use sc_client_api::HeaderBackend;
use sc_consensus::JustificationSyncLink;
use sc_network_common::{
config::{MultiaddrWithPeerId, SetConfig},
protocol::event::Event,
service::NetworkSyncForkRequest,
sync::{SyncState, SyncStatus},
};
use sc_network_sync::{mock::MockChainSync, service::mock::MockChainSyncInterface};
use sc_network_sync::{mock::MockChainSync, service::mock::MockChainSyncInterface, ChainSync};
use sp_core::H256;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as _},
};
use std::{iter, sync::Arc, task::Poll};
use std::{
iter,
sync::{Arc, RwLock},
task::Poll,
time::Duration,
};
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _};

fn set_default_expecations_no_peers(
Expand Down Expand Up @@ -224,3 +234,169 @@ async fn on_block_finalized() {
})
.await;
}

// report from mock import queue that importing a justification was not successful
// and verify that connection to the peer is closed
#[async_std::test]
async fn invalid_justification_imported() {
struct DummyImportQueue(
Arc<
RwLock<
Option<(
PeerId,
substrate_test_runtime_client::runtime::Hash,
sp_runtime::traits::NumberFor<substrate_test_runtime_client::runtime::Block>,
)>,
>,
>,
);

impl sc_consensus::ImportQueue<substrate_test_runtime_client::runtime::Block> for DummyImportQueue {
fn import_blocks(
&mut self,
_origin: sp_consensus::BlockOrigin,
_blocks: Vec<
sc_consensus::IncomingBlock<substrate_test_runtime_client::runtime::Block>,
>,
) {
}

fn import_justifications(
&mut self,
_who: sc_consensus::import_queue::RuntimeOrigin,
_hash: substrate_test_runtime_client::runtime::Hash,
_number: sp_runtime::traits::NumberFor<substrate_test_runtime_client::runtime::Block>,
_justifications: sp_runtime::Justifications,
) {
}

fn poll_actions(
&mut self,
_cx: &mut futures::task::Context,
link: &mut dyn sc_consensus::Link<substrate_test_runtime_client::runtime::Block>,
) {
if let Some((peer, hash, number)) = *self.0.read().unwrap() {
link.justification_imported(peer, &hash, number, false);
}
}
}

let justification_info = Arc::new(RwLock::new(None));
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];

let (service1, mut event_stream1) = TestNetworkBuilder::new()
.with_import_queue(Box::new(DummyImportQueue(justification_info.clone())))
.with_listen_addresses(vec![listen_addr.clone()])
.build()
.start_network();

let (service2, mut event_stream2) = TestNetworkBuilder::new()
.with_set_config(SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr,
peer_id: service1.local_peer_id,
}],
..Default::default()
})
.build()
.start_network();

async fn wait_for_events(stream: &mut (impl Stream<Item = Event> + std::marker::Unpin)) {
let mut notif_received = false;
let mut sync_received = false;
while !notif_received || !sync_received {
match stream.next().await.unwrap() {
Event::NotificationStreamOpened { .. } => notif_received = true,
Event::SyncConnected { .. } => sync_received = true,
_ => {},
};
}
}

wait_for_events(&mut event_stream1).await;
wait_for_events(&mut event_stream2).await;

{
let mut info = justification_info.write().unwrap();
*info = Some((service2.local_peer_id, H256::random(), 1337u64));
}

let wait_disconnection = async {
while !std::matches!(event_stream1.next().await, Some(Event::SyncDisconnected { .. })) {}
};

if async_std::future::timeout(Duration::from_secs(5), wait_disconnection)
.await
.is_err()
{
panic!("did not receive disconnection event in time");
}
}

#[async_std::test]
async fn disconnect_peer_using_chain_sync_handle() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];

let (chain_sync_network_provider, chain_sync_network_handle) =
sc_network_sync::service::network::NetworkServiceProvider::new();
let handle_clone = chain_sync_network_handle.clone();

let (chain_sync, chain_sync_service) = ChainSync::new(
sc_network_common::sync::SyncMode::Full,
client.clone(),
Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator),
1u32,
None,
chain_sync_network_handle.clone(),
)
.unwrap();

let (node1, mut event_stream1) = TestNetworkBuilder::new()
.with_listen_addresses(vec![listen_addr.clone()])
.with_chain_sync((Box::new(chain_sync), chain_sync_service))
.with_chain_sync_network((chain_sync_network_provider, chain_sync_network_handle))
.with_client(client.clone())
.build()
.start_network();

let (node2, mut event_stream2) = TestNetworkBuilder::new()
.with_set_config(SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr,
peer_id: node1.local_peer_id,
}],
..Default::default()
})
.with_client(client.clone())
.build()
.start_network();

async fn wait_for_events(stream: &mut (impl Stream<Item = Event> + std::marker::Unpin)) {
let mut notif_received = false;
let mut sync_received = false;
while !notif_received || !sync_received {
match stream.next().await.unwrap() {
Event::NotificationStreamOpened { .. } => notif_received = true,
Event::SyncConnected { .. } => sync_received = true,
_ => {},
};
}
}

wait_for_events(&mut event_stream1).await;
wait_for_events(&mut event_stream2).await;

handle_clone.disconnect_peer(node2.local_peer_id, BLOCK_ANNOUNCE_PROTO_NAME.into());

let wait_disconnection = async {
while !std::matches!(event_stream1.next().await, Some(Event::SyncDisconnected { .. })) {}
};

if async_std::future::timeout(Duration::from_secs(5), wait_disconnection)
.await
.is_err()
{
panic!("did not receive disconnection event in time");
}
}
28 changes: 27 additions & 1 deletion client/network/src/service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use sc_network_common::{
};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler,
block_request_handler::BlockRequestHandler,
service::network::{NetworkServiceHandle, NetworkServiceProvider},
state_request_handler::StateRequestHandler,
ChainSync,
};
use sp_runtime::traits::{Block as BlockT, Header as _, Zero};
Expand Down Expand Up @@ -93,6 +95,7 @@ struct TestNetworkBuilder {
listen_addresses: Vec<Multiaddr>,
set_config: Option<SetConfig>,
chain_sync: Option<(Box<dyn ChainSyncT<TestBlock>>, Box<dyn ChainSyncInterface<TestBlock>>)>,
chain_sync_network: Option<(NetworkServiceProvider, NetworkServiceHandle)>,
config: Option<config::NetworkConfiguration>,
}

Expand All @@ -104,6 +107,7 @@ impl TestNetworkBuilder {
listen_addresses: Vec::new(),
set_config: None,
chain_sync: None,
chain_sync_network: None,
config: None,
}
}
Expand Down Expand Up @@ -136,6 +140,19 @@ impl TestNetworkBuilder {
self
}

pub fn with_chain_sync_network(
mut self,
chain_sync_network: (NetworkServiceProvider, NetworkServiceHandle),
) -> Self {
self.chain_sync_network = Some(chain_sync_network);
self
}

pub fn with_import_queue(mut self, import_queue: Box<dyn ImportQueue<TestBlock>>) -> Self {
self.import_queue = Some(import_queue);
self
}

pub fn build(mut self) -> TestNetwork {
let client = self.client.as_mut().map_or(
Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0),
Expand Down Expand Up @@ -199,6 +216,9 @@ impl TestNetworkBuilder {
None,
)));

let (chain_sync_network_provider, chain_sync_network_handle) =
self.chain_sync_network.unwrap_or(NetworkServiceProvider::new());

let (chain_sync, chain_sync_service) = self.chain_sync.unwrap_or({
let (chain_sync, chain_sync_service) = ChainSync::new(
match network_config.sync_mode {
Expand All @@ -214,6 +234,7 @@ impl TestNetworkBuilder {
Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator),
network_config.max_parallel_downloads,
None,
chain_sync_network_handle,
)
.unwrap();

Expand Down Expand Up @@ -292,6 +313,11 @@ impl TestNetworkBuilder {
})
.unwrap();

let service = worker.service().clone();
async_std::task::spawn(async move {
let _ = chain_sync_network_provider.run(service).await;
});

TestNetwork::new(worker)
}
}
Loading