Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

pvf: ensure enough stack space #5712

Merged
merged 3 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ gum = { package = "tracing-gum", path = "../../gum" }
pin-project = "1.0.9"
rand = "0.8.5"
tempfile = "3.3.0"
rayon = "1.5.1"
parity-scale-codec = { version = "3.1.2", default-features = false, features = ["derive"] }
polkadot-parachain = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
Expand Down
15 changes: 15 additions & 0 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use parity_scale_codec::{Decode, Encode};
use std::any::Any;

/// Result of PVF preparation performed by the validation host.
pub type PrepareResult = Result<(), PrepareError>;
Expand Down Expand Up @@ -108,3 +109,17 @@ impl From<PrepareError> for ValidationError {
}
}
}

/// Attempt to convert an opaque panic payload to a string.
///
/// This is a best effort, and is not guaranteed to provide the most accurate value.
pub(crate) fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
match payload.downcast::<&'static str>() {
Ok(msg) => msg.to_string(),
Err(payload) => match payload.downcast::<String>() {
Ok(msg) => *msg,
// At least we tried...
Err(_) => "unknown panic payload".to_string(),
},
}
}
10 changes: 5 additions & 5 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::{
artifacts::ArtifactPathId,
executor_intf::TaskExecutor,
executor_intf::Executor,
worker_common::{
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
Expand Down Expand Up @@ -184,8 +184,8 @@ impl Response {
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("execute", socket_path, |mut stream| async move {
let executor = TaskExecutor::new().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create task executor: {}", e))
let executor = Executor::new().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?;
loop {
let (artifact_path, params) = recv_request(&mut stream).await?;
Expand All @@ -204,14 +204,14 @@ pub fn worker_entrypoint(socket_path: &str) {
async fn validate_using_artifact(
artifact_path: &Path,
params: &[u8],
spawner: &TaskExecutor,
executor: &Executor,
) -> Response {
let validation_started_at = Instant::now();
let descriptor_bytes = match unsafe {
// SAFETY: this should be safe since the compiled artifact passed here comes from the
// file created by the prepare workers. These files are obtained by calling
// [`executor_intf::prepare`].
crate::executor_intf::execute(artifact_path.as_ref(), params, spawner.clone())
executor.execute(artifact_path.as_ref(), params)
} {
Err(err) => return Response::format_invalid("execute", &err.to_string()),
Ok(d) => d,
Expand Down
118 changes: 100 additions & 18 deletions node/core/pvf/src/executor_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ use std::{
const DEFAULT_HEAP_PAGES_ESTIMATE: u64 = 32;
const EXTRA_HEAP_PAGES: u64 = 2048;

/// The number of bytes devoted for the stack during wasm execution of a PVF.
const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024;

const CONFIG: Config = Config {
allow_missing_func_imports: true,
cache_path: None,
Expand All @@ -69,7 +72,7 @@ const CONFIG: Config = Config {
// the stack limit set by the wasmtime.
deterministic_stack_limit: Some(DeterministicStackLimit {
logical_max: 65536,
native_stack_max: 256 * 1024 * 1024,
native_stack_max: NATIVE_STACK_MAX,
}),
canonicalize_nans: true,
// Rationale for turning the multi-threaded compilation off is to make the preparation time
Expand Down Expand Up @@ -98,20 +101,99 @@ pub fn prepare(blob: RuntimeBlob) -> Result<Vec<u8>, sc_executor_common::error::
sc_executor_wasmtime::prepare_runtime_artifact(blob, &CONFIG.semantics)
}

/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
/// upon success.
///
/// # Safety
///
/// The caller must ensure that the compiled artifact passed here was:
/// 1) produced by [`prepare`],
/// 2) written to the disk as a file,
/// 3) was not modified,
/// 4) will not be modified while any runtime using this artifact is alive, or is being
/// instantiated.
///
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
pub unsafe fn execute(
pub struct Executor {
thread_pool: rayon::ThreadPool,
spawner: TaskSpawner,
}

impl Executor {
pub fn new() -> Result<Self, String> {
// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
// That native code does not create any stacks and just reuses the stack of the thread that
// wasmtime was invoked from.
//
// Also, we configure the executor to provide the deterministic stack and that requires
// supplying the amount of the native stack space that wasm is allowed to use. This is
// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
//
// There are quirks to that configuration knob:
//
// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check
// that the stack space is actually available.
//
// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
// guard page and the Rust stack overflow handler will be triggered. That leads to an
// **abort**.
//
// 2. It cannot and does not limit the stack space consumed by Rust code.
//
// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
// and that will abort the process as well.
//
// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
// NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
//
// Hence we need to increase it.
//
// The simplest way to fix that is to spawn a thread with the desired stack limit. In order
// to avoid costs of creating a thread, we use a thread pool. The execution is
// single-threaded hence the thread pool has only one thread.
//
// The reasoning why we pick this particular size is:
//
// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack.
let thread_stack_size = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize;
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.stack_size(thread_stack_size)
.build()
.map_err(|e| format!("Failed to create thread pool: {:?}", e))?;

let spawner =
TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?;

Ok(Self { thread_pool, spawner })
}

/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
/// upon success.
///
/// # Safety
///
/// The caller must ensure that the compiled artifact passed here was:
/// 1) produced by [`prepare`],
/// 2) written to the disk as a file,
/// 3) was not modified,
/// 4) will not be modified while any runtime using this artifact is alive, or is being
/// instantiated.
///
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
pub unsafe fn execute(
&self,
compiled_artifact_path: &Path,
params: &[u8],
) -> Result<Vec<u8>, String> {
let spawner = self.spawner.clone();
let mut result = None;
self.thread_pool.scope({
let result = &mut result;
move |s| {
s.spawn(move |_| {
// spawn does not return a value, so we need to use a variable to pass the result.
*result = Some(
do_execute(compiled_artifact_path, params, spawner)
.map_err(|err| format!("execute error: {:?}", err)),
);
});
}
});
result.unwrap_or_else(|| Err("rayon thread pool spawn failed".to_string()))
}
}

unsafe fn do_execute(
compiled_artifact_path: &Path,
params: &[u8],
spawner: impl sp_core::traits::SpawnNamed + 'static,
Expand Down Expand Up @@ -291,9 +373,9 @@ impl sp_externalities::ExtensionStore for ValidationExternalities {
///
/// This is a light handle meaning it will only clone the handle not create a new thread pool.
#[derive(Clone)]
pub(crate) struct TaskExecutor(futures::executor::ThreadPool);
pub(crate) struct TaskSpawner(futures::executor::ThreadPool);

impl TaskExecutor {
impl TaskSpawner {
pub(crate) fn new() -> Result<Self, String> {
futures::executor::ThreadPoolBuilder::new()
.pool_size(4)
Expand All @@ -304,7 +386,7 @@ impl TaskExecutor {
}
}

impl sp_core::traits::SpawnNamed for TaskExecutor {
impl sp_core::traits::SpawnNamed for TaskSpawner {
fn spawn_blocking(
&self,
_task_name: &'static str,
Expand Down
20 changes: 4 additions & 16 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use async_std::{
};
use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{any::Any, panic, sync::Arc, time::Duration};
use std::{panic, sync::Arc, time::Duration};

/// The time period after which the preparation worker is considered unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
Expand Down Expand Up @@ -294,20 +294,8 @@ fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> {
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
})
.map_err(|panic_payload| PrepareError::Panic(stringify_panic_payload(panic_payload)))
.map_err(|panic_payload| {
PrepareError::Panic(crate::error::stringify_panic_payload(panic_payload))
})
.and_then(|inner_result| inner_result)
}

/// Attempt to convert an opaque panic payload to a string.
///
/// This is a best effort, and is not guaranteed to provide the most accurate value.
fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
match payload.downcast::<&'static str>() {
Ok(msg) => msg.to_string(),
Err(payload) => match payload.downcast::<String>() {
Ok(msg) => *msg,
// At least we tried...
Err(_) => "unkown panic payload".to_string(),
},
}
}
6 changes: 3 additions & 3 deletions node/core/pvf/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn validate_candidate(
code: &[u8],
params: &[u8],
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
use crate::executor_intf::{execute, prepare, prevalidate, TaskExecutor};
use crate::executor_intf::{prepare, prevalidate, Executor};

let code = sp_maybe_compressed_blob::decompress(code, 10 * 1024 * 1024)
.expect("Decompressing code failed");
Expand All @@ -40,11 +40,11 @@ pub fn validate_candidate(
let artifact_path = tmpdir.path().join("blob");
std::fs::write(&artifact_path, &artifact)?;

let executor = TaskExecutor::new()?;
let executor = Executor::new()?;
let result = unsafe {
// SAFETY: This is trivially safe since the artifact is obtained by calling `prepare`
// and is written into a temporary directory in an unmodified state.
execute(&artifact_path, params, executor)?
executor.execute(&artifact_path, params)?
};

Ok(result)
Expand Down