Skip to content

Commit

Permalink
Extract syncing protocol from sc-network (paritytech#12828)
Browse files Browse the repository at this point in the history
* Move import queue out of `sc-network`

Add supplementary asynchronous API for the import queue which means
it can be run as an independent task and communicated with through
the `ImportQueueService`.

This commit removes removes block and justification imports from
`sc-network` and provides `ChainSync` with a handle to import queue so
it can import blocks and justifications. Polling of the import queue is
moved complete out of `sc-network` and `sc_consensus::Link` is
implemented for `ChainSyncInterfaceHandled` so the import queue
can still influence the syncing process.

* Move stuff to SyncingEngine

* Move `ChainSync` instanation to `SyncingEngine`

Some of the tests have to be rewritten

* Move peer hashmap to `SyncingEngine`

* Let `SyncingEngine` to implement `ChainSyncInterface`

* Introduce `SyncStatusProvider`

* Move `sync_peer_(connected|disconnected)` to `SyncingEngine`

* Implement `SyncEventStream`

Remove `SyncConnected`/`SyncDisconnected` events from
`NetworkEvenStream` and provide those events through
`ChainSyncInterface` instead.

Modify BEEFY/GRANDPA/transactions protocol and `NetworkGossip` to take
`SyncEventStream` object which they listen to for incoming sync peer
events.

* Introduce `ChainSyncInterface`

This interface provides a set of miscellaneous functions that other
subsystems can use to query, for example, the syncing status.

* Move event stream polling to `SyncingEngine`

Subscribe to `NetworkStreamEvent` and poll the incoming notifications
and substream events from `SyncingEngine`.

The code needs refactoring.

* Make `SyncingEngine` into an asynchronous runner

This commits removes the last hard dependency of syncing from
`sc-network` meaning the protocol now lives completely outside of
`sc-network`, ignoring the hardcoded peerset entry which will be
addressed in the future.

Code needs a lot of refactoring.

* Fix warnings

* Code refactoring

* Use `SyncingService` for BEEFY

* Use `SyncingService` for GRANDPA

* Remove call delegation from `NetworkService`

* Remove `ChainSyncService`

* Remove `ChainSync` service tests

They were written for the sole purpose of verifying that `NetworWorker`
continues to function while the calls are being dispatched to
`ChainSync`.

* Refactor code

* Refactor code

* Update client/finality-grandpa/src/communication/tests.rs

Co-authored-by: Anton <anton.kalyaev@gmail.com>

* Fix warnings

* Apply review comments

* Fix docs

* Fix test

* cargo-fmt

* Update client/network/sync/src/engine.rs

Co-authored-by: Anton <anton.kalyaev@gmail.com>

* Update client/network/sync/src/engine.rs

Co-authored-by: Anton <anton.kalyaev@gmail.com>

* Add missing docs

* Refactor code

---------

Co-authored-by: Anton <anton.kalyaev@gmail.com>
  • Loading branch information
2 people authored and nathanwhit committed Jul 19, 2023
1 parent 9dd4cd1 commit bb4d9ab
Show file tree
Hide file tree
Showing 57 changed files with 2,886 additions and 2,859 deletions.
666 changes: 397 additions & 269 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
Vec::default(),
));

let (network, system_rpc_tx, tx_handler_controller, network_starter) =
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -240,6 +240,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
backend,
system_rpc_tx,
tx_handler_controller,
sync_service: sync_service.clone(),
config,
telemetry: telemetry.as_mut(),
})?;
Expand Down Expand Up @@ -276,8 +277,8 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
force_authoring,
backoff_authoring_blocks,
keystore: keystore_container.sync_keystore(),
sync_oracle: network.clone(),
justification_sync_link: network.clone(),
sync_oracle: sync_service.clone(),
justification_sync_link: sync_service.clone(),
block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
max_block_proposal_slot_portion: None,
telemetry: telemetry.as_ref().map(|x| x.handle()),
Expand Down Expand Up @@ -320,6 +321,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
config: grandpa_config,
link: grandpa_link,
network,
sync: Arc::new(sync_service),
voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
shared_voter_state: SharedVoterState::empty(),
Expand Down
1 change: 1 addition & 0 deletions bin/node/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ sc-transaction-pool = { version = "4.0.0-dev", path = "../../../client/transacti
sc-transaction-pool-api = { version = "4.0.0-dev", path = "../../../client/transaction-pool/api" }
sc-network = { version = "0.10.0-dev", path = "../../../client/network" }
sc-network-common = { version = "0.10.0-dev", path = "../../../client/network/common" }
sc-network-sync = { version = "0.10.0-dev", path = "../../../client/network/sync" }
sc-consensus-slots = { version = "0.10.0-dev", path = "../../../client/consensus/slots" }
sc-consensus-babe = { version = "0.10.0-dev", path = "../../../client/consensus/babe" }
grandpa = { version = "0.10.0-dev", package = "sc-consensus-grandpa", path = "../../../client/consensus/grandpa" }
Expand Down
3 changes: 2 additions & 1 deletion bin/node/cli/src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,13 @@ pub(crate) mod tests {
sp_tracing::try_init_simple();

sc_service_test::connectivity(integration_test_config_with_two_authorities(), |config| {
let NewFullBase { task_manager, client, network, transaction_pool, .. } =
let NewFullBase { task_manager, client, network, sync, transaction_pool, .. } =
new_full_base(config, false, |_, _| ())?;
Ok(sc_service_test::TestNetComponents::new(
task_manager,
client,
network,
sync,
transaction_pool,
))
});
Expand Down
26 changes: 20 additions & 6 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use sc_network::NetworkService;
use sc_network_common::{
protocol::event::Event, service::NetworkEventStream, sync::warp::WarpSyncParams,
};
use sc_network_sync::SyncingService;
use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -303,6 +304,8 @@ pub struct NewFullBase {
pub client: Arc<FullClient>,
/// The networking service of the node.
pub network: Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
/// The syncing service of the node.
pub sync: Arc<SyncingService<Block>>,
/// The transaction pool of the node.
pub transaction_pool: Arc<TransactionPool>,
/// The rpc handlers of the node.
Expand Down Expand Up @@ -353,7 +356,7 @@ pub fn new_full_base(
Vec::default(),
));

let (network, system_rpc_tx, tx_handler_controller, network_starter) =
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
client: client.clone(),
Expand Down Expand Up @@ -392,6 +395,7 @@ pub fn new_full_base(
task_manager: &mut task_manager,
system_rpc_tx,
tx_handler_controller,
sync_service: sync_service.clone(),
telemetry: telemetry.as_mut(),
})?;

Expand Down Expand Up @@ -434,8 +438,8 @@ pub fn new_full_base(
select_chain,
env: proposer,
block_import,
sync_oracle: network.clone(),
justification_sync_link: network.clone(),
sync_oracle: sync_service.clone(),
justification_sync_link: sync_service.clone(),
create_inherent_data_providers: move |parent, ()| {
let client_clone = client_clone.clone();
async move {
Expand Down Expand Up @@ -531,6 +535,7 @@ pub fn new_full_base(
config,
link: grandpa_link,
network: network.clone(),
sync: Arc::new(sync_service.clone()),
telemetry: telemetry.as_ref().map(|x| x.handle()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
Expand All @@ -547,7 +552,14 @@ pub fn new_full_base(
}

network_starter.start_network();
Ok(NewFullBase { task_manager, client, network, transaction_pool, rpc_handlers })
Ok(NewFullBase {
task_manager,
client,
network,
sync: sync_service,
transaction_pool,
rpc_handlers,
})
}

/// Builds a new service for a full client.
Expand Down Expand Up @@ -627,7 +639,7 @@ mod tests {
chain_spec,
|config| {
let mut setup_handles = None;
let NewFullBase { task_manager, client, network, transaction_pool, .. } =
let NewFullBase { task_manager, client, network, sync, transaction_pool, .. } =
new_full_base(
config,
false,
Expand All @@ -641,6 +653,7 @@ mod tests {
task_manager,
client,
network,
sync,
transaction_pool,
);
Ok((node, setup_handles.unwrap()))
Expand Down Expand Up @@ -807,12 +820,13 @@ mod tests {
sc_service_test::consensus(
crate::chain_spec::tests::integration_test_config_with_two_authorities(),
|config| {
let NewFullBase { task_manager, client, network, transaction_pool, .. } =
let NewFullBase { task_manager, client, network, sync, transaction_pool, .. } =
new_full_base(config, false, |_, _| ())?;
Ok(sc_service_test::TestNetComponents::new(
task_manager,
client,
network,
sync,
transaction_pool,
))
},
Expand Down
20 changes: 12 additions & 8 deletions client/cli/src/arg_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,19 @@ pub enum SyncMode {
Warp,
}

impl Into<sc_network::config::SyncMode> for SyncMode {
fn into(self) -> sc_network::config::SyncMode {
impl Into<sc_network_common::config::SyncMode> for SyncMode {
fn into(self) -> sc_network_common::config::SyncMode {
match self {
SyncMode::Full => sc_network::config::SyncMode::Full,
SyncMode::Fast =>
sc_network::config::SyncMode::Fast { skip_proofs: false, storage_chain_mode: false },
SyncMode::FastUnsafe =>
sc_network::config::SyncMode::Fast { skip_proofs: true, storage_chain_mode: false },
SyncMode::Warp => sc_network::config::SyncMode::Warp,
SyncMode::Full => sc_network_common::config::SyncMode::Full,
SyncMode::Fast => sc_network_common::config::SyncMode::Fast {
skip_proofs: false,
storage_chain_mode: false,
},
SyncMode::FastUnsafe => sc_network_common::config::SyncMode::Fast {
skip_proofs: true,
storage_chain_mode: false,
},
SyncMode::Warp => sc_network_common::config::SyncMode::Warp,
}
}
}
Expand Down
7 changes: 2 additions & 5 deletions client/cli/src/params/network_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

use crate::{arg_enums::SyncMode, params::node_key_params::NodeKeyParams};
use clap::Args;
use sc_network::{
config::{NetworkConfiguration, NodeKeyConfig},
multiaddr::Protocol,
};
use sc_network_common::config::{NonReservedPeerMode, SetConfig, TransportConfig};
use sc_network::{config::NetworkConfiguration, multiaddr::Protocol};
use sc_network_common::config::{NodeKeyConfig, NonReservedPeerMode, SetConfig, TransportConfig};
use sc_service::{
config::{Multiaddr, MultiaddrWithPeerId},
ChainSpec, ChainType,
Expand Down
14 changes: 7 additions & 7 deletions client/cli/src/params/node_key_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use clap::Args;
use sc_network::config::{identity::ed25519, NodeKeyConfig};
use sc_network_common::config::{identity::ed25519, NodeKeyConfig};
use sp_core::H256;
use std::{path::PathBuf, str::FromStr};

Expand Down Expand Up @@ -92,7 +92,7 @@ impl NodeKeyParams {
let secret = if let Some(node_key) = self.node_key.as_ref() {
parse_ed25519_secret(node_key)?
} else {
sc_network::config::Secret::File(
sc_network_common::config::Secret::File(
self.node_key_file
.clone()
.unwrap_or_else(|| net_config_dir.join(NODE_KEY_ED25519_FILE)),
Expand All @@ -111,10 +111,10 @@ fn invalid_node_key(e: impl std::fmt::Display) -> error::Error {
}

/// Parse a Ed25519 secret key from a hex string into a `sc_network::Secret`.
fn parse_ed25519_secret(hex: &str) -> error::Result<sc_network::config::Ed25519Secret> {
fn parse_ed25519_secret(hex: &str) -> error::Result<sc_network_common::config::Ed25519Secret> {
H256::from_str(hex).map_err(invalid_node_key).and_then(|bytes| {
ed25519::SecretKey::from_bytes(bytes)
.map(sc_network::config::Secret::Input)
.map(sc_network_common::config::Secret::Input)
.map_err(invalid_node_key)
})
}
Expand All @@ -123,7 +123,7 @@ fn parse_ed25519_secret(hex: &str) -> error::Result<sc_network::config::Ed25519S
mod tests {
use super::*;
use clap::ValueEnum;
use sc_network::config::identity::{ed25519, Keypair};
use sc_network_common::config::identity::{ed25519, Keypair};
use std::fs;

#[test]
Expand All @@ -140,7 +140,7 @@ mod tests {
node_key_file: None,
};
params.node_key(net_config_dir).and_then(|c| match c {
NodeKeyConfig::Ed25519(sc_network::config::Secret::Input(ref ski))
NodeKeyConfig::Ed25519(sc_network_common::config::Secret::Input(ref ski))
if node_key_type == NodeKeyType::Ed25519 && &sk[..] == ski.as_ref() =>
Ok(()),
_ => Err(error::Error::Input("Unexpected node key config".into())),
Expand Down Expand Up @@ -200,7 +200,7 @@ mod tests {
let dir = PathBuf::from(net_config_dir.clone());
let typ = params.node_key_type;
params.node_key(net_config_dir).and_then(move |c| match c {
NodeKeyConfig::Ed25519(sc_network::config::Secret::File(ref f))
NodeKeyConfig::Ed25519(sc_network_common::config::Secret::File(ref f))
if typ == NodeKeyType::Ed25519 && f == &dir.join(NODE_KEY_ED25519_FILE) =>
Ok(()),
_ => Err(error::Error::Input("Unexpected node key config".into())),
Expand Down
5 changes: 5 additions & 0 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ mod tests {
fn peers(&self) -> &Vec<AuraPeer> {
&self.peers
}

fn peers_mut(&mut self) -> &mut Vec<AuraPeer> {
&mut self.peers
}

fn mut_peers<F: FnOnce(&mut Vec<AuraPeer>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
Expand Down
5 changes: 5 additions & 0 deletions client/consensus/babe/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ impl TestNetFactory for BabeTestNet {
&self.peers
}

fn peers_mut(&mut self) -> &mut Vec<BabePeer> {
trace!(target: "babe", "Retrieving peers, mutable");
&mut self.peers
}

fn mut_peers<F: FnOnce(&mut Vec<BabePeer>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
Expand Down
1 change: 1 addition & 0 deletions client/consensus/beefy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sc-keystore = { version = "4.0.0-dev", path = "../../keystore" }
sc-network = { version = "0.10.0-dev", path = "../../network" }
sc-network-common = { version = "0.10.0-dev", path = "../../network/common" }
sc-network-gossip = { version = "0.10.0-dev", path = "../../network-gossip" }
sc-network-sync = { version = "0.10.0-dev", path = "../../network/sync" }
sc-utils = { version = "4.0.0-dev", path = "../../utils" }
sp-api = { version = "4.0.0-dev", path = "../../../primitives/api" }
sp-application-crypto = { version = "7.0.0", path = "../../../primitives/application-crypto" }
Expand Down
30 changes: 20 additions & 10 deletions client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotificatio
use sc_consensus::BlockImport;
use sc_network::ProtocolName;
use sc_network_common::service::NetworkRequest;
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi};
use sp_blockchain::{
Backend as BlockchainBackend, Error as ClientError, HeaderBackend, Result as ClientResult,
Expand Down Expand Up @@ -172,9 +172,11 @@ where
}

/// BEEFY gadget network parameters.
pub struct BeefyNetworkParams<B: Block, N> {
pub struct BeefyNetworkParams<B: Block, N, S> {
/// Network implementing gossip, requests and sync-oracle.
pub network: Arc<N>,
/// Syncing service implementing a sync oracle and an event stream for peers.
pub sync: Arc<S>,
/// Chain specific BEEFY gossip protocol name. See
/// [`communication::beefy_protocol_name::gossip_protocol_name`].
pub gossip_protocol_name: ProtocolName,
Expand All @@ -186,7 +188,7 @@ pub struct BeefyNetworkParams<B: Block, N> {
}

/// BEEFY gadget initialization parameters.
pub struct BeefyParams<B: Block, BE, C, N, P, R> {
pub struct BeefyParams<B: Block, BE, C, N, P, R, S> {
/// BEEFY client
pub client: Arc<C>,
/// Client Backend
Expand All @@ -198,7 +200,7 @@ pub struct BeefyParams<B: Block, BE, C, N, P, R> {
/// Local key store
pub key_store: Option<SyncCryptoStorePtr>,
/// BEEFY voter network params
pub network_params: BeefyNetworkParams<B, N>,
pub network_params: BeefyNetworkParams<B, N, S>,
/// Minimal delta between blocks, BEEFY should vote for
pub min_block_delta: u32,
/// Prometheus metric registry
Expand All @@ -212,15 +214,17 @@ pub struct BeefyParams<B: Block, BE, C, N, P, R> {
/// Start the BEEFY gadget.
///
/// This is a thin shim around running and awaiting a BEEFY worker.
pub async fn start_beefy_gadget<B, BE, C, N, P, R>(beefy_params: BeefyParams<B, BE, C, N, P, R>)
where
pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
beefy_params: BeefyParams<B, BE, C, N, P, R, S>,
) where
B: Block,
BE: Backend<B>,
C: Client<B, BE> + BlockBackend<B>,
P: PayloadProvider<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B> + MmrApi<B, MmrRootHash, NumberFor<B>>,
N: GossipNetwork<B> + NetworkRequest + SyncOracle + Send + Sync + 'static,
N: GossipNetwork<B> + NetworkRequest + Send + Sync + 'static,
S: GossipSyncing<B> + SyncOracle + 'static,
{
let BeefyParams {
client,
Expand All @@ -235,14 +239,20 @@ where
on_demand_justifications_handler,
} = beefy_params;

let BeefyNetworkParams { network, gossip_protocol_name, justifications_protocol_name, .. } =
network_params;
let BeefyNetworkParams {
network,
sync,
gossip_protocol_name,
justifications_protocol_name,
..
} = network_params;

let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator =
Arc::new(communication::gossip::GossipValidator::new(known_peers.clone()));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name,
gossip_validator.clone(),
None,
Expand Down Expand Up @@ -280,7 +290,7 @@ where
backend,
payload_provider,
runtime,
network,
sync,
key_store: key_store.into(),
gossip_engine,
gossip_validator,
Expand Down
Loading

0 comments on commit bb4d9ab

Please sign in to comment.