Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

PVF: Don't dispute on missing artifact #7011

Merged
merged 7 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,38 +691,54 @@ trait ValidationBackend {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating candidate-validation tests would not harm

/// Tries executing a PVF. Will retry once if an error is encountered that may have been
/// transient.
///
/// NOTE: Should retry only on errors that are a result of execution itself, and not of
/// preparation.
async fn validate_candidate_with_retry(
&mut self,
raw_validation_code: Vec<u8>,
exec_timeout: Duration,
params: ValidationParams,
executor_params: ExecutorParams,
) -> Result<WasmValidationResult, ValidationError> {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Lenient);
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf = PvfPrepData::from_code(raw_validation_code, executor_params, prep_timeout);

let mut validation_result =
self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;

// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient. Note that
// this error is only a result of execution itself and not of preparation.
if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =
validation_result
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;
// Allow limited retries for each kind of error.
let mut num_internal_retries_left = 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make this higher, since this kind of error is probably the most likely to be transient.

let mut num_awd_retries_left = 1;
loop {
match validation_result {
Err(ValidationError::InvalidCandidate(
WasmInvalidCandidate::AmbiguousWorkerDeath,
)) if num_awd_retries_left > 0 => num_awd_retries_left -= 1,
Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 =>
num_internal_retries_left -= 1,
_ => break,
}

// If we got a possibly transient error, retry once after a brief delay, on the assumption
// that the conditions that caused this error may have resolved on their own.
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;

gum::warn!(
target: LOG_TARGET,
?pvf,
"Re-trying failed candidate validation due to AmbiguousWorkerDeath."
);
gum::warn!(
target: LOG_TARGET,
?pvf,
"Re-trying failed candidate validation due to possible transient error: {:?}",
validation_result
);

// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
validation_result = self.validate_candidate(pvf, exec_timeout, params.encode()).await;
// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
validation_result =
self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;
}
}

validation_result
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
mod queue;
mod worker;

pub use queue::{start, ToQueue};
pub use queue::{start, PendingExecutionRequest, ToQueue};
pub use worker::{worker_entrypoint, Response as ExecuteResponse};
22 changes: 14 additions & 8 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ slotmap::new_key_type! { struct Worker; }

#[derive(Debug)]
pub enum ToQueue {
Enqueue {
artifact: ArtifactPathId,
exec_timeout: Duration,
params: Vec<u8>,
executor_params: ExecutorParams,
result_tx: ResultSender,
},
Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest },
}

/// An execution request that should execute the PVF (known in the context) and send the results
/// to the given result sender.
#[derive(Debug)]
pub struct PendingExecutionRequest {
pub exec_timeout: Duration,
pub params: Vec<u8>,
pub executor_params: ExecutorParams,
pub result_tx: ResultSender,
}

struct ExecuteJob {
Expand Down Expand Up @@ -259,7 +263,9 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
}

fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, exec_timeout, params, executor_params, result_tx } = to_queue;
let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue;
let PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } =
pending_execution_request;
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
Expand Down
12 changes: 12 additions & 0 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ impl Response {
Self::InvalidCandidate(format!("{}: {}", ctx, msg))
}
}
fn format_internal(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Self::InternalError(ctx.to_string())
} else {
Self::InternalError(format!("{}: {}", ctx, msg))
}
}
}

/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
Expand Down Expand Up @@ -353,6 +360,11 @@ fn validate_using_artifact(
executor: Arc<Executor>,
cpu_time_start: ProcessTime,
) -> Response {
let file_metadata = std::fs::metadata(artifact_path);
if let Err(err) = file_metadata {
return Response::format_internal("execute: could not find or open file", &err.to_string())
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
}

let descriptor_bytes = match unsafe {
// SAFETY: this should be safe since the compiled artifact passed here comes from the
// file created by the prepare workers. These files are obtained by calling
Expand Down
Loading