Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit f774459

Browse files
committed
Use tokio runtime handle instead of TaskExecutor abstraction
Before this pr we had the `TaskExecutor` abstraction which theoretically allowed that any futures executor could have been used. However, this was never tested and is currently not really required. Anyone running a node currently only used tokio and nothing else (because this was hard coded in CLI). So, this pr removes the `TaskExecutor` abstraction and relies directly on the tokio runtime handle. Besides this changes, this pr also makes sure that the http and ws rpc server use the same tokio runtime. This fixes a panic that occurred when you drop the rpc servers inside an async function (tokio doesn't like that a tokio runtime is dropped in the async context of another tokio runtime). As we don't use any custom runtime in the http rpc server anymore, this pr also removes the `rpc-http-threads` cli argument. If external parties complain that there aren't enough threads for the rpc server, we could bring support for increasing the thread count of the tokio runtime.
1 parent cd21e62 commit f774459

File tree

27 files changed

+174
-295
lines changed

27 files changed

+174
-295
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/node/cli/tests/running_the_node_and_interrupt.rs

+65-10
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,24 @@
1616
// You should have received a copy of the GNU General Public License
1717
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818

19+
#![cfg(unix)]
20+
1921
use assert_cmd::cargo::cargo_bin;
20-
use std::{convert::TryInto, process::Command, thread, time::Duration};
22+
use nix::{
23+
sys::signal::{
24+
kill,
25+
Signal::{self, SIGINT, SIGTERM},
26+
},
27+
unistd::Pid,
28+
};
29+
use sc_service::Deref;
30+
use std::{convert::TryInto, ops::DerefMut, process::{Child, Command}, thread, time::Duration};
2131
use tempfile::tempdir;
2232

2333
pub mod common;
2434

2535
#[test]
26-
#[cfg(unix)]
2736
fn running_the_node_works_and_can_be_interrupted() {
28-
use nix::{
29-
sys::signal::{
30-
kill,
31-
Signal::{self, SIGINT, SIGTERM},
32-
},
33-
unistd::Pid,
34-
};
35-
3637
fn run_command_and_kill(signal: Signal) {
3738
let base_path = tempdir().expect("could not create a temp dir");
3839
let mut cmd = Command::new(cargo_bin("substrate"))
@@ -55,3 +56,57 @@ fn running_the_node_works_and_can_be_interrupted() {
5556
run_command_and_kill(SIGINT);
5657
run_command_and_kill(SIGTERM);
5758
}
59+
60+
struct KillChildOnDrop(Child);
61+
62+
impl Drop for KillChildOnDrop {
63+
fn drop(&mut self) {
64+
let _ = self.0.kill();
65+
}
66+
}
67+
68+
impl Deref for KillChildOnDrop {
69+
type Target = Child;
70+
71+
fn deref(&self) -> &Self::Target {
72+
&self.0
73+
}
74+
}
75+
76+
impl DerefMut for KillChildOnDrop {
77+
fn deref_mut(&mut self) -> &mut Self::Target {
78+
&mut self.0
79+
}
80+
}
81+
82+
#[test]
83+
fn running_two_nodes_with_the_same_ws_port_should_work() {
84+
fn start_node() -> Child {
85+
Command::new(cargo_bin("substrate"))
86+
.args(&["--dev", "--tmp", "--ws-port=45789"])
87+
.spawn()
88+
.unwrap()
89+
}
90+
91+
let mut first_node = KillChildOnDrop(start_node());
92+
let mut second_node = KillChildOnDrop(start_node());
93+
94+
thread::sleep(Duration::from_secs(30));
95+
96+
assert!(first_node.try_wait().unwrap().is_none(), "The first node should still be running");
97+
assert!(second_node.try_wait().unwrap().is_none(), "The second node should still be running");
98+
99+
kill(Pid::from_raw(first_node.id().try_into().unwrap()), SIGINT).unwrap();
100+
kill(Pid::from_raw(second_node.id().try_into().unwrap()), SIGINT).unwrap();
101+
102+
assert_eq!(
103+
common::wait_for(&mut first_node, 30).map(|x| x.success()),
104+
Some(true),
105+
"The first node must exit gracefully",
106+
);
107+
assert_eq!(
108+
common::wait_for(&mut second_node, 30).map(|x| x.success()),
109+
Some(true),
110+
"The second node must exit gracefully",
111+
);
112+
}

bin/node/test-runner-example/src/lib.rs

+7-8
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,17 @@ mod tests {
8888
use node_cli::chain_spec::development_config;
8989
use sp_keyring::sr25519::Keyring::Alice;
9090
use sp_runtime::{traits::IdentifyAccount, MultiSigner};
91-
use test_runner::{build_runtime, client_parts, task_executor, ConfigOrChainSpec, Node};
91+
use test_runner::{build_runtime, client_parts, ConfigOrChainSpec, Node};
9292

9393
#[test]
9494
fn test_runner() {
9595
let tokio_runtime = build_runtime().unwrap();
96-
let task_executor = task_executor(tokio_runtime.handle().clone());
97-
let (rpc, task_manager, client, pool, command_sink, backend) = client_parts::<
98-
NodeTemplateChainInfo,
99-
>(
100-
ConfigOrChainSpec::ChainSpec(Box::new(development_config()), task_executor),
101-
)
102-
.unwrap();
96+
let (rpc, task_manager, client, pool, command_sink, backend) =
97+
client_parts::<NodeTemplateChainInfo>(ConfigOrChainSpec::ChainSpec(
98+
Box::new(development_config()),
99+
tokio_runtime.handle().clone(),
100+
))
101+
.unwrap();
103102
let node = Node::<NodeTemplateChainInfo>::new(
104103
rpc,
105104
task_manager,

client/cli/src/commands/run_cmd.rs

-8
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,6 @@ pub struct RunCmd {
127127
#[structopt(long = "ws-max-connections", value_name = "COUNT")]
128128
pub ws_max_connections: Option<usize>,
129129

130-
/// Size of the RPC HTTP server thread pool.
131-
#[structopt(long = "rpc-http-threads", value_name = "COUNT")]
132-
pub rpc_http_threads: Option<usize>,
133-
134130
/// Specify browser Origins allowed to access the HTTP & WS RPC servers.
135131
///
136132
/// A comma-separated list of origins (protocol://domain or special `null`
@@ -381,10 +377,6 @@ impl CliConfiguration for RunCmd {
381377
Ok(self.ws_max_connections)
382378
}
383379

384-
fn rpc_http_threads(&self) -> Result<Option<usize>> {
385-
Ok(self.rpc_http_threads)
386-
}
387-
388380
fn rpc_cors(&self, is_dev: bool) -> Result<Option<Vec<String>>> {
389381
Ok(self
390382
.rpc_cors

client/cli/src/config.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use sc_service::{
2929
config::{
3030
BasePath, Configuration, DatabaseSource, KeystoreConfig, NetworkConfiguration,
3131
NodeKeyConfig, OffchainWorkerConfig, PrometheusConfig, PruningMode, Role, RpcMethods,
32-
TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
32+
TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
3333
},
3434
ChainSpec, KeepBlocks, TracingReceiver, TransactionStorageMode,
3535
};
@@ -343,13 +343,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
343343
Ok(None)
344344
}
345345

346-
/// Get the RPC HTTP thread pool size (`None` for a default 4-thread pool config).
347-
///
348-
/// By default this is `None`.
349-
fn rpc_http_threads(&self) -> Result<Option<usize>> {
350-
Ok(None)
351-
}
352-
353346
/// Get the RPC cors (`None` if disabled)
354347
///
355348
/// By default this is `Some(Vec::new())`.
@@ -460,7 +453,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
460453
fn create_configuration<C: SubstrateCli>(
461454
&self,
462455
cli: &C,
463-
task_executor: TaskExecutor,
456+
tokio_handle: tokio::runtime::Handle,
464457
) -> Result<Configuration> {
465458
let is_dev = self.is_dev()?;
466459
let chain_id = self.chain_id(is_dev)?;
@@ -485,7 +478,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
485478
Ok(Configuration {
486479
impl_name: C::impl_name(),
487480
impl_version: C::impl_version(),
488-
task_executor,
481+
tokio_handle,
489482
transaction_pool: self.transaction_pool()?,
490483
network: self.network_config(
491484
&chain_spec,
@@ -513,7 +506,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
513506
rpc_ipc: self.rpc_ipc()?,
514507
rpc_methods: self.rpc_methods()?,
515508
rpc_ws_max_connections: self.rpc_ws_max_connections()?,
516-
rpc_http_threads: self.rpc_http_threads()?,
517509
rpc_cors: self.rpc_cors(is_dev)?,
518510
rpc_max_payload: self.rpc_max_payload()?,
519511
prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?,

client/cli/src/lib.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ pub use config::*;
3535
pub use error::*;
3636
pub use params::*;
3737
pub use runner::*;
38+
use sc_service::Configuration;
3839
pub use sc_service::{ChainSpec, Role};
39-
use sc_service::{Configuration, TaskExecutor};
4040
pub use sc_tracing::logging::LoggerBuilder;
4141
pub use sp_version::RuntimeVersion;
4242
use std::io::Write;
@@ -216,9 +216,9 @@ pub trait SubstrateCli: Sized {
216216
fn create_configuration<T: CliConfiguration<DVC>, DVC: DefaultConfigurationValues>(
217217
&self,
218218
command: &T,
219-
task_executor: TaskExecutor,
219+
tokio_handle: tokio::runtime::Handle,
220220
) -> error::Result<Configuration> {
221-
command.create_configuration(self, task_executor)
221+
command.create_configuration(self, tokio_handle)
222222
}
223223

224224
/// Create a runner for the command provided in argument. This will create a Configuration and

client/cli/src/runner.rs

+2-9
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{error::Error as CliError, CliConfiguration, Result, SubstrateCli};
2020
use chrono::prelude::*;
2121
use futures::{future, future::FutureExt, pin_mut, select, Future};
2222
use log::info;
23-
use sc_service::{Configuration, Error as ServiceError, TaskManager, TaskType};
23+
use sc_service::{Configuration, Error as ServiceError, TaskManager};
2424
use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
2525
use std::marker::PhantomData;
2626

@@ -116,15 +116,8 @@ impl<C: SubstrateCli> Runner<C> {
116116
let tokio_runtime = build_runtime()?;
117117
let runtime_handle = tokio_runtime.handle().clone();
118118

119-
let task_executor = move |fut, task_type| match task_type {
120-
TaskType::Async => runtime_handle.spawn(fut).map(drop),
121-
TaskType::Blocking => runtime_handle
122-
.spawn_blocking(move || futures::executor::block_on(fut))
123-
.map(drop),
124-
};
125-
126119
Ok(Runner {
127-
config: command.create_configuration(cli, task_executor.into())?,
120+
config: command.create_configuration(cli, runtime_handle)?,
128121
tokio_runtime,
129122
phantom: PhantomData,
130123
})

client/rpc-servers/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pubsub = { package = "jsonrpc-pubsub", version = "18.0.0" }
1919
log = "0.4.8"
2020
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.9.0"}
2121
serde_json = "1.0.41"
22+
tokio = "1.10"
2223

2324
[target.'cfg(not(target_os = "unknown"))'.dependencies]
2425
http = { package = "jsonrpc-http-server", version = "18.0.0" }

client/rpc-servers/src/lib.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ pub const RPC_MAX_PAYLOAD_DEFAULT: usize = 15 * MEGABYTE;
3636
/// Default maximum number of connections for WS RPC servers.
3737
const WS_MAX_CONNECTIONS: usize = 100;
3838

39-
/// Default thread pool size for RPC HTTP servers.
40-
const HTTP_THREADS: usize = 4;
41-
4239
/// The RPC IoHandler containing all requested APIs.
4340
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;
4441

@@ -137,17 +134,18 @@ mod inner {
137134
/// **Note**: Only available if `not(target_os = "unknown")`.
138135
pub fn start_http<M: pubsub::PubSubMetadata + Default + Unpin>(
139136
addr: &std::net::SocketAddr,
140-
thread_pool_size: Option<usize>,
141137
cors: Option<&Vec<String>>,
142138
io: RpcHandler<M>,
143139
maybe_max_payload_mb: Option<usize>,
140+
tokio_handle: tokio::runtime::Handle,
144141
) -> io::Result<http::Server> {
145142
let max_request_body_size = maybe_max_payload_mb
146143
.map(|mb| mb.saturating_mul(MEGABYTE))
147144
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
148145

149146
http::ServerBuilder::new(io)
150-
.threads(thread_pool_size.unwrap_or(HTTP_THREADS))
147+
.threads(1)
148+
.event_loop_executor(tokio_handle)
151149
.health_api(("/health", "system_health"))
152150
.allowed_hosts(hosts_filtering(cors.is_some()))
153151
.rest_api(if cors.is_some() { http::RestApi::Secure } else { http::RestApi::Unsecure })
@@ -186,13 +184,15 @@ mod inner {
186184
io: RpcHandler<M>,
187185
maybe_max_payload_mb: Option<usize>,
188186
server_metrics: ServerMetrics,
187+
tokio_handle: tokio::runtime::Handle,
189188
) -> io::Result<ws::Server> {
190189
let rpc_max_payload = maybe_max_payload_mb
191190
.map(|mb| mb.saturating_mul(MEGABYTE))
192191
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
193192
ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| {
194193
context.sender().into()
195194
})
195+
.event_loop_executor(tokio_handle)
196196
.max_payload(rpc_max_payload)
197197
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
198198
.allowed_origins(map_cors(cors))

client/service/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ parity-util-mem = { version = "0.10.0", default-features = false, features = [
7979
"primitive-types",
8080
] }
8181
async-trait = "0.1.50"
82+
tokio = { version = "1.10", features = ["time", "rt-multi-thread"] }
8283

8384
[target.'cfg(not(target_os = "unknown"))'.dependencies]
8485
tempfile = "3.1.0"
@@ -87,5 +88,4 @@ directories = "3.0.2"
8788
[dev-dependencies]
8889
substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" }
8990
substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime/" }
90-
tokio = { version = "1.10", features = ["time"] }
9191
async-std = { version = "1.6.5", default-features = false }

client/service/src/builder.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ where
287287

288288
let task_manager = {
289289
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
290-
TaskManager::new(config.task_executor.clone(), registry)?
290+
TaskManager::new(config.tokio_handle.clone(), registry)?
291291
};
292292

293293
let chain_spec = &config.chain_spec;
@@ -373,7 +373,7 @@ where
373373
let keystore_container = KeystoreContainer::new(&config.keystore)?;
374374
let task_manager = {
375375
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
376-
TaskManager::new(config.task_executor.clone(), registry)?
376+
TaskManager::new(config.tokio_handle.clone(), registry)?
377377
};
378378

379379
let db_storage = {

0 commit comments

Comments
 (0)