From 54f84285bf863c75d9383f7d9d3e7612410f23a4 Mon Sep 17 00:00:00 2001 From: jserrat <35823283+Jpserrat@users.noreply.github.com> Date: Tue, 14 Nov 2023 14:50:18 -0300 Subject: [PATCH] change prepare worker to use fork instead of threads (#1685) Co-authored-by: Marcin S --- Cargo.lock | 26 + .../node/core/candidate-validation/src/lib.rs | 24 +- .../core/candidate-validation/src/tests.rs | 12 +- .../benches/host_prepare_rococo_runtime.rs | 2 +- polkadot/node/core/pvf/common/Cargo.toml | 2 +- polkadot/node/core/pvf/common/src/error.rs | 55 +- polkadot/node/core/pvf/common/src/execute.rs | 51 +- .../node/core/pvf/common/src/worker/mod.rs | 12 +- .../node/core/pvf/execute-worker/Cargo.toml | 3 + .../node/core/pvf/execute-worker/src/lib.rs | 399 ++++++++--- .../node/core/pvf/prepare-worker/Cargo.toml | 2 + .../node/core/pvf/prepare-worker/src/lib.rs | 634 ++++++++++++------ polkadot/node/core/pvf/src/error.rs | 53 +- polkadot/node/core/pvf/src/execute/queue.rs | 15 +- .../node/core/pvf/src/execute/worker_intf.rs | 49 +- polkadot/node/core/pvf/src/prepare/pool.rs | 21 +- .../node/core/pvf/src/prepare/worker_intf.rs | 24 +- polkadot/node/core/pvf/src/testing.rs | 34 +- polkadot/node/core/pvf/tests/it/main.rs | 135 +--- polkadot/node/core/pvf/tests/it/process.rs | 383 +++++++++++ .../node/core/pvf/tests/it/worker_common.rs | 8 +- .../src/node/utility/pvf-host-and-workers.md | 47 +- .../list-syscalls/execute-worker-syscalls | 6 + .../list-syscalls/prepare-worker-syscalls | 5 + 24 files changed, 1468 insertions(+), 534 deletions(-) create mode 100644 polkadot/node/core/pvf/tests/it/process.rs diff --git a/Cargo.lock b/Cargo.lock index c57a5ce1393d..d83080868221 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8627,6 +8627,17 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.4.0", + "cfg-if", + "libc", +] + [[package]] name = "no-std-net" version = "0.6.0" @@ -9103,6 +9114,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "os_pipe" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "os_str_bytes" version = "6.5.1" @@ -12525,6 +12546,9 @@ name = "polkadot-node-core-pvf-execute-worker" version = "1.0.0" dependencies = [ "cpu-time", + "libc", + "nix 0.27.1", + "os_pipe", "parity-scale-codec", "polkadot-node-core-pvf-common", "polkadot-parachain-primitives", @@ -12539,6 +12563,8 @@ dependencies = [ "cfg-if", "criterion 0.4.0", "libc", + "nix 0.27.1", + "os_pipe", "parity-scale-codec", "polkadot-node-core-pvf-common", "polkadot-primitives", diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 4232e5f1cdd8..a3d6f0473136 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -642,14 +642,19 @@ async fn validate_candidate_exhaustive( }, Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout)) => Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)), - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedError(e))) => + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedInvalid(e))) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))), Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError( "ambiguous worker death".to_string(), ))), - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(err))) => + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(err))) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))), + + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath(err))) => + Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!( + "ambiguous job death: {err}" + )))), Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::PrepareError(e))) => { // In principle if preparation of the `WASM` fails, the current candidate can not be the // reason for that. So we can't say whether it is invalid or not. In addition, with @@ -741,9 +746,9 @@ trait ValidationBackend { }; // Allow limited retries for each kind of error. + let mut num_death_retries_left = 1; + let mut num_job_error_retries_left = 1; let mut num_internal_retries_left = 1; - let mut num_awd_retries_left = 1; - let mut num_panic_retries_left = 1; loop { // Stop retrying if we exceeded the timeout. if total_time_start.elapsed() + retry_delay > exec_timeout { @@ -752,11 +757,12 @@ trait ValidationBackend { match validation_result { Err(ValidationError::InvalidCandidate( - WasmInvalidCandidate::AmbiguousWorkerDeath, - )) if num_awd_retries_left > 0 => num_awd_retries_left -= 1, - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(_))) - if num_panic_retries_left > 0 => - num_panic_retries_left -= 1, + WasmInvalidCandidate::AmbiguousWorkerDeath | + WasmInvalidCandidate::AmbiguousJobDeath(_), + )) if num_death_retries_left > 0 => num_death_retries_left -= 1, + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(_))) + if num_job_error_retries_left > 0 => + num_job_error_retries_left -= 1, Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 => num_internal_retries_left -= 1, _ => break, diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index af530a20c4e0..cab823e1e637 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -695,11 +695,13 @@ fn candidate_validation_retry_panic_errors() { let v = executor::block_on(validate_candidate_exhaustive( MockValidateCandidateBackend::with_hardcoded_result_list(vec![ - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("foo".into()))), - // Throw an AWD error, we should still retry again. - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("foo".into()))), + // Throw an AJD error, we should still retry again. + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath( + "baz".into(), + ))), // Throw another panic error. - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("bar".into()))), + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("bar".into()))), ]), validation_data, validation_code, @@ -1216,7 +1218,7 @@ fn precheck_properly_classifies_outcomes() { inner(Err(PrepareError::Prevalidation("foo".to_owned())), PreCheckOutcome::Invalid); inner(Err(PrepareError::Preparation("bar".to_owned())), PreCheckOutcome::Invalid); - inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid); + inner(Err(PrepareError::JobError("baz".to_owned())), PreCheckOutcome::Invalid); inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed); inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed); diff --git a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs index d0cefae6cdbf..378374a10b39 100644 --- a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs +++ b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs @@ -37,7 +37,7 @@ impl TestHost { where F: FnOnce(&mut Config), { - let (prepare_worker_path, execute_worker_path) = testing::get_and_check_worker_paths(); + let (prepare_worker_path, execute_worker_path) = testing::build_workers_and_get_paths(true); let cache_dir = tempfile::tempdir().unwrap(); let mut config = Config::new( diff --git a/polkadot/node/core/pvf/common/Cargo.toml b/polkadot/node/core/pvf/common/Cargo.toml index 7dc8d307026e..e3fda06963e3 100644 --- a/polkadot/node/core/pvf/common/Cargo.toml +++ b/polkadot/node/core/pvf/common/Cargo.toml @@ -12,6 +12,7 @@ cpu-time = "1.0.0" futures = "0.3.21" gum = { package = "tracing-gum", path = "../../../gum" } libc = "0.2.139" +thiserror = "1.0.31" parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] } @@ -30,7 +31,6 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" } [target.'cfg(target_os = "linux")'.dependencies] landlock = "0.3.0" seccompiler = "0.4.0" -thiserror = "1.0.31" [dev-dependencies] assert_matches = "1.4.0" diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs index 82b56562d8cc..34475c481f73 100644 --- a/polkadot/node/core/pvf/common/src/error.rs +++ b/polkadot/node/core/pvf/common/src/error.rs @@ -35,9 +35,9 @@ pub enum PrepareError { /// Instantiation of the WASM module instance failed. #[codec(index = 2)] RuntimeConstruction(String), - /// An unexpected panic has occurred in the preparation worker. + /// An unexpected error has occurred in the preparation job. #[codec(index = 3)] - Panic(String), + JobError(String), /// Failed to prepare the PVF due to the time limit. #[codec(index = 4)] TimedOut, @@ -48,12 +48,12 @@ pub enum PrepareError { /// The temporary file for the artifact could not be created at the given cache path. This /// state is reported by the validation host (not by the worker). #[codec(index = 6)] - CreateTmpFileErr(String), + CreateTmpFile(String), /// The response from the worker is received, but the file cannot be renamed (moved) to the /// final destination location. This state is reported by the validation host (not by the /// worker). #[codec(index = 7)] - RenameTmpFileErr { + RenameTmpFile { err: String, // Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible // conversion to `Option`. @@ -68,11 +68,14 @@ pub enum PrepareError { /// reported by the validation host (not by the worker). #[codec(index = 9)] ClearWorkerDir(String), + /// The preparation job process died, due to OOM, a seccomp violation, or some other factor. + JobDied(String), + #[codec(index = 10)] + /// Some error occurred when interfacing with the kernel. + #[codec(index = 11)] + Kernel(String), } -/// Pre-encoded length-prefixed `PrepareResult::Err(PrepareError::OutOfMemory)` -pub const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08"; - impl PrepareError { /// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those /// errors depend on the PVF itself and the sc-executor/wasmtime logic. @@ -83,12 +86,15 @@ impl PrepareError { pub fn is_deterministic(&self) -> bool { use PrepareError::*; match self { - Prevalidation(_) | Preparation(_) | Panic(_) | OutOfMemory => true, - TimedOut | + Prevalidation(_) | Preparation(_) | JobError(_) | OutOfMemory => true, IoErr(_) | - CreateTmpFileErr(_) | - RenameTmpFileErr { .. } | - ClearWorkerDir(_) => false, + JobDied(_) | + CreateTmpFile(_) | + RenameTmpFile { .. } | + ClearWorkerDir(_) | + Kernel(_) => false, + // Can occur due to issues with the PVF, but also due to factors like local load. + TimedOut => false, // Can occur due to issues with the PVF, but also due to local errors. RuntimeConstruction(_) => false, } @@ -102,14 +108,16 @@ impl fmt::Display for PrepareError { Prevalidation(err) => write!(f, "prevalidation: {}", err), Preparation(err) => write!(f, "preparation: {}", err), RuntimeConstruction(err) => write!(f, "runtime construction: {}", err), - Panic(err) => write!(f, "panic: {}", err), + JobError(err) => write!(f, "panic: {}", err), TimedOut => write!(f, "prepare: timeout"), IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err), - CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), - RenameTmpFileErr { err, src, dest } => + JobDied(err) => write!(f, "prepare: prepare job died: {}", err), + CreateTmpFile(err) => write!(f, "prepare: error creating tmp file: {}", err), + RenameTmpFile { err, src, dest } => write!(f, "prepare: error renaming tmp file ({:?} -> {:?}): {}", src, dest, err), OutOfMemory => write!(f, "prepare: out of memory"), ClearWorkerDir(err) => write!(f, "prepare: error clearing worker cache: {}", err), + Kernel(err) => write!(f, "prepare: error interfacing with the kernel: {}", err), } } } @@ -133,9 +141,9 @@ pub enum InternalValidationError { // conversion to `Option`. path: Option, }, - /// An error occurred in the CPU time monitor thread. Should be totally unrelated to - /// validation. - CpuTimeMonitorThread(String), + /// Some error occurred when interfacing with the kernel. + Kernel(String), + /// Some non-deterministic preparation error occurred. NonDeterministicPrepareError(PrepareError), } @@ -158,17 +166,8 @@ impl fmt::Display for InternalValidationError { "validation: host could not clear the worker cache ({:?}) after a job: {}", path, err ), - CpuTimeMonitorThread(err) => - write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err), + Kernel(err) => write!(f, "validation: error interfacing with the kernel: {}", err), NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err), } } } - -#[test] -fn pre_encoded_payloads() { - let oom_enc = PrepareResult::Err(PrepareError::OutOfMemory).encode(); - let mut oom_payload = oom_enc.len().to_le_bytes().to_vec(); - oom_payload.extend(oom_enc); - assert_eq!(oom_payload, OOM_PAYLOAD); -} diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs index b89ab089af1c..89e7c8e471a5 100644 --- a/polkadot/node/core/pvf/common/src/execute.rs +++ b/polkadot/node/core/pvf/common/src/execute.rs @@ -28,9 +28,9 @@ pub struct Handshake { pub executor_params: ExecutorParams, } -/// The response from an execution job on the worker. +/// The response from the execution worker. #[derive(Debug, Encode, Decode)] -pub enum Response { +pub enum WorkerResponse { /// The job completed successfully. Ok { /// The result of parachain validation. @@ -41,14 +41,38 @@ pub enum Response { /// The candidate is invalid. InvalidCandidate(String), /// The job timed out. - TimedOut, - /// An unexpected panic has occurred in the execution worker. - Panic(String), + JobTimedOut, + /// The job process has died. We must kill the worker just in case. + /// + /// We cannot treat this as an internal error because malicious code may have killed the job. + /// We still retry it, because in the non-malicious case it is likely spurious. + JobDied(String), + /// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic, + /// etc. + /// + /// Because malicious code can cause a job error, we must not treat it as an internal error. We + /// still retry it, because in the non-malicious case it is likely spurious. + JobError(String), + /// Some internal error occurred. InternalError(InternalValidationError), } -impl Response { +/// The result of a job on the execution worker. +pub type JobResult = Result; + +/// The successful response from a job on the execution worker. +#[derive(Debug, Encode, Decode)] +pub enum JobResponse { + Ok { + /// The result of parachain validation. + result_descriptor: ValidationResult, + }, + /// The candidate is invalid. + InvalidCandidate(String), +} + +impl JobResponse { /// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty). pub fn format_invalid(ctx: &'static str, msg: &str) -> Self { if msg.is_empty() { @@ -58,3 +82,18 @@ impl Response { } } } + +/// An unexpected error occurred in the execution job process. Because this comes from the job, +/// which executes untrusted code, this error must likewise be treated as untrusted. That is, we +/// cannot raise an internal error based on this. +#[derive(thiserror::Error, Debug, Encode, Decode)] +pub enum JobError { + #[error("The job timed out")] + TimedOut, + #[error("An unexpected panic has occurred in the execution job: {0}")] + Panic(String), + #[error("Could not spawn the requested thread: {0}")] + CouldNotSpawnThread(String), + #[error("An error occurred in the CPU time monitor thread: {0}")] + CpuTimeMonitorThread(String), +} diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs index f6a67b98321a..86f47acccac6 100644 --- a/polkadot/node/core/pvf/common/src/worker/mod.rs +++ b/polkadot/node/core/pvf/common/src/worker/mod.rs @@ -205,9 +205,15 @@ impl fmt::Display for WorkerKind { } } -// The worker version must be passed in so that we accurately get the version of the worker, and not -// the version that this crate was compiled with. -pub fn worker_event_loop( +// NOTE: The worker version must be passed in so that we accurately get the version of the worker, +// and not the version that this crate was compiled with. +// +// NOTE: This must not spawn any threads due to safety requirements in `event_loop` and to avoid +// errors in [`security::unshare_user_namespace_and_change_root`]. +// +/// Initializes the worker process, then runs the given event loop, which spawns a new job process +/// to securely handle each incoming request. +pub fn run_worker( worker_kind: WorkerKind, socket_path: PathBuf, #[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf, diff --git a/polkadot/node/core/pvf/execute-worker/Cargo.toml b/polkadot/node/core/pvf/execute-worker/Cargo.toml index 77a9420961c0..40e0ff4f0a19 100644 --- a/polkadot/node/core/pvf/execute-worker/Cargo.toml +++ b/polkadot/node/core/pvf/execute-worker/Cargo.toml @@ -9,6 +9,9 @@ license.workspace = true [dependencies] cpu-time = "1.0.0" gum = { package = "tracing-gum", path = "../../../gum" } +os_pipe = "1.1.4" +nix = { version = "0.27.1", features = ["resource", "process"]} +libc = "0.2.139" parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] } diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index 8872f9bc8dd3..9ec811686b89 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -25,23 +25,33 @@ pub use polkadot_node_core_pvf_common::{ const LOG_TARGET: &str = "parachain::pvf-execute-worker"; use cpu_time::ProcessTime; +use nix::{ + errno::Errno, + sys::{ + resource::{Usage, UsageWho}, + wait::WaitStatus, + }, + unistd::{ForkResult, Pid}, +}; +use os_pipe::{self, PipeReader, PipeWriter}; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::InternalValidationError, - execute::{Handshake, Response}, + execute::{Handshake, JobError, JobResponse, JobResult, WorkerResponse}, framed_recv_blocking, framed_send_blocking, worker::{ - cpu_time_monitor_loop, stringify_panic_payload, + cpu_time_monitor_loop, run_worker, stringify_panic_payload, thread::{self, WaitOutcome}, - worker_event_loop, WorkerKind, + WorkerKind, }, }; use polkadot_parachain_primitives::primitives::ValidationResult; use polkadot_primitives::{executor_params::DEFAULT_NATIVE_STACK_MAX, ExecutorParams}; use std::{ - io, + io::{self, Read}, os::unix::net::UnixStream, path::PathBuf, + process, sync::{mpsc::channel, Arc}, time::Duration, }; @@ -105,7 +115,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> { Ok((params, execution_timeout)) } -fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> { +fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Result<()> { framed_send_blocking(stream, &response.encode()) } @@ -131,7 +141,7 @@ pub fn worker_entrypoint( worker_version: Option<&str>, security_status: SecurityStatus, ) { - worker_event_loop( + run_worker( WorkerKind::Execute, socket_path, worker_dir_path, @@ -139,7 +149,7 @@ pub fn worker_entrypoint( worker_version, &security_status, |mut stream, worker_dir_path| { - let worker_pid = std::process::id(); + let worker_pid = process::id(); let artifact_path = worker_dir::execute_artifact(&worker_dir_path); let Handshake { executor_params } = recv_handshake(&mut stream)?; @@ -157,7 +167,7 @@ pub fn worker_entrypoint( let compiled_artifact_blob = match std::fs::read(&artifact_path) { Ok(bytes) => bytes, Err(err) => { - let response = Response::InternalError( + let response = WorkerResponse::InternalError( InternalValidationError::CouldNotOpenFile(err.to_string()), ); send_response(&mut stream, response)?; @@ -165,82 +175,51 @@ pub fn worker_entrypoint( }, }; - // Conditional variable to notify us when a thread is done. - let condvar = thread::get_condvar(); + let (pipe_reader, pipe_writer) = os_pipe::pipe()?; - let cpu_time_start = ProcessTime::now(); + let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { + Ok(usage) => usage, + Err(errno) => { + let response = internal_error_from_errno("getrusage before", errno); + send_response(&mut stream, response)?; + continue + }, + }; + + // SAFETY: new process is spawned within a single threaded process. This invariant + // is enforced by tests. + let response = match unsafe { nix::unistd::fork() } { + Err(errno) => internal_error_from_errno("fork", errno), + Ok(ForkResult::Child) => { + // Dropping the stream closes the underlying socket. We want to make sure + // that the sandboxed child can't get any kind of information from the + // outside world. The only IPC it should be able to do is sending its + // response over the pipe. + drop(stream); + // Drop the read end so we don't have too many FDs open. + drop(pipe_reader); - // Spawn a new thread that runs the CPU time monitor. - let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); - let cpu_time_monitor_thread = thread::spawn_worker_thread( - "cpu time monitor thread", - move || { - cpu_time_monitor_loop( - cpu_time_start, + handle_child_process( + pipe_writer, + compiled_artifact_blob, + executor_params, + params, execution_timeout, - cpu_time_monitor_rx, - ) - }, - Arc::clone(&condvar), - WaitOutcome::TimedOut, - )?; - - let executor_params_2 = executor_params.clone(); - let execute_thread = thread::spawn_worker_thread_with_stack_size( - "execute thread", - move || { - validate_using_artifact( - &compiled_artifact_blob, - &executor_params_2, - ¶ms, - cpu_time_start, ) }, - Arc::clone(&condvar), - WaitOutcome::Finished, - EXECUTE_THREAD_STACK_SIZE, - )?; - - let outcome = thread::wait_for_threads(condvar); - - let response = match outcome { - WaitOutcome::Finished => { - let _ = cpu_time_monitor_tx.send(()); - execute_thread - .join() - .unwrap_or_else(|e| Response::Panic(stringify_panic_payload(e))) - }, - // If the CPU thread is not selected, we signal it to end, the join handle is - // dropped and the thread will finish in the background. - WaitOutcome::TimedOut => { - match cpu_time_monitor_thread.join() { - Ok(Some(cpu_time_elapsed)) => { - // Log if we exceed the timeout and the other thread hasn't - // finished. - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "execute job took {}ms cpu time, exceeded execute timeout {}ms", - cpu_time_elapsed.as_millis(), - execution_timeout.as_millis(), - ); - Response::TimedOut - }, - Ok(None) => Response::InternalError( - InternalValidationError::CpuTimeMonitorThread( - "error communicating over finished channel".into(), - ), - ), - Err(e) => Response::InternalError( - InternalValidationError::CpuTimeMonitorThread( - stringify_panic_payload(e), - ), - ), - } + Ok(ForkResult::Parent { child }) => { + // the read end will wait until all write ends have been closed, + // this drop is necessary to avoid deadlock + drop(pipe_writer); + + handle_parent_process( + pipe_reader, + child, + worker_pid, + usage_before, + execution_timeout, + )? }, - WaitOutcome::Pending => unreachable!( - "we run wait_while until the outcome is no longer pending; qed" - ), }; gum::trace!( @@ -259,27 +238,275 @@ fn validate_using_artifact( compiled_artifact_blob: &[u8], executor_params: &ExecutorParams, params: &[u8], - cpu_time_start: ProcessTime, -) -> Response { +) -> JobResponse { let descriptor_bytes = match unsafe { // SAFETY: this should be safe since the compiled artifact passed here comes from the // file created by the prepare workers. These files are obtained by calling // [`executor_intf::prepare`]. execute_artifact(compiled_artifact_blob, executor_params, params) } { - Err(err) => return Response::format_invalid("execute", &err), + Err(err) => return JobResponse::format_invalid("execute", &err), Ok(d) => d, }; let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) { Err(err) => - return Response::format_invalid("validation result decoding failed", &err.to_string()), + return JobResponse::format_invalid( + "validation result decoding failed", + &err.to_string(), + ), Ok(r) => r, }; - // Include the decoding in the measured time, to prevent any potential attacks exploiting some - // bug in decoding. - let duration = cpu_time_start.elapsed(); + JobResponse::Ok { result_descriptor } +} + +/// This is used to handle child process during pvf execute worker. +/// It execute the artifact and pipes back the response to the parent process +/// +/// # Arguments +/// +/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe. +/// +/// - `compiled_artifact_blob`: The artifact bytes from compiled by the prepare worker`. +/// +/// - `executor_params`: Deterministically serialized execution environment semantics. +/// +/// - `params`: Validation parameters. +/// +/// - `execution_timeout`: The timeout in `Duration`. +/// +/// # Returns +/// +/// - pipe back `JobResponse` to the parent process. +fn handle_child_process( + mut pipe_write: PipeWriter, + compiled_artifact_blob: Vec, + executor_params: ExecutorParams, + params: Vec, + execution_timeout: Duration, +) -> ! { + gum::debug!( + target: LOG_TARGET, + worker_job_pid = %process::id(), + "worker job: executing artifact", + ); + + // Conditional variable to notify us when a thread is done. + let condvar = thread::get_condvar(); + let cpu_time_start = ProcessTime::now(); + + // Spawn a new thread that runs the CPU time monitor. + let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); + let cpu_time_monitor_thread = thread::spawn_worker_thread( + "cpu time monitor thread", + move || cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx), + Arc::clone(&condvar), + WaitOutcome::TimedOut, + ) + .unwrap_or_else(|err| { + send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string()))) + }); + + let executor_params_2 = executor_params.clone(); + let execute_thread = thread::spawn_worker_thread_with_stack_size( + "execute thread", + move || validate_using_artifact(&compiled_artifact_blob, &executor_params_2, ¶ms), + Arc::clone(&condvar), + WaitOutcome::Finished, + EXECUTE_THREAD_STACK_SIZE, + ) + .unwrap_or_else(|err| { + send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string()))) + }); + + let outcome = thread::wait_for_threads(condvar); + + let response = match outcome { + WaitOutcome::Finished => { + let _ = cpu_time_monitor_tx.send(()); + execute_thread.join().map_err(|e| JobError::Panic(stringify_panic_payload(e))) + }, + // If the CPU thread is not selected, we signal it to end, the join handle is + // dropped and the thread will finish in the background. + WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() { + Ok(Some(_cpu_time_elapsed)) => Err(JobError::TimedOut), + Ok(None) => Err(JobError::CpuTimeMonitorThread( + "error communicating over finished channel".into(), + )), + Err(e) => Err(JobError::CpuTimeMonitorThread(stringify_panic_payload(e))), + }, + WaitOutcome::Pending => + unreachable!("we run wait_while until the outcome is no longer pending; qed"), + }; + + send_child_response(&mut pipe_write, response); +} + +/// Waits for child process to finish and handle child response from pipe. +/// +/// # Arguments +/// +/// - `pipe_read`: A `PipeReader` used to read data from the child process. +/// +/// - `child`: The child pid. +/// +/// - `usage_before`: Resource usage statistics before executing the child process. +/// +/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`. +/// +/// # Returns +/// +/// - The response, either `Ok` or some error state. +fn handle_parent_process( + mut pipe_read: PipeReader, + child: Pid, + worker_pid: u32, + usage_before: Usage, + timeout: Duration, +) -> io::Result { + // Read from the child. Don't decode unless the process exited normally, which we check later. + let mut received_data = Vec::new(); + pipe_read + .read_to_end(&mut received_data) + // Could not decode job response. There is either a bug or the job was hijacked. + // Should retry at any rate. + .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; + + let status = nix::sys::wait::waitpid(child, None); + gum::trace!( + target: LOG_TARGET, + %worker_pid, + "execute worker received wait status from job: {:?}", + status, + ); + + let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { + Ok(usage) => usage, + Err(errno) => return Ok(internal_error_from_errno("getrusage after", errno)), + }; + + // Using `getrusage` is needed to check whether child has timedout since we cannot rely on + // child to report its own time. + // As `getrusage` returns resource usage from all terminated child processes, + // it is necessary to subtract the usage before the current child process to isolate its cpu + // time + let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before); + if cpu_tv >= timeout { + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "execute job took {}ms cpu time, exceeded execute timeout {}ms", + cpu_tv.as_millis(), + timeout.as_millis(), + ); + return Ok(WorkerResponse::JobTimedOut) + } + + match status { + Ok(WaitStatus::Exited(_, exit_status)) => { + let mut reader = io::BufReader::new(received_data.as_slice()); + let result = match recv_child_response(&mut reader) { + Ok(result) => result, + Err(err) => return Ok(WorkerResponse::JobError(err.to_string())), + }; + + match result { + Ok(JobResponse::Ok { result_descriptor }) => { + // The exit status should have been zero if no error occurred. + if exit_status != 0 { + return Ok(WorkerResponse::JobError(format!( + "unexpected exit status: {}", + exit_status + ))) + } + + Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv }) + }, + Ok(JobResponse::InvalidCandidate(err)) => Ok(WorkerResponse::InvalidCandidate(err)), + Err(job_error) => { + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "execute job error: {}", + job_error, + ); + if matches!(job_error, JobError::TimedOut) { + Ok(WorkerResponse::JobTimedOut) + } else { + Ok(WorkerResponse::JobError(job_error.to_string())) + } + }, + } + }, + // The job was killed by the given signal. + // + // The job gets SIGSYS on seccomp violations, but this signal may have been sent for some + // other reason, so we still need to check for seccomp violations elsewhere. + Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => + Ok(WorkerResponse::JobDied(format!("received signal: {signal:?}"))), + Err(errno) => Ok(internal_error_from_errno("waitpid", errno)), + + // It is within an attacker's power to send an unexpected exit status. So we cannot treat + // this as an internal error (which would make us abstain), but must vote against. + Ok(unexpected_wait_status) => Ok(WorkerResponse::JobDied(format!( + "unexpected status from wait: {unexpected_wait_status:?}" + ))), + } +} + +/// Calculate the total CPU time from the given `usage` structure, returned from +/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user +/// and system time. +/// +/// # Arguments +/// +/// - `rusage`: Contains resource usage information. +/// +/// # Returns +/// +/// Returns a `Duration` representing the total CPU time. +fn get_total_cpu_usage(rusage: Usage) -> Duration { + let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) + + (rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64; + + return Duration::from_micros(micros) +} + +/// Get a job response. +fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result { + let response_bytes = framed_recv_blocking(received_data)?; + JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("execute pvf recv_child_response: decode error: {:?}", e), + ) + }) +} + +/// Write response to the pipe and exit process after. +/// +/// # Arguments +/// +/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe. +/// +/// - `response`: Child process response, or error. +fn send_child_response(pipe_write: &mut PipeWriter, response: JobResult) -> ! { + framed_send_blocking(pipe_write, response.encode().as_slice()) + .unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE)); + + if response.is_ok() { + process::exit(libc::EXIT_SUCCESS) + } else { + process::exit(libc::EXIT_FAILURE) + } +} - Response::Ok { result_descriptor, duration } +fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerResponse { + WorkerResponse::InternalError(InternalValidationError::Kernel(format!( + "{}: {}: {}", + context, + errno, + io::Error::last_os_error() + ))) } diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml index e21583ecc8b7..1cd221533f48 100644 --- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml +++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml @@ -14,6 +14,8 @@ rayon = "1.5.1" tracking-allocator = { package = "staging-tracking-allocator", path = "../../../tracking-allocator" } tikv-jemalloc-ctl = { version = "0.5.0", optional = true } tikv-jemallocator = { version = "0.5.0", optional = true } +os_pipe = "1.1.4" +nix = { version = "0.27.1", features = ["resource", "process"]} parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] } diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index 37a4dd06075e..151b54efc2d1 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -28,28 +28,40 @@ const LOG_TARGET: &str = "parachain::pvf-prepare-worker"; use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread}; #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; +use libc; +use nix::{ + errno::Errno, + sys::{ + resource::{Usage, UsageWho}, + wait::WaitStatus, + }, + unistd::{ForkResult, Pid}, +}; +use os_pipe::{self, PipeReader, PipeWriter}; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ - error::{PrepareError, PrepareResult, OOM_PAYLOAD}, + error::{PrepareError, PrepareResult}, executor_intf::create_runtime_from_artifact_bytes, framed_recv_blocking, framed_send_blocking, prepare::{MemoryStats, PrepareJobKind, PrepareStats}, pvf::PvfPrepData, worker::{ - cpu_time_monitor_loop, stringify_panic_payload, - thread::{self, WaitOutcome}, - worker_event_loop, WorkerKind, + cpu_time_monitor_loop, run_worker, stringify_panic_payload, + thread::{self, spawn_worker_thread, WaitOutcome}, + WorkerKind, }, worker_dir, ProcessTime, SecurityStatus, }; use polkadot_primitives::ExecutorParams; use std::{ - fs, io, + fs, + io::{self, Read}, os::{ fd::{AsRawFd, RawFd}, unix::net::UnixStream, }, path::PathBuf, + process, sync::{mpsc::channel, Arc}, time::Duration, }; @@ -65,6 +77,7 @@ static ALLOC: TrackingAllocator = static ALLOC: TrackingAllocator = TrackingAllocator(std::alloc::System); /// Contains the bytes for a successfully compiled artifact. +#[derive(Encode, Decode)] pub struct CompiledArtifact(Vec); impl CompiledArtifact { @@ -80,6 +93,7 @@ impl AsRef<[u8]> for CompiledArtifact { } } +/// Get a worker request. fn recv_request(stream: &mut UnixStream) -> io::Result { let pvf = framed_recv_blocking(stream)?; let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| { @@ -91,6 +105,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result { Ok(pvf) } +/// Send a worker response. fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { framed_send_blocking(stream, &result.encode()) } @@ -111,18 +126,22 @@ fn start_memory_tracking(fd: RawFd, limit: Option) { // Syscalls never allocate or deallocate, so this is safe. libc::syscall(libc::SYS_write, fd, OOM_PAYLOAD.as_ptr(), OOM_PAYLOAD.len()); libc::syscall(libc::SYS_close, fd); - libc::syscall(libc::SYS_exit, 1); + // Make sure we exit from all threads. Copied from glibc. + libc::syscall(libc::SYS_exit_group, 1); + loop { + libc::syscall(libc::SYS_exit, 1); + } } #[cfg(not(target_os = "linux"))] { // Syscalls are not available on MacOS, so we have to use `libc` wrappers. - // Technicaly, there may be allocations inside, although they shouldn't be + // Technically, there may be allocations inside, although they shouldn't be // there. In that case, we'll see deadlocks on MacOS after the OOM condition // triggered. As we consider running a validator on MacOS unsafe, and this // code is only run by a validator, it's a lesser evil. libc::write(fd, OOM_PAYLOAD.as_ptr().cast(), OOM_PAYLOAD.len()); libc::close(fd); - std::process::exit(1); + libc::_exit(1); } })), ); @@ -155,17 +174,19 @@ fn end_memory_tracking() -> isize { /// /// 1. Get the code and parameters for preparation from the host. /// -/// 2. Start a memory tracker in a separate thread. +/// 2. Start a new child process /// -/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads. +/// 3. Start the memory tracker and the actual preparation in two separate threads. /// /// 4. Wait on the two threads created in step 3. /// /// 5. Stop the memory tracker and get the stats. /// -/// 6. If compilation succeeded, write the compiled artifact into a temporary file. +/// 6. Pipe the result back to the parent process and exit from child process. /// -/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we +/// 7. If compilation succeeded, write the compiled artifact into a temporary file. +/// +/// 8. Send the result of preparation back to the host. If any error occurred in the above steps, we /// send that in the `PrepareResult`. pub fn worker_entrypoint( socket_path: PathBuf, @@ -174,7 +195,7 @@ pub fn worker_entrypoint( worker_version: Option<&str>, security_status: SecurityStatus, ) { - worker_event_loop( + run_worker( WorkerKind::Prepare, socket_path, worker_dir_path, @@ -182,7 +203,7 @@ pub fn worker_entrypoint( worker_version, &security_status, |mut stream, worker_dir_path| { - let worker_pid = std::process::id(); + let worker_pid = process::id(); let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_dir_path); loop { @@ -197,186 +218,58 @@ pub fn worker_entrypoint( let prepare_job_kind = pvf.prep_kind(); let executor_params = pvf.executor_params(); - // Conditional variable to notify us when a thread is done. - let condvar = thread::get_condvar(); - - // Run the memory tracker in a regular, non-worker thread. - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let condvar_memory = Arc::clone(&condvar); - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory)); + let (pipe_reader, pipe_writer) = os_pipe::pipe()?; - let cpu_time_start = ProcessTime::now(); - - // Spawn a new thread that runs the CPU time monitor. - let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); - let cpu_time_monitor_thread = thread::spawn_worker_thread( - "cpu time monitor thread", - move || { - cpu_time_monitor_loop( - cpu_time_start, - preparation_timeout, - cpu_time_monitor_rx, - ) + let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { + Ok(usage) => usage, + Err(errno) => { + let result = Err(error_from_errno("getrusage before", errno)); + send_response(&mut stream, result)?; + continue }, - Arc::clone(&condvar), - WaitOutcome::TimedOut, - )?; - - start_memory_tracking( - stream.as_raw_fd(), - executor_params.prechecking_max_memory().map(|v| { - v.try_into().unwrap_or_else(|_| { - gum::warn!( - LOG_TARGET, - %worker_pid, - "Illegal pre-checking max memory value {} discarded", - v, - ); - 0 - }) - }), - ); - - // Spawn another thread for preparation. - let prepare_thread = thread::spawn_worker_thread( - "prepare thread", - move || { - #[allow(unused_mut)] - let mut result = prepare_artifact(pvf, cpu_time_start); - - // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. - #[cfg(target_os = "linux")] - let mut result = result - .map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread())); - - // If we are pre-checking, check for runtime construction errors. - // - // As pre-checking is more strict than just preparation in terms of memory - // and time, it is okay to do extra checks here. This takes negligible time - // anyway. - if let PrepareJobKind::Prechecking = prepare_job_kind { - result = result.and_then(|output| { - runtime_construction_check( - output.0.as_ref(), - executor_params.as_ref(), - )?; - Ok(output) - }); - } - - result - }, - Arc::clone(&condvar), - WaitOutcome::Finished, - )?; - - let outcome = thread::wait_for_threads(condvar); - - let peak_alloc = { - let peak = end_memory_tracking(); - gum::debug!( - target: LOG_TARGET, - %worker_pid, - "prepare job peak allocation is {} bytes", - peak, - ); - peak }; - let result = match outcome { - WaitOutcome::Finished => { - let _ = cpu_time_monitor_tx.send(()); - - match prepare_thread.join().unwrap_or_else(|err| { - Err(PrepareError::Panic(stringify_panic_payload(err))) - }) { - Err(err) => { - // Serialized error will be written into the socket. - Err(err) - }, - Ok(ok) => { - cfg_if::cfg_if! { - if #[cfg(target_os = "linux")] { - let (artifact, cpu_time_elapsed, max_rss) = ok; - } else { - let (artifact, cpu_time_elapsed) = ok; - } - } - - // Stop the memory stats worker and get its observed memory stats. - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid); - let memory_stats = MemoryStats { - #[cfg(any( - target_os = "linux", - feature = "jemalloc-allocator" - ))] - memory_tracker_stats, - #[cfg(target_os = "linux")] - max_rss: extract_max_rss_stat(max_rss, worker_pid), - // Negative peak allocation values are legit; they are narrow - // corner cases and shouldn't affect overall statistics - // significantly - peak_tracked_alloc: if peak_alloc > 0 { - peak_alloc as u64 - } else { - 0u64 - }, - }; - - // Write the serialized artifact into a temp file. - // - // PVF host only keeps artifacts statuses in its memory, - // successfully compiled code gets stored on the disk (and - // consequently deserialized by execute-workers). The prepare worker - // is only required to send `Ok` to the pool to indicate the - // success. - - gum::debug!( - target: LOG_TARGET, - %worker_pid, - "worker: writing artifact to {}", - temp_artifact_dest.display(), - ); - fs::write(&temp_artifact_dest, &artifact)?; - - Ok(PrepareStats { cpu_time_elapsed, memory_stats }) - }, - } + // SAFETY: new process is spawned within a single threaded process. This invariant + // is enforced by tests. + let result = match unsafe { nix::unistd::fork() } { + Err(errno) => Err(error_from_errno("fork", errno)), + Ok(ForkResult::Child) => { + // Dropping the stream closes the underlying socket. We want to make sure + // that the sandboxed child can't get any kind of information from the + // outside world. The only IPC it should be able to do is sending its + // response over the pipe. + drop(stream); + // Drop the read end so we don't have too many FDs open. + drop(pipe_reader); + + handle_child_process( + pvf, + pipe_writer, + preparation_timeout, + prepare_job_kind, + executor_params, + ) }, - // If the CPU thread is not selected, we signal it to end, the join handle is - // dropped and the thread will finish in the background. - WaitOutcome::TimedOut => { - match cpu_time_monitor_thread.join() { - Ok(Some(cpu_time_elapsed)) => { - // Log if we exceed the timeout and the other thread hasn't - // finished. - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", - cpu_time_elapsed.as_millis(), - preparation_timeout.as_millis(), - ); - Err(PrepareError::TimedOut) - }, - Ok(None) => Err(PrepareError::IoErr( - "error communicating over closed channel".into(), - )), - // Errors in this thread are independent of the PVF. - Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))), - } + Ok(ForkResult::Parent { child }) => { + // the read end will wait until all write ends have been closed, + // this drop is necessary to avoid deadlock + drop(pipe_writer); + + handle_parent_process( + pipe_reader, + child, + temp_artifact_dest.clone(), + worker_pid, + usage_before, + preparation_timeout, + ) }, - WaitOutcome::Pending => unreachable!( - "we run wait_while until the outcome is no longer pending; qed" - ), }; gum::trace!( target: LOG_TARGET, %worker_pid, - "worker: sending response to host: {:?}", + "worker: sending result to host: {:?}", result ); send_response(&mut stream, result)?; @@ -385,10 +278,7 @@ pub fn worker_entrypoint( ); } -fn prepare_artifact( - pvf: PvfPrepData, - cpu_time_start: ProcessTime, -) -> Result<(CompiledArtifact, Duration), PrepareError> { +fn prepare_artifact(pvf: PvfPrepData) -> Result { let blob = match prevalidate(&pvf.code()) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, @@ -398,7 +288,6 @@ fn prepare_artifact( Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } - .map(|artifact| (artifact, cpu_time_start.elapsed())) } /// Try constructing the runtime to catch any instantiation errors during pre-checking. @@ -412,3 +301,372 @@ fn runtime_construction_check( .map(|_runtime| ()) .map_err(|err| PrepareError::RuntimeConstruction(format!("{:?}", err))) } + +#[derive(Encode, Decode)] +struct JobResponse { + artifact: CompiledArtifact, + memory_stats: MemoryStats, +} + +/// This is used to handle child process during pvf prepare worker. +/// It prepares the artifact and tracks memory stats during preparation +/// and pipes back the response to the parent process +/// +/// # Arguments +/// +/// - `pvf`: `PvfPrepData` structure, containing data to prepare the artifact +/// +/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe. +/// +/// - `preparation_timeout`: The timeout in `Duration`. +/// +/// - `prepare_job_kind`: The kind of prepare job. +/// +/// - `executor_params`: Deterministically serialized execution environment semantics. +/// +/// # Returns +/// +/// - If any error occur, pipe response back with `PrepareError`. +/// +/// - If success, pipe back `JobResponse`. +fn handle_child_process( + pvf: PvfPrepData, + mut pipe_write: PipeWriter, + preparation_timeout: Duration, + prepare_job_kind: PrepareJobKind, + executor_params: Arc, +) -> ! { + let worker_job_pid = process::id(); + gum::debug!( + target: LOG_TARGET, + %worker_job_pid, + ?prepare_job_kind, + ?preparation_timeout, + "worker job: preparing artifact", + ); + + // Conditional variable to notify us when a thread is done. + let condvar = thread::get_condvar(); + + // Run the memory tracker in a regular, non-worker thread. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let condvar_memory = Arc::clone(&condvar); + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory)); + + start_memory_tracking( + pipe_write.as_raw_fd(), + executor_params.prechecking_max_memory().map(|v| { + v.try_into().unwrap_or_else(|_| { + gum::warn!( + LOG_TARGET, + %worker_job_pid, + "Illegal pre-checking max memory value {} discarded", + v, + ); + 0 + }) + }), + ); + + let cpu_time_start = ProcessTime::now(); + + // Spawn a new thread that runs the CPU time monitor. + let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); + let cpu_time_monitor_thread = thread::spawn_worker_thread( + "cpu time monitor thread", + move || cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx), + Arc::clone(&condvar), + WaitOutcome::TimedOut, + ) + .unwrap_or_else(|err| { + send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string()))) + }); + + let prepare_thread = spawn_worker_thread( + "prepare worker", + move || { + #[allow(unused_mut)] + let mut result = prepare_artifact(pvf); + + // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. + #[cfg(target_os = "linux")] + let mut result = result.map(|artifact| (artifact, get_max_rss_thread())); + + // If we are pre-checking, check for runtime construction errors. + // + // As pre-checking is more strict than just preparation in terms of memory + // and time, it is okay to do extra checks here. This takes negligible time + // anyway. + if let PrepareJobKind::Prechecking = prepare_job_kind { + result = result.and_then(|output| { + runtime_construction_check(output.0.as_ref(), &executor_params)?; + Ok(output) + }); + } + result + }, + Arc::clone(&condvar), + WaitOutcome::Finished, + ) + .unwrap_or_else(|err| { + send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string()))) + }); + + let outcome = thread::wait_for_threads(condvar); + + let peak_alloc = { + let peak = end_memory_tracking(); + gum::debug!( + target: LOG_TARGET, + %worker_job_pid, + "prepare job peak allocation is {} bytes", + peak, + ); + peak + }; + + let result = match outcome { + WaitOutcome::Finished => { + let _ = cpu_time_monitor_tx.send(()); + + match prepare_thread.join().unwrap_or_else(|err| { + send_child_response( + &mut pipe_write, + Err(PrepareError::JobError(stringify_panic_payload(err))), + ) + }) { + Err(err) => Err(err), + Ok(ok) => { + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + let (artifact, max_rss) = ok; + } else { + let artifact = ok; + } + } + + // Stop the memory stats worker and get its observed memory stats. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, process::id()); + + let memory_stats = MemoryStats { + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + memory_tracker_stats, + #[cfg(target_os = "linux")] + max_rss: extract_max_rss_stat(max_rss, process::id()), + // Negative peak allocation values are legit; they are narrow + // corner cases and shouldn't affect overall statistics + // significantly + peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 }, + }; + + Ok(JobResponse { artifact, memory_stats }) + }, + } + }, + + // If the CPU thread is not selected, we signal it to end, the join handle is + // dropped and the thread will finish in the background. + WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() { + Ok(Some(_cpu_time_elapsed)) => Err(PrepareError::TimedOut), + Ok(None) => Err(PrepareError::IoErr("error communicating over closed channel".into())), + Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))), + }, + WaitOutcome::Pending => + unreachable!("we run wait_while until the outcome is no longer pending; qed"), + }; + + send_child_response(&mut pipe_write, result); +} + +/// Waits for child process to finish and handle child response from pipe. +/// +/// # Arguments +/// +/// - `pipe_read`: A `PipeReader` used to read data from the child process. +/// +/// - `child`: The child pid. +/// +/// - `temp_artifact_dest`: The destination `PathBuf` to write the temporary artifact file. +/// +/// - `worker_pid`: The PID of the child process. +/// +/// - `usage_before`: Resource usage statistics before executing the child process. +/// +/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`. +/// +/// # Returns +/// +/// - If the child send response without an error, this function returns `Ok(PrepareStats)` +/// containing memory and CPU usage statistics. +/// +/// - If the child send response with an error, it returns a `PrepareError` with that error. +/// +/// - If the child process timeout, it returns `PrepareError::TimedOut`. +fn handle_parent_process( + mut pipe_read: PipeReader, + child: Pid, + temp_artifact_dest: PathBuf, + worker_pid: u32, + usage_before: Usage, + timeout: Duration, +) -> Result { + // Read from the child. Don't decode unless the process exited normally, which we check later. + let mut received_data = Vec::new(); + pipe_read + .read_to_end(&mut received_data) + .map_err(|err| PrepareError::IoErr(err.to_string()))?; + + let status = nix::sys::wait::waitpid(child, None); + gum::trace!( + target: LOG_TARGET, + %worker_pid, + "prepare worker received wait status from job: {:?}", + status, + ); + + let usage_after = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) + .map_err(|errno| error_from_errno("getrusage after", errno))?; + + // Using `getrusage` is needed to check whether child has timedout since we cannot rely on + // child to report its own time. + // As `getrusage` returns resource usage from all terminated child processes, + // it is necessary to subtract the usage before the current child process to isolate its cpu + // time + let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before); + if cpu_tv >= timeout { + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", + cpu_tv.as_millis(), + timeout.as_millis(), + ); + return Err(PrepareError::TimedOut) + } + + match status { + Ok(WaitStatus::Exited(_pid, exit_status)) => { + let mut reader = io::BufReader::new(received_data.as_slice()); + let result = recv_child_response(&mut reader) + .map_err(|err| PrepareError::JobError(err.to_string()))?; + + match result { + Err(err) => Err(err), + Ok(response) => { + // The exit status should have been zero if no error occurred. + if exit_status != 0 { + return Err(PrepareError::JobError(format!( + "unexpected exit status: {}", + exit_status + ))) + } + + // Write the serialized artifact into a temp file. + // + // PVF host only keeps artifacts statuses in its memory, + // successfully compiled code gets stored on the disk (and + // consequently deserialized by execute-workers). The prepare worker + // is only required to send `Ok` to the pool to indicate the + // success. + gum::debug!( + target: LOG_TARGET, + %worker_pid, + "worker: writing artifact to {}", + temp_artifact_dest.display(), + ); + // Write to the temp file created by the host. + if let Err(err) = fs::write(&temp_artifact_dest, &response.artifact) { + return Err(PrepareError::IoErr(err.to_string())) + }; + + Ok(PrepareStats { + memory_stats: response.memory_stats, + cpu_time_elapsed: cpu_tv, + }) + }, + } + }, + // The job was killed by the given signal. + // + // The job gets SIGSYS on seccomp violations, but this signal may have been sent for some + // other reason, so we still need to check for seccomp violations elsewhere. + Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => + Err(PrepareError::JobDied(format!("received signal: {signal:?}"))), + Err(errno) => Err(error_from_errno("waitpid", errno)), + + // An attacker can make the child process return any exit status it wants. So we can treat + // all unexpected cases the same way. + Ok(unexpected_wait_status) => Err(PrepareError::JobDied(format!( + "unexpected status from wait: {unexpected_wait_status:?}" + ))), + } +} + +/// Calculate the total CPU time from the given `usage` structure, returned from +/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user +/// and system time. +/// +/// # Arguments +/// +/// - `rusage`: Contains resource usage information. +/// +/// # Returns +/// +/// Returns a `Duration` representing the total CPU time. +fn get_total_cpu_usage(rusage: Usage) -> Duration { + let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) + + (rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64; + + return Duration::from_micros(micros) +} + +/// Get a job response. +fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result { + let response_bytes = framed_recv_blocking(received_data)?; + JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("prepare pvf recv_child_response: decode error: {:?}", e), + ) + }) +} + +/// Write a job response to the pipe and exit process after. +/// +/// # Arguments +/// +/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe. +/// +/// - `response`: Child process response +fn send_child_response(pipe_write: &mut PipeWriter, response: JobResult) -> ! { + framed_send_blocking(pipe_write, response.encode().as_slice()) + .unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE)); + + if response.is_ok() { + process::exit(libc::EXIT_SUCCESS) + } else { + process::exit(libc::EXIT_FAILURE) + } +} + +fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError { + PrepareError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error())) +} + +type JobResult = Result; + +/// Pre-encoded length-prefixed `Result::Err(PrepareError::OutOfMemory)` +const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08"; + +#[test] +fn pre_encoded_payloads() { + // NOTE: This must match the type of `response` in `send_child_response`. + let oom_unencoded: JobResult = Result::Err(PrepareError::OutOfMemory); + let oom_encoded = oom_unencoded.encode(); + // The payload is prefixed with its length in `framed_send`. + let mut oom_payload = oom_encoded.len().to_le_bytes().to_vec(); + oom_payload.extend(oom_encoded); + assert_eq!(oom_payload, OOM_PAYLOAD); +} diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs index 87ef0b54a040..7fdb8c56ec92 100644 --- a/polkadot/node/core/pvf/src/error.rs +++ b/polkadot/node/core/pvf/src/error.rs @@ -33,36 +33,37 @@ pub enum ValidationError { pub enum InvalidCandidate { /// PVF preparation ended up with a deterministic error. PrepareError(String), - /// The failure is reported by the execution worker. The string contains the error message. - WorkerReportedError(String), - /// The worker has died during validation of a candidate. That may fall in one of the following - /// categories, which we cannot distinguish programmatically: + /// The candidate is reported to be invalid by the execution worker. The string contains the + /// error message. + WorkerReportedInvalid(String), + /// The worker process (not the job) has died during validation of a candidate. /// - /// (a) Some sort of transient glitch caused the worker process to abort. An example would be - /// that the host machine ran out of free memory and the OOM killer started killing the - /// processes, and in order to save the parent it will "sacrifice child" first. - /// - /// (b) The candidate triggered a code path that has lead to the process death. For example, - /// the PVF found a way to consume unbounded amount of resources and then it either - /// exceeded an `rlimit` (if set) or, again, invited OOM killer. Another possibility is a - /// bug in wasmtime allowed the PVF to gain control over the execution worker. - /// - /// We attribute such an event to an *invalid candidate* in either case. - /// - /// The rationale for this is that a glitch may lead to unfair rejecting candidate by a single - /// validator. If the glitch is somewhat more persistent the validator will reject all - /// candidate thrown at it and hopefully the operator notices it by decreased reward - /// performance of the validator. On the other hand, if the worker died because of (b) we would - /// have better chances to stop the attack. + /// It's unlikely that this is caused by malicious code since workers spawn separate job + /// processes, and those job processes are sandboxed. But, it is possible. We retry in this + /// case, and if the error persists, we assume it's caused by the candidate and vote against. AmbiguousWorkerDeath, /// PVF execution (compilation is not included) took more time than was allotted. HardTimeout, - /// A panic occurred and we can't be sure whether the candidate is really invalid or some - /// internal glitch occurred. Whenever we are unsure, we can never treat an error as internal - /// as we would abstain from voting. This is bad because if the issue was due to the candidate, - /// then all validators would abstain, stalling finality on the chain. So we will first retry - /// the candidate, and if the issue persists we are forced to vote invalid. - Panic(String), + /// The job process (not the worker) has died for one of the following reasons: + /// + /// (a) A seccomp violation occurred, most likely due to an attempt by malicious code to + /// execute arbitrary code. Note that there is no foolproof way to detect this if the operator + /// has seccomp auditing disabled. + /// + /// (b) The host machine ran out of free memory and the OOM killer started killing the + /// processes, and in order to save the parent it will "sacrifice child" first. + /// + /// (c) Some other reason, perhaps transient or perhaps caused by malicious code. + /// + /// We cannot treat this as an internal error because malicious code may have caused this. + AmbiguousJobDeath(String), + /// An unexpected error occurred in the job process and we can't be sure whether the candidate + /// is really invalid or some internal glitch occurred. Whenever we are unsure, we can never + /// treat an error as internal as we would abstain from voting. This is bad because if the + /// issue was due to the candidate, then all validators would abstain, stalling finality on the + /// chain. So we will first retry the candidate, and if the issue persists we are forced to + /// vote invalid. + JobError(String), } impl From for ValidationError { diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index aca604f0de21..257377df3f48 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -342,20 +342,27 @@ fn handle_job_finish( }, Outcome::InvalidCandidate { err, idle_worker } => ( Some(idle_worker), - Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(err))), + Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedInvalid(err))), None, ), Outcome::InternalError { err } => (None, Err(ValidationError::InternalError(err)), None), + // Either the worker or the job timed out. Kill the worker in either case. Treated as + // definitely-invalid, because if we timed out, there's no time left for a retry. Outcome::HardTimeout => (None, Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)), None), // "Maybe invalid" errors (will retry). - Outcome::IoErr => ( + Outcome::WorkerIntfErr => ( None, Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)), None, ), - Outcome::Panic { err } => - (None, Err(ValidationError::InvalidCandidate(InvalidCandidate::Panic(err))), None), + Outcome::JobDied { err } => ( + None, + Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousJobDeath(err))), + None, + ), + Outcome::JobError { err } => + (None, Err(ValidationError::InvalidCandidate(InvalidCandidate::JobError(err))), None), }; queue.metrics.execute_finished(); diff --git a/polkadot/node/core/pvf/src/execute/worker_intf.rs b/polkadot/node/core/pvf/src/execute/worker_intf.rs index 61264f7d517d..bf44ba017250 100644 --- a/polkadot/node/core/pvf/src/execute/worker_intf.rs +++ b/polkadot/node/core/pvf/src/execute/worker_intf.rs @@ -30,7 +30,7 @@ use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::InternalValidationError, - execute::{Handshake, Response}, + execute::{Handshake, WorkerResponse}, worker_dir, SecurityStatus, }; use polkadot_parachain_primitives::primitives::ValidationResult; @@ -88,19 +88,26 @@ pub enum Outcome { /// a trap. Errors related to the preparation process are not expected to be encountered by the /// execution workers. InvalidCandidate { err: String, idle_worker: IdleWorker }, + /// The execution time exceeded the hard limit. The worker is terminated. + HardTimeout, + /// An I/O error happened during communication with the worker. This may mean that the worker + /// process already died. The token is not returned in any case. + WorkerIntfErr, + /// The job process has died. We must kill the worker just in case. + /// + /// We cannot treat this as an internal error because malicious code may have caused this. + JobDied { err: String }, + /// An unexpected error occurred in the job process. + /// + /// Because malicious code can cause a job error, we must not treat it as an internal error. + JobError { err: String }, + /// An internal error happened during the validation. Such an error is most likely related to /// some transient glitch. /// /// Should only ever be used for errors independent of the candidate and PVF. Therefore it may /// be a problem with the worker, so we terminate it. InternalError { err: InternalValidationError }, - /// The execution time exceeded the hard limit. The worker is terminated. - HardTimeout, - /// An I/O error happened during communication with the worker. This may mean that the worker - /// process already died. The token is not returned in any case. - IoErr, - /// An unexpected panic has occurred in the execution worker. - Panic { err: String }, } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -137,7 +144,7 @@ pub async fn start_work( ?error, "failed to send an execute request", ); - return Outcome::IoErr + return Outcome::WorkerIntfErr } // We use a generous timeout here. This is in addition to the one in the child process, in @@ -173,7 +180,7 @@ pub async fn start_work( ); } - return Outcome::IoErr + return Outcome::WorkerIntfErr }, Ok(response) => { // Check if any syscall violations occurred during the job. For now this is @@ -189,7 +196,7 @@ pub async fn start_work( ); } - if let Response::Ok{duration, ..} = response { + if let WorkerResponse::Ok{duration, ..} = response { if duration > execution_timeout { // The job didn't complete within the timeout. gum::warn!( @@ -201,7 +208,7 @@ pub async fn start_work( ); // Return a timeout error. - return Outcome::HardTimeout; + return Outcome::HardTimeout } } @@ -216,23 +223,25 @@ pub async fn start_work( validation_code_hash = ?artifact.id.code_hash, "execution worker exceeded lenient timeout for execution, child worker likely stalled", ); - Response::TimedOut + WorkerResponse::JobTimedOut }, }; match response { - Response::Ok { result_descriptor, duration } => Outcome::Ok { + WorkerResponse::Ok { result_descriptor, duration } => Outcome::Ok { result_descriptor, duration, idle_worker: IdleWorker { stream, pid, worker_dir }, }, - Response::InvalidCandidate(err) => Outcome::InvalidCandidate { + WorkerResponse::InvalidCandidate(err) => Outcome::InvalidCandidate { err, idle_worker: IdleWorker { stream, pid, worker_dir }, }, - Response::TimedOut => Outcome::HardTimeout, - Response::Panic(err) => Outcome::Panic { err }, - Response::InternalError(err) => Outcome::InternalError { err }, + WorkerResponse::JobTimedOut => Outcome::HardTimeout, + WorkerResponse::JobDied(err) => Outcome::JobDied { err }, + WorkerResponse::JobError(err) => Outcome::JobError { err }, + + WorkerResponse::InternalError(err) => Outcome::InternalError { err }, } }) .await @@ -306,9 +315,9 @@ async fn send_request( framed_send(stream, &execution_timeout.encode()).await } -async fn recv_response(stream: &mut UnixStream) -> io::Result { +async fn recv_response(stream: &mut UnixStream) -> io::Result { let response_bytes = framed_recv(stream).await?; - Response::decode(&mut &response_bytes[..]).map_err(|e| { + WorkerResponse::decode(&mut response_bytes.as_slice()).map_err(|e| { io::Error::new( io::ErrorKind::Other, format!("execute pvf recv_response: decode error: {:?}", e), diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs index 6bb6ca5b6445..8e02f540d321 100644 --- a/polkadot/node/core/pvf/src/prepare/pool.rs +++ b/polkadot/node/core/pvf/src/prepare/pool.rs @@ -339,17 +339,17 @@ fn handle_mux( spawned, worker, idle, - Err(PrepareError::CreateTmpFileErr(err)), + Err(PrepareError::CreateTmpFile(err)), ), // Return `Concluded`, but do not kill the worker since the error was on the host // side. - Outcome::RenameTmpFileErr { worker: idle, result: _, err, src, dest } => + Outcome::RenameTmpFile { worker: idle, result: _, err, src, dest } => handle_concluded_no_rip( from_pool, spawned, worker, idle, - Err(PrepareError::RenameTmpFileErr { err, src, dest }), + Err(PrepareError::RenameTmpFile { err, src, dest }), ), // Could not clear worker cache. Kill the worker so other jobs can't see the data. Outcome::ClearWorkerDir { err } => { @@ -387,6 +387,21 @@ fn handle_mux( Ok(()) }, + // The worker might still be usable, but we kill it just in case. + Outcome::JobDied(err) => { + if attempt_retire(metrics, spawned, worker) { + reply( + from_pool, + FromPool::Concluded { + worker, + rip: true, + result: Err(PrepareError::JobDied(err)), + }, + )?; + } + + Ok(()) + }, Outcome::TimedOut => { if attempt_retire(metrics, spawned, worker) { reply( diff --git a/polkadot/node/core/pvf/src/prepare/worker_intf.rs b/polkadot/node/core/pvf/src/prepare/worker_intf.rs index 0e50caf1feb5..a22fa74b2fe1 100644 --- a/polkadot/node/core/pvf/src/prepare/worker_intf.rs +++ b/polkadot/node/core/pvf/src/prepare/worker_intf.rs @@ -79,7 +79,7 @@ pub enum Outcome { CreateTmpFileErr { worker: IdleWorker, err: String }, /// The response from the worker is received, but the tmp file cannot be renamed (moved) to the /// final destination location. - RenameTmpFileErr { + RenameTmpFile { worker: IdleWorker, result: PrepareResult, err: String, @@ -100,6 +100,10 @@ pub enum Outcome { IoErr(String), /// The worker ran out of memory and is aborting. The worker should be ripped. OutOfMemory, + /// The preparation job process died, due to OOM, a seccomp violation, or some other factor. + /// + /// The worker might still be usable, but we kill it just in case. + JobDied(String), } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -187,21 +191,6 @@ pub async fn start_work( "failed to recv a prepare response: {:?}", err, ); - - // The worker died. Check if it was due to a seccomp violation. - // - // NOTE: Log, but don't change the outcome. Not all validators may have auditing - // enabled, so we don't want attackers to abuse a non-deterministic outcome. - for syscall in security::check_seccomp_violations_for_worker(audit_log_file, pid).await { - gum::error!( - target: LOG_TARGET, - worker_pid = %pid, - %syscall, - ?pvf, - "A forbidden syscall was attempted! This is a violation of our seccomp security policy. Report an issue ASAP!" - ); - } - Outcome::IoErr(err.to_string()) }, Err(_) => { @@ -236,6 +225,7 @@ async fn handle_response( Ok(result) => result, // Timed out on the child. This should already be logged by the child. Err(PrepareError::TimedOut) => return Outcome::TimedOut, + Err(PrepareError::JobDied(err)) => return Outcome::JobDied(err), Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory, Err(_) => return Outcome::Concluded { worker, result }, }; @@ -272,7 +262,7 @@ async fn handle_response( artifact_path.display(), err, ); - Outcome::RenameTmpFileErr { + Outcome::RenameTmpFile { worker, result, err: format!("{:?}", err), diff --git a/polkadot/node/core/pvf/src/testing.rs b/polkadot/node/core/pvf/src/testing.rs index 4c038896f7f9..400b65bfe7d8 100644 --- a/polkadot/node/core/pvf/src/testing.rs +++ b/polkadot/node/core/pvf/src/testing.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Various things for testing other crates. +//! Various utilities for testing. pub use crate::{ host::{EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME}, @@ -59,27 +59,33 @@ pub fn validate_candidate( /// /// NOTE: This should only be called in dev code (tests, benchmarks) as it relies on the relative /// paths of the built workers. -pub fn get_and_check_worker_paths() -> (PathBuf, PathBuf) { +pub fn build_workers_and_get_paths(is_bench: bool) -> (PathBuf, PathBuf) { // Only needs to be called once for the current process. static WORKER_PATHS: OnceLock> = OnceLock::new(); - fn build_workers() { - let build_args = vec![ + fn build_workers(is_bench: bool) { + let mut build_args = vec![ "build", "--package=polkadot", "--bin=polkadot-prepare-worker", "--bin=polkadot-execute-worker", ]; - let exit_status = std::process::Command::new("cargo") + if is_bench { + // Benches require --release. Regular tests are debug (no flag needed). + build_args.push("--release"); + } + let mut cargo = std::process::Command::new("cargo"); + let cmd = cargo // wasm runtime not needed .env("SKIP_WASM_BUILD", "1") .args(build_args) - .stdout(std::process::Stdio::piped()) - .status() - .expect("Failed to run the build program"); + .stdout(std::process::Stdio::piped()); + + println!("INFO: calling `{cmd:?}`"); + let exit_status = cmd.status().expect("Failed to run the build program"); if !exit_status.success() { - eprintln!("Failed to build workers: {}", exit_status.code().unwrap()); + eprintln!("ERROR: Failed to build workers: {}", exit_status.code().unwrap()); std::process::exit(1); } } @@ -95,23 +101,23 @@ pub fn get_and_check_worker_paths() -> (PathBuf, PathBuf) { // explain why a build happens if !prepare_worker_path.is_executable() { - eprintln!("Prepare worker does not exist or is not executable. Workers directory: {:?}", workers_path); + println!("WARN: Prepare worker does not exist or is not executable. Workers directory: {:?}", workers_path); } if !execute_worker_path.is_executable() { - eprintln!("Execute worker does not exist or is not executable. Workers directory: {:?}", workers_path); + println!("WARN: Execute worker does not exist or is not executable. Workers directory: {:?}", workers_path); } if let Ok(ver) = get_worker_version(&prepare_worker_path) { if ver != NODE_VERSION { - eprintln!("Prepare worker version {ver} does not match node version {NODE_VERSION}; worker path: {prepare_worker_path:?}"); + println!("WARN: Prepare worker version {ver} does not match node version {NODE_VERSION}; worker path: {prepare_worker_path:?}"); } } if let Ok(ver) = get_worker_version(&execute_worker_path) { if ver != NODE_VERSION { - eprintln!("Execute worker version {ver} does not match node version {NODE_VERSION}; worker path: {execute_worker_path:?}"); + println!("WARN: Execute worker version {ver} does not match node version {NODE_VERSION}; worker path: {execute_worker_path:?}"); } } - build_workers(); + build_workers(is_bench); Mutex::new((prepare_worker_path, execute_worker_path)) }); diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 801b60884fa3..d2d842cf84a3 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -19,23 +19,23 @@ use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ - start, testing::get_and_check_worker_paths, Config, InvalidCandidate, Metrics, PrepareError, + start, testing::build_workers_and_get_paths, Config, InvalidCandidate, Metrics, PrepareError, PrepareJobKind, PrepareStats, PvfPrepData, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult}; use polkadot_primitives::{ExecutorParam, ExecutorParams}; -#[cfg(target_os = "linux")] -use rusty_fork::rusty_fork_test; use std::time::Duration; use tokio::sync::Mutex; mod adder; +#[cfg(target_os = "linux")] +mod process; mod worker_common; -const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); -const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(3); +const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6); +const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6); struct TestHost { cache_dir: tempfile::TempDir, @@ -51,7 +51,7 @@ impl TestHost { where F: FnOnce(&mut Config), { - let (prepare_worker_path, execute_worker_path) = get_and_check_worker_paths(); + let (prepare_worker_path, execute_worker_path) = build_workers_and_get_paths(false); let cache_dir = tempfile::tempdir().unwrap(); let mut config = Config::new( @@ -126,7 +126,26 @@ impl TestHost { } #[tokio::test] -async fn terminates_on_timeout() { +async fn prepare_job_terminates_on_timeout() { + let host = TestHost::new().await; + + let start = std::time::Instant::now(); + let result = host + .precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()) + .await; + + match result { + Err(PrepareError::TimedOut) => {}, + r => panic!("{:?}", r), + } + + let duration = std::time::Instant::now().duration_since(start); + assert!(duration >= TEST_PREPARATION_TIMEOUT); + assert!(duration < TEST_PREPARATION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR); +} + +#[tokio::test] +async fn execute_job_terminates_on_timeout() { let host = TestHost::new().await; let start = std::time::Instant::now(); @@ -153,108 +172,6 @@ async fn terminates_on_timeout() { assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR); } -#[cfg(target_os = "linux")] -fn kill_by_sid_and_name(sid: i32, exe_name: &'static str) { - use procfs::process; - - let all_processes: Vec = process::all_processes() - .expect("Can't read /proc") - .filter_map(|p| match p { - Ok(p) => Some(p), // happy path - Err(e) => match e { - // process vanished during iteration, ignore it - procfs::ProcError::NotFound(_) => None, - x => { - panic!("some unknown error: {}", x); - }, - }, - }) - .collect(); - - for process in all_processes { - if process.stat().unwrap().session == sid && - process.exe().unwrap().to_str().unwrap().contains(exe_name) - { - assert_eq!(unsafe { libc::kill(process.pid(), 9) }, 0); - } - } -} - -// Run these tests in their own processes with rusty-fork. They work by each creating a new session, -// then killing the worker process that matches the session ID and expected worker name. -#[cfg(target_os = "linux")] -rusty_fork_test! { - // What happens when the prepare worker dies in the middle of a job? - #[test] - fn prepare_worker_killed_during_job() { - const PROCESS_NAME: &'static str = "polkadot-prepare-worker"; - - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - - let (result, _) = futures::join!( - // Choose a job that would normally take the entire timeout. - host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), - // Run a future that kills the job in the middle of the timeout. - async { - tokio::time::sleep(TEST_PREPARATION_TIMEOUT / 2).await; - kill_by_sid_and_name(sid, PROCESS_NAME); - } - ); - - assert_matches!(result, Err(PrepareError::IoErr(_))); - }) - } - - // What happens when the execute worker dies in the middle of a job? - #[test] - fn execute_worker_killed_during_job() { - const PROCESS_NAME: &'static str = "polkadot-execute-worker"; - - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - - // Prepare the artifact ahead of time. - let binary = halt::wasm_binary_unwrap(); - host.precheck_pvf(binary, Default::default()).await.unwrap(); - - let (result, _) = futures::join!( - // Choose an job that would normally take the entire timeout. - host.validate_candidate( - binary, - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ), - // Run a future that kills the job in the middle of the timeout. - async { - tokio::time::sleep(TEST_EXECUTION_TIMEOUT / 2).await; - kill_by_sid_and_name(sid, PROCESS_NAME); - } - ); - - assert_matches!( - result, - Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)) - ); - }) - } -} - #[cfg(feature = "ci-only-tests")] #[tokio::test] async fn ensure_parallel_execution() { diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs new file mode 100644 index 000000000000..725d060ab916 --- /dev/null +++ b/polkadot/node/core/pvf/tests/it/process.rs @@ -0,0 +1,383 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Test unexpected behaviors of the spawned processes. We test both worker processes (directly +//! spawned by the host) and job processes (spawned by the workers to securely perform PVF jobs). + +use super::TestHost; +use assert_matches::assert_matches; +use polkadot_node_core_pvf::{InvalidCandidate, PrepareError, ValidationError}; +use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams}; +use procfs::process; +use rusty_fork::rusty_fork_test; +use std::time::Duration; + +const PREPARE_PROCESS_NAME: &'static str = "polkadot-prepare-worker"; +const EXECUTE_PROCESS_NAME: &'static str = "polkadot-execute-worker"; + +const SIGNAL_KILL: i32 = 9; +const SIGNAL_STOP: i32 = 19; + +fn send_signal_by_sid_and_name( + sid: i32, + exe_name: &'static str, + is_direct_child: bool, + signal: i32, +) { + let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child); + assert_eq!(unsafe { libc::kill(process.pid(), signal) }, 0); +} +fn get_num_threads_by_sid_and_name(sid: i32, exe_name: &'static str, is_direct_child: bool) -> i64 { + let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child); + process.stat().unwrap().num_threads +} + +fn find_process_by_sid_and_name( + sid: i32, + exe_name: &'static str, + is_direct_child: bool, +) -> process::Process { + let all_processes: Vec = process::all_processes() + .expect("Can't read /proc") + .filter_map(|p| match p { + Ok(p) => Some(p), // happy path + Err(e) => match e { + // process vanished during iteration, ignore it + procfs::ProcError::NotFound(_) => None, + x => { + panic!("some unknown error: {}", x); + }, + }, + }) + .collect(); + + let mut found = None; + for process in all_processes { + let stat = process.stat().unwrap(); + + if stat.session != sid || !process.exe().unwrap().to_str().unwrap().contains(exe_name) { + continue + } + // The workers are direct children of the current process, the worker job processes are not + // (they are children of the workers). + let process_is_direct_child = stat.ppid as u32 == std::process::id(); + if is_direct_child != process_is_direct_child { + continue + } + + if found.is_some() { + panic!("Found more than one process") + } + found = Some(process); + } + found.expect("Should have found the expected process") +} + +// Run these tests in their own processes with rusty-fork. They work by each creating a new session, +// then doing something with the child process that matches the session ID and expected process +// name. +rusty_fork_test! { + // What happens when the prepare worker (not the job) times out? + #[test] + fn prepare_worker_timeout() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = TestHost::new().await; + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + let (result, _) = futures::join!( + // Choose a job that would normally take the entire timeout. + host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), + // Send a stop signal to pause the worker. + async { + tokio::time::sleep(Duration::from_secs(1)).await; + send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true, SIGNAL_STOP); + } + ); + + assert_matches!(result, Err(PrepareError::TimedOut)); + }) + } + + // What happens when the execute worker (not the job) times out? + #[test] + fn execute_worker_timeout() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = TestHost::new().await; + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + // Prepare the artifact ahead of time. + let binary = halt::wasm_binary_unwrap(); + host.precheck_pvf(binary, Default::default()).await.unwrap(); + + let (result, _) = futures::join!( + // Choose an job that would normally take the entire timeout. + host.validate_candidate( + binary, + ValidationParams { + block_data: BlockData(Vec::new()), + parent_head: Default::default(), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + Default::default(), + ), + // Send a stop signal to pause the worker. + async { + tokio::time::sleep(Duration::from_secs(1)).await; + send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true, SIGNAL_STOP); + } + ); + + assert_matches!( + result, + Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) + ); + }) + } + + // What happens when the prepare worker dies in the middle of a job? + #[test] + fn prepare_worker_killed_during_job() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = TestHost::new().await; + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + let (result, _) = futures::join!( + // Choose a job that would normally take the entire timeout. + host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), + // Run a future that kills the job while it's running. + async { + tokio::time::sleep(Duration::from_secs(1)).await; + send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true, SIGNAL_KILL); + } + ); + + assert_matches!(result, Err(PrepareError::IoErr(_))); + }) + } + + // What happens when the execute worker dies in the middle of a job? + #[test] + fn execute_worker_killed_during_job() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = TestHost::new().await; + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + // Prepare the artifact ahead of time. + let binary = halt::wasm_binary_unwrap(); + host.precheck_pvf(binary, Default::default()).await.unwrap(); + + let (result, _) = futures::join!( + // Choose an job that would normally take the entire timeout. + host.validate_candidate( + binary, + ValidationParams { + block_data: BlockData(Vec::new()), + parent_head: Default::default(), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + Default::default(), + ), + // Run a future that kills the job while it's running. + async { + tokio::time::sleep(Duration::from_secs(1)).await; + send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true, SIGNAL_KILL); + } + ); + + assert_matches!( + result, + Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)) + ); + }) + } + + // What happens when the forked prepare job dies in the middle of its job? + #[test] + fn forked_prepare_job_killed_during_job() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = TestHost::new().await; + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + let (result, _) = futures::join!( + // Choose a job that would normally take the entire timeout. + host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), + // Run a future that kills the job while it's running. + async { + tokio::time::sleep(Duration::from_secs(1)).await; + send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false, SIGNAL_KILL); + } + ); + + // Note that we get a more specific error if the job died than if the whole worker died. + assert_matches!( + result, + Err(PrepareError::JobDied(err)) if err == "received signal: SIGKILL" + ); + }) + } + + // What happens when the forked execute job dies in the middle of its job? + #[test] + fn forked_execute_job_killed_during_job() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = TestHost::new().await; + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + // Prepare the artifact ahead of time. + let binary = halt::wasm_binary_unwrap(); + host.precheck_pvf(binary, Default::default()).await.unwrap(); + + let (result, _) = futures::join!( + // Choose a job that would normally take the entire timeout. + host.validate_candidate( + binary, + ValidationParams { + block_data: BlockData(Vec::new()), + parent_head: Default::default(), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + Default::default(), + ), + // Run a future that kills the job while it's running. + async { + tokio::time::sleep(Duration::from_secs(1)).await; + send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false, SIGNAL_KILL); + } + ); + + // Note that we get a more specific error if the job died than if the whole worker died. + assert_matches!( + result, + Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousJobDeath(err))) + if err == "received signal: SIGKILL" + ); + }) + } + + // Ensure that the spawned prepare worker is single-threaded. + // + // See `run_worker` for why we need this invariant. + #[test] + fn ensure_prepare_processes_have_correct_num_threads() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = TestHost::new().await; + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + let _ = futures::join!( + // Choose a job that would normally take the entire timeout. + host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), + // Run a future that kills the job while it's running. + async { + tokio::time::sleep(Duration::from_secs(1)).await; + assert_eq!( + get_num_threads_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true), + 1 + ); + // Child job should have three threads: main thread, execute thread, CPU time + // monitor, and memory tracking. + assert_eq!( + get_num_threads_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false), + 4 + ); + + // End the test. + send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true, SIGNAL_KILL); + } + ); + }) + } + + // Ensure that the spawned execute worker is single-threaded. + // + // See `run_worker` for why we need this invariant. + #[test] + fn ensure_execute_processes_have_correct_num_threads() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = TestHost::new().await; + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + // Prepare the artifact ahead of time. + let binary = halt::wasm_binary_unwrap(); + host.precheck_pvf(binary, Default::default()).await.unwrap(); + + let _ = futures::join!( + // Choose a job that would normally take the entire timeout. + host.validate_candidate( + binary, + ValidationParams { + block_data: BlockData(Vec::new()), + parent_head: Default::default(), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + Default::default(), + ), + // Run a future that tests the thread count while the worker is running. + async { + tokio::time::sleep(Duration::from_secs(1)).await; + assert_eq!( + get_num_threads_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true), + 1 + ); + // Child job should have three threads: main thread, execute thread, and CPU + // time monitor. + assert_eq!( + get_num_threads_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false), + 3 + ); + + // End the test. + send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true, SIGNAL_KILL); + } + ); + }) + } +} diff --git a/polkadot/node/core/pvf/tests/it/worker_common.rs b/polkadot/node/core/pvf/tests/it/worker_common.rs index df64980dc806..0d33af7e096c 100644 --- a/polkadot/node/core/pvf/tests/it/worker_common.rs +++ b/polkadot/node/core/pvf/tests/it/worker_common.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use polkadot_node_core_pvf::{ - testing::{get_and_check_worker_paths, spawn_with_program_path, SpawnErr}, + testing::{build_workers_and_get_paths, spawn_with_program_path, SpawnErr}, SecurityStatus, }; use std::{env, time::Duration}; @@ -23,7 +23,7 @@ use std::{env, time::Duration}; // Test spawning a program that immediately exits with a failure code. #[tokio::test] async fn spawn_immediate_exit() { - let (prepare_worker_path, _) = get_and_check_worker_paths(); + let (prepare_worker_path, _) = build_workers_and_get_paths(false); // There's no explicit `exit` subcommand in the worker; it will panic on an unknown // subcommand anyway @@ -41,7 +41,7 @@ async fn spawn_immediate_exit() { #[tokio::test] async fn spawn_timeout() { - let (_, execute_worker_path) = get_and_check_worker_paths(); + let (_, execute_worker_path) = build_workers_and_get_paths(false); let result = spawn_with_program_path( "integration-test", @@ -57,7 +57,7 @@ async fn spawn_timeout() { #[tokio::test] async fn should_connect() { - let (prepare_worker_path, _) = get_and_check_worker_paths(); + let (prepare_worker_path, _) = build_workers_and_get_paths(false); let _ = spawn_with_program_path( "integration-test", diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md b/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md index 52129f9eb80a..4dbb7980c1be 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/pvf-host-and-workers.md @@ -1,7 +1,11 @@ # PVF Host and Workers The PVF host is responsible for handling requests to prepare and execute PVF -code blobs, which it sends to PVF workers running in their own child processes. +code blobs, which it sends to PVF **workers** running in their own child +processes. + +While the workers are generally long-living, they also spawn one-off secure +**job processes** that perform the jobs. See "Job Processes" section below. This system has two high-levels goals that we will touch on here: *determinism* and *security*. @@ -36,8 +40,11 @@ execution request: not successful. 2. **Artifact missing:** The prepared artifact might have been deleted due to operator error or some bug in the system. -3. **Panic:** The worker thread panicked for some indeterminate reason, which - may or may not be independent of the candidate or PVF. +3. **Job errors:** For example, the worker thread panicked for some + indeterminate reason, which may or may not be independent of the candidate or + PVF. +4. **Internal errors:** See "Internal Errors" section. In this case, after the + retry we abstain from voting. ### Preparation timeouts @@ -62,10 +69,16 @@ more than the CPU time. ### Internal errors +An internal, or local, error is one that we treat as independent of the PVF +and/or candidate, i.e. local to the running machine. If this happens, then we +will first retry the job and if the errors persists, then we simply do not vote. +This prevents slashes, since otherwise our vote may not agree with that of the +other validators. + In general, for errors not raising a dispute we have to be very careful. This is -only sound, if we either: +only sound, if either: -1. Ruled out that error in pre-checking. If something is not checked in +1. We ruled out that error in pre-checking. If something is not checked in pre-checking, even if independent of the candidate and PVF, we must raise a dispute. 2. We are 100% confident that it is a hardware/local issue: Like corrupted file, @@ -75,11 +88,11 @@ Reasoning: Otherwise it would be possible to register a PVF where candidates can not be checked, but we don't get a dispute - so nobody gets punished. Second, we end up with a finality stall that is not going to resolve! -There are some error conditions where we can't be sure whether the candidate is -really invalid or some internal glitch occurred, e.g. panics. Whenever we are -unsure, we can never treat an error as internal as we would abstain from voting. -So we will first retry the candidate, and if the issue persists we are forced to -vote invalid. +Note that any error from the job process we cannot treat as internal. The job +runs untrusted code and an attacker can therefore return arbitrary errors. If +they were to return errors that we treat as internal, they could make us abstain +from voting. Since we are unsure if such errors are legitimate, we will first +retry the candidate, and if the issue persists we are forced to vote invalid. ## Security @@ -119,6 +132,20 @@ So what are we actually worried about? Things that come to mind: 6. **Intercepting and manipulating packages** - Effect very similar to the above, hard to do without also being able to do 4 or 5. +### Job Processes + +As mentioned above, our architecture includes long-living **worker processes** +and one-off **job processes*. This separation is important so that the handling +of untrusted code can be limited to the job processes. A hijacked job process +can therefore not interfere with other jobs running in separate processes. + +Furthermore, if an unexpected execution error occurred in the worker and not the +job, we generally can be confident that it has nothing to do with the candidate, +so we can abstain from voting. On the other hand, a hijacked job can send back +erroneous responses for candidates, so we know that we should not abstain from +voting on such errors from jobs. Otherwise, an attacker could trigger a finality +stall. (See "Internal Errors" section above.) + ### Restricting file-system access A basic security mechanism is to make sure that any process directly interfacing diff --git a/polkadot/scripts/list-syscalls/execute-worker-syscalls b/polkadot/scripts/list-syscalls/execute-worker-syscalls index 4a7a66181299..349af783cf1a 100644 --- a/polkadot/scripts/list-syscalls/execute-worker-syscalls +++ b/polkadot/scripts/list-syscalls/execute-worker-syscalls @@ -16,6 +16,7 @@ 16 (ioctl) 19 (readv) 20 (writev) +22 (pipe) 24 (sched_yield) 25 (mremap) 28 (madvise) @@ -25,7 +26,9 @@ 45 (recvfrom) 46 (sendmsg) 56 (clone) +57 (fork) 60 (exit) +61 (wait4) 62 (kill) 72 (fcntl) 79 (getcwd) @@ -36,6 +39,7 @@ 89 (readlink) 96 (gettimeofday) 97 (getrlimit) +98 (getrusage) 99 (sysinfo) 102 (getuid) 110 (getppid) @@ -47,6 +51,7 @@ 158 (arch_prctl) 165 (mount) 166 (umount2) +186 (gettid) 200 (tkill) 202 (futex) 204 (sched_getaffinity) @@ -60,6 +65,7 @@ 263 (unlinkat) 272 (unshare) 273 (set_robust_list) +293 (pipe2) 302 (prlimit64) 318 (getrandom) 319 (memfd_create) diff --git a/polkadot/scripts/list-syscalls/prepare-worker-syscalls b/polkadot/scripts/list-syscalls/prepare-worker-syscalls index cab58e06692b..05281b61591a 100644 --- a/polkadot/scripts/list-syscalls/prepare-worker-syscalls +++ b/polkadot/scripts/list-syscalls/prepare-worker-syscalls @@ -16,6 +16,7 @@ 16 (ioctl) 19 (readv) 20 (writev) +22 (pipe) 24 (sched_yield) 25 (mremap) 28 (madvise) @@ -25,7 +26,9 @@ 45 (recvfrom) 46 (sendmsg) 56 (clone) +57 (fork) 60 (exit) +61 (wait4) 62 (kill) 72 (fcntl) 79 (getcwd) @@ -48,6 +51,7 @@ 158 (arch_prctl) 165 (mount) 166 (umount2) +186 (gettid) 200 (tkill) 202 (futex) 203 (sched_setaffinity) @@ -62,6 +66,7 @@ 263 (unlinkat) 272 (unshare) 273 (set_robust_list) +293 (pipe2) 302 (prlimit64) 309 (getcpu) 318 (getrandom)