Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PVF: re-preparing artifact on failed runtime construction #3187

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@ async fn validate_candidate_exhaustive(
))),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),

Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
Expand Down Expand Up @@ -780,40 +782,46 @@ trait ValidationBackend {
return validation_result
}

macro_rules! break_if_no_retries_left {
($counter:ident) => {
if $counter > 0 {
$counter -= 1;
} else {
break
}
};
}

// Allow limited retries for each kind of error.
let mut num_death_retries_left = 1;
let mut num_job_error_retries_left = 1;
let mut num_internal_retries_left = 1;
let mut num_runtime_construction_retries_left = 1;
loop {
// Stop retrying if we exceeded the timeout.
if total_time_start.elapsed() + retry_delay > exec_timeout {
break
}

let mut wait_retry_delay = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about retry_immediately? I would appreciate a brief comment on why we need it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thank you! Fixed d722ccf

match validation_result {
Err(ValidationError::PossiblyInvalid(
PossiblyInvalidError::AmbiguousWorkerDeath |
PossiblyInvalidError::AmbiguousJobDeath(_),
)) =>
if num_death_retries_left > 0 {
num_death_retries_left -= 1;
} else {
break
},
)) => break_if_no_retries_left!(num_death_retries_left),

Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(_))) =>
if num_job_error_retries_left > 0 {
num_job_error_retries_left -= 1;
} else {
break
},
break_if_no_retries_left!(num_job_error_retries_left),

Err(ValidationError::Internal(_)) =>
if num_internal_retries_left > 0 {
num_internal_retries_left -= 1;
} else {
break
},
break_if_no_retries_left!(num_internal_retries_left),

Err(ValidationError::PossiblyInvalid(
PossiblyInvalidError::RuntimeConstruction(_),
)) => {
break_if_no_retries_left!(num_runtime_construction_retries_left);
self.precheck_pvf(pvf.clone()).await?;
wait_retry_delay = false;
},

Ok(_) | Err(ValidationError::Invalid(_) | ValidationError::Preparation(_)) => break,
}
Expand All @@ -822,7 +830,9 @@ trait ValidationBackend {
// assumption that the conditions that caused this error may have resolved on their own.
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(retry_delay).await;
if wait_retry_delay {
futures_timer::Delay::new(retry_delay).await;
}

let new_timeout = exec_timeout.saturating_sub(total_time_start.elapsed());

Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/pvf/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use crate::prepare::{PrepareSuccess, PrepareWorkerSuccess};
use parity_scale_codec::{Decode, Encode};
pub use sc_executor_common::error::Error as ExecuteError;

/// Result of PVF preparation from a worker, with checksum of the compiled PVF and stats of the
/// preparation if successful.
Expand Down
15 changes: 15 additions & 0 deletions polkadot/node/core/pvf/common/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub enum WorkerResponse {
},
/// The candidate is invalid.
InvalidCandidate(String),
/// Instantiation of the WASM module instance failed during an execution.
/// Possibly related to local issues or dirty node update. May be retried with re-preparation.
RuntimeConstruction(String),
/// The job timed out.
JobTimedOut,
/// The job process has died. We must kill the worker just in case.
Expand Down Expand Up @@ -68,6 +71,9 @@ pub enum JobResponse {
/// The result of parachain validation.
result_descriptor: ValidationResult,
},
/// A possibly transient runtime instantiation error happened during the execution; may be
/// retried with re-preparation
RuntimeConstruction(String),
/// The candidate is invalid.
InvalidCandidate(String),
}
Expand All @@ -81,6 +87,15 @@ impl JobResponse {
Self::InvalidCandidate(format!("{}: {}", ctx, msg))
}
}

/// Creates a may retry response from a context `ctx` and a message `msg` (which can be empty).
pub fn runtime_construction(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Self::RuntimeConstruction(ctx.to_string())
} else {
Self::RuntimeConstruction(format!("{}: {}", ctx, msg))
}
}
}

/// An unexpected error occurred in the execution job process. Because this comes from the job,
Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/core/pvf/common/src/executor_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! Interface to the Substrate Executor

use crate::error::ExecuteError;
use polkadot_primitives::{
executor_params::{DEFAULT_LOGICAL_STACK_MAX, DEFAULT_NATIVE_STACK_MAX},
ExecutorParam, ExecutorParams,
Expand Down Expand Up @@ -109,7 +110,7 @@ pub unsafe fn execute_artifact(
compiled_artifact_blob: &[u8],
executor_params: &ExecutorParams,
params: &[u8],
) -> Result<Vec<u8>, String> {
) -> Result<Vec<u8>, ExecuteError> {
let mut extensions = sp_externalities::Extensions::new();

extensions.register(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion));
Expand All @@ -123,7 +124,6 @@ pub unsafe fn execute_artifact(
Ok(Ok(ok)) => Ok(ok),
Ok(Err(err)) | Err(err) => Err(err),
}
.map_err(|err| format!("execute error: {:?}", err))
}

/// Constructs the runtime for the given PVF, given the artifact bytes.
Expand Down
10 changes: 8 additions & 2 deletions polkadot/node/core/pvf/execute-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

//! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary.

pub use polkadot_node_core_pvf_common::executor_interface::execute_artifact;
pub use polkadot_node_core_pvf_common::{
error::ExecuteError, executor_interface::execute_artifact,
};

// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`.
Expand Down Expand Up @@ -237,7 +239,9 @@ fn validate_using_artifact(
// [`executor_interface::prepare`].
execute_artifact(compiled_artifact_blob, executor_params, params)
} {
Err(err) => return JobResponse::format_invalid("execute", &err),
Err(ExecuteError::RuntimeConstruction(wasmerr)) =>
return JobResponse::runtime_construction("execute", &wasmerr.to_string()),
Err(err) => return JobResponse::format_invalid("execute", &err.to_string()),
Ok(d) => d,
};

Expand Down Expand Up @@ -550,6 +554,8 @@ fn handle_parent_process(
Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv })
},
Ok(JobResponse::InvalidCandidate(err)) => Ok(WorkerResponse::InvalidCandidate(err)),
Ok(JobResponse::RuntimeConstruction(err)) =>
Ok(WorkerResponse::RuntimeConstruction(err)),
Err(job_error) => {
gum::warn!(
target: LOG_TARGET,
Expand Down
8 changes: 8 additions & 0 deletions polkadot/node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ impl Artifacts {
.is_none());
}

/// Remove artifact by its id.
pub fn remove(&mut self, artifact_id: ArtifactId) -> Option<(ArtifactId, PathBuf)> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I get it right, we don't currently remove anything from disk, either here or in the unused artifact pruning procedure? Are they always removed from the memory cache table only, and the disk is cleaned up only on node startup? It's totally okay for now, but we shouldn't forget that if we ever decide to re-enable artifact persistence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it seems like I indeed missed the moment when artifact names became really random, so right now, it's not a concern at all. Never mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the logic of remove is the same as with pruning, i.e. it affects only the cache

self.inner.remove(&artifact_id).and_then(|state| match state {
ArtifactState::Prepared { path, .. } => Some((artifact_id, path)),
_ => None,
})
}

/// Remove artifacts older than the given TTL and return id and path of the removed ones.
pub fn prune(&mut self, artifact_ttl: Duration) -> Vec<(ArtifactId, PathBuf)> {
let now = SystemTime::now();
Expand Down
4 changes: 4 additions & 0 deletions polkadot/node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ pub enum PossiblyInvalidError {
/// vote invalid.
#[error("possibly invalid: job error: {0}")]
JobError(String),
/// Instantiation of the WASM module instance failed during an execution.
/// Possibly related to local issues or dirty node update. May be retried with re-preparation.
#[error("possibly invalid: runtime construction: {0}")]
RuntimeConstruction(String),
}

impl From<PrepareError> for ValidationError {
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/pvf/src/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@
mod queue;
mod worker_interface;

pub use queue::{start, PendingExecutionRequest, ToQueue};
pub use queue::{start, FromQueue, PendingExecutionRequest, ToQueue};
68 changes: 57 additions & 11 deletions polkadot/node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
InvalidCandidate, PossiblyInvalidError, ValidationError, LOG_TARGET,
};
use futures::{
channel::mpsc,
channel::{mpsc, oneshot},
future::BoxFuture,
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
Expand Down Expand Up @@ -54,6 +54,12 @@ pub enum ToQueue {
Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest },
}

/// A response from queue.
#[derive(Debug)]
pub enum FromQueue {
RemoveArtifact { artifact: ArtifactId, reply_to: oneshot::Sender<()> },
}

/// An execution request that should execute the PVF (known in the context) and send the results
/// to the given result sender.
#[derive(Debug)]
Expand Down Expand Up @@ -137,6 +143,8 @@ struct Queue {

/// The receiver that receives messages to the pool.
to_queue_rx: mpsc::Receiver<ToQueue>,
/// The sender to send messages back to validation host.
from_queue_tx: mpsc::UnboundedSender<FromQueue>,

// Some variables related to the current session.
program_path: PathBuf,
Expand All @@ -161,6 +169,7 @@ impl Queue {
node_version: Option<String>,
security_status: SecurityStatus,
to_queue_rx: mpsc::Receiver<ToQueue>,
from_queue_tx: mpsc::UnboundedSender<FromQueue>,
) -> Self {
Self {
metrics,
Expand All @@ -170,6 +179,7 @@ impl Queue {
node_version,
security_status,
to_queue_rx,
from_queue_tx,
queue: VecDeque::new(),
mux: Mux::new(),
workers: Workers {
Expand Down Expand Up @@ -301,7 +311,7 @@ async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
handle_worker_spawned(queue, idle, handle, job);
},
QueueEvent::StartWork(worker, outcome, artifact_id, result_tx) => {
handle_job_finish(queue, worker, outcome, artifact_id, result_tx);
handle_job_finish(queue, worker, outcome, artifact_id, result_tx).await;
},
}
}
Expand All @@ -327,42 +337,69 @@ fn handle_worker_spawned(

/// If there are pending jobs in the queue, schedules the next of them onto the just freed up
/// worker. Otherwise, puts back into the available workers list.
fn handle_job_finish(
async fn handle_job_finish(
queue: &mut Queue,
worker: Worker,
outcome: Outcome,
artifact_id: ArtifactId,
result_tx: ResultSender,
) {
let (idle_worker, result, duration) = match outcome {
let (idle_worker, result, duration, sync_channel) = match outcome {
Outcome::Ok { result_descriptor, duration, idle_worker } => {
// TODO: propagate the soft timeout

(Some(idle_worker), Ok(result_descriptor), Some(duration))
(Some(idle_worker), Ok(result_descriptor), Some(duration), None)
},
Outcome::InvalidCandidate { err, idle_worker } => (
Some(idle_worker),
Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))),
None,
None,
),
Outcome::InternalError { err } => (None, Err(ValidationError::Internal(err)), None),
Outcome::RuntimeConstruction { err, idle_worker } => {
// The task for artifact removal is executed concurrently with
// the message to the host on the execution result.
let (result_tx, result_rx) = oneshot::channel();
queue
.from_queue_tx
.unbounded_send(FromQueue::RemoveArtifact {
artifact: artifact_id.clone(),
reply_to: result_tx,
})
.expect("from execute queue receiver is listened by the host; qed");
(
Some(idle_worker),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(
err,
))),
None,
Some(result_rx),
)
},
Outcome::InternalError { err } => (None, Err(ValidationError::Internal(err)), None, None),
// Either the worker or the job timed out. Kill the worker in either case. Treated as
// definitely-invalid, because if we timed out, there's no time left for a retry.
Outcome::HardTimeout =>
(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None),
(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None),
// "Maybe invalid" errors (will retry).
Outcome::WorkerIntfErr => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
None,
None,
),
Outcome::JobDied { err } => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))),
None,
None,
),
Outcome::JobError { err } => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))),
None,
None,
),
Outcome::JobError { err } =>
(None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))), None),
};

queue.metrics.execute_finished();
Expand All @@ -386,6 +423,12 @@ fn handle_job_finish(
);
}

if let Some(sync_channel) = sync_channel {
// err means the sender is dropped (the artifact is already removed from the cache)
// so that's legitimate to ignore the result
let _ = sync_channel.await;
}

// First we send the result. It may fail due to the other end of the channel being dropped,
// that's legitimate and we don't treat that as an error.
let _ = result_tx.send(result);
Expand Down Expand Up @@ -521,8 +564,10 @@ pub fn start(
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
) -> (mpsc::Sender<ToQueue>, mpsc::UnboundedReceiver<FromQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
let (from_queue_tx, from_queue_rx) = mpsc::unbounded();

let run = Queue::new(
metrics,
program_path,
Expand All @@ -532,7 +577,8 @@ pub fn start(
node_version,
security_status,
to_queue_rx,
from_queue_tx,
)
.run();
(to_queue_tx, run)
(to_queue_tx, from_queue_rx, run)
}
Loading
Loading