Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Move spawning tasks from thread pools to Service's TaskManager for block importing #5647

Merged
merged 25 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
037989c
Move tasks from tx pool to Service for basic_queue
Apr 15, 2020
894c74f
Add spawner to tests
Apr 15, 2020
b8fae5d
Update tests
Apr 15, 2020
05c6c6e
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 22, 2020
5ae2652
Change usage of TaskManagerBuilder to TaskManager
Apr 22, 2020
f51dc2b
Renove dbg!
Apr 22, 2020
225e6c1
Revert exposing TaskManager in public API
Apr 22, 2020
5b9975e
Move client and import queue inside the import closure
Apr 22, 2020
58351b6
Move client inside the export closure
Apr 22, 2020
0edfdce
Move comments outside of closure
Apr 22, 2020
13e0f23
Add spawn_blocking method to SpawnTaskHandle; WIP
Apr 23, 2020
7dc8aa9
Use futures::executor::block_on instead of || wrapping the future
Apr 23, 2020
786811c
Add comments to code
pscott Apr 24, 2020
b891cbe
Remove useless block on task_executor declaration
Apr 24, 2020
ddd28a1
Add spawn_inner private function
Apr 24, 2020
497c540
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 24, 2020
a69a69f
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 25, 2020
e347a80
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 27, 2020
1abcf8c
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 27, 2020
2ac200d
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 27, 2020
3857f7a
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 27, 2020
1989c75
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 28, 2020
7b9f700
Add Send to block_announce_validator_builder
Apr 28, 2020
c93ecf2
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 29, 2020
e7ef065
Merge branch 'master' of github.com:paritytech/substrate into scott_m…
Apr 29, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ macro_rules! new_full_start {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;

Expand All @@ -52,13 +52,16 @@ macro_rules! new_full_start {
grandpa_block_import.clone(), client.clone(),
);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>(
let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
aura_block_import,
Some(Box::new(grandpa_block_import.clone())),
None,
client,
inherent_data_providers.clone(),
spawner,
)?;

import_setup = Some((grandpa_block_import, grandpa_link));
Expand Down Expand Up @@ -191,7 +194,7 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, spawn_task_handle| {
let fetch_checker = fetcher
.map(|fetcher| fetcher.checker().clone())
.ok_or_else(|| "Trying to start light import queue without active fetch checker")?;
Expand All @@ -205,13 +208,16 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
let finality_proof_request_builder =
finality_proof_import.create_finality_proof_request_builder();

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>(
let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
grandpa_block_import,
None,
Some(Box::new(finality_proof_import)),
client,
inherent_data_providers.clone(),
spawner,
)?;

Ok((import_queue, finality_proof_request_builder))
Expand Down
10 changes: 8 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ macro_rules! new_full_start {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;
let (grandpa_block_import, grandpa_link) = grandpa::block_import(
Expand All @@ -68,13 +68,16 @@ macro_rules! new_full_start {
client.clone(),
)?;

let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_babe::import_queue(
babe_link.clone(),
block_import.clone(),
Some(Box::new(justification_import)),
None,
client,
inherent_data_providers.clone(),
spawner,
)?;

import_setup = Some((block_import, grandpa_link, babe_link));
Expand Down Expand Up @@ -284,7 +287,7 @@ pub fn new_light(config: Configuration)
);
Ok(pool)
})?
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, spawn_task_handle| {
let fetch_checker = fetcher
.map(|fetcher| fetcher.checker().clone())
.ok_or_else(|| "Trying to start light import queue without active fetch checker")?;
Expand All @@ -305,13 +308,16 @@ pub fn new_light(config: Configuration)
client.clone(),
)?;

let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_babe::import_queue(
babe_link,
babe_block_import,
None,
Some(Box::new(finality_proof_import)),
client.clone(),
inherent_data_providers.clone(),
spawner,
)?;

Ok((import_queue, finality_proof_request_builder))
Expand Down
4 changes: 2 additions & 2 deletions client/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use names::{Generator, Name};
use sc_service::config::{
WasmExecutionMethod, Role, OffchainWorkerConfig,
Configuration, DatabaseConfig, ExtTransport, KeystoreConfig, NetworkConfiguration,
NodeKeyConfig, PrometheusConfig, PruningMode, TelemetryEndpoints, TransactionPoolOptions,
NodeKeyConfig, PrometheusConfig, PruningMode, TelemetryEndpoints, TransactionPoolOptions, TaskType
};
use sc_client_api::execution_extensions::ExecutionStrategies;
use sc_service::{ChainSpec, TracingReceiver};
Expand Down Expand Up @@ -385,7 +385,7 @@ pub trait CliConfiguration: Sized {
fn create_configuration<C: SubstrateCli>(
&self,
cli: &C,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>,
) -> Result<Configuration> {
let is_dev = self.is_dev()?;
let chain_id = self.chain_id(is_dev)?;
Expand Down
4 changes: 2 additions & 2 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use log::info;
pub use params::*;
use regex::Regex;
pub use runner::*;
use sc_service::{ChainSpec, Configuration};
use sc_service::{ChainSpec, Configuration, TaskType};
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
Expand Down Expand Up @@ -177,7 +177,7 @@ pub trait SubstrateCli: Sized {
fn create_configuration<T: CliConfiguration>(
&self,
command: &T,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
task_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>,
) -> error::Result<Configuration> {
command.create_configuration(self, task_executor)
}
Expand Down
25 changes: 17 additions & 8 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::pin_mut;
use futures::select;
use futures::{future, future::FutureExt, Future};
use log::info;
use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand};
use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand, TaskType};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use std::fmt::Debug;
Expand Down Expand Up @@ -116,13 +116,22 @@ impl<C: SubstrateCli> Runner<C> {
/// Create a new runtime with the command provided in argument
pub fn new<T: CliConfiguration>(cli: &C, command: &T) -> Result<Runner<C>> {
let tokio_runtime = build_runtime()?;

let task_executor = {
let runtime_handle = tokio_runtime.handle().clone();
Arc::new(move |fut| {
runtime_handle.spawn(fut);
})
};
let runtime_handle = tokio_runtime.handle().clone();

let task_executor = Arc::new(
move |fut, task_type| {
match task_type {
TaskType::Async => { runtime_handle.spawn(fut); }
TaskType::Blocking => {
runtime_handle.spawn( async move {
// `spawn_blocking` is looking for the current runtime, and as such has to be called
// from within `spawn`.
tokio::task::spawn_blocking(move || futures::executor::block_on(fut))
pscott marked this conversation as resolved.
Show resolved Hide resolved
});
}
}
}
);

Ok(Runner {
config: command.create_configuration(cli, task_executor)?,
Expand Down
7 changes: 5 additions & 2 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{
collections::HashMap
};

use futures::prelude::*;
use futures::{prelude::*, future::BoxFuture};
use parking_lot::Mutex;
use log::{debug, info, trace};

Expand Down Expand Up @@ -788,13 +788,14 @@ impl<Block: BlockT, C, I, P> BlockImport<Block> for AuraBlockImport<Block, C, I,
}

/// Start an import queue for the Aura consensus algorithm.
pub fn import_queue<B, I, C, P>(
pub fn import_queue<B, I, C, P, F>(
slot_duration: SlotDuration,
block_import: I,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
client: Arc<C>,
inherent_data_providers: InherentDataProviders,
spawner: F,
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<AuraImportQueue<B, sp_api::TransactionFor<C, B>>, sp_consensus::Error> where
B: BlockT,
C::Api: BlockBuilderApi<B> + AuraApi<B, AuthorityId<P>> + ApiExt<B, Error = sp_blockchain::Error>,
Expand All @@ -804,6 +805,7 @@ pub fn import_queue<B, I, C, P>(
P: Pair + Send + Sync + 'static,
P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode,
P::Signature: Encode + Decode,
F: Fn(BoxFuture<'static, ()>) -> (),
{
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?;
initialize_authorities_cache(&*client)?;
Expand All @@ -818,6 +820,7 @@ pub fn import_queue<B, I, C, P>(
Box::new(block_import),
justification_import,
finality_proof_import,
spawner,
))
}

Expand Down
4 changes: 3 additions & 1 deletion client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ use sc_client_api::{
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;

use futures::prelude::*;
use futures::{prelude::*, future::BoxFuture};
use log::{debug, info, log, trace, warn};
use sc_consensus_slots::{
SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation,
Expand Down Expand Up @@ -1272,6 +1272,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
finality_proof_import: Option<BoxFinalityProofImport<Block>>,
client: Arc<Client>,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
) -> ClientResult<BabeImportQueue<Block, sp_api::TransactionFor<Client, Block>>> where
Inner: BlockImport<Block, Error = ConsensusError, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send + Sync + 'static,
Expand All @@ -1294,6 +1295,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
Box::new(block_import),
justification_import,
finality_proof_import,
spawner,
))
}

Expand Down
6 changes: 4 additions & 2 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks.
//! This is suitable for a testing environment.

use futures::prelude::*;
use futures::{prelude::*, future::BoxFuture};
use sp_consensus::{
Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain,
import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport},
Expand Down Expand Up @@ -67,7 +67,8 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {

/// Instantiate the import queue for the manual seal consensus engine.
pub fn import_queue<Block, B>(
block_import: BoxBlockImport<Block, TransactionFor<B, Block>>
block_import: BoxBlockImport<Block, TransactionFor<B, Block>>,
spawner: impl Fn(BoxFuture<'static, ()>) -> ()
) -> BasicQueue<Block, TransactionFor<B, Block>>
where
Block: BlockT,
Expand All @@ -78,6 +79,7 @@ pub fn import_queue<Block, B>(
Box::new(block_import),
None,
None,
spawner,
)
}

Expand Down
5 changes: 4 additions & 1 deletion client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use codec::{Encode, Decode};
use sc_client_api;
use log::*;
use sp_timestamp::{InherentError as TIError, TimestampInherentData};
use futures::future::BoxFuture;

#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
Expand Down Expand Up @@ -461,6 +462,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
finality_proof_import: Option<BoxFinalityProofImport<B>>,
algorithm: Algorithm,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
) -> Result<
PowImportQueue<B, Transaction>,
sp_consensus::Error
Expand All @@ -477,7 +479,8 @@ pub fn import_queue<B, Transaction, Algorithm>(
verifier,
block_import,
justification_import,
finality_proof_import
finality_proof_import,
spawner,
))
}

Expand Down
4 changes: 4 additions & 0 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ fn build_test_full_node(config: config::NetworkConfiguration)
}
}

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new(
PassThroughVerifier(false),
Box::new(client.clone()),
None,
None,
spawner,
));

let worker = NetworkWorker::new(config::Params {
Expand Down
5 changes: 4 additions & 1 deletion client/network/test/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ fn async_import_queue_drops() {
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier(true);
let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None);

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);
let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None, spawner);
drop(queue);
}
}
8 changes: 8 additions & 0 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,15 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
));

let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
Expand Down Expand Up @@ -683,11 +687,15 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
));

let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
Expand Down
Loading