diff --git a/Cargo.lock b/Cargo.lock
index 9381afc45219..23abf35bbaba 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3123,6 +3123,12 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
+[[package]]
+name = "hex-literal"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0"
+
[[package]]
name = "hex-literal"
version = "0.4.1"
@@ -3693,7 +3699,7 @@ dependencies = [
"frame-system-benchmarking",
"frame-system-rpc-runtime-api",
"frame-try-runtime",
- "hex-literal",
+ "hex-literal 0.4.1",
"kusama-runtime-constants",
"log",
"pallet-authority-discovery",
@@ -6580,7 +6586,7 @@ dependencies = [
"nix 0.26.2",
"polkadot-cli",
"polkadot-core-primitives",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
"polkadot-overseer",
"substrate-rpc-client",
"tempfile",
@@ -6706,7 +6712,7 @@ dependencies = [
"futures",
"log",
"polkadot-client",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
"polkadot-node-metrics",
"polkadot-performance-test",
"polkadot-service",
@@ -6720,6 +6726,7 @@ dependencies = [
"sp-core",
"sp-io",
"sp-keyring",
+ "sp-maybe-compressed-blob",
"substrate-build-script-utils",
"thiserror",
"try-runtime-cli",
@@ -7175,10 +7182,9 @@ version = "0.9.41"
dependencies = [
"always-assert",
"assert_matches",
- "cpu-time",
"futures",
"futures-timer",
- "hex-literal",
+ "hex-literal 0.3.4",
"libc",
"parity-scale-codec",
"pin-project",
@@ -7188,22 +7194,13 @@ dependencies = [
"polkadot-parachain",
"polkadot-primitives",
"rand 0.8.5",
- "rayon",
- "sc-executor",
- "sc-executor-common",
- "sc-executor-wasmtime",
"slotmap",
"sp-core",
- "sp-externalities",
- "sp-io",
"sp-maybe-compressed-blob",
"sp-tracing",
"sp-wasm-interface",
"substrate-build-script-utils",
"tempfile",
- "test-parachain-adder",
- "test-parachain-halt",
- "tikv-jemalloc-ctl",
"tokio",
"tracing-gum",
]
@@ -7231,6 +7228,36 @@ dependencies = [
"tracing-gum",
]
+[[package]]
+name = "polkadot-node-core-pvf-worker"
+version = "0.9.41"
+dependencies = [
+ "assert_matches",
+ "cpu-time",
+ "futures",
+ "libc",
+ "parity-scale-codec",
+ "polkadot-node-core-pvf",
+ "polkadot-parachain",
+ "polkadot-primitives",
+ "rayon",
+ "sc-executor",
+ "sc-executor-common",
+ "sc-executor-wasmtime",
+ "sp-core",
+ "sp-externalities",
+ "sp-io",
+ "sp-maybe-compressed-blob",
+ "sp-tracing",
+ "substrate-build-script-utils",
+ "tempfile",
+ "test-parachain-adder",
+ "test-parachain-halt",
+ "tikv-jemalloc-ctl",
+ "tokio",
+ "tracing-gum",
+]
+
[[package]]
name = "polkadot-node-core-runtime-api"
version = "0.9.41"
@@ -7480,10 +7507,12 @@ dependencies = [
"kusama-runtime",
"log",
"polkadot-erasure-coding",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
"polkadot-node-primitives",
"polkadot-primitives",
"quote",
+ "sc-executor-common",
+ "sp-maybe-compressed-blob",
"thiserror",
]
@@ -7492,7 +7521,7 @@ name = "polkadot-primitives"
version = "0.9.41"
dependencies = [
"bitvec",
- "hex-literal",
+ "hex-literal 0.4.1",
"parity-scale-codec",
"polkadot-core-primitives",
"polkadot-parachain",
@@ -7569,7 +7598,7 @@ dependencies = [
"frame-system-benchmarking",
"frame-system-rpc-runtime-api",
"frame-try-runtime",
- "hex-literal",
+ "hex-literal 0.4.1",
"log",
"pallet-authority-discovery",
"pallet-authorship",
@@ -7666,7 +7695,7 @@ dependencies = [
"frame-support",
"frame-support-test",
"frame-system",
- "hex-literal",
+ "hex-literal 0.4.1",
"impl-trait-for-tuples",
"libsecp256k1",
"log",
@@ -7744,7 +7773,7 @@ dependencies = [
"frame-support-test",
"frame-system",
"futures",
- "hex-literal",
+ "hex-literal 0.4.1",
"log",
"pallet-authority-discovery",
"pallet-authorship",
@@ -7795,7 +7824,7 @@ dependencies = [
"frame-support",
"frame-system-rpc-runtime-api",
"futures",
- "hex-literal",
+ "hex-literal 0.4.1",
"kusama-runtime",
"kusama-runtime-constants",
"kvdb",
@@ -7981,7 +8010,7 @@ dependencies = [
"polkadot-node-core-backing",
"polkadot-node-core-candidate-validation",
"polkadot-node-core-dispute-coordinator",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
@@ -8004,7 +8033,7 @@ dependencies = [
"frame-support",
"frame-system",
"frame-system-rpc-runtime-api",
- "hex-literal",
+ "hex-literal 0.4.1",
"log",
"pallet-authority-discovery",
"pallet-authorship",
@@ -8887,7 +8916,7 @@ dependencies = [
"frame-system-benchmarking",
"frame-system-rpc-runtime-api",
"frame-try-runtime",
- "hex-literal",
+ "hex-literal 0.4.1",
"log",
"pallet-authority-discovery",
"pallet-authorship",
@@ -12041,7 +12070,7 @@ dependencies = [
"log",
"parity-scale-codec",
"polkadot-cli",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-parachain",
@@ -12089,7 +12118,7 @@ dependencies = [
"log",
"parity-scale-codec",
"polkadot-cli",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-parachain",
@@ -13649,7 +13678,7 @@ dependencies = [
"frame-system-benchmarking",
"frame-system-rpc-runtime-api",
"frame-try-runtime",
- "hex-literal",
+ "hex-literal 0.4.1",
"log",
"pallet-authority-discovery",
"pallet-authorship",
@@ -14048,7 +14077,7 @@ dependencies = [
"bounded-collections",
"derivative",
"hex",
- "hex-literal",
+ "hex-literal 0.4.1",
"impl-trait-for-tuples",
"log",
"parity-scale-codec",
diff --git a/Cargo.toml b/Cargo.toml
index ded5dda0aa2e..886b489a5024 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,7 +24,7 @@ tikv-jemallocator = "0.5.0"
# Crates in our workspace, defined as dependencies so we can pass them feature flags.
polkadot-cli = { path = "cli", features = [ "kusama-native", "westend-native", "rococo-native" ] }
-polkadot-node-core-pvf = { path = "node/core/pvf" }
+polkadot-node-core-pvf-worker = { path = "node/core/pvf/worker" }
polkadot-overseer = { path = "node/overseer" }
[dev-dependencies]
@@ -80,6 +80,7 @@ members = [
"node/core/parachains-inherent",
"node/core/provisioner",
"node/core/pvf",
+ "node/core/pvf/worker",
"node/core/pvf-checker",
"node/core/runtime-api",
"node/network/approval-distribution",
@@ -206,7 +207,7 @@ try-runtime = [ "polkadot-cli/try-runtime" ]
fast-runtime = [ "polkadot-cli/fast-runtime" ]
runtime-metrics = [ "polkadot-cli/runtime-metrics" ]
pyroscope = ["polkadot-cli/pyroscope"]
-jemalloc-allocator = ["polkadot-node-core-pvf/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"]
+jemalloc-allocator = ["polkadot-node-core-pvf-worker/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"]
# Configuration for building a .deb package - for use with `cargo-deb`
[package.metadata.deb]
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 01247bbc996f..4d08ee18ed1b 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -22,12 +22,13 @@ pyro = { package = "pyroscope", version = "0.3.1", optional = true }
service = { package = "polkadot-service", path = "../node/service", default-features = false, optional = true }
polkadot-client = { path = "../node/client", optional = true }
-polkadot-node-core-pvf = { path = "../node/core/pvf", optional = true }
+polkadot-node-core-pvf-worker = { path = "../node/core/pvf/worker", optional = true }
polkadot-performance-test = { path = "../node/test/performance-test", optional = true }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
frame-benchmarking-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
try-runtime-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
@@ -52,7 +53,7 @@ cli = [
"frame-benchmarking-cli",
"try-runtime-cli",
"polkadot-client",
- "polkadot-node-core-pvf",
+ "polkadot-node-core-pvf-worker",
]
runtime-benchmarks = [
"service/runtime-benchmarks",
diff --git a/cli/src/command.rs b/cli/src/command.rs
index 2f0bc9e2f856..c0e96de2a54b 100644
--- a/cli/src/command.rs
+++ b/cli/src/command.rs
@@ -494,7 +494,7 @@ pub fn run() -> Result<()> {
#[cfg(not(target_os = "android"))]
{
- polkadot_node_core_pvf::prepare_worker_entrypoint(
+ polkadot_node_core_pvf_worker::prepare_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
@@ -516,7 +516,7 @@ pub fn run() -> Result<()> {
#[cfg(not(target_os = "android"))]
{
- polkadot_node_core_pvf::execute_worker_entrypoint(
+ polkadot_node_core_pvf_worker::execute_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
diff --git a/cli/src/host_perf_check.rs b/cli/src/host_perf_check.rs
index 1225c4708a3a..adfdebce6779 100644
--- a/cli/src/host_perf_check.rs
+++ b/cli/src/host_perf_check.rs
@@ -15,7 +15,6 @@
// along with Polkadot. If not, see .
use log::info;
-use polkadot_node_core_pvf::sp_maybe_compressed_blob;
use polkadot_performance_test::{
measure_erasure_coding, measure_pvf_prepare, PerfCheckError, ERASURE_CODING_N_VALIDATORS,
ERASURE_CODING_TIME_LIMIT, PVF_PREPARE_TIME_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml
index 20dcd5afdfaf..026930758b86 100644
--- a/node/core/pvf/Cargo.toml
+++ b/node/core/pvf/Cargo.toml
@@ -4,24 +4,15 @@ version.workspace = true
authors.workspace = true
edition.workspace = true
-[[bin]]
-name = "puppet_worker"
-path = "bin/puppet_worker.rs"
-
[dependencies]
always-assert = "0.1"
-assert_matches = "1.4.0"
-cpu-time = "1.0.0"
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
libc = "0.2.139"
pin-project = "1.0.9"
rand = "0.8.5"
-rayon = "1.5.1"
slotmap = "1.0"
-tempfile = "3.3.0"
-tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
tokio = { version = "1.24.2", features = ["fs", "process"] }
parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] }
@@ -30,13 +21,8 @@ polkadot-parachain = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
polkadot-node-metrics = { path = "../../metrics" }
polkadot-node-primitives = { path = "../../primitives" }
-
polkadot-primitives = { path = "../../../primitives" }
-sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-externalities = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -45,14 +31,7 @@ sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master
[build-dependencies]
substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
-[target.'cfg(target_os = "linux")'.dependencies]
-tikv-jemalloc-ctl = "0.5.0"
-
[dev-dependencies]
-adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" }
-halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }
-hex-literal = "0.4.1"
+assert_matches = "1.4.0"
+hex-literal = "0.3.4"
tempfile = "3.3.0"
-
-[features]
-jemalloc-allocator = ["dep:tikv-jemalloc-ctl"]
diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs
index f0f8a0f8af28..d5a660cc3aa5 100644
--- a/node/core/pvf/src/artifacts.rs
+++ b/node/core/pvf/src/artifacts.rs
@@ -65,9 +65,11 @@ use std::{
time::{Duration, SystemTime},
};
+/// Contains the bytes for a successfully compiled artifact.
pub struct CompiledArtifact(Vec);
impl CompiledArtifact {
+ /// Creates a `CompiledArtifact`.
pub fn new(code: Vec) -> Self {
Self(code)
}
diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs
index 662fcc22cd31..21f23d515fdd 100644
--- a/node/core/pvf/src/error.rs
+++ b/node/core/pvf/src/error.rs
@@ -16,7 +16,7 @@
use crate::prepare::PrepareStats;
use parity_scale_codec::{Decode, Encode};
-use std::{any::Any, fmt};
+use std::fmt;
/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if
/// successful
@@ -126,17 +126,3 @@ impl From for ValidationError {
}
}
}
-
-/// Attempt to convert an opaque panic payload to a string.
-///
-/// This is a best effort, and is not guaranteed to provide the most accurate value.
-pub(crate) fn stringify_panic_payload(payload: Box) -> String {
- match payload.downcast::<&'static str>() {
- Ok(msg) => msg.to_string(),
- Err(payload) => match payload.downcast::() {
- Ok(msg) => *msg,
- // At least we tried...
- Err(_) => "unknown panic payload".to_string(),
- },
- }
-}
diff --git a/node/core/pvf/src/execute/mod.rs b/node/core/pvf/src/execute/mod.rs
index b0e8cc482561..8e3b17d71569 100644
--- a/node/core/pvf/src/execute/mod.rs
+++ b/node/core/pvf/src/execute/mod.rs
@@ -18,10 +18,10 @@
//!
//! The validation host [runs the queue][`start`] communicating with it by sending [`ToQueue`]
//! messages. The queue will spawn workers in new processes. Those processes should jump to
-//! [`worker_entrypoint`].
+//! `polkadot_node_core_pvf_worker::execute_worker_entrypoint`.
mod queue;
-mod worker;
+mod worker_intf;
pub use queue::{start, PendingExecutionRequest, ToQueue};
-pub use worker::{worker_entrypoint, Response as ExecuteResponse};
+pub use worker_intf::{Handshake as ExecuteHandshake, Response as ExecuteResponse};
diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs
index e1e02205256b..5b3e21cee079 100644
--- a/node/core/pvf/src/execute/queue.rs
+++ b/node/core/pvf/src/execute/queue.rs
@@ -16,7 +16,7 @@
//! A queue that handles requests for PVF execution.
-use super::worker::Outcome;
+use super::worker_intf::Outcome;
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
@@ -416,7 +416,8 @@ async fn spawn_worker_task(
use futures_timer::Delay;
loop {
- match super::worker::spawn(&program_path, job.executor_params.clone(), spawn_timeout).await
+ match super::worker_intf::spawn(&program_path, job.executor_params.clone(), spawn_timeout)
+ .await
{
Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
Err(err) => {
@@ -460,9 +461,13 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
queue.mux.push(
async move {
let _timer = execution_timer;
- let outcome =
- super::worker::start_work(idle, job.artifact.clone(), job.exec_timeout, job.params)
- .await;
+ let outcome = super::worker_intf::start_work(
+ idle,
+ job.artifact.clone(),
+ job.exec_timeout,
+ job.params,
+ )
+ .await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
}
.boxed(),
diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker_intf.rs
similarity index 55%
rename from node/core/pvf/src/execute/worker.rs
rename to node/core/pvf/src/execute/worker_intf.rs
index f20874083be3..bc467cf90de6 100644
--- a/node/core/pvf/src/execute/worker.rs
+++ b/node/core/pvf/src/execute/worker_intf.rs
@@ -14,28 +14,23 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
+//! Host interface to the execute worker.
+
use crate::{
artifacts::ArtifactPathId,
- executor_intf::Executor,
worker_common::{
- bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
- spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
- JOB_TIMEOUT_WALL_CLOCK_FACTOR,
+ framed_recv, framed_send, path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr,
+ WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
-use cpu_time::ProcessTime;
-use futures::{pin_mut, select_biased, FutureExt};
+use futures::FutureExt;
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain::primitives::ValidationResult;
use polkadot_primitives::ExecutorParams;
-use std::{
- path::{Path, PathBuf},
- sync::{mpsc::channel, Arc},
- time::Duration,
-};
+use std::{path::Path, time::Duration};
use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
@@ -185,17 +180,6 @@ async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Re
framed_send(stream, &handshake.encode()).await
}
-async fn recv_handshake(stream: &mut UnixStream) -> io::Result {
- let handshake_enc = framed_recv(stream).await?;
- let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
- io::Error::new(
- io::ErrorKind::Other,
- "execute pvf recv_handshake: failed to decode Handshake".to_owned(),
- )
- })?;
- Ok(handshake)
-}
-
async fn send_request(
stream: &mut UnixStream,
artifact_path: &Path,
@@ -207,29 +191,6 @@ async fn send_request(
framed_send(stream, &execution_timeout.encode()).await
}
-async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec, Duration)> {
- let artifact_path = framed_recv(stream).await?;
- let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| {
- io::Error::new(
- io::ErrorKind::Other,
- "execute pvf recv_request: non utf-8 artifact path".to_string(),
- )
- })?;
- let params = framed_recv(stream).await?;
- let execution_timeout = framed_recv(stream).await?;
- let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| {
- io::Error::new(
- io::ErrorKind::Other,
- "execute pvf recv_request: failed to decode duration".to_string(),
- )
- })?;
- Ok((artifact_path, params, execution_timeout))
-}
-
-async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> {
- framed_send(stream, &response.encode()).await
-}
-
async fn recv_response(stream: &mut UnixStream) -> io::Result {
let response_bytes = framed_recv(stream).await?;
Response::decode(&mut &response_bytes[..]).map_err(|e| {
@@ -240,28 +201,43 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result {
})
}
+/// The payload of the one-time handshake that is done when a worker process is created. Carries
+/// data from the host to the worker.
#[derive(Encode, Decode)]
-struct Handshake {
- executor_params: ExecutorParams,
+pub struct Handshake {
+ /// The executor parameters.
+ pub executor_params: ExecutorParams,
}
+/// The response from an execution job on the worker.
#[derive(Encode, Decode)]
pub enum Response {
- Ok { result_descriptor: ValidationResult, duration: Duration },
+ /// The job completed successfully.
+ Ok {
+ /// The result of parachain validation.
+ result_descriptor: ValidationResult,
+ /// The amount of CPU time taken by the job.
+ duration: Duration,
+ },
+ /// The candidate is invalid.
InvalidCandidate(String),
+ /// The job timed out.
TimedOut,
+ /// Some internal error occurred. Should only be used for errors independent of the candidate.
InternalError(String),
}
impl Response {
- fn format_invalid(ctx: &'static str, msg: &str) -> Self {
+ /// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty).
+ pub fn format_invalid(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Self::InvalidCandidate(ctx.to_string())
} else {
Self::InvalidCandidate(format!("{}: {}", ctx, msg))
}
}
- fn format_internal(ctx: &'static str, msg: &str) -> Self {
+ /// Creates an internal response from a context `ctx` and a message `msg` (which can be empty).
+ pub fn format_internal(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Self::InternalError(ctx.to_string())
} else {
@@ -269,110 +245,3 @@ impl Response {
}
}
}
-
-/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
-/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
-/// is checked against the worker version. A mismatch results in immediate worker termination.
-/// `None` is used for tests and in other situations when version check is not necessary.
-pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
- worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
- let worker_pid = std::process::id();
-
- let handshake = recv_handshake(&mut stream).await?;
- let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
- io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
- })?);
-
- loop {
- let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
- gum::debug!(
- target: LOG_TARGET,
- %worker_pid,
- "worker: validating artifact {}",
- artifact_path.display(),
- );
-
- // Used to signal to the cpu time monitor thread that it can finish.
- let (finished_tx, finished_rx) = channel::<()>();
- let cpu_time_start = ProcessTime::now();
-
- // Spawn a new thread that runs the CPU time monitor.
- let cpu_time_monitor_fut = rt_handle
- .spawn_blocking(move || {
- cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
- })
- .fuse();
- let executor_2 = executor.clone();
- let execute_fut = rt_handle
- .spawn_blocking(move || {
- validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start)
- })
- .fuse();
-
- pin_mut!(cpu_time_monitor_fut);
- pin_mut!(execute_fut);
-
- let response = select_biased! {
- // If this future is not selected, the join handle is dropped and the thread will
- // finish in the background.
- cpu_time_monitor_res = cpu_time_monitor_fut => {
- match cpu_time_monitor_res {
- Ok(Some(cpu_time_elapsed)) => {
- // Log if we exceed the timeout and the other thread hasn't finished.
- gum::warn!(
- target: LOG_TARGET,
- %worker_pid,
- "execute job took {}ms cpu time, exceeded execute timeout {}ms",
- cpu_time_elapsed.as_millis(),
- execution_timeout.as_millis(),
- );
- Response::TimedOut
- },
- Ok(None) => Response::InternalError("error communicating over finished channel".into()),
- Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
- }
- },
- execute_res = execute_fut => {
- let _ = finished_tx.send(());
- execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
- },
- };
-
- send_response(&mut stream, response).await?;
- }
- });
-}
-
-fn validate_using_artifact(
- artifact_path: &Path,
- params: &[u8],
- executor: Arc,
- cpu_time_start: ProcessTime,
-) -> Response {
- // Check here if the file exists, because the error from Substrate is not match-able.
- // TODO: Re-evaluate after .
- let file_metadata = std::fs::metadata(artifact_path);
- if let Err(err) = file_metadata {
- return Response::format_internal("execute: could not find or open file", &err.to_string())
- }
-
- let descriptor_bytes = match unsafe {
- // SAFETY: this should be safe since the compiled artifact passed here comes from the
- // file created by the prepare workers. These files are obtained by calling
- // [`executor_intf::prepare`].
- executor.execute(artifact_path.as_ref(), params)
- } {
- Err(err) => return Response::format_invalid("execute", &err),
- Ok(d) => d,
- };
-
- let duration = cpu_time_start.elapsed();
-
- let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
- Err(err) =>
- return Response::format_invalid("validation result decoding failed", &err.to_string()),
- Ok(r) => r,
- };
-
- Response::Ok { result_descriptor, duration }
-}
diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs
index e8f174b89cd2..cdaee3341402 100644
--- a/node/core/pvf/src/lib.rs
+++ b/node/core/pvf/src/lib.rs
@@ -29,11 +29,11 @@
//!
//! Then using the handle the client can send three types of requests:
//!
-//! (a) PVF pre-checking. This takes the PVF [code][`Pvf`] and tries to prepare it (verify and
+//! (a) PVF pre-checking. This takes the `Pvf` code and tries to prepare it (verify and
//! compile) in order to pre-check its validity.
//!
//! (b) PVF execution. This accepts the PVF [`params`][`polkadot_parachain::primitives::ValidationParams`]
-//! and the PVF [code][`Pvf`], prepares (verifies and compiles) the code, and then executes PVF
+//! and the `Pvf` code, prepares (verifies and compiles) the code, and then executes PVF
//! with the `params`.
//!
//! (c) Heads up. This request allows to signal that the given PVF may be needed soon and that it
@@ -91,7 +91,6 @@
mod artifacts;
mod error;
mod execute;
-mod executor_intf;
mod host;
mod metrics;
mod prepare;
@@ -99,27 +98,22 @@ mod priority;
mod pvf;
mod worker_common;
-#[doc(hidden)]
-pub mod testing;
-
-#[doc(hidden)]
-pub use sp_tracing;
-
+pub use artifacts::CompiledArtifact;
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
-pub use prepare::PrepareStats;
+pub use execute::{ExecuteHandshake, ExecuteResponse};
+#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+pub use prepare::MemoryAllocationStats;
+pub use prepare::{MemoryStats, PrepareStats};
pub use priority::Priority;
pub use pvf::PvfPrepData;
pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
-pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;
-
-pub use execute::worker_entrypoint as execute_worker_entrypoint;
-pub use prepare::worker_entrypoint as prepare_worker_entrypoint;
-
-pub use executor_intf::{prepare, prevalidate};
-
-pub use sc_executor_common;
-pub use sp_maybe_compressed_blob;
+pub use worker_common::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
const LOG_TARGET: &str = "parachain::pvf";
+
+#[doc(hidden)]
+pub mod testing {
+ pub use crate::worker_common::{spawn_with_program_path, SpawnErr};
+}
diff --git a/node/core/pvf/src/prepare/mod.rs b/node/core/pvf/src/prepare/mod.rs
index d8d036a82238..de40c48464c4 100644
--- a/node/core/pvf/src/prepare/mod.rs
+++ b/node/core/pvf/src/prepare/mod.rs
@@ -20,23 +20,44 @@
//! (by running [`start_pool`]).
//!
//! The pool will spawn workers in new processes and those should execute pass control to
-//! [`worker_entrypoint`].
+//! `polkadot_node_core_pvf_worker::prepare_worker_entrypoint`.
-mod memory_stats;
mod pool;
mod queue;
-mod worker;
+mod worker_intf;
-pub use memory_stats::MemoryStats;
pub use pool::start as start_pool;
pub use queue::{start as start_queue, FromQueue, ToQueue};
-pub use worker::worker_entrypoint;
use parity_scale_codec::{Decode, Encode};
/// Preparation statistics, including the CPU time and memory taken.
#[derive(Debug, Clone, Default, Encode, Decode)]
pub struct PrepareStats {
- cpu_time_elapsed: std::time::Duration,
- memory_stats: MemoryStats,
+ /// The CPU time that elapsed for the preparation job.
+ pub cpu_time_elapsed: std::time::Duration,
+ /// The observed memory statistics for the preparation job.
+ pub memory_stats: MemoryStats,
+}
+
+/// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if
+/// supported by the OS, `ru_maxrss`.
+#[derive(Clone, Debug, Default, Encode, Decode)]
+pub struct MemoryStats {
+ /// Memory stats from `tikv_jemalloc_ctl`.
+ #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+ pub memory_tracker_stats: Option,
+ /// `ru_maxrss` from `getrusage`. `None` if an error occurred.
+ #[cfg(target_os = "linux")]
+ pub max_rss: Option,
+}
+
+/// Statistics of collected memory metrics.
+#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+#[derive(Clone, Debug, Default, Encode, Decode)]
+pub struct MemoryAllocationStats {
+ /// Total resident memory, in bytes.
+ pub resident: u64,
+ /// Total allocated memory, in bytes.
+ pub allocated: u64,
}
diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs
index f8435a40348d..d151f097805e 100644
--- a/node/core/pvf/src/prepare/pool.rs
+++ b/node/core/pvf/src/prepare/pool.rs
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-use super::worker::{self, Outcome};
+use super::worker_intf::{self, Outcome};
use crate::{
error::{PrepareError, PrepareResult},
metrics::Metrics,
@@ -250,7 +250,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po
use futures_timer::Delay;
loop {
- match worker::spawn(&program_path, spawn_timeout).await {
+ match worker_intf::spawn(&program_path, spawn_timeout).await {
Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle),
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err);
@@ -271,7 +271,7 @@ async fn start_work_task(
artifact_path: PathBuf,
_preparation_timer: Option,
) -> PoolEvent {
- let outcome = worker::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await;
+ let outcome = worker_intf::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await;
PoolEvent::StartWork(worker, outcome)
}
diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs
index 20ee95a435b2..f84d5ab0e56e 100644
--- a/node/core/pvf/src/prepare/queue.rs
+++ b/node/core/pvf/src/prepare/queue.rs
@@ -226,7 +226,7 @@ async fn handle_enqueue(
target: LOG_TARGET,
validation_code_hash = ?pvf.code_hash(),
?priority,
- preparation_timeout = ?pvf.prep_timeout,
+ preparation_timeout = ?pvf.prep_timeout(),
"PVF is enqueued for preparation.",
);
queue.metrics.prepare_enqueued();
diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker_intf.rs
similarity index 55%
rename from node/core/pvf/src/prepare/worker.rs
rename to node/core/pvf/src/prepare/worker_intf.rs
index 3b2ae211e6ca..daf94aadc672 100644
--- a/node/core/pvf/src/prepare/worker.rs
+++ b/node/core/pvf/src/prepare/worker_intf.rs
@@ -14,33 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-#[cfg(target_os = "linux")]
-use super::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
-#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-use super::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
-use super::memory_stats::MemoryStats;
+//! Host interface to the prepare worker.
+
use crate::{
- artifacts::CompiledArtifact,
error::{PrepareError, PrepareResult},
metrics::Metrics,
prepare::PrepareStats,
pvf::PvfPrepData,
worker_common::{
- bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
- spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
- JOB_TIMEOUT_WALL_CLOCK_FACTOR,
+ framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker,
+ SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
-use cpu_time::ProcessTime;
-use futures::{pin_mut, select_biased, FutureExt};
use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{
- panic,
path::{Path, PathBuf},
- sync::mpsc::channel,
time::Duration,
};
use tokio::{io, net::UnixStream};
@@ -104,7 +95,7 @@ pub async fn start_work(
);
with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
- let preparation_timeout = pvf.prep_timeout;
+ let preparation_timeout = pvf.prep_timeout();
if let Err(err) = send_request(&mut stream, pvf, &tmp_file).await {
gum::warn!(
target: LOG_TARGET,
@@ -285,28 +276,6 @@ async fn send_request(
Ok(())
}
-async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> {
- let pvf = framed_recv(stream).await?;
- let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
- io::Error::new(
- io::ErrorKind::Other,
- format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e),
- )
- })?;
- let tmp_file = framed_recv(stream).await?;
- let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
- io::Error::new(
- io::ErrorKind::Other,
- "prepare pvf recv_request: non utf-8 artifact path".to_string(),
- )
- })?;
- Ok((pvf, tmp_file))
-}
-
-async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
- framed_send(stream, &result.encode()).await
-}
-
async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result {
let result = framed_recv(stream).await?;
let result = PrepareResult::decode(&mut &result[..]).map_err(|e| {
@@ -325,158 +294,3 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result) {
- worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move {
- let worker_pid = std::process::id();
-
- loop {
- let (pvf, dest) = recv_request(&mut stream).await?;
- gum::debug!(
- target: LOG_TARGET,
- %worker_pid,
- "worker: preparing artifact",
- );
-
- let cpu_time_start = ProcessTime::now();
- let preparation_timeout = pvf.prep_timeout;
-
- // Run the memory tracker.
- #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
- let (memory_tracker_tx, memory_tracker_rx) = channel::<()>();
- #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
- let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx));
-
- // Spawn a new thread that runs the CPU time monitor.
- let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
- let cpu_time_monitor_fut = rt_handle
- .spawn_blocking(move || {
- cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx)
- })
- .fuse();
- // Spawn another thread for preparation.
- let prepare_fut = rt_handle
- .spawn_blocking(move || {
- let result = prepare_artifact(pvf);
-
- // Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
- #[cfg(target_os = "linux")]
- let result = result.map(|artifact| (artifact, get_max_rss_thread()));
-
- result
- })
- .fuse();
-
- pin_mut!(cpu_time_monitor_fut);
- pin_mut!(prepare_fut);
-
- let result = select_biased! {
- // If this future is not selected, the join handle is dropped and the thread will
- // finish in the background.
- join_res = cpu_time_monitor_fut => {
- match join_res {
- Ok(Some(cpu_time_elapsed)) => {
- // Log if we exceed the timeout and the other thread hasn't finished.
- gum::warn!(
- target: LOG_TARGET,
- %worker_pid,
- "prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
- cpu_time_elapsed.as_millis(),
- preparation_timeout.as_millis(),
- );
- Err(PrepareError::TimedOut)
- },
- Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
- Err(err) => Err(PrepareError::IoErr(err.to_string())),
- }
- },
- prepare_res = prepare_fut => {
- let cpu_time_elapsed = cpu_time_start.elapsed();
- let _ = cpu_time_monitor_tx.send(());
-
- match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) {
- Err(err) => {
- // Serialized error will be written into the socket.
- Err(err)
- },
- Ok(ok) => {
- // Stop the memory stats worker and get its observed memory stats.
- #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
- let memory_tracker_stats =
- get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await;
- #[cfg(target_os = "linux")]
- let (ok, max_rss) = ok;
- let memory_stats = MemoryStats {
- #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
- memory_tracker_stats,
- #[cfg(target_os = "linux")]
- max_rss: extract_max_rss_stat(max_rss, worker_pid),
- };
-
- // Write the serialized artifact into a temp file.
- //
- // PVF host only keeps artifacts statuses in its memory, successfully
- // compiled code gets stored on the disk (and consequently deserialized
- // by execute-workers). The prepare worker is only required to send `Ok`
- // to the pool to indicate the success.
-
- gum::debug!(
- target: LOG_TARGET,
- %worker_pid,
- "worker: writing artifact to {}",
- dest.display(),
- );
- tokio::fs::write(&dest, &ok).await?;
-
- Ok(PrepareStats{cpu_time_elapsed, memory_stats})
- },
- }
- },
- };
-
- send_response(&mut stream, result).await?;
- }
- });
-}
-
-fn prepare_artifact(pvf: PvfPrepData) -> Result {
- panic::catch_unwind(|| {
- let blob = match crate::executor_intf::prevalidate(&pvf.code()) {
- Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
- Ok(b) => b,
- };
-
- match crate::executor_intf::prepare(blob, &pvf.executor_params()) {
- Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
- Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
- }
- })
- .map_err(|panic_payload| {
- PrepareError::Panic(crate::error::stringify_panic_payload(panic_payload))
- })
- .and_then(|inner_result| inner_result)
-}
diff --git a/node/core/pvf/src/pvf.rs b/node/core/pvf/src/pvf.rs
index ad2dc5fcd918..c134cacb4acf 100644
--- a/node/core/pvf/src/pvf.rs
+++ b/node/core/pvf/src/pvf.rs
@@ -36,13 +36,13 @@ use crate::host::tests::TEST_PREPARATION_TIMEOUT;
#[derive(Clone, Encode, Decode)]
pub struct PvfPrepData {
/// Wasm code (uncompressed)
- pub(crate) code: Arc>,
+ code: Arc>,
/// Wasm code hash
- pub(crate) code_hash: ValidationCodeHash,
+ code_hash: ValidationCodeHash,
/// Executor environment parameters for the session for which artifact is prepared
- pub(crate) executor_params: Arc,
+ executor_params: Arc,
/// Preparation timeout
- pub(crate) prep_timeout: Duration,
+ prep_timeout: Duration,
}
impl PvfPrepData {
@@ -69,15 +69,20 @@ impl PvfPrepData {
}
/// Returns PVF code
- pub(crate) fn code(&self) -> Arc> {
+ pub fn code(&self) -> Arc> {
self.code.clone()
}
/// Returns executor params
- pub(crate) fn executor_params(&self) -> Arc {
+ pub fn executor_params(&self) -> Arc {
self.executor_params.clone()
}
+ /// Returns preparation timeout.
+ pub fn prep_timeout(&self) -> Duration {
+ self.prep_timeout
+ }
+
/// Creates a structure for tests
#[cfg(test)]
pub(crate) fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self {
diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs
index 3caee34a5d0f..33144616601d 100644
--- a/node/core/pvf/src/worker_common.rs
+++ b/node/core/pvf/src/worker_common.rs
@@ -17,8 +17,7 @@
//! Common logic for implementation of worker processes.
use crate::LOG_TARGET;
-use cpu_time::ProcessTime;
-use futures::{never::Never, FutureExt as _};
+use futures::FutureExt as _;
use futures_timer::Delay;
use pin_project::pin_project;
use rand::Rng;
@@ -26,7 +25,6 @@ use std::{
fmt, mem,
path::{Path, PathBuf},
pin::Pin,
- sync::mpsc::{Receiver, RecvTimeoutError},
task::{Context, Poll},
time::Duration,
};
@@ -34,17 +32,12 @@ use tokio::{
io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf},
net::{UnixListener, UnixStream},
process,
- runtime::{Handle, Runtime},
};
/// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in
/// wall clock time). This is lenient because CPU time may go slower than wall clock time.
pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
-/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
-/// child process.
-pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
-
/// This is publicly exposed only for integration tests.
#[doc(hidden)]
pub async fn spawn_with_program_path(
@@ -171,92 +164,6 @@ pub async fn tmpfile(prefix: &str) -> io::Result {
tmpfile_in(prefix, &temp_dir).await
}
-pub fn worker_event_loop(
- debug_id: &'static str,
- socket_path: &str,
- node_version: Option<&str>,
- mut event_loop: F,
-) where
- F: FnMut(Handle, UnixStream) -> Fut,
- Fut: futures::Future