Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Use tokio runtime handle instead of TaskExecutor abstraction #9737

Merged
7 commits merged into from
Sep 12, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 71 additions & 10 deletions bin/node/cli/tests/running_the_node_and_interrupt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,30 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

#![cfg(unix)]

use assert_cmd::cargo::cargo_bin;
use std::{convert::TryInto, process::Command, thread, time::Duration};
use nix::{
sys::signal::{
kill,
Signal::{self, SIGINT, SIGTERM},
},
unistd::Pid,
};
use sc_service::Deref;
use std::{
convert::TryInto,
ops::DerefMut,
process::{Child, Command},
thread,
time::Duration,
};
use tempfile::tempdir;

pub mod common;

#[test]
#[cfg(unix)]
fn running_the_node_works_and_can_be_interrupted() {
use nix::{
sys::signal::{
kill,
Signal::{self, SIGINT, SIGTERM},
},
unistd::Pid,
};

fn run_command_and_kill(signal: Signal) {
let base_path = tempdir().expect("could not create a temp dir");
let mut cmd = Command::new(cargo_bin("substrate"))
Expand All @@ -55,3 +62,57 @@ fn running_the_node_works_and_can_be_interrupted() {
run_command_and_kill(SIGINT);
run_command_and_kill(SIGTERM);
}

struct KillChildOnDrop(Child);

impl Drop for KillChildOnDrop {
fn drop(&mut self) {
let _ = self.0.kill();
}
}

impl Deref for KillChildOnDrop {
type Target = Child;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for KillChildOnDrop {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

#[test]
fn running_two_nodes_with_the_same_ws_port_should_work() {
fn start_node() -> Child {
Command::new(cargo_bin("substrate"))
.args(&["--dev", "--tmp", "--ws-port=45789"])
.spawn()
.unwrap()
}

let mut first_node = KillChildOnDrop(start_node());
let mut second_node = KillChildOnDrop(start_node());

thread::sleep(Duration::from_secs(30));

assert!(first_node.try_wait().unwrap().is_none(), "The first node should still be running");
assert!(second_node.try_wait().unwrap().is_none(), "The second node should still be running");

kill(Pid::from_raw(first_node.id().try_into().unwrap()), SIGINT).unwrap();
kill(Pid::from_raw(second_node.id().try_into().unwrap()), SIGINT).unwrap();

assert_eq!(
common::wait_for(&mut first_node, 30).map(|x| x.success()),
Some(true),
"The first node must exit gracefully",
);
assert_eq!(
common::wait_for(&mut second_node, 30).map(|x| x.success()),
Some(true),
"The second node must exit gracefully",
);
}
15 changes: 7 additions & 8 deletions bin/node/test-runner-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,17 @@ mod tests {
use node_cli::chain_spec::development_config;
use sp_keyring::sr25519::Keyring::Alice;
use sp_runtime::{traits::IdentifyAccount, MultiSigner};
use test_runner::{build_runtime, client_parts, task_executor, ConfigOrChainSpec, Node};
use test_runner::{build_runtime, client_parts, ConfigOrChainSpec, Node};

#[test]
fn test_runner() {
let tokio_runtime = build_runtime().unwrap();
let task_executor = task_executor(tokio_runtime.handle().clone());
let (rpc, task_manager, client, pool, command_sink, backend) = client_parts::<
NodeTemplateChainInfo,
>(
ConfigOrChainSpec::ChainSpec(Box::new(development_config()), task_executor),
)
.unwrap();
let (rpc, task_manager, client, pool, command_sink, backend) =
client_parts::<NodeTemplateChainInfo>(ConfigOrChainSpec::ChainSpec(
Box::new(development_config()),
tokio_runtime.handle().clone(),
))
.unwrap();
let node = Node::<NodeTemplateChainInfo>::new(
rpc,
task_manager,
Expand Down
8 changes: 0 additions & 8 deletions client/cli/src/commands/run_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ pub struct RunCmd {
#[structopt(long = "ws-max-connections", value_name = "COUNT")]
pub ws_max_connections: Option<usize>,

/// Size of the RPC HTTP server thread pool.
#[structopt(long = "rpc-http-threads", value_name = "COUNT")]
pub rpc_http_threads: Option<usize>,

/// Specify browser Origins allowed to access the HTTP & WS RPC servers.
///
/// A comma-separated list of origins (protocol://domain or special `null`
Expand Down Expand Up @@ -381,10 +377,6 @@ impl CliConfiguration for RunCmd {
Ok(self.ws_max_connections)
}

fn rpc_http_threads(&self) -> Result<Option<usize>> {
Ok(self.rpc_http_threads)
}

fn rpc_cors(&self, is_dev: bool) -> Result<Option<Vec<String>>> {
Ok(self
.rpc_cors
Expand Down
14 changes: 3 additions & 11 deletions client/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sc_service::{
config::{
BasePath, Configuration, DatabaseSource, KeystoreConfig, NetworkConfiguration,
NodeKeyConfig, OffchainWorkerConfig, PrometheusConfig, PruningMode, Role, RpcMethods,
TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
},
ChainSpec, KeepBlocks, TracingReceiver, TransactionStorageMode,
};
Expand Down Expand Up @@ -348,13 +348,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
Ok(None)
}

/// Get the RPC HTTP thread pool size (`None` for a default 4-thread pool config).
///
/// By default this is `None`.
fn rpc_http_threads(&self) -> Result<Option<usize>> {
Ok(None)
}

/// Get the RPC cors (`None` if disabled)
///
/// By default this is `Some(Vec::new())`.
Expand Down Expand Up @@ -465,7 +458,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
fn create_configuration<C: SubstrateCli>(
&self,
cli: &C,
task_executor: TaskExecutor,
tokio_handle: tokio::runtime::Handle,
) -> Result<Configuration> {
let is_dev = self.is_dev()?;
let chain_id = self.chain_id(is_dev)?;
Expand All @@ -490,7 +483,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
Ok(Configuration {
impl_name: C::impl_name(),
impl_version: C::impl_version(),
task_executor,
tokio_handle,
transaction_pool: self.transaction_pool()?,
network: self.network_config(
&chain_spec,
Expand Down Expand Up @@ -518,7 +511,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
rpc_ipc: self.rpc_ipc()?,
rpc_methods: self.rpc_methods()?,
rpc_ws_max_connections: self.rpc_ws_max_connections()?,
rpc_http_threads: self.rpc_http_threads()?,
rpc_cors: self.rpc_cors(is_dev)?,
rpc_max_payload: self.rpc_max_payload()?,
prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?,
Expand Down
6 changes: 3 additions & 3 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use config::*;
pub use error::*;
pub use params::*;
pub use runner::*;
use sc_service::Configuration;
pub use sc_service::{ChainSpec, Role};
use sc_service::{Configuration, TaskExecutor};
pub use sc_tracing::logging::LoggerBuilder;
pub use sp_version::RuntimeVersion;
use std::io::Write;
Expand Down Expand Up @@ -216,9 +216,9 @@ pub trait SubstrateCli: Sized {
fn create_configuration<T: CliConfiguration<DVC>, DVC: DefaultConfigurationValues>(
&self,
command: &T,
task_executor: TaskExecutor,
tokio_handle: tokio::runtime::Handle,
) -> error::Result<Configuration> {
command.create_configuration(self, task_executor)
command.create_configuration(self, tokio_handle)
}

/// Create a runner for the command provided in argument. This will create a Configuration and
Expand Down
11 changes: 2 additions & 9 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{error::Error as CliError, CliConfiguration, Result, SubstrateCli};
use chrono::prelude::*;
use futures::{future, future::FutureExt, pin_mut, select, Future};
use log::info;
use sc_service::{Configuration, Error as ServiceError, TaskManager, TaskType};
use sc_service::{Configuration, Error as ServiceError, TaskManager};
use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use std::marker::PhantomData;

Expand Down Expand Up @@ -116,15 +116,8 @@ impl<C: SubstrateCli> Runner<C> {
let tokio_runtime = build_runtime()?;
let runtime_handle = tokio_runtime.handle().clone();

let task_executor = move |fut, task_type| match task_type {
TaskType::Async => runtime_handle.spawn(fut).map(drop),
TaskType::Blocking => runtime_handle
.spawn_blocking(move || futures::executor::block_on(fut))
.map(drop),
};

Ok(Runner {
config: command.create_configuration(cli, task_executor.into())?,
config: command.create_configuration(cli, runtime_handle)?,
tokio_runtime,
phantom: PhantomData,
})
Expand Down
1 change: 1 addition & 0 deletions client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pubsub = { package = "jsonrpc-pubsub", version = "18.0.0" }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.9.0"}
serde_json = "1.0.41"
tokio = "1.10"
http = { package = "jsonrpc-http-server", version = "18.0.0" }
ipc = { package = "jsonrpc-ipc-server", version = "18.0.0" }
ws = { package = "jsonrpc-ws-server", version = "18.0.0" }
10 changes: 5 additions & 5 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ pub const RPC_MAX_PAYLOAD_DEFAULT: usize = 15 * MEGABYTE;
/// Default maximum number of connections for WS RPC servers.
const WS_MAX_CONNECTIONS: usize = 100;

/// Default thread pool size for RPC HTTP servers.
const HTTP_THREADS: usize = 4;

/// The RPC IoHandler containing all requested APIs.
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;

Expand Down Expand Up @@ -130,17 +127,18 @@ impl ws::SessionStats for ServerMetrics {
/// Start HTTP server listening on given address.
pub fn start_http<M: pubsub::PubSubMetadata + Default + Unpin>(
addr: &std::net::SocketAddr,
thread_pool_size: Option<usize>,
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
tokio_handle: tokio::runtime::Handle,
) -> io::Result<http::Server> {
let max_request_body_size = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);

http::ServerBuilder::new(io)
.threads(thread_pool_size.unwrap_or(HTTP_THREADS))
.threads(1)
.event_loop_executor(tokio_handle)
.health_api(("/health", "system_health"))
.allowed_hosts(hosts_filtering(cors.is_some()))
.rest_api(if cors.is_some() { http::RestApi::Secure } else { http::RestApi::Unsecure })
Expand Down Expand Up @@ -175,13 +173,15 @@ pub fn start_ws<
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
server_metrics: ServerMetrics,
tokio_handle: tokio::runtime::Handle,
) -> io::Result<ws::Server> {
let rpc_max_payload = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
.unwrap_or(RPC_MAX_PAYLOAD_DEFAULT);
ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| {
context.sender().into()
})
.event_loop_executor(tokio_handle)
.max_payload(rpc_max_payload)
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
.allowed_origins(map_cors(cors))
Expand Down
2 changes: 1 addition & 1 deletion client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ parity-util-mem = { version = "0.10.0", default-features = false, features = [
"primitive-types",
] }
async-trait = "0.1.50"
tokio = { version = "1.10", features = ["time", "rt-multi-thread"] }
tempfile = "3.1.0"
directories = "3.0.2"

[dev-dependencies]
substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" }
substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime/" }
tokio = { version = "1.10", features = ["time"] }
async-std = { version = "1.6.5", default-features = false }
4 changes: 2 additions & 2 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ where

let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry)?
TaskManager::new(config.tokio_handle.clone(), registry)?
};

let chain_spec = &config.chain_spec;
Expand Down Expand Up @@ -372,7 +372,7 @@ where
let keystore_container = KeystoreContainer::new(&config.keystore)?;
let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry)?
TaskManager::new(config.tokio_handle.clone(), registry)?
};

let db_storage = {
Expand Down
Loading