Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit b674bd2

Browse files
authored
Use tokio runtime handle instead of TaskExecutor abstraction (#9737)
* Use tokio runtime handle instead of TaskExecutor abstraction Before this pr we had the `TaskExecutor` abstraction which theoretically allowed that any futures executor could have been used. However, this was never tested and is currently not really required. Anyone running a node currently only used tokio and nothing else (because this was hard coded in CLI). So, this pr removes the `TaskExecutor` abstraction and relies directly on the tokio runtime handle. Besides this changes, this pr also makes sure that the http and ws rpc server use the same tokio runtime. This fixes a panic that occurred when you drop the rpc servers inside an async function (tokio doesn't like that a tokio runtime is dropped in the async context of another tokio runtime). As we don't use any custom runtime in the http rpc server anymore, this pr also removes the `rpc-http-threads` cli argument. If external parties complain that there aren't enough threads for the rpc server, we could bring support for increasing the thread count of the tokio runtime. * FMT * Fix try runtime * Fix integration tests and some other optimizations * Remove warnings
1 parent 9b15da9 commit b674bd2

31 files changed

+198
-303
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/node/cli/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ sc-consensus = { version = "0.10.0-dev", path = "../../../client/consensus/commo
114114
sc-consensus-babe = { version = "0.10.0-dev", path = "../../../client/consensus/babe" }
115115
sc-consensus-epochs = { version = "0.10.0-dev", path = "../../../client/consensus/epochs" }
116116
sc-service-test = { version = "2.0.0", path = "../../../client/service/test" }
117+
sp-tracing = { version = "4.0.0-dev", path = "../../../primitives/tracing" }
117118
futures = "0.3.16"
118119
tempfile = "3.1.0"
119120
assert_cmd = "1.0"

bin/node/cli/src/chain_spec.rs

+2
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,8 @@ pub(crate) mod tests {
463463
#[test]
464464
#[ignore]
465465
fn test_connectivity() {
466+
sp_tracing::try_init_simple();
467+
466468
sc_service_test::connectivity(
467469
integration_test_config_with_two_authorities(),
468470
|config| {

bin/node/cli/src/command.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ pub fn run() -> Result<()> {
156156
// manager to do `async_run`.
157157
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
158158
let task_manager =
159-
sc_service::TaskManager::new(config.task_executor.clone(), registry)
159+
sc_service::TaskManager::new(config.tokio_handle.clone(), registry)
160160
.map_err(|e| sc_cli::Error::Service(sc_service::Error::Prometheus(e)))?;
161161

162162
Ok((cmd.run::<Block, ExecutorDispatch>(config), task_manager))

bin/node/cli/src/service.rs

+4
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,8 @@ mod tests {
644644
// This can be run locally with `cargo test --release -p node-cli test_sync -- --ignored`.
645645
#[ignore]
646646
fn test_sync() {
647+
sp_tracing::try_init_simple();
648+
647649
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
648650
let keystore: SyncCryptoStorePtr =
649651
Arc::new(LocalKeystore::open(keystore_path.path(), None).expect("Creates keystore"));
@@ -843,6 +845,8 @@ mod tests {
843845
#[test]
844846
#[ignore]
845847
fn test_consensus() {
848+
sp_tracing::try_init_simple();
849+
846850
sc_service_test::consensus(
847851
crate::chain_spec::tests::integration_test_config_with_two_authorities(),
848852
|config| {

bin/node/cli/tests/running_the_node_and_interrupt.rs

+71-10
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,30 @@
1616
// You should have received a copy of the GNU General Public License
1717
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818

19+
#![cfg(unix)]
20+
1921
use assert_cmd::cargo::cargo_bin;
20-
use std::{convert::TryInto, process::Command, thread, time::Duration};
22+
use nix::{
23+
sys::signal::{
24+
kill,
25+
Signal::{self, SIGINT, SIGTERM},
26+
},
27+
unistd::Pid,
28+
};
29+
use sc_service::Deref;
30+
use std::{
31+
convert::TryInto,
32+
ops::DerefMut,
33+
process::{Child, Command},
34+
thread,
35+
time::Duration,
36+
};
2137
use tempfile::tempdir;
2238

2339
pub mod common;
2440

2541
#[test]
26-
#[cfg(unix)]
2742
fn running_the_node_works_and_can_be_interrupted() {
28-
use nix::{
29-
sys::signal::{
30-
kill,
31-
Signal::{self, SIGINT, SIGTERM},
32-
},
33-
unistd::Pid,
34-
};
35-
3643
fn run_command_and_kill(signal: Signal) {
3744
let base_path = tempdir().expect("could not create a temp dir");
3845
let mut cmd = Command::new(cargo_bin("substrate"))
@@ -55,3 +62,57 @@ fn running_the_node_works_and_can_be_interrupted() {
5562
run_command_and_kill(SIGINT);
5663
run_command_and_kill(SIGTERM);
5764
}
65+
66+
struct KillChildOnDrop(Child);
67+
68+
impl Drop for KillChildOnDrop {
69+
fn drop(&mut self) {
70+
let _ = self.0.kill();
71+
}
72+
}
73+
74+
impl Deref for KillChildOnDrop {
75+
type Target = Child;
76+
77+
fn deref(&self) -> &Self::Target {
78+
&self.0
79+
}
80+
}
81+
82+
impl DerefMut for KillChildOnDrop {
83+
fn deref_mut(&mut self) -> &mut Self::Target {
84+
&mut self.0
85+
}
86+
}
87+
88+
#[test]
89+
fn running_two_nodes_with_the_same_ws_port_should_work() {
90+
fn start_node() -> Child {
91+
Command::new(cargo_bin("substrate"))
92+
.args(&["--dev", "--tmp", "--ws-port=45789"])
93+
.spawn()
94+
.unwrap()
95+
}
96+
97+
let mut first_node = KillChildOnDrop(start_node());
98+
let mut second_node = KillChildOnDrop(start_node());
99+
100+
thread::sleep(Duration::from_secs(30));
101+
102+
assert!(first_node.try_wait().unwrap().is_none(), "The first node should still be running");
103+
assert!(second_node.try_wait().unwrap().is_none(), "The second node should still be running");
104+
105+
kill(Pid::from_raw(first_node.id().try_into().unwrap()), SIGINT).unwrap();
106+
kill(Pid::from_raw(second_node.id().try_into().unwrap()), SIGINT).unwrap();
107+
108+
assert_eq!(
109+
common::wait_for(&mut first_node, 30).map(|x| x.success()),
110+
Some(true),
111+
"The first node must exit gracefully",
112+
);
113+
assert_eq!(
114+
common::wait_for(&mut second_node, 30).map(|x| x.success()),
115+
Some(true),
116+
"The second node must exit gracefully",
117+
);
118+
}

bin/node/test-runner-example/src/lib.rs

+7-8
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,17 @@ mod tests {
8888
use node_cli::chain_spec::development_config;
8989
use sp_keyring::sr25519::Keyring::Alice;
9090
use sp_runtime::{traits::IdentifyAccount, MultiSigner};
91-
use test_runner::{build_runtime, client_parts, task_executor, ConfigOrChainSpec, Node};
91+
use test_runner::{build_runtime, client_parts, ConfigOrChainSpec, Node};
9292

9393
#[test]
9494
fn test_runner() {
9595
let tokio_runtime = build_runtime().unwrap();
96-
let task_executor = task_executor(tokio_runtime.handle().clone());
97-
let (rpc, task_manager, client, pool, command_sink, backend) = client_parts::<
98-
NodeTemplateChainInfo,
99-
>(
100-
ConfigOrChainSpec::ChainSpec(Box::new(development_config()), task_executor),
101-
)
102-
.unwrap();
96+
let (rpc, task_manager, client, pool, command_sink, backend) =
97+
client_parts::<NodeTemplateChainInfo>(ConfigOrChainSpec::ChainSpec(
98+
Box::new(development_config()),
99+
tokio_runtime.handle().clone(),
100+
))
101+
.unwrap();
103102
let node = Node::<NodeTemplateChainInfo>::new(
104103
rpc,
105104
task_manager,

client/cli/src/commands/run_cmd.rs

-8
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,6 @@ pub struct RunCmd {
127127
#[structopt(long = "ws-max-connections", value_name = "COUNT")]
128128
pub ws_max_connections: Option<usize>,
129129

130-
/// Size of the RPC HTTP server thread pool.
131-
#[structopt(long = "rpc-http-threads", value_name = "COUNT")]
132-
pub rpc_http_threads: Option<usize>,
133-
134130
/// Specify browser Origins allowed to access the HTTP & WS RPC servers.
135131
///
136132
/// A comma-separated list of origins (protocol://domain or special `null`
@@ -381,10 +377,6 @@ impl CliConfiguration for RunCmd {
381377
Ok(self.ws_max_connections)
382378
}
383379

384-
fn rpc_http_threads(&self) -> Result<Option<usize>> {
385-
Ok(self.rpc_http_threads)
386-
}
387-
388380
fn rpc_cors(&self, is_dev: bool) -> Result<Option<Vec<String>>> {
389381
Ok(self
390382
.rpc_cors

client/cli/src/config.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use sc_service::{
2929
config::{
3030
BasePath, Configuration, DatabaseSource, KeystoreConfig, NetworkConfiguration,
3131
NodeKeyConfig, OffchainWorkerConfig, PrometheusConfig, PruningMode, Role, RpcMethods,
32-
TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
32+
TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
3333
},
3434
ChainSpec, KeepBlocks, TracingReceiver, TransactionStorageMode,
3535
};
@@ -348,13 +348,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
348348
Ok(None)
349349
}
350350

351-
/// Get the RPC HTTP thread pool size (`None` for a default 4-thread pool config).
352-
///
353-
/// By default this is `None`.
354-
fn rpc_http_threads(&self) -> Result<Option<usize>> {
355-
Ok(None)
356-
}
357-
358351
/// Get the RPC cors (`None` if disabled)
359352
///
360353
/// By default this is `Some(Vec::new())`.
@@ -465,7 +458,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
465458
fn create_configuration<C: SubstrateCli>(
466459
&self,
467460
cli: &C,
468-
task_executor: TaskExecutor,
461+
tokio_handle: tokio::runtime::Handle,
469462
) -> Result<Configuration> {
470463
let is_dev = self.is_dev()?;
471464
let chain_id = self.chain_id(is_dev)?;
@@ -490,7 +483,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
490483
Ok(Configuration {
491484
impl_name: C::impl_name(),
492485
impl_version: C::impl_version(),
493-
task_executor,
486+
tokio_handle,
494487
transaction_pool: self.transaction_pool()?,
495488
network: self.network_config(
496489
&chain_spec,
@@ -518,7 +511,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
518511
rpc_ipc: self.rpc_ipc()?,
519512
rpc_methods: self.rpc_methods()?,
520513
rpc_ws_max_connections: self.rpc_ws_max_connections()?,
521-
rpc_http_threads: self.rpc_http_threads()?,
522514
rpc_cors: self.rpc_cors(is_dev)?,
523515
rpc_max_payload: self.rpc_max_payload()?,
524516
prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?,

client/cli/src/lib.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ pub use config::*;
3535
pub use error::*;
3636
pub use params::*;
3737
pub use runner::*;
38+
use sc_service::Configuration;
3839
pub use sc_service::{ChainSpec, Role};
39-
use sc_service::{Configuration, TaskExecutor};
4040
pub use sc_tracing::logging::LoggerBuilder;
4141
pub use sp_version::RuntimeVersion;
4242
use std::io::Write;
@@ -216,9 +216,9 @@ pub trait SubstrateCli: Sized {
216216
fn create_configuration<T: CliConfiguration<DVC>, DVC: DefaultConfigurationValues>(
217217
&self,
218218
command: &T,
219-
task_executor: TaskExecutor,
219+
tokio_handle: tokio::runtime::Handle,
220220
) -> error::Result<Configuration> {
221-
command.create_configuration(self, task_executor)
221+
command.create_configuration(self, tokio_handle)
222222
}
223223

224224
/// Create a runner for the command provided in argument. This will create a Configuration and

client/cli/src/runner.rs

+2-9
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{error::Error as CliError, CliConfiguration, Result, SubstrateCli};
2020
use chrono::prelude::*;
2121
use futures::{future, future::FutureExt, pin_mut, select, Future};
2222
use log::info;
23-
use sc_service::{Configuration, Error as ServiceError, TaskManager, TaskType};
23+
use sc_service::{Configuration, Error as ServiceError, TaskManager};
2424
use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
2525
use std::marker::PhantomData;
2626

@@ -116,15 +116,8 @@ impl<C: SubstrateCli> Runner<C> {
116116
let tokio_runtime = build_runtime()?;
117117
let runtime_handle = tokio_runtime.handle().clone();
118118

119-
let task_executor = move |fut, task_type| match task_type {
120-
TaskType::Async => runtime_handle.spawn(fut).map(drop),
121-
TaskType::Blocking => runtime_handle
122-
.spawn_blocking(move || futures::executor::block_on(fut))
123-
.map(drop),
124-
};
125-
126119
Ok(Runner {
127-
config: command.create_configuration(cli, task_executor.into())?,
120+
config: command.create_configuration(cli, runtime_handle)?,
128121
tokio_runtime,
129122
phantom: PhantomData,
130123
})

client/rpc-servers/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pubsub = { package = "jsonrpc-pubsub", version = "18.0.0" }
1919
log = "0.4.8"
2020
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.9.0"}
2121
serde_json = "1.0.41"
22+
tokio = "1.10"
2223
http = { package = "jsonrpc-http-server", version = "18.0.0" }
2324
ipc = { package = "jsonrpc-ipc-server", version = "18.0.0" }
2425
ws = { package = "jsonrpc-ws-server", version = "18.0.0" }

client/rpc-servers/src/lib.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ pub const RPC_MAX_PAYLOAD_DEFAULT: usize = 15 * MEGABYTE;
3636
/// Default maximum number of connections for WS RPC servers.
3737
const WS_MAX_CONNECTIONS: usize = 100;
3838

39-
/// Default thread pool size for RPC HTTP servers.
40-
const HTTP_THREADS: usize = 4;
41-
4239
/// The RPC IoHandler containing all requested APIs.
4340
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;
4441

@@ -130,17 +127,18 @@ impl ws::SessionStats for ServerMetrics {
130127
/// Start HTTP server listening on given address.
131128
pub fn start_http<M: pubsub::PubSubMetadata + Default + Unpin>(
132129
addr: &std::net::SocketAddr,
133-
thread_pool_size: Option<usize>,
134130
cors: Option<&Vec<String>>,
135131
io: RpcHandler<M>,
136132
maybe_max_payload_mb: Option<usize>,
133+
tokio_handle: tokio::runtime::Handle,
137134
) -> io::Result<http::Server> {
138135
let max_request_body_size = maybe_max_payload_mb
139136
.map(|mb| mb.saturating_mul(MEGABYTE))
140137
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
141138

142139
http::ServerBuilder::new(io)
143-
.threads(thread_pool_size.unwrap_or(HTTP_THREADS))
140+
.threads(1)
141+
.event_loop_executor(tokio_handle)
144142
.health_api(("/health", "system_health"))
145143
.allowed_hosts(hosts_filtering(cors.is_some()))
146144
.rest_api(if cors.is_some() { http::RestApi::Secure } else { http::RestApi::Unsecure })
@@ -175,13 +173,15 @@ pub fn start_ws<
175173
io: RpcHandler<M>,
176174
maybe_max_payload_mb: Option<usize>,
177175
server_metrics: ServerMetrics,
176+
tokio_handle: tokio::runtime::Handle,
178177
) -> io::Result<ws::Server> {
179178
let rpc_max_payload = maybe_max_payload_mb
180179
.map(|mb| mb.saturating_mul(MEGABYTE))
181180
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
182181
ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| {
183182
context.sender().into()
184183
})
184+
.event_loop_executor(tokio_handle)
185185
.max_payload(rpc_max_payload)
186186
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
187187
.allowed_origins(map_cors(cors))

client/service/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ parity-util-mem = { version = "0.10.0", default-features = false, features = [
7979
"primitive-types",
8080
] }
8181
async-trait = "0.1.50"
82+
tokio = { version = "1.10", features = ["time", "rt-multi-thread"] }
8283
tempfile = "3.1.0"
8384
directories = "3.0.2"
8485

8586
[dev-dependencies]
8687
substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" }
8788
substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime/" }
88-
tokio = { version = "1.10", features = ["time"] }
8989
async-std = { version = "1.6.5", default-features = false }

client/service/src/builder.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ where
286286

287287
let task_manager = {
288288
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
289-
TaskManager::new(config.task_executor.clone(), registry)?
289+
TaskManager::new(config.tokio_handle.clone(), registry)?
290290
};
291291

292292
let chain_spec = &config.chain_spec;
@@ -372,7 +372,7 @@ where
372372
let keystore_container = KeystoreContainer::new(&config.keystore)?;
373373
let task_manager = {
374374
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
375-
TaskManager::new(config.task_executor.clone(), registry)?
375+
TaskManager::new(config.tokio_handle.clone(), registry)?
376376
};
377377

378378
let db_storage = {

0 commit comments

Comments
 (0)