diff --git a/Cargo.lock b/Cargo.lock index 9381afc45219..23abf35bbaba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3123,6 +3123,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hex-literal" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0" + [[package]] name = "hex-literal" version = "0.4.1" @@ -3693,7 +3699,7 @@ dependencies = [ "frame-system-benchmarking", "frame-system-rpc-runtime-api", "frame-try-runtime", - "hex-literal", + "hex-literal 0.4.1", "kusama-runtime-constants", "log", "pallet-authority-discovery", @@ -6580,7 +6586,7 @@ dependencies = [ "nix 0.26.2", "polkadot-cli", "polkadot-core-primitives", - "polkadot-node-core-pvf", + "polkadot-node-core-pvf-worker", "polkadot-overseer", "substrate-rpc-client", "tempfile", @@ -6706,7 +6712,7 @@ dependencies = [ "futures", "log", "polkadot-client", - "polkadot-node-core-pvf", + "polkadot-node-core-pvf-worker", "polkadot-node-metrics", "polkadot-performance-test", "polkadot-service", @@ -6720,6 +6726,7 @@ dependencies = [ "sp-core", "sp-io", "sp-keyring", + "sp-maybe-compressed-blob", "substrate-build-script-utils", "thiserror", "try-runtime-cli", @@ -7175,10 +7182,9 @@ version = "0.9.41" dependencies = [ "always-assert", "assert_matches", - "cpu-time", "futures", "futures-timer", - "hex-literal", + "hex-literal 0.3.4", "libc", "parity-scale-codec", "pin-project", @@ -7188,22 +7194,13 @@ dependencies = [ "polkadot-parachain", "polkadot-primitives", "rand 0.8.5", - "rayon", - "sc-executor", - "sc-executor-common", - "sc-executor-wasmtime", "slotmap", "sp-core", - "sp-externalities", - "sp-io", "sp-maybe-compressed-blob", "sp-tracing", "sp-wasm-interface", "substrate-build-script-utils", "tempfile", - "test-parachain-adder", - "test-parachain-halt", - "tikv-jemalloc-ctl", "tokio", "tracing-gum", ] @@ -7231,6 +7228,36 @@ dependencies = [ "tracing-gum", ] +[[package]] +name = "polkadot-node-core-pvf-worker" +version = "0.9.41" +dependencies = [ + "assert_matches", + "cpu-time", + "futures", + "libc", + "parity-scale-codec", + "polkadot-node-core-pvf", + "polkadot-parachain", + "polkadot-primitives", + "rayon", + "sc-executor", + "sc-executor-common", + "sc-executor-wasmtime", + "sp-core", + "sp-externalities", + "sp-io", + "sp-maybe-compressed-blob", + "sp-tracing", + "substrate-build-script-utils", + "tempfile", + "test-parachain-adder", + "test-parachain-halt", + "tikv-jemalloc-ctl", + "tokio", + "tracing-gum", +] + [[package]] name = "polkadot-node-core-runtime-api" version = "0.9.41" @@ -7480,10 +7507,12 @@ dependencies = [ "kusama-runtime", "log", "polkadot-erasure-coding", - "polkadot-node-core-pvf", + "polkadot-node-core-pvf-worker", "polkadot-node-primitives", "polkadot-primitives", "quote", + "sc-executor-common", + "sp-maybe-compressed-blob", "thiserror", ] @@ -7492,7 +7521,7 @@ name = "polkadot-primitives" version = "0.9.41" dependencies = [ "bitvec", - "hex-literal", + "hex-literal 0.4.1", "parity-scale-codec", "polkadot-core-primitives", "polkadot-parachain", @@ -7569,7 +7598,7 @@ dependencies = [ "frame-system-benchmarking", "frame-system-rpc-runtime-api", "frame-try-runtime", - "hex-literal", + "hex-literal 0.4.1", "log", "pallet-authority-discovery", "pallet-authorship", @@ -7666,7 +7695,7 @@ dependencies = [ "frame-support", "frame-support-test", "frame-system", - "hex-literal", + "hex-literal 0.4.1", "impl-trait-for-tuples", "libsecp256k1", "log", @@ -7744,7 +7773,7 @@ dependencies = [ "frame-support-test", "frame-system", "futures", - "hex-literal", + "hex-literal 0.4.1", "log", "pallet-authority-discovery", "pallet-authorship", @@ -7795,7 +7824,7 @@ dependencies = [ "frame-support", "frame-system-rpc-runtime-api", "futures", - "hex-literal", + "hex-literal 0.4.1", "kusama-runtime", "kusama-runtime-constants", "kvdb", @@ -7981,7 +8010,7 @@ dependencies = [ "polkadot-node-core-backing", "polkadot-node-core-candidate-validation", "polkadot-node-core-dispute-coordinator", - "polkadot-node-core-pvf", + "polkadot-node-core-pvf-worker", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", @@ -8004,7 +8033,7 @@ dependencies = [ "frame-support", "frame-system", "frame-system-rpc-runtime-api", - "hex-literal", + "hex-literal 0.4.1", "log", "pallet-authority-discovery", "pallet-authorship", @@ -8887,7 +8916,7 @@ dependencies = [ "frame-system-benchmarking", "frame-system-rpc-runtime-api", "frame-try-runtime", - "hex-literal", + "hex-literal 0.4.1", "log", "pallet-authority-discovery", "pallet-authorship", @@ -12041,7 +12070,7 @@ dependencies = [ "log", "parity-scale-codec", "polkadot-cli", - "polkadot-node-core-pvf", + "polkadot-node-core-pvf-worker", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-parachain", @@ -12089,7 +12118,7 @@ dependencies = [ "log", "parity-scale-codec", "polkadot-cli", - "polkadot-node-core-pvf", + "polkadot-node-core-pvf-worker", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-parachain", @@ -13649,7 +13678,7 @@ dependencies = [ "frame-system-benchmarking", "frame-system-rpc-runtime-api", "frame-try-runtime", - "hex-literal", + "hex-literal 0.4.1", "log", "pallet-authority-discovery", "pallet-authorship", @@ -14048,7 +14077,7 @@ dependencies = [ "bounded-collections", "derivative", "hex", - "hex-literal", + "hex-literal 0.4.1", "impl-trait-for-tuples", "log", "parity-scale-codec", diff --git a/Cargo.toml b/Cargo.toml index ded5dda0aa2e..886b489a5024 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ tikv-jemallocator = "0.5.0" # Crates in our workspace, defined as dependencies so we can pass them feature flags. polkadot-cli = { path = "cli", features = [ "kusama-native", "westend-native", "rococo-native" ] } -polkadot-node-core-pvf = { path = "node/core/pvf" } +polkadot-node-core-pvf-worker = { path = "node/core/pvf/worker" } polkadot-overseer = { path = "node/overseer" } [dev-dependencies] @@ -80,6 +80,7 @@ members = [ "node/core/parachains-inherent", "node/core/provisioner", "node/core/pvf", + "node/core/pvf/worker", "node/core/pvf-checker", "node/core/runtime-api", "node/network/approval-distribution", @@ -206,7 +207,7 @@ try-runtime = [ "polkadot-cli/try-runtime" ] fast-runtime = [ "polkadot-cli/fast-runtime" ] runtime-metrics = [ "polkadot-cli/runtime-metrics" ] pyroscope = ["polkadot-cli/pyroscope"] -jemalloc-allocator = ["polkadot-node-core-pvf/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"] +jemalloc-allocator = ["polkadot-node-core-pvf-worker/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"] # Configuration for building a .deb package - for use with `cargo-deb` [package.metadata.deb] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 01247bbc996f..4d08ee18ed1b 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -22,12 +22,13 @@ pyro = { package = "pyroscope", version = "0.3.1", optional = true } service = { package = "polkadot-service", path = "../node/service", default-features = false, optional = true } polkadot-client = { path = "../node/client", optional = true } -polkadot-node-core-pvf = { path = "../node/core/pvf", optional = true } +polkadot-node-core-pvf-worker = { path = "../node/core/pvf/worker", optional = true } polkadot-performance-test = { path = "../node/test/performance-test", optional = true } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } frame-benchmarking-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true } try-runtime-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true } sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true } @@ -52,7 +53,7 @@ cli = [ "frame-benchmarking-cli", "try-runtime-cli", "polkadot-client", - "polkadot-node-core-pvf", + "polkadot-node-core-pvf-worker", ] runtime-benchmarks = [ "service/runtime-benchmarks", diff --git a/cli/src/command.rs b/cli/src/command.rs index 2f0bc9e2f856..c0e96de2a54b 100644 --- a/cli/src/command.rs +++ b/cli/src/command.rs @@ -494,7 +494,7 @@ pub fn run() -> Result<()> { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf::prepare_worker_entrypoint( + polkadot_node_core_pvf_worker::prepare_worker_entrypoint( &cmd.socket_path, Some(&cmd.node_impl_version), ); @@ -516,7 +516,7 @@ pub fn run() -> Result<()> { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf::execute_worker_entrypoint( + polkadot_node_core_pvf_worker::execute_worker_entrypoint( &cmd.socket_path, Some(&cmd.node_impl_version), ); diff --git a/cli/src/host_perf_check.rs b/cli/src/host_perf_check.rs index 1225c4708a3a..adfdebce6779 100644 --- a/cli/src/host_perf_check.rs +++ b/cli/src/host_perf_check.rs @@ -15,7 +15,6 @@ // along with Polkadot. If not, see . use log::info; -use polkadot_node_core_pvf::sp_maybe_compressed_blob; use polkadot_performance_test::{ measure_erasure_coding, measure_pvf_prepare, PerfCheckError, ERASURE_CODING_N_VALIDATORS, ERASURE_CODING_TIME_LIMIT, PVF_PREPARE_TIME_LIMIT, VALIDATION_CODE_BOMB_LIMIT, diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index 20dcd5afdfaf..026930758b86 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -4,24 +4,15 @@ version.workspace = true authors.workspace = true edition.workspace = true -[[bin]] -name = "puppet_worker" -path = "bin/puppet_worker.rs" - [dependencies] always-assert = "0.1" -assert_matches = "1.4.0" -cpu-time = "1.0.0" futures = "0.3.21" futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../../gum" } libc = "0.2.139" pin-project = "1.0.9" rand = "0.8.5" -rayon = "1.5.1" slotmap = "1.0" -tempfile = "3.3.0" -tikv-jemalloc-ctl = { version = "0.5.0", optional = true } tokio = { version = "1.24.2", features = ["fs", "process"] } parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] } @@ -30,13 +21,8 @@ polkadot-parachain = { path = "../../../parachain" } polkadot-core-primitives = { path = "../../../core-primitives" } polkadot-node-metrics = { path = "../../metrics" } polkadot-node-primitives = { path = "../../primitives" } - polkadot-primitives = { path = "../../../primitives" } -sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" } -sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" } -sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" } -sp-externalities = { git = "https://github.com/paritytech/substrate", branch = "master" } -sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" } + sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } @@ -45,14 +31,7 @@ sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master [build-dependencies] substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } -[target.'cfg(target_os = "linux")'.dependencies] -tikv-jemalloc-ctl = "0.5.0" - [dev-dependencies] -adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" } -halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" } -hex-literal = "0.4.1" +assert_matches = "1.4.0" +hex-literal = "0.3.4" tempfile = "3.3.0" - -[features] -jemalloc-allocator = ["dep:tikv-jemalloc-ctl"] diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index f0f8a0f8af28..d5a660cc3aa5 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -65,9 +65,11 @@ use std::{ time::{Duration, SystemTime}, }; +/// Contains the bytes for a successfully compiled artifact. pub struct CompiledArtifact(Vec); impl CompiledArtifact { + /// Creates a `CompiledArtifact`. pub fn new(code: Vec) -> Self { Self(code) } diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index 662fcc22cd31..21f23d515fdd 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -16,7 +16,7 @@ use crate::prepare::PrepareStats; use parity_scale_codec::{Decode, Encode}; -use std::{any::Any, fmt}; +use std::fmt; /// Result of PVF preparation performed by the validation host. Contains stats about the preparation if /// successful @@ -126,17 +126,3 @@ impl From for ValidationError { } } } - -/// Attempt to convert an opaque panic payload to a string. -/// -/// This is a best effort, and is not guaranteed to provide the most accurate value. -pub(crate) fn stringify_panic_payload(payload: Box) -> String { - match payload.downcast::<&'static str>() { - Ok(msg) => msg.to_string(), - Err(payload) => match payload.downcast::() { - Ok(msg) => *msg, - // At least we tried... - Err(_) => "unknown panic payload".to_string(), - }, - } -} diff --git a/node/core/pvf/src/execute/mod.rs b/node/core/pvf/src/execute/mod.rs index b0e8cc482561..8e3b17d71569 100644 --- a/node/core/pvf/src/execute/mod.rs +++ b/node/core/pvf/src/execute/mod.rs @@ -18,10 +18,10 @@ //! //! The validation host [runs the queue][`start`] communicating with it by sending [`ToQueue`] //! messages. The queue will spawn workers in new processes. Those processes should jump to -//! [`worker_entrypoint`]. +//! `polkadot_node_core_pvf_worker::execute_worker_entrypoint`. mod queue; -mod worker; +mod worker_intf; pub use queue::{start, PendingExecutionRequest, ToQueue}; -pub use worker::{worker_entrypoint, Response as ExecuteResponse}; +pub use worker_intf::{Handshake as ExecuteHandshake, Response as ExecuteResponse}; diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index e1e02205256b..5b3e21cee079 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -16,7 +16,7 @@ //! A queue that handles requests for PVF execution. -use super::worker::Outcome; +use super::worker_intf::Outcome; use crate::{ artifacts::{ArtifactId, ArtifactPathId}, host::ResultSender, @@ -416,7 +416,8 @@ async fn spawn_worker_task( use futures_timer::Delay; loop { - match super::worker::spawn(&program_path, job.executor_params.clone(), spawn_timeout).await + match super::worker_intf::spawn(&program_path, job.executor_params.clone(), spawn_timeout) + .await { Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job), Err(err) => { @@ -460,9 +461,13 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { queue.mux.push( async move { let _timer = execution_timer; - let outcome = - super::worker::start_work(idle, job.artifact.clone(), job.exec_timeout, job.params) - .await; + let outcome = super::worker_intf::start_work( + idle, + job.artifact.clone(), + job.exec_timeout, + job.params, + ) + .await; QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx) } .boxed(), diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker_intf.rs similarity index 55% rename from node/core/pvf/src/execute/worker.rs rename to node/core/pvf/src/execute/worker_intf.rs index f20874083be3..bc467cf90de6 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker_intf.rs @@ -14,28 +14,23 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +//! Host interface to the execute worker. + use crate::{ artifacts::ArtifactPathId, - executor_intf::Executor, worker_common::{ - bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, - spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, - JOB_TIMEOUT_WALL_CLOCK_FACTOR, + framed_recv, framed_send, path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr, + WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; -use cpu_time::ProcessTime; -use futures::{pin_mut, select_biased, FutureExt}; +use futures::FutureExt; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationResult; use polkadot_primitives::ExecutorParams; -use std::{ - path::{Path, PathBuf}, - sync::{mpsc::channel, Arc}, - time::Duration, -}; +use std::{path::Path, time::Duration}; use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. @@ -185,17 +180,6 @@ async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Re framed_send(stream, &handshake.encode()).await } -async fn recv_handshake(stream: &mut UnixStream) -> io::Result { - let handshake_enc = framed_recv(stream).await?; - let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| { - io::Error::new( - io::ErrorKind::Other, - "execute pvf recv_handshake: failed to decode Handshake".to_owned(), - ) - })?; - Ok(handshake) -} - async fn send_request( stream: &mut UnixStream, artifact_path: &Path, @@ -207,29 +191,6 @@ async fn send_request( framed_send(stream, &execution_timeout.encode()).await } -async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec, Duration)> { - let artifact_path = framed_recv(stream).await?; - let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "execute pvf recv_request: non utf-8 artifact path".to_string(), - ) - })?; - let params = framed_recv(stream).await?; - let execution_timeout = framed_recv(stream).await?; - let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| { - io::Error::new( - io::ErrorKind::Other, - "execute pvf recv_request: failed to decode duration".to_string(), - ) - })?; - Ok((artifact_path, params, execution_timeout)) -} - -async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> { - framed_send(stream, &response.encode()).await -} - async fn recv_response(stream: &mut UnixStream) -> io::Result { let response_bytes = framed_recv(stream).await?; Response::decode(&mut &response_bytes[..]).map_err(|e| { @@ -240,28 +201,43 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result { }) } +/// The payload of the one-time handshake that is done when a worker process is created. Carries +/// data from the host to the worker. #[derive(Encode, Decode)] -struct Handshake { - executor_params: ExecutorParams, +pub struct Handshake { + /// The executor parameters. + pub executor_params: ExecutorParams, } +/// The response from an execution job on the worker. #[derive(Encode, Decode)] pub enum Response { - Ok { result_descriptor: ValidationResult, duration: Duration }, + /// The job completed successfully. + Ok { + /// The result of parachain validation. + result_descriptor: ValidationResult, + /// The amount of CPU time taken by the job. + duration: Duration, + }, + /// The candidate is invalid. InvalidCandidate(String), + /// The job timed out. TimedOut, + /// Some internal error occurred. Should only be used for errors independent of the candidate. InternalError(String), } impl Response { - fn format_invalid(ctx: &'static str, msg: &str) -> Self { + /// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty). + pub fn format_invalid(ctx: &'static str, msg: &str) -> Self { if msg.is_empty() { Self::InvalidCandidate(ctx.to_string()) } else { Self::InvalidCandidate(format!("{}: {}", ctx, msg)) } } - fn format_internal(ctx: &'static str, msg: &str) -> Self { + /// Creates an internal response from a context `ctx` and a message `msg` (which can be empty). + pub fn format_internal(ctx: &'static str, msg: &str) -> Self { if msg.is_empty() { Self::InternalError(ctx.to_string()) } else { @@ -269,110 +245,3 @@ impl Response { } } } - -/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies -/// the path to the socket used to communicate with the host. The `node_version`, if `Some`, -/// is checked against the worker version. A mismatch results in immediate worker termination. -/// `None` is used for tests and in other situations when version check is not necessary. -pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { - worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move { - let worker_pid = std::process::id(); - - let handshake = recv_handshake(&mut stream).await?; - let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| { - io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) - })?); - - loop { - let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; - gum::debug!( - target: LOG_TARGET, - %worker_pid, - "worker: validating artifact {}", - artifact_path.display(), - ); - - // Used to signal to the cpu time monitor thread that it can finish. - let (finished_tx, finished_rx) = channel::<()>(); - let cpu_time_start = ProcessTime::now(); - - // Spawn a new thread that runs the CPU time monitor. - let cpu_time_monitor_fut = rt_handle - .spawn_blocking(move || { - cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx) - }) - .fuse(); - let executor_2 = executor.clone(); - let execute_fut = rt_handle - .spawn_blocking(move || { - validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) - }) - .fuse(); - - pin_mut!(cpu_time_monitor_fut); - pin_mut!(execute_fut); - - let response = select_biased! { - // If this future is not selected, the join handle is dropped and the thread will - // finish in the background. - cpu_time_monitor_res = cpu_time_monitor_fut => { - match cpu_time_monitor_res { - Ok(Some(cpu_time_elapsed)) => { - // Log if we exceed the timeout and the other thread hasn't finished. - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "execute job took {}ms cpu time, exceeded execute timeout {}ms", - cpu_time_elapsed.as_millis(), - execution_timeout.as_millis(), - ); - Response::TimedOut - }, - Ok(None) => Response::InternalError("error communicating over finished channel".into()), - Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()), - } - }, - execute_res = execute_fut => { - let _ = finished_tx.send(()); - execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string())) - }, - }; - - send_response(&mut stream, response).await?; - } - }); -} - -fn validate_using_artifact( - artifact_path: &Path, - params: &[u8], - executor: Arc, - cpu_time_start: ProcessTime, -) -> Response { - // Check here if the file exists, because the error from Substrate is not match-able. - // TODO: Re-evaluate after . - let file_metadata = std::fs::metadata(artifact_path); - if let Err(err) = file_metadata { - return Response::format_internal("execute: could not find or open file", &err.to_string()) - } - - let descriptor_bytes = match unsafe { - // SAFETY: this should be safe since the compiled artifact passed here comes from the - // file created by the prepare workers. These files are obtained by calling - // [`executor_intf::prepare`]. - executor.execute(artifact_path.as_ref(), params) - } { - Err(err) => return Response::format_invalid("execute", &err), - Ok(d) => d, - }; - - let duration = cpu_time_start.elapsed(); - - let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) { - Err(err) => - return Response::format_invalid("validation result decoding failed", &err.to_string()), - Ok(r) => r, - }; - - Response::Ok { result_descriptor, duration } -} diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index e8f174b89cd2..cdaee3341402 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -29,11 +29,11 @@ //! //! Then using the handle the client can send three types of requests: //! -//! (a) PVF pre-checking. This takes the PVF [code][`Pvf`] and tries to prepare it (verify and +//! (a) PVF pre-checking. This takes the `Pvf` code and tries to prepare it (verify and //! compile) in order to pre-check its validity. //! //! (b) PVF execution. This accepts the PVF [`params`][`polkadot_parachain::primitives::ValidationParams`] -//! and the PVF [code][`Pvf`], prepares (verifies and compiles) the code, and then executes PVF +//! and the `Pvf` code, prepares (verifies and compiles) the code, and then executes PVF //! with the `params`. //! //! (c) Heads up. This request allows to signal that the given PVF may be needed soon and that it @@ -91,7 +91,6 @@ mod artifacts; mod error; mod execute; -mod executor_intf; mod host; mod metrics; mod prepare; @@ -99,27 +98,22 @@ mod priority; mod pvf; mod worker_common; -#[doc(hidden)] -pub mod testing; - -#[doc(hidden)] -pub use sp_tracing; - +pub use artifacts::CompiledArtifact; pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError}; -pub use prepare::PrepareStats; +pub use execute::{ExecuteHandshake, ExecuteResponse}; +#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] +pub use prepare::MemoryAllocationStats; +pub use prepare::{MemoryStats, PrepareStats}; pub use priority::Priority; pub use pvf::PvfPrepData; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; -pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR; - -pub use execute::worker_entrypoint as execute_worker_entrypoint; -pub use prepare::worker_entrypoint as prepare_worker_entrypoint; - -pub use executor_intf::{prepare, prevalidate}; - -pub use sc_executor_common; -pub use sp_maybe_compressed_blob; +pub use worker_common::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR}; const LOG_TARGET: &str = "parachain::pvf"; + +#[doc(hidden)] +pub mod testing { + pub use crate::worker_common::{spawn_with_program_path, SpawnErr}; +} diff --git a/node/core/pvf/src/prepare/mod.rs b/node/core/pvf/src/prepare/mod.rs index d8d036a82238..de40c48464c4 100644 --- a/node/core/pvf/src/prepare/mod.rs +++ b/node/core/pvf/src/prepare/mod.rs @@ -20,23 +20,44 @@ //! (by running [`start_pool`]). //! //! The pool will spawn workers in new processes and those should execute pass control to -//! [`worker_entrypoint`]. +//! `polkadot_node_core_pvf_worker::prepare_worker_entrypoint`. -mod memory_stats; mod pool; mod queue; -mod worker; +mod worker_intf; -pub use memory_stats::MemoryStats; pub use pool::start as start_pool; pub use queue::{start as start_queue, FromQueue, ToQueue}; -pub use worker::worker_entrypoint; use parity_scale_codec::{Decode, Encode}; /// Preparation statistics, including the CPU time and memory taken. #[derive(Debug, Clone, Default, Encode, Decode)] pub struct PrepareStats { - cpu_time_elapsed: std::time::Duration, - memory_stats: MemoryStats, + /// The CPU time that elapsed for the preparation job. + pub cpu_time_elapsed: std::time::Duration, + /// The observed memory statistics for the preparation job. + pub memory_stats: MemoryStats, +} + +/// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if +/// supported by the OS, `ru_maxrss`. +#[derive(Clone, Debug, Default, Encode, Decode)] +pub struct MemoryStats { + /// Memory stats from `tikv_jemalloc_ctl`. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + pub memory_tracker_stats: Option, + /// `ru_maxrss` from `getrusage`. `None` if an error occurred. + #[cfg(target_os = "linux")] + pub max_rss: Option, +} + +/// Statistics of collected memory metrics. +#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] +#[derive(Clone, Debug, Default, Encode, Decode)] +pub struct MemoryAllocationStats { + /// Total resident memory, in bytes. + pub resident: u64, + /// Total allocated memory, in bytes. + pub allocated: u64, } diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index f8435a40348d..d151f097805e 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use super::worker::{self, Outcome}; +use super::worker_intf::{self, Outcome}; use crate::{ error::{PrepareError, PrepareResult}, metrics::Metrics, @@ -250,7 +250,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po use futures_timer::Delay; loop { - match worker::spawn(&program_path, spawn_timeout).await { + match worker_intf::spawn(&program_path, spawn_timeout).await { Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle), Err(err) => { gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err); @@ -271,7 +271,7 @@ async fn start_work_task( artifact_path: PathBuf, _preparation_timer: Option, ) -> PoolEvent { - let outcome = worker::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await; + let outcome = worker_intf::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await; PoolEvent::StartWork(worker, outcome) } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 20ee95a435b2..f84d5ab0e56e 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -226,7 +226,7 @@ async fn handle_enqueue( target: LOG_TARGET, validation_code_hash = ?pvf.code_hash(), ?priority, - preparation_timeout = ?pvf.prep_timeout, + preparation_timeout = ?pvf.prep_timeout(), "PVF is enqueued for preparation.", ); queue.metrics.prepare_enqueued(); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker_intf.rs similarity index 55% rename from node/core/pvf/src/prepare/worker.rs rename to node/core/pvf/src/prepare/worker_intf.rs index 3b2ae211e6ca..daf94aadc672 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker_intf.rs @@ -14,33 +14,24 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -#[cfg(target_os = "linux")] -use super::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread}; -#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] -use super::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; -use super::memory_stats::MemoryStats; +//! Host interface to the prepare worker. + use crate::{ - artifacts::CompiledArtifact, error::{PrepareError, PrepareResult}, metrics::Metrics, prepare::PrepareStats, pvf::PvfPrepData, worker_common::{ - bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, - spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, - JOB_TIMEOUT_WALL_CLOCK_FACTOR, + framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker, + SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; -use cpu_time::ProcessTime; -use futures::{pin_mut, select_biased, FutureExt}; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{ - panic, path::{Path, PathBuf}, - sync::mpsc::channel, time::Duration, }; use tokio::{io, net::UnixStream}; @@ -104,7 +95,7 @@ pub async fn start_work( ); with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { - let preparation_timeout = pvf.prep_timeout; + let preparation_timeout = pvf.prep_timeout(); if let Err(err) = send_request(&mut stream, pvf, &tmp_file).await { gum::warn!( target: LOG_TARGET, @@ -285,28 +276,6 @@ async fn send_request( Ok(()) } -async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> { - let pvf = framed_recv(stream).await?; - let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e), - ) - })?; - let tmp_file = framed_recv(stream).await?; - let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "prepare pvf recv_request: non utf-8 artifact path".to_string(), - ) - })?; - Ok((pvf, tmp_file)) -} - -async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { - framed_send(stream, &result.encode()).await -} - async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result { let result = framed_recv(stream).await?; let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { @@ -325,158 +294,3 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result) { - worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move { - let worker_pid = std::process::id(); - - loop { - let (pvf, dest) = recv_request(&mut stream).await?; - gum::debug!( - target: LOG_TARGET, - %worker_pid, - "worker: preparing artifact", - ); - - let cpu_time_start = ProcessTime::now(); - let preparation_timeout = pvf.prep_timeout; - - // Run the memory tracker. - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let (memory_tracker_tx, memory_tracker_rx) = channel::<()>(); - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx)); - - // Spawn a new thread that runs the CPU time monitor. - let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); - let cpu_time_monitor_fut = rt_handle - .spawn_blocking(move || { - cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx) - }) - .fuse(); - // Spawn another thread for preparation. - let prepare_fut = rt_handle - .spawn_blocking(move || { - let result = prepare_artifact(pvf); - - // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. - #[cfg(target_os = "linux")] - let result = result.map(|artifact| (artifact, get_max_rss_thread())); - - result - }) - .fuse(); - - pin_mut!(cpu_time_monitor_fut); - pin_mut!(prepare_fut); - - let result = select_biased! { - // If this future is not selected, the join handle is dropped and the thread will - // finish in the background. - join_res = cpu_time_monitor_fut => { - match join_res { - Ok(Some(cpu_time_elapsed)) => { - // Log if we exceed the timeout and the other thread hasn't finished. - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", - cpu_time_elapsed.as_millis(), - preparation_timeout.as_millis(), - ); - Err(PrepareError::TimedOut) - }, - Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), - Err(err) => Err(PrepareError::IoErr(err.to_string())), - } - }, - prepare_res = prepare_fut => { - let cpu_time_elapsed = cpu_time_start.elapsed(); - let _ = cpu_time_monitor_tx.send(()); - - match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { - Err(err) => { - // Serialized error will be written into the socket. - Err(err) - }, - Ok(ok) => { - // Stop the memory stats worker and get its observed memory stats. - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let memory_tracker_stats = - get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await; - #[cfg(target_os = "linux")] - let (ok, max_rss) = ok; - let memory_stats = MemoryStats { - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - memory_tracker_stats, - #[cfg(target_os = "linux")] - max_rss: extract_max_rss_stat(max_rss, worker_pid), - }; - - // Write the serialized artifact into a temp file. - // - // PVF host only keeps artifacts statuses in its memory, successfully - // compiled code gets stored on the disk (and consequently deserialized - // by execute-workers). The prepare worker is only required to send `Ok` - // to the pool to indicate the success. - - gum::debug!( - target: LOG_TARGET, - %worker_pid, - "worker: writing artifact to {}", - dest.display(), - ); - tokio::fs::write(&dest, &ok).await?; - - Ok(PrepareStats{cpu_time_elapsed, memory_stats}) - }, - } - }, - }; - - send_response(&mut stream, result).await?; - } - }); -} - -fn prepare_artifact(pvf: PvfPrepData) -> Result { - panic::catch_unwind(|| { - let blob = match crate::executor_intf::prevalidate(&pvf.code()) { - Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), - Ok(b) => b, - }; - - match crate::executor_intf::prepare(blob, &pvf.executor_params()) { - Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), - Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), - } - }) - .map_err(|panic_payload| { - PrepareError::Panic(crate::error::stringify_panic_payload(panic_payload)) - }) - .and_then(|inner_result| inner_result) -} diff --git a/node/core/pvf/src/pvf.rs b/node/core/pvf/src/pvf.rs index ad2dc5fcd918..c134cacb4acf 100644 --- a/node/core/pvf/src/pvf.rs +++ b/node/core/pvf/src/pvf.rs @@ -36,13 +36,13 @@ use crate::host::tests::TEST_PREPARATION_TIMEOUT; #[derive(Clone, Encode, Decode)] pub struct PvfPrepData { /// Wasm code (uncompressed) - pub(crate) code: Arc>, + code: Arc>, /// Wasm code hash - pub(crate) code_hash: ValidationCodeHash, + code_hash: ValidationCodeHash, /// Executor environment parameters for the session for which artifact is prepared - pub(crate) executor_params: Arc, + executor_params: Arc, /// Preparation timeout - pub(crate) prep_timeout: Duration, + prep_timeout: Duration, } impl PvfPrepData { @@ -69,15 +69,20 @@ impl PvfPrepData { } /// Returns PVF code - pub(crate) fn code(&self) -> Arc> { + pub fn code(&self) -> Arc> { self.code.clone() } /// Returns executor params - pub(crate) fn executor_params(&self) -> Arc { + pub fn executor_params(&self) -> Arc { self.executor_params.clone() } + /// Returns preparation timeout. + pub fn prep_timeout(&self) -> Duration { + self.prep_timeout + } + /// Creates a structure for tests #[cfg(test)] pub(crate) fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self { diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 3caee34a5d0f..33144616601d 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -17,8 +17,7 @@ //! Common logic for implementation of worker processes. use crate::LOG_TARGET; -use cpu_time::ProcessTime; -use futures::{never::Never, FutureExt as _}; +use futures::FutureExt as _; use futures_timer::Delay; use pin_project::pin_project; use rand::Rng; @@ -26,7 +25,6 @@ use std::{ fmt, mem, path::{Path, PathBuf}, pin::Pin, - sync::mpsc::{Receiver, RecvTimeoutError}, task::{Context, Poll}, time::Duration, }; @@ -34,17 +32,12 @@ use tokio::{ io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}, net::{UnixListener, UnixStream}, process, - runtime::{Handle, Runtime}, }; /// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in /// wall clock time). This is lenient because CPU time may go slower than wall clock time. pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4; -/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the -/// child process. -pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); - /// This is publicly exposed only for integration tests. #[doc(hidden)] pub async fn spawn_with_program_path( @@ -171,92 +164,6 @@ pub async fn tmpfile(prefix: &str) -> io::Result { tmpfile_in(prefix, &temp_dir).await } -pub fn worker_event_loop( - debug_id: &'static str, - socket_path: &str, - node_version: Option<&str>, - mut event_loop: F, -) where - F: FnMut(Handle, UnixStream) -> Fut, - Fut: futures::Future>, -{ - let worker_pid = std::process::id(); - gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id); - - // Check for a mismatch between the node and worker versions. - if let Some(version) = node_version { - if version != env!("SUBSTRATE_CLI_IMPL_VERSION") { - gum::error!( - target: LOG_TARGET, - %worker_pid, - "Node and worker version mismatch, node needs restarting, forcing shutdown", - ); - kill_parent_node_in_emergency(); - let err: io::Result = - Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch")); - gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err); - return - } - } - - // Run the main worker loop. - let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); - let handle = rt.handle(); - let err = rt - .block_on(async move { - let stream = UnixStream::connect(socket_path).await?; - let _ = tokio::fs::remove_file(socket_path).await; - - let result = event_loop(handle.clone(), stream).await; - - result - }) - // It's never `Ok` because it's `Ok(Never)`. - .unwrap_err(); - - gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err); - - // We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast - // as possible and not wait for stalled validation to finish. This isn't strictly necessary now, - // but may be in the future. - rt.shutdown_background(); -} - -/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up -/// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout. -/// -/// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return -/// `None` if the other thread finishes first, without us timing out. -/// -/// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or -/// execution, to be killed by the host. We do not kill the process here because it would interfere -/// with the proper handling of this error. -pub fn cpu_time_monitor_loop( - cpu_time_start: ProcessTime, - timeout: Duration, - finished_rx: Receiver<()>, -) -> Option { - loop { - let cpu_time_elapsed = cpu_time_start.elapsed(); - - // Treat the timeout as CPU time, which is less subject to variance due to load. - if cpu_time_elapsed <= timeout { - // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep - // is wall clock time. The CPU clock may be slower than the wall clock. - let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD; - match finished_rx.recv_timeout(sleep_interval) { - // Received finish signal. - Ok(()) => return None, - // Timed out, restart loop. - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => return None, - } - } - - return Some(cpu_time_elapsed) - } -} - /// A struct that represents an idle worker. /// /// This struct is supposed to be used as a token that is passed by move into a subroutine that @@ -405,12 +312,7 @@ pub fn path_to_bytes(path: &Path) -> &[u8] { path.to_str().expect("non-UTF-8 path").as_bytes() } -/// Interprets the given bytes as a path. Returns `None` if the given bytes do not constitute a -/// a proper utf-8 string. -pub fn bytes_to_path(bytes: &[u8]) -> Option { - std::str::from_utf8(bytes).ok().map(PathBuf::from) -} - +/// Write some data prefixed by its length into `w`. pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> { let len_buf = buf.len().to_le_bytes(); w.write_all(&len_buf).await?; @@ -418,6 +320,7 @@ pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::R Ok(()) } +/// Read some data prefixed by its length from `r`. pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result> { let mut len_buf = [0u8; mem::size_of::()]; r.read_exact(&mut len_buf).await?; @@ -426,20 +329,3 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result r.read_exact(&mut buf).await?; Ok(buf) } - -/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM` -/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node -/// restart should be handled by the node owner. As node exits, unix sockets opened to workers -/// get closed by the OS and other workers receive error on socket read and also exit. Preparation -/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so -/// no leftover artifacts are possible. -fn kill_parent_node_in_emergency() { - unsafe { - // SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in - // some corner cases, which is checked. `kill()` never fails. - let ppid = libc::getppid(); - if ppid > 1 { - libc::kill(ppid, libc::SIGTERM); - } - } -} diff --git a/node/core/pvf/worker/Cargo.toml b/node/core/pvf/worker/Cargo.toml new file mode 100644 index 000000000000..260c6217eb67 --- /dev/null +++ b/node/core/pvf/worker/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "polkadot-node-core-pvf-worker" +version.workspace = true +authors.workspace = true +edition.workspace = true + +[[bin]] +name = "puppet_worker" +path = "bin/puppet_worker.rs" + +[dependencies] +assert_matches = "1.4.0" +cpu-time = "1.0.0" +futures = "0.3.21" +gum = { package = "tracing-gum", path = "../../../gum" } +libc = "0.2.139" +rayon = "1.5.1" +tempfile = "3.3.0" +tikv-jemalloc-ctl = { version = "0.5.0", optional = true } +tokio = "1.24.2" + +parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] } + +polkadot-node-core-pvf = { path = ".." } +polkadot-parachain = { path = "../../../../parachain" } +polkadot-primitives = { path = "../../../../primitives" } + +sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-externalities = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } + +[target.'cfg(target_os = "linux")'.dependencies] +tikv-jemalloc-ctl = "0.5.0" + +[build-dependencies] +substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } + +[dev-dependencies] +adder = { package = "test-parachain-adder", path = "../../../../parachain/test-parachains/adder" } +halt = { package = "test-parachain-halt", path = "../../../../parachain/test-parachains/halt" } +tempfile = "3.3.0" + +[features] +jemalloc-allocator = ["dep:tikv-jemalloc-ctl"] diff --git a/node/core/pvf/bin/puppet_worker.rs b/node/core/pvf/worker/bin/puppet_worker.rs similarity index 92% rename from node/core/pvf/bin/puppet_worker.rs rename to node/core/pvf/worker/bin/puppet_worker.rs index 7f93519d8454..ddd81971292b 100644 --- a/node/core/pvf/bin/puppet_worker.rs +++ b/node/core/pvf/worker/bin/puppet_worker.rs @@ -14,4 +14,4 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -polkadot_node_core_pvf::decl_puppet_worker_main!(); +polkadot_node_core_pvf_worker::decl_puppet_worker_main!(); diff --git a/node/core/pvf/worker/build.rs b/node/core/pvf/worker/build.rs new file mode 100644 index 000000000000..40e9f832586e --- /dev/null +++ b/node/core/pvf/worker/build.rs @@ -0,0 +1,19 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +fn main() { + substrate_build_script_utils::generate_cargo_keys(); +} diff --git a/node/core/pvf/worker/src/common.rs b/node/core/pvf/worker/src/common.rs new file mode 100644 index 000000000000..84bc88701d62 --- /dev/null +++ b/node/core/pvf/worker/src/common.rs @@ -0,0 +1,142 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::LOG_TARGET; +use cpu_time::ProcessTime; +use futures::never::Never; +use std::{ + path::PathBuf, + sync::mpsc::{Receiver, RecvTimeoutError}, + time::Duration, +}; +use tokio::{ + io, + net::UnixStream, + runtime::{Handle, Runtime}, +}; + +/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the +/// child process. +pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); + +/// Interprets the given bytes as a path. Returns `None` if the given bytes do not constitute a +/// a proper utf-8 string. +pub fn bytes_to_path(bytes: &[u8]) -> Option { + std::str::from_utf8(bytes).ok().map(PathBuf::from) +} + +pub fn worker_event_loop( + debug_id: &'static str, + socket_path: &str, + node_version: Option<&str>, + mut event_loop: F, +) where + F: FnMut(Handle, UnixStream) -> Fut, + Fut: futures::Future>, +{ + let worker_pid = std::process::id(); + gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id); + + // Check for a mismatch between the node and worker versions. + if let Some(version) = node_version { + if version != env!("SUBSTRATE_CLI_IMPL_VERSION") { + gum::error!( + target: LOG_TARGET, + %worker_pid, + "Node and worker version mismatch, node needs restarting, forcing shutdown", + ); + kill_parent_node_in_emergency(); + let err: io::Result = + Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch")); + gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err); + return + } + } + + // Run the main worker loop. + let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); + let handle = rt.handle(); + let err = rt + .block_on(async move { + let stream = UnixStream::connect(socket_path).await?; + let _ = tokio::fs::remove_file(socket_path).await; + + let result = event_loop(handle.clone(), stream).await; + + result + }) + // It's never `Ok` because it's `Ok(Never)`. + .unwrap_err(); + + gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err); + + // We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast + // as possible and not wait for stalled validation to finish. This isn't strictly necessary now, + // but may be in the future. + rt.shutdown_background(); +} + +/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up +/// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout. +/// +/// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return +/// `None` if the other thread finishes first, without us timing out. +/// +/// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or +/// execution, to be killed by the host. We do not kill the process here because it would interfere +/// with the proper handling of this error. +pub fn cpu_time_monitor_loop( + cpu_time_start: ProcessTime, + timeout: Duration, + finished_rx: Receiver<()>, +) -> Option { + loop { + let cpu_time_elapsed = cpu_time_start.elapsed(); + + // Treat the timeout as CPU time, which is less subject to variance due to load. + if cpu_time_elapsed <= timeout { + // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep + // is wall clock time. The CPU clock may be slower than the wall clock. + let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD; + match finished_rx.recv_timeout(sleep_interval) { + // Received finish signal. + Ok(()) => return None, + // Timed out, restart loop. + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => return None, + } + } + + return Some(cpu_time_elapsed) + } +} + +/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM` +/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node +/// restart should be handled by the node owner. As node exits, unix sockets opened to workers +/// get closed by the OS and other workers receive error on socket read and also exit. Preparation +/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so +/// no leftover artifacts are possible. +fn kill_parent_node_in_emergency() { + unsafe { + // SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in + // some corner cases, which is checked. `kill()` never fails. + let ppid = libc::getppid(); + if ppid > 1 { + libc::kill(ppid, libc::SIGTERM); + } + } +} diff --git a/node/core/pvf/worker/src/execute.rs b/node/core/pvf/worker/src/execute.rs new file mode 100644 index 000000000000..9f6ff164a2b6 --- /dev/null +++ b/node/core/pvf/worker/src/execute.rs @@ -0,0 +1,175 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::{ + common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop}, + executor_intf::Executor, + LOG_TARGET, +}; +use cpu_time::ProcessTime; +use futures::{pin_mut, select_biased, FutureExt}; +use parity_scale_codec::{Decode, Encode}; +use polkadot_node_core_pvf::{ + framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response, +}; +use polkadot_parachain::primitives::ValidationResult; +use std::{ + path::{Path, PathBuf}, + sync::{mpsc::channel, Arc}, + time::Duration, +}; +use tokio::{io, net::UnixStream}; + +async fn recv_handshake(stream: &mut UnixStream) -> io::Result { + let handshake_enc = framed_recv(stream).await?; + let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_handshake: failed to decode Handshake".to_owned(), + ) + })?; + Ok(handshake) +} + +async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec, Duration)> { + let artifact_path = framed_recv(stream).await?; + let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: non utf-8 artifact path".to_string(), + ) + })?; + let params = framed_recv(stream).await?; + let execution_timeout = framed_recv(stream).await?; + let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: failed to decode duration".to_string(), + ) + })?; + Ok((artifact_path, params, execution_timeout)) +} + +async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> { + framed_send(stream, &response.encode()).await +} + +/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies +/// the path to the socket used to communicate with the host. The `node_version`, if `Some`, +/// is checked against the worker version. A mismatch results in immediate worker termination. +/// `None` is used for tests and in other situations when version check is not necessary. +pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { + worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move { + let worker_pid = std::process::id(); + + let handshake = recv_handshake(&mut stream).await?; + let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) + })?); + + loop { + let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; + gum::debug!( + target: LOG_TARGET, + %worker_pid, + "worker: validating artifact {}", + artifact_path.display(), + ); + + // Used to signal to the cpu time monitor thread that it can finish. + let (finished_tx, finished_rx) = channel::<()>(); + let cpu_time_start = ProcessTime::now(); + + // Spawn a new thread that runs the CPU time monitor. + let cpu_time_monitor_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx) + }) + .fuse(); + let executor_2 = executor.clone(); + let execute_fut = rt_handle + .spawn_blocking(move || { + validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) + }) + .fuse(); + + pin_mut!(cpu_time_monitor_fut); + pin_mut!(execute_fut); + + let response = select_biased! { + // If this future is not selected, the join handle is dropped and the thread will + // finish in the background. + cpu_time_monitor_res = cpu_time_monitor_fut => { + match cpu_time_monitor_res { + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "execute job took {}ms cpu time, exceeded execute timeout {}ms", + cpu_time_elapsed.as_millis(), + execution_timeout.as_millis(), + ); + Response::TimedOut + }, + Ok(None) => Response::InternalError("error communicating over finished channel".into()), + Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()), + } + }, + execute_res = execute_fut => { + let _ = finished_tx.send(()); + execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string())) + }, + }; + + send_response(&mut stream, response).await?; + } + }); +} + +fn validate_using_artifact( + artifact_path: &Path, + params: &[u8], + executor: Arc, + cpu_time_start: ProcessTime, +) -> Response { + // Check here if the file exists, because the error from Substrate is not match-able. + // TODO: Re-evaluate after . + let file_metadata = std::fs::metadata(artifact_path); + if let Err(err) = file_metadata { + return Response::format_internal("execute: could not find or open file", &err.to_string()) + } + + let descriptor_bytes = match unsafe { + // SAFETY: this should be safe since the compiled artifact passed here comes from the + // file created by the prepare workers. These files are obtained by calling + // [`executor_intf::prepare`]. + executor.execute(artifact_path.as_ref(), params) + } { + Err(err) => return Response::format_invalid("execute", &err), + Ok(d) => d, + }; + + let duration = cpu_time_start.elapsed(); + + let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) { + Err(err) => + return Response::format_invalid("validation result decoding failed", &err.to_string()), + Ok(r) => r, + }; + + Response::Ok { result_descriptor, duration } +} diff --git a/node/core/pvf/src/executor_intf.rs b/node/core/pvf/worker/src/executor_intf.rs similarity index 100% rename from node/core/pvf/src/executor_intf.rs rename to node/core/pvf/worker/src/executor_intf.rs diff --git a/node/core/pvf/worker/src/lib.rs b/node/core/pvf/worker/src/lib.rs new file mode 100644 index 000000000000..456362cf8f57 --- /dev/null +++ b/node/core/pvf/worker/src/lib.rs @@ -0,0 +1,73 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +mod common; +mod execute; +mod executor_intf; +mod memory_stats; +mod prepare; + +#[doc(hidden)] +pub mod testing; + +#[doc(hidden)] +pub use sp_tracing; + +pub use execute::worker_entrypoint as execute_worker_entrypoint; +pub use prepare::worker_entrypoint as prepare_worker_entrypoint; + +pub use executor_intf::{prepare, prevalidate}; + +// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are +// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-worker=trace`. +const LOG_TARGET: &str = "parachain::pvf-worker"; + +/// Use this macro to declare a `fn main() {}` that will create an executable that can be used for +/// spawning the desired worker. +#[macro_export(local_inner_macros)] +macro_rules! decl_worker_main { + ($command:tt) => { + fn main() { + $crate::sp_tracing::try_init_simple(); + + let args = std::env::args().collect::>(); + + let mut version = None; + let mut socket_path: &str = ""; + + for i in 1..args.len() { + match args[i].as_ref() { + "--socket-path" => socket_path = args[i + 1].as_str(), + "--node-version" => version = Some(args[i + 1].as_str()), + _ => (), + } + } + + decl_worker_main_command!($command, socket_path, version) + } + }; +} + +#[macro_export] +#[doc(hidden)] +macro_rules! decl_worker_main_command { + (prepare, $socket_path:expr, $version: expr) => { + $crate::prepare_worker_entrypoint(&$socket_path, $version) + }; + (execute, $socket_path:expr, $version: expr) => { + $crate::execute_worker_entrypoint(&$socket_path, $version) + }; +} diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/worker/src/memory_stats.rs similarity index 89% rename from node/core/pvf/src/prepare/memory_stats.rs rename to node/core/pvf/worker/src/memory_stats.rs index 3513a68c79e5..945c849eb1db 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/worker/src/memory_stats.rs @@ -27,38 +27,14 @@ //! for more //! background. -use parity_scale_codec::{Decode, Encode}; - -/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if -/// supported by the OS, `ru_maxrss`. -#[derive(Clone, Debug, Default, Encode, Decode)] -pub struct MemoryStats { - /// Memory stats from `tikv_jemalloc_ctl`. - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - pub memory_tracker_stats: Option, - /// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able. - #[cfg(target_os = "linux")] - pub max_rss: Option, -} - -/// Statistics of collected memory metrics. -#[non_exhaustive] -#[derive(Clone, Debug, Default, Encode, Decode)] -pub struct MemoryAllocationStats { - /// Total resident memory, in bytes. - pub resident: u64, - /// Total allocated memory, in bytes. - pub allocated: u64, -} - /// Module for the memory tracker. The memory tracker runs in its own thread, where it polls memory /// usage at an interval. /// /// NOTE: Requires jemalloc enabled. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] pub mod memory_tracker { - use super::*; use crate::LOG_TARGET; + use polkadot_node_core_pvf::MemoryAllocationStats; use std::{ sync::mpsc::{Receiver, RecvTimeoutError, Sender}, time::Duration, diff --git a/node/core/pvf/worker/src/prepare.rs b/node/core/pvf/worker/src/prepare.rs new file mode 100644 index 000000000000..3cec7439f8df --- /dev/null +++ b/node/core/pvf/worker/src/prepare.rs @@ -0,0 +1,222 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +#[cfg(target_os = "linux")] +use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread}; +#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] +use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; +use crate::{ + common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop}, + prepare, prevalidate, LOG_TARGET, +}; +use cpu_time::ProcessTime; +use futures::{pin_mut, select_biased, FutureExt}; +use parity_scale_codec::{Decode, Encode}; +use polkadot_node_core_pvf::{ + framed_recv, framed_send, CompiledArtifact, MemoryStats, PrepareError, PrepareResult, + PrepareStats, PvfPrepData, +}; +use std::{any::Any, panic, path::PathBuf, sync::mpsc::channel}; +use tokio::{io, net::UnixStream}; + +async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> { + let pvf = framed_recv(stream).await?; + let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e), + ) + })?; + let tmp_file = framed_recv(stream).await?; + let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "prepare pvf recv_request: non utf-8 artifact path".to_string(), + ) + })?; + Ok((pvf, tmp_file)) +} + +async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { + framed_send(stream, &result.encode()).await +} + +/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies +/// the path to the socket used to communicate with the host. The `node_version`, if `Some`, +/// is checked against the worker version. A mismatch results in immediate worker termination. +/// `None` is used for tests and in other situations when version check is not necessary. +/// +/// # Flow +/// +/// This runs the following in a loop: +/// +/// 1. Get the code and parameters for preparation from the host. +/// +/// 2. Start a memory tracker in a separate thread. +/// +/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads. +/// +/// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor +/// thread will trigger first. +/// +/// 5. Stop the memory tracker and get the stats. +/// +/// 6. If compilation succeeded, write the compiled artifact into a temporary file. +/// +/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we +/// send that in the `PrepareResult`. +pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { + worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move { + let worker_pid = std::process::id(); + + loop { + let (pvf, dest) = recv_request(&mut stream).await?; + gum::debug!( + target: LOG_TARGET, + %worker_pid, + "worker: preparing artifact", + ); + + let cpu_time_start = ProcessTime::now(); + let preparation_timeout = pvf.prep_timeout(); + + // Run the memory tracker. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let (memory_tracker_tx, memory_tracker_rx) = channel::<()>(); + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx)); + + // Spawn a new thread that runs the CPU time monitor. + let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); + let cpu_time_monitor_fut = rt_handle + .spawn_blocking(move || { + cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx) + }) + .fuse(); + // Spawn another thread for preparation. + let prepare_fut = rt_handle + .spawn_blocking(move || { + let result = prepare_artifact(pvf); + + // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. + #[cfg(target_os = "linux")] + let result = result.map(|artifact| (artifact, get_max_rss_thread())); + + result + }) + .fuse(); + + pin_mut!(cpu_time_monitor_fut); + pin_mut!(prepare_fut); + + let result = select_biased! { + // If this future is not selected, the join handle is dropped and the thread will + // finish in the background. + join_res = cpu_time_monitor_fut => { + match join_res { + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", + cpu_time_elapsed.as_millis(), + preparation_timeout.as_millis(), + ); + Err(PrepareError::TimedOut) + }, + Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), + Err(err) => Err(PrepareError::IoErr(err.to_string())), + } + }, + prepare_res = prepare_fut => { + let cpu_time_elapsed = cpu_time_start.elapsed(); + let _ = cpu_time_monitor_tx.send(()); + + match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { + Err(err) => { + // Serialized error will be written into the socket. + Err(err) + }, + Ok(ok) => { + // Stop the memory stats worker and get its observed memory stats. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let memory_tracker_stats = + get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await; + #[cfg(target_os = "linux")] + let (ok, max_rss) = ok; + let memory_stats = MemoryStats { + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + memory_tracker_stats, + #[cfg(target_os = "linux")] + max_rss: extract_max_rss_stat(max_rss, worker_pid), + }; + + // Write the serialized artifact into a temp file. + // + // PVF host only keeps artifacts statuses in its memory, successfully + // compiled code gets stored on the disk (and consequently deserialized + // by execute-workers). The prepare worker is only required to send `Ok` + // to the pool to indicate the success. + + gum::debug!( + target: LOG_TARGET, + %worker_pid, + "worker: writing artifact to {}", + dest.display(), + ); + tokio::fs::write(&dest, &ok).await?; + + Ok(PrepareStats{cpu_time_elapsed, memory_stats}) + }, + } + }, + }; + + send_response(&mut stream, result).await?; + } + }); +} + +fn prepare_artifact(pvf: PvfPrepData) -> Result { + panic::catch_unwind(|| { + let blob = match prevalidate(&pvf.code()) { + Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), + Ok(b) => b, + }; + + match prepare(blob, &pvf.executor_params()) { + Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), + Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), + } + }) + .map_err(|panic_payload| PrepareError::Panic(stringify_panic_payload(panic_payload))) + .and_then(|inner_result| inner_result) +} + +/// Attempt to convert an opaque panic payload to a string. +/// +/// This is a best effort, and is not guaranteed to provide the most accurate value. +fn stringify_panic_payload(payload: Box) -> String { + match payload.downcast::<&'static str>() { + Ok(msg) => msg.to_string(), + Err(payload) => match payload.downcast::() { + Ok(msg) => *msg, + // At least we tried... + Err(_) => "unknown panic payload".to_string(), + }, + } +} diff --git a/node/core/pvf/src/testing.rs b/node/core/pvf/worker/src/testing.rs similarity index 96% rename from node/core/pvf/src/testing.rs rename to node/core/pvf/worker/src/testing.rs index fb1b406cdad6..d09b68bf8b33 100644 --- a/node/core/pvf/src/testing.rs +++ b/node/core/pvf/worker/src/testing.rs @@ -21,10 +21,6 @@ use polkadot_primitives::ExecutorParams; -pub mod worker_common { - pub use crate::worker_common::{spawn_with_program_path, SpawnErr}; -} - /// A function that emulates the stitches together behaviors of the preparation and the execution /// worker in a single synchronous function. pub fn validate_candidate( diff --git a/node/core/pvf/tests/it/adder.rs b/node/core/pvf/worker/tests/it/adder.rs similarity index 100% rename from node/core/pvf/tests/it/adder.rs rename to node/core/pvf/worker/tests/it/adder.rs diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/worker/tests/it/main.rs similarity index 100% rename from node/core/pvf/tests/it/main.rs rename to node/core/pvf/worker/tests/it/main.rs diff --git a/node/core/pvf/tests/it/worker_common.rs b/node/core/pvf/worker/tests/it/worker_common.rs similarity index 94% rename from node/core/pvf/tests/it/worker_common.rs rename to node/core/pvf/worker/tests/it/worker_common.rs index 3a17efc8df5c..439ac8538c95 100644 --- a/node/core/pvf/tests/it/worker_common.rs +++ b/node/core/pvf/worker/tests/it/worker_common.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use crate::PUPPET_EXE; -use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr}; +use polkadot_node_core_pvf::testing::{spawn_with_program_path, SpawnErr}; use std::time::Duration; // Test spawning a program that immediately exits with a failure code. diff --git a/node/malus/Cargo.toml b/node/malus/Cargo.toml index c783693ca527..3c6aa5c2d39e 100644 --- a/node/malus/Cargo.toml +++ b/node/malus/Cargo.toml @@ -20,9 +20,9 @@ polkadot-node-subsystem-types = { path = "../subsystem-types" } polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator" } polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" } polkadot-node-core-backing = { path = "../core/backing" } +polkadot-node-core-pvf-worker = { path = "../core/pvf/worker" } polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } -polkadot-node-core-pvf = { path = "../core/pvf" } color-eyre = { version = "0.6.1", default-features = false } assert_matches = "1.5" async-trait = "0.1.57" diff --git a/node/malus/src/malus.rs b/node/malus/src/malus.rs index f202996aca1e..36cf0cca06bf 100644 --- a/node/malus/src/malus.rs +++ b/node/malus/src/malus.rs @@ -97,7 +97,10 @@ impl MalusCli { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path, None); + polkadot_node_core_pvf_worker::prepare_worker_entrypoint( + &cmd.socket_path, + None, + ); } }, NemesisVariant::PvfExecuteWorker(cmd) => { @@ -108,7 +111,10 @@ impl MalusCli { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path, None); + polkadot_node_core_pvf_worker::execute_worker_entrypoint( + &cmd.socket_path, + None, + ); } }, } diff --git a/node/test/performance-test/Cargo.toml b/node/test/performance-test/Cargo.toml index c83557f124d3..70f072c03ae1 100644 --- a/node/test/performance-test/Cargo.toml +++ b/node/test/performance-test/Cargo.toml @@ -10,11 +10,14 @@ quote = "1.0.26" env_logger = "0.9" log = "0.4" -polkadot-node-core-pvf = { path = "../../core/pvf" } +polkadot-node-core-pvf-worker = { path = "../../core/pvf/worker" } polkadot-erasure-coding = { path = "../../../erasure-coding" } polkadot-node-primitives = { path = "../../primitives" } polkadot-primitives = { path = "../../../primitives" } +sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } + kusama-runtime = { path = "../../../runtime/kusama" } [[bin]] diff --git a/node/test/performance-test/src/gen_ref_constants.rs b/node/test/performance-test/src/gen_ref_constants.rs index 0f06af1580ef..ba10ed215552 100644 --- a/node/test/performance-test/src/gen_ref_constants.rs +++ b/node/test/performance-test/src/gen_ref_constants.rs @@ -31,7 +31,6 @@ fn main() -> Result<(), PerfCheckError> { #[cfg(build_type = "release")] mod run { - use polkadot_node_core_pvf::sp_maybe_compressed_blob; use polkadot_node_primitives::VALIDATION_CODE_BOMB_LIMIT; use polkadot_performance_test::{ measure_erasure_coding, measure_pvf_prepare, PerfCheckError, ERASURE_CODING_N_VALIDATORS, diff --git a/node/test/performance-test/src/lib.rs b/node/test/performance-test/src/lib.rs index e426cc4e5142..1afa43cc62ba 100644 --- a/node/test/performance-test/src/lib.rs +++ b/node/test/performance-test/src/lib.rs @@ -17,7 +17,6 @@ //! A Polkadot performance tests utilities. use polkadot_erasure_coding::{obtain_chunks, reconstruct}; -use polkadot_node_core_pvf::{sc_executor_common, sp_maybe_compressed_blob}; use polkadot_primitives::ExecutorParams; use std::time::{Duration, Instant}; @@ -66,8 +65,9 @@ pub fn measure_pvf_prepare(wasm_code: &[u8]) -> Result .or(Err(PerfCheckError::CodeDecompressionFailed))?; // Recreate the pipeline from the pvf prepare worker. - let blob = polkadot_node_core_pvf::prevalidate(code.as_ref()).map_err(PerfCheckError::from)?; - polkadot_node_core_pvf::prepare(blob, &ExecutorParams::default()) + let blob = + polkadot_node_core_pvf_worker::prevalidate(code.as_ref()).map_err(PerfCheckError::from)?; + polkadot_node_core_pvf_worker::prepare(blob, &ExecutorParams::default()) .map_err(PerfCheckError::from)?; Ok(start.elapsed()) diff --git a/parachain/test-parachains/adder/collator/Cargo.toml b/parachain/test-parachains/adder/collator/Cargo.toml index 7fe4aefc688d..ee20cb0b0d17 100644 --- a/parachain/test-parachains/adder/collator/Cargo.toml +++ b/parachain/test-parachains/adder/collator/Cargo.toml @@ -34,7 +34,7 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master # This one is tricky. Even though it is not used directly by the collator, we still need it for the # `puppet_worker` binary, which is required for the integration test. However, this shouldn't be # a big problem since it is used transitively anyway. -polkadot-node-core-pvf = { path = "../../../../node/core/pvf" } +polkadot-node-core-pvf-worker = { path = "../../../../node/core/pvf/worker" } [dev-dependencies] polkadot-parachain = { path = "../../.." } diff --git a/parachain/test-parachains/adder/collator/bin/puppet_worker.rs b/parachain/test-parachains/adder/collator/bin/puppet_worker.rs index 7f93519d8454..ddd81971292b 100644 --- a/parachain/test-parachains/adder/collator/bin/puppet_worker.rs +++ b/parachain/test-parachains/adder/collator/bin/puppet_worker.rs @@ -14,4 +14,4 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -polkadot_node_core_pvf::decl_puppet_worker_main!(); +polkadot_node_core_pvf_worker::decl_puppet_worker_main!(); diff --git a/parachain/test-parachains/adder/collator/src/lib.rs b/parachain/test-parachains/adder/collator/src/lib.rs index 02a4598f9e47..4b2b9248de22 100644 --- a/parachain/test-parachains/adder/collator/src/lib.rs +++ b/parachain/test-parachains/adder/collator/src/lib.rs @@ -272,7 +272,7 @@ mod tests { } fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) { - use polkadot_node_core_pvf::testing::validate_candidate; + use polkadot_node_core_pvf_worker::testing::validate_candidate; let block_data = match collation.proof_of_validity { MaybeCompressedPoV::Raw(pov) => pov.block_data, diff --git a/parachain/test-parachains/undying/collator/Cargo.toml b/parachain/test-parachains/undying/collator/Cargo.toml index 2b9d80401f5d..1b2ccf3be0ca 100644 --- a/parachain/test-parachains/undying/collator/Cargo.toml +++ b/parachain/test-parachains/undying/collator/Cargo.toml @@ -34,7 +34,7 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master # This one is tricky. Even though it is not used directly by the collator, we still need it for the # `puppet_worker` binary, which is required for the integration test. However, this shouldn't be # a big problem since it is used transitively anyway. -polkadot-node-core-pvf = { path = "../../../../node/core/pvf" } +polkadot-node-core-pvf-worker = { path = "../../../../node/core/pvf/worker" } [dev-dependencies] polkadot-parachain = { path = "../../.." } diff --git a/parachain/test-parachains/undying/collator/bin/puppet_worker.rs b/parachain/test-parachains/undying/collator/bin/puppet_worker.rs index 7f93519d8454..ddd81971292b 100644 --- a/parachain/test-parachains/undying/collator/bin/puppet_worker.rs +++ b/parachain/test-parachains/undying/collator/bin/puppet_worker.rs @@ -14,4 +14,4 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -polkadot_node_core_pvf::decl_puppet_worker_main!(); +polkadot_node_core_pvf_worker::decl_puppet_worker_main!(); diff --git a/parachain/test-parachains/undying/collator/src/lib.rs b/parachain/test-parachains/undying/collator/src/lib.rs index 838590fa16f5..dcaf9b63296d 100644 --- a/parachain/test-parachains/undying/collator/src/lib.rs +++ b/parachain/test-parachains/undying/collator/src/lib.rs @@ -354,7 +354,7 @@ mod tests { } fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) { - use polkadot_node_core_pvf::testing::validate_candidate; + use polkadot_node_core_pvf_worker::testing::validate_candidate; let block_data = match collation.proof_of_validity { MaybeCompressedPoV::Raw(pov) => pov.block_data,