Skip to content

Commit

Permalink
PVF: Add worker check during tests and benches (#1771)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrcnski authored Oct 24, 2023
1 parent 8cba5b9 commit e39253c
Show file tree
Hide file tree
Showing 19 changed files with 285 additions and 113 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion polkadot/cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl SubstrateCli for Cli {

fn impl_version() -> String {
let commit_hash = env!("SUBSTRATE_CLI_COMMIT_HASH");
format!("{NODE_VERSION}-{commit_hash}")
format!("{}-{commit_hash}", NODE_VERSION)
}

fn description() -> String {
Expand Down
13 changes: 9 additions & 4 deletions polkadot/node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cfg-if = "1.0"
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
is_executable = "1.0.1"
libc = "0.2.139"
pin-project = "1.0.9"
rand = "0.8.5"
Expand All @@ -34,19 +35,23 @@ sp-maybe-compressed-blob = { path = "../../../../substrate/primitives/maybe-comp
polkadot-node-core-pvf-prepare-worker = { path = "prepare-worker", optional = true }
polkadot-node-core-pvf-execute-worker = { path = "execute-worker", optional = true }

[build-dependencies]
substrate-build-script-utils = { path = "../../../../substrate/utils/build-script-utils" }

[dev-dependencies]
assert_matches = "1.4.0"
criterion = { version = "0.4.0", default-features = false, features = ["cargo_bench_support", "async_tokio"] }
hex-literal = "0.4.1"
polkadot-node-core-pvf-common = { path = "common", features = ["test-utils"] }
# For the puppet worker, depend on ourselves with the test-utils feature.
# For benches and integration tests, depend on ourselves with the test-utils
# feature.
polkadot-node-core-pvf = { path = ".", features = ["test-utils"] }
rococo-runtime = { path = "../../../runtime/rococo" }

adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" }
halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }

[[bench]]
name = "host_prepare_rococo_runtime"
harness = false

[features]
ci-only-tests = []
jemalloc-allocator = [ "polkadot-node-core-pvf-common/jemalloc-allocator" ]
Expand Down
130 changes: 130 additions & 0 deletions polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Benchmarks for preparation through the host. We use a real PVF to get realistic results.
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, SamplingMode};
use parity_scale_codec::Encode;
use polkadot_node_core_pvf::{
start, testing, Config, Metrics, PrepareError, PrepareJobKind, PrepareStats, PvfPrepData,
ValidationError, ValidationHost,
};
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::ExecutorParams;
use rococo_runtime::WASM_BINARY;
use std::time::Duration;
use tokio::{runtime::Handle, sync::Mutex};

const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);

struct TestHost {
host: Mutex<ValidationHost>,
}

impl TestHost {
fn new_with_config<F>(handle: &Handle, f: F) -> Self
where
F: FnOnce(&mut Config),
{
let (prepare_worker_path, execute_worker_path) = testing::get_and_check_worker_paths();

let cache_dir = tempfile::tempdir().unwrap();
let mut config = Config::new(
cache_dir.path().to_owned(),
None,
prepare_worker_path,
execute_worker_path,
);
f(&mut config);
let (host, task) = start(config, Metrics::default());
let _ = handle.spawn(task);
Self { host: Mutex::new(host) }
}

async fn precheck_pvf(
&self,
code: &[u8],
executor_params: ExecutorParams,
) -> Result<PrepareStats, PrepareError> {
let (result_tx, result_rx) = futures::channel::oneshot::channel();

let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
.expect("Compression works");

self.host
.lock()
.await
.precheck_pvf(
PvfPrepData::from_code(
code.into(),
executor_params,
TEST_PREPARATION_TIMEOUT,
PrepareJobKind::Prechecking,
),
result_tx,
)
.await
.unwrap();
result_rx.await.unwrap()
}
}

fn host_prepare_rococo_runtime(c: &mut Criterion) {
polkadot_node_core_pvf_common::sp_tracing::try_init_simple();

let rt = tokio::runtime::Runtime::new().unwrap();

let blob = WASM_BINARY.expect("You need to build the WASM binaries to run the tests!");
let pvf = match sp_maybe_compressed_blob::decompress(&blob, 64 * 1024 * 1024) {
Ok(code) => PvfPrepData::from_code(
code.into_owned(),
ExecutorParams::default(),
Duration::from_secs(360),
PrepareJobKind::Compilation,
),
Err(e) => {
panic!("Cannot decompress blob: {:?}", e);
},
};

let mut group = c.benchmark_group("prepare rococo");
group.sampling_mode(SamplingMode::Flat);
group.sample_size(20);
group.measurement_time(Duration::from_secs(240));
group.bench_function("host: prepare Rococo runtime", |b| {
b.to_async(&rt).iter_batched(
|| {
(
TestHost::new_with_config(rt.handle(), |cfg| {
cfg.prepare_workers_hard_max_num = 1;
}),
pvf.clone().code(),
)
},
|(host, pvf_code)| async move {
// `PvfPrepData` is designed to be cheap to clone, so cloning shouldn't affect the
// benchmark accuracy.
let _stats = host.precheck_pvf(&pvf_code, Default::default()).await.unwrap();
},
BatchSize::SmallInput,
)
});
group.finish();
}

criterion_group!(prepare, host_prepare_rococo_runtime);
criterion_main!(prepare);
17 changes: 0 additions & 17 deletions polkadot/node/core/pvf/bin/puppet_worker.rs

This file was deleted.

1 change: 0 additions & 1 deletion polkadot/node/core/pvf/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,5 @@ tempfile = "3.3.0"

[features]
# This feature is used to export test code to other crates without putting it in the production build.
# Also used for building the puppet worker.
test-utils = []
jemalloc-allocator = []
12 changes: 11 additions & 1 deletion polkadot/node/core/pvf/common/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ use tokio::{io, runtime::Runtime};
/// spawning the desired worker.
#[macro_export]
macro_rules! decl_worker_main {
($expected_command:expr, $entrypoint:expr, $worker_version:expr $(,)*) => {
($expected_command:expr, $entrypoint:expr, $worker_version:expr, $worker_version_hash:expr $(,)*) => {
fn get_full_version() -> String {
format!("{}-{}", $worker_version, $worker_version_hash)
}

fn print_help(expected_command: &str) {
println!("{} {}", expected_command, $worker_version);
println!("commit: {}", $worker_version_hash);
println!();
println!("PVF worker that is called by polkadot.");
}
Expand Down Expand Up @@ -67,6 +72,11 @@ macro_rules! decl_worker_main {
println!("{}", $worker_version);
return
},
// Useful for debugging. --version is used for version checks.
"--full-version" => {
println!("{}", get_full_version());
return
},

"--check-can-enable-landlock" => {
#[cfg(target_os = "linux")]
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl ArtifactPathId {
}
}

#[derive(Debug)]
pub enum ArtifactState {
/// The artifact is ready to be used by the executor.
///
Expand Down
10 changes: 5 additions & 5 deletions polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,8 @@ async fn handle_to_host(
/// This tries to prepare the PVF by compiling the WASM blob within a timeout set in
/// `PvfPrepData`.
///
/// If the prepare job failed previously, we may retry it under certain conditions.
/// We don't retry artifacts that previously failed preparation. We don't expect multiple
/// pre-checking requests.
async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
Expand All @@ -464,8 +465,7 @@ async fn handle_precheck_pvf(
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender),
ArtifactState::FailedToProcess { error, .. } => {
// Do not retry failed preparation if another pre-check request comes in. We do not
// retry pre-checking, anyway.
// Do not retry an artifact that previously failed preparation.
let _ = result_sender.send(PrepareResult::Err(error.clone()));
},
}
Expand Down Expand Up @@ -764,7 +764,7 @@ async fn handle_prepare_done(
let last_time_failed = SystemTime::now();
let num_failures = *num_failures + 1;

gum::warn!(
gum::error!(
target: LOG_TARGET,
?artifact_id,
time_failed = ?last_time_failed,
Expand Down Expand Up @@ -846,7 +846,7 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
gum::trace!(
target: LOG_TARGET,
?result,
"Sweeping the artifact file {}",
"Sweeped the artifact file {}",
condemned.display(),
);
},
Expand Down
13 changes: 13 additions & 0 deletions polkadot/node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,18 @@ pub use polkadot_node_core_pvf_common::{
SecurityStatus,
};

use std::{path::Path, process::Command};

/// The log target for this crate.
pub const LOG_TARGET: &str = "parachain::pvf";

/// Utility to get the version of a worker, used for version checks.
///
/// The worker's existence at the given path must be checked separately.
pub fn get_worker_version(worker_path: &Path) -> std::io::Result<String> {
let worker_version = Command::new(worker_path).args(["--version"]).output()?.stdout;
Ok(std::str::from_utf8(&worker_version)
.expect("version is printed as a string; qed")
.trim()
.to_string())
}
58 changes: 54 additions & 4 deletions polkadot/node/core/pvf/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Various things for testing other crates.
//!
//! N.B. This is not guarded with some feature flag. Overexposing items here may affect the final
//! artifact even for production builds.
pub use crate::worker_intf::{spawn_with_program_path, SpawnErr};
pub use crate::{
host::{EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME},
worker_intf::{spawn_with_program_path, SpawnErr},
};

use crate::get_worker_version;
use is_executable::IsExecutable;
use polkadot_node_primitives::NODE_VERSION;
use polkadot_primitives::ExecutorParams;
use std::{
path::PathBuf,
sync::{Mutex, OnceLock},
};

/// A function that emulates the stitches together behaviors of the preparation and the execution
/// worker in a single synchronous function.
Expand All @@ -47,3 +54,46 @@ pub fn validate_candidate(

Ok(result)
}

/// Retrieves the worker paths, checks that they exist and does a version check.
///
/// NOTE: This should only be called in dev code (tests, benchmarks) as it relies on the relative
/// paths of the built workers.
pub fn get_and_check_worker_paths() -> (PathBuf, PathBuf) {
// Only needs to be called once for the current process.
static WORKER_PATHS: OnceLock<Mutex<(PathBuf, PathBuf)>> = OnceLock::new();
let mutex = WORKER_PATHS.get_or_init(|| {
let mut workers_path = std::env::current_exe().unwrap();
workers_path.pop();
workers_path.pop();
let mut prepare_worker_path = workers_path.clone();
prepare_worker_path.push(PREPARE_BINARY_NAME);
let mut execute_worker_path = workers_path.clone();
execute_worker_path.push(EXECUTE_BINARY_NAME);

// Check that the workers are valid.
if !prepare_worker_path.is_executable() || !execute_worker_path.is_executable() {
panic!("ERROR: Workers do not exist or are not executable. Workers directory: {:?}", workers_path);
}

let worker_version =
get_worker_version(&prepare_worker_path).expect("checked for worker existence");
if worker_version != NODE_VERSION {
panic!("ERROR: Prepare worker version {worker_version} does not match node version {NODE_VERSION}; worker path: {prepare_worker_path:?}");
}
let worker_version =
get_worker_version(&execute_worker_path).expect("checked for worker existence");
if worker_version != NODE_VERSION {
panic!("ERROR: Execute worker version {worker_version} does not match node version {NODE_VERSION}; worker path: {execute_worker_path:?}");
}

// We don't want to check against the commit hash because we'd have to always rebuild
// the calling crate on every commit.
eprintln!("WARNING: Workers match the node version, but may have changed in recent commits. Please rebuild them if anything funny happens. Workers path: {workers_path:?}");

Mutex::new((prepare_worker_path, execute_worker_path))
});

let guard = mutex.lock().unwrap();
(guard.0.clone(), guard.1.clone())
}
Loading

0 comments on commit e39253c

Please sign in to comment.