Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into ao-fix-zombienet-parachain-upgrade
Browse files Browse the repository at this point in the history
* master:
  Bump `wasmtime` to 0.38.0 and `zstd` to 0.11.2 (companion for substrate#11720) (#5707)
  pvf: ensure enough stack space (#5712)
  Bump generic-array from 0.12.3 to 0.12.4 in /bridges/fuzz/storage-proof (#5648)
  pvf: unignore `terminates_on_timeout` test (#5722)
  • Loading branch information
ordian committed Jun 25, 2022
2 parents 11545b6 + b072240 commit 06bebff
Show file tree
Hide file tree
Showing 10 changed files with 401 additions and 279 deletions.
495 changes: 266 additions & 229 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions bridges/fuzz/storage-proof/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ gum = { package = "tracing-gum", path = "../../gum" }
pin-project = "1.0.9"
rand = "0.8.5"
tempfile = "3.3.0"
rayon = "1.5.1"
parity-scale-codec = { version = "3.1.2", default-features = false, features = ["derive"] }
polkadot-parachain = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
Expand Down
15 changes: 15 additions & 0 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use parity_scale_codec::{Decode, Encode};
use std::any::Any;

/// Result of PVF preparation performed by the validation host.
pub type PrepareResult = Result<(), PrepareError>;
Expand Down Expand Up @@ -108,3 +109,17 @@ impl From<PrepareError> for ValidationError {
}
}
}

/// Attempt to convert an opaque panic payload to a string.
///
/// This is a best effort, and is not guaranteed to provide the most accurate value.
pub(crate) fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
match payload.downcast::<&'static str>() {
Ok(msg) => msg.to_string(),
Err(payload) => match payload.downcast::<String>() {
Ok(msg) => *msg,
// At least we tried...
Err(_) => "unknown panic payload".to_string(),
},
}
}
10 changes: 5 additions & 5 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::{
artifacts::ArtifactPathId,
executor_intf::TaskExecutor,
executor_intf::Executor,
worker_common::{
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
Expand Down Expand Up @@ -184,8 +184,8 @@ impl Response {
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("execute", socket_path, |mut stream| async move {
let executor = TaskExecutor::new().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create task executor: {}", e))
let executor = Executor::new().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?;
loop {
let (artifact_path, params) = recv_request(&mut stream).await?;
Expand All @@ -204,14 +204,14 @@ pub fn worker_entrypoint(socket_path: &str) {
async fn validate_using_artifact(
artifact_path: &Path,
params: &[u8],
spawner: &TaskExecutor,
executor: &Executor,
) -> Response {
let validation_started_at = Instant::now();
let descriptor_bytes = match unsafe {
// SAFETY: this should be safe since the compiled artifact passed here comes from the
// file created by the prepare workers. These files are obtained by calling
// [`executor_intf::prepare`].
crate::executor_intf::execute(artifact_path.as_ref(), params, spawner.clone())
executor.execute(artifact_path.as_ref(), params)
} {
Err(err) => return Response::format_invalid("execute", &err.to_string()),
Ok(d) => d,
Expand Down
118 changes: 100 additions & 18 deletions node/core/pvf/src/executor_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ use std::{
const DEFAULT_HEAP_PAGES_ESTIMATE: u64 = 32;
const EXTRA_HEAP_PAGES: u64 = 2048;

/// The number of bytes devoted for the stack during wasm execution of a PVF.
const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024;

const CONFIG: Config = Config {
allow_missing_func_imports: true,
cache_path: None,
Expand All @@ -69,7 +72,7 @@ const CONFIG: Config = Config {
// the stack limit set by the wasmtime.
deterministic_stack_limit: Some(DeterministicStackLimit {
logical_max: 65536,
native_stack_max: 256 * 1024 * 1024,
native_stack_max: NATIVE_STACK_MAX,
}),
canonicalize_nans: true,
// Rationale for turning the multi-threaded compilation off is to make the preparation time
Expand Down Expand Up @@ -98,20 +101,99 @@ pub fn prepare(blob: RuntimeBlob) -> Result<Vec<u8>, sc_executor_common::error::
sc_executor_wasmtime::prepare_runtime_artifact(blob, &CONFIG.semantics)
}

/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
/// upon success.
///
/// # Safety
///
/// The caller must ensure that the compiled artifact passed here was:
/// 1) produced by [`prepare`],
/// 2) written to the disk as a file,
/// 3) was not modified,
/// 4) will not be modified while any runtime using this artifact is alive, or is being
/// instantiated.
///
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
pub unsafe fn execute(
pub struct Executor {
thread_pool: rayon::ThreadPool,
spawner: TaskSpawner,
}

impl Executor {
pub fn new() -> Result<Self, String> {
// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
// That native code does not create any stacks and just reuses the stack of the thread that
// wasmtime was invoked from.
//
// Also, we configure the executor to provide the deterministic stack and that requires
// supplying the amount of the native stack space that wasm is allowed to use. This is
// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
//
// There are quirks to that configuration knob:
//
// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check
// that the stack space is actually available.
//
// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
// guard page and the Rust stack overflow handler will be triggered. That leads to an
// **abort**.
//
// 2. It cannot and does not limit the stack space consumed by Rust code.
//
// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
// and that will abort the process as well.
//
// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
// NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
//
// Hence we need to increase it.
//
// The simplest way to fix that is to spawn a thread with the desired stack limit. In order
// to avoid costs of creating a thread, we use a thread pool. The execution is
// single-threaded hence the thread pool has only one thread.
//
// The reasoning why we pick this particular size is:
//
// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack.
let thread_stack_size = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize;
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.stack_size(thread_stack_size)
.build()
.map_err(|e| format!("Failed to create thread pool: {:?}", e))?;

let spawner =
TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?;

Ok(Self { thread_pool, spawner })
}

/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
/// upon success.
///
/// # Safety
///
/// The caller must ensure that the compiled artifact passed here was:
/// 1) produced by [`prepare`],
/// 2) written to the disk as a file,
/// 3) was not modified,
/// 4) will not be modified while any runtime using this artifact is alive, or is being
/// instantiated.
///
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
pub unsafe fn execute(
&self,
compiled_artifact_path: &Path,
params: &[u8],
) -> Result<Vec<u8>, String> {
let spawner = self.spawner.clone();
let mut result = None;
self.thread_pool.scope({
let result = &mut result;
move |s| {
s.spawn(move |_| {
// spawn does not return a value, so we need to use a variable to pass the result.
*result = Some(
do_execute(compiled_artifact_path, params, spawner)
.map_err(|err| format!("execute error: {:?}", err)),
);
});
}
});
result.unwrap_or_else(|| Err("rayon thread pool spawn failed".to_string()))
}
}

unsafe fn do_execute(
compiled_artifact_path: &Path,
params: &[u8],
spawner: impl sp_core::traits::SpawnNamed + 'static,
Expand Down Expand Up @@ -291,9 +373,9 @@ impl sp_externalities::ExtensionStore for ValidationExternalities {
///
/// This is a light handle meaning it will only clone the handle not create a new thread pool.
#[derive(Clone)]
pub(crate) struct TaskExecutor(futures::executor::ThreadPool);
pub(crate) struct TaskSpawner(futures::executor::ThreadPool);

impl TaskExecutor {
impl TaskSpawner {
pub(crate) fn new() -> Result<Self, String> {
futures::executor::ThreadPoolBuilder::new()
.pool_size(4)
Expand All @@ -304,7 +386,7 @@ impl TaskExecutor {
}
}

impl sp_core::traits::SpawnNamed for TaskExecutor {
impl sp_core::traits::SpawnNamed for TaskSpawner {
fn spawn_blocking(
&self,
_task_name: &'static str,
Expand Down
20 changes: 4 additions & 16 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use async_std::{
};
use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{any::Any, panic, sync::Arc, time::Duration};
use std::{panic, sync::Arc, time::Duration};

/// The time period after which the preparation worker is considered unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
Expand Down Expand Up @@ -294,20 +294,8 @@ fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> {
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
})
.map_err(|panic_payload| PrepareError::Panic(stringify_panic_payload(panic_payload)))
.map_err(|panic_payload| {
PrepareError::Panic(crate::error::stringify_panic_payload(panic_payload))
})
.and_then(|inner_result| inner_result)
}

/// Attempt to convert an opaque panic payload to a string.
///
/// This is a best effort, and is not guaranteed to provide the most accurate value.
fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
match payload.downcast::<&'static str>() {
Ok(msg) => msg.to_string(),
Err(payload) => match payload.downcast::<String>() {
Ok(msg) => *msg,
// At least we tried...
Err(_) => "unkown panic payload".to_string(),
},
}
}
6 changes: 3 additions & 3 deletions node/core/pvf/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn validate_candidate(
code: &[u8],
params: &[u8],
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
use crate::executor_intf::{execute, prepare, prevalidate, TaskExecutor};
use crate::executor_intf::{prepare, prevalidate, Executor};

let code = sp_maybe_compressed_blob::decompress(code, 10 * 1024 * 1024)
.expect("Decompressing code failed");
Expand All @@ -40,11 +40,11 @@ pub fn validate_candidate(
let artifact_path = tmpdir.path().join("blob");
std::fs::write(&artifact_path, &artifact)?;

let executor = TaskExecutor::new()?;
let executor = Executor::new()?;
let result = unsafe {
// SAFETY: This is trivially safe since the artifact is obtained by calling `prepare`
// and is written into a temporary directory in an unmodified state.
execute(&artifact_path, params, executor)?
executor.execute(&artifact_path, params)?
};

Ok(result)
Expand Down
1 change: 0 additions & 1 deletion node/core/pvf/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl TestHost {
}

#[async_std::test]
#[ignore]
async fn terminates_on_timeout() {
let host = TestHost::new();

Expand Down
2 changes: 1 addition & 1 deletion node/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ thiserror = "1.0.31"
serde = { version = "1.0.137", features = ["derive"] }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
zstd = { version = "0.10.2", default-features = false }
zstd = { version = "0.11.2", default-features = false }

[dev-dependencies]
polkadot-erasure-coding = { path = "../../erasure-coding" }

0 comments on commit 06bebff

Please sign in to comment.