Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Retry failed PVF execution (AmbiguousWorkerDeath) #6235

Merged
merged 6 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/core/candidate-validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
async-trait = "0.1.57"
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }

sp-maybe-compressed-blob = { package = "sp-maybe-compressed-blob", git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
64 changes: 44 additions & 20 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ mod tests;

const LOG_TARGET: &'static str = "parachain::candidate-validation";

/// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error.
#[cfg(not(test))]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);

/// Configuration for the candidate validation subsystem
#[derive(Clone)]
pub struct Config {
Expand Down Expand Up @@ -490,7 +496,7 @@ where
}

async fn validate_candidate_exhaustive(
mut validation_backend: impl ValidationBackend,
mut validation_backend: impl ValidationBackend + Send,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suddenly needed this due to the new default implementation of validate_candidate_with_retry on the trait. I feel like this bound shouldn't be there, but looks like it gets introduced by the #[async_trait] proc macro.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it's helpful here, but there's a #[async_trait(?Send)] variant for that attribute.

persisted_validation_data: PersistedValidationData,
validation_code: ValidationCode,
candidate_receipt: CandidateReceipt,
Expand Down Expand Up @@ -551,7 +557,7 @@ async fn validate_candidate_exhaustive(
};

let result = validation_backend
.validate_candidate(raw_validation_code.to_vec(), timeout, params)
.validate_candidate_with_retry(raw_validation_code.to_vec(), timeout, params)
.await;

if let Err(ref error) = result {
Expand Down Expand Up @@ -604,45 +610,63 @@ async fn validate_candidate_exhaustive(
#[async_trait]
trait ValidationBackend {
async fn validate_candidate(
&mut self,
pvf: Pvf,
timeout: Duration,
encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError>;

async fn validate_candidate_with_retry(
&mut self,
raw_validation_code: Vec<u8>,
timeout: Duration,
params: ValidationParams,
) -> Result<WasmValidationResult, ValidationError>;
) -> Result<WasmValidationResult, ValidationError> {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf = Pvf::from_code(raw_validation_code);

let validation_result =
self.validate_candidate(pvf.clone(), timeout, params.encode()).await;

// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient.
if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =
validation_result
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;
// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
self.validate_candidate(pvf, timeout, params.encode()).await
} else {
validation_result
}
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>;
}

#[async_trait]
impl ValidationBackend for ValidationHost {
/// Tries executing a PVF a single time (no retries).
async fn validate_candidate(
&mut self,
raw_validation_code: Vec<u8>,
pvf: Pvf,
timeout: Duration,
params: ValidationParams,
encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> {
let priority = polkadot_node_core_pvf::Priority::Normal;

let (tx, rx) = oneshot::channel();
if let Err(err) = self
.execute_pvf(
Pvf::from_code(raw_validation_code),
timeout,
params.encode(),
polkadot_node_core_pvf::Priority::Normal,
tx,
)
.await
{
if let Err(err) = self.execute_pvf(pvf, timeout, encoded_params, priority, tx).await {
return Err(ValidationError::InternalError(format!(
"cannot send pvf to the validation host: {:?}",
err
)))
}

let validation_result = rx
.await
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?;

validation_result
rx.await
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> {
Expand Down
144 changes: 136 additions & 8 deletions node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,24 +345,36 @@ fn check_does_not_match() {
}

struct MockValidateCandidateBackend {
result: Result<WasmValidationResult, ValidationError>,
result_list: Vec<Result<WasmValidationResult, ValidationError>>,
num_times_called: usize,
}

impl MockValidateCandidateBackend {
fn with_hardcoded_result(result: Result<WasmValidationResult, ValidationError>) -> Self {
Self { result }
Self { result_list: vec![result], num_times_called: 0 }
}

fn with_hardcoded_result_list(
result_list: Vec<Result<WasmValidationResult, ValidationError>>,
) -> Self {
Self { result_list, num_times_called: 0 }
}
}

#[async_trait]
impl ValidationBackend for MockValidateCandidateBackend {
async fn validate_candidate(
&mut self,
_raw_validation_code: Vec<u8>,
_pvf: Pvf,
_timeout: Duration,
_params: ValidationParams,
_encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> {
self.result.clone()
// This is expected to panic if called more times than expected, indicating an error in the
// test.
let result = self.result_list[self.num_times_called].clone();
self.num_times_called += 1;

result
}

async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> {
Expand Down Expand Up @@ -468,7 +480,7 @@ fn candidate_validation_bad_return_is_invalid() {

let v = executor::block_on(validate_candidate_exhaustive(
MockValidateCandidateBackend::with_hardcoded_result(Err(
ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath),
ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout),
)),
validation_data,
validation_code,
Expand All @@ -479,6 +491,122 @@ fn candidate_validation_bad_return_is_invalid() {
))
.unwrap();

assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::Timeout));
}

#[test]
fn candidate_validation_one_ambiguous_error_is_valid() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };

let pov = PoV { block_data: BlockData(vec![1; 32]) };
let head_data = HeadData(vec![1, 1, 1]);
let validation_code = ValidationCode(vec![2; 16]);

let descriptor = make_valid_candidate_descriptor(
ParaId::from(1_u32),
dummy_hash(),
validation_data.hash(),
pov.hash(),
validation_code.hash(),
head_data.hash(),
dummy_hash(),
Sr25519Keyring::Alice,
);

let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());

let validation_result = WasmValidationResult {
head_data,
new_validation_code: Some(vec![2, 2, 2].into()),
upward_messages: Vec::new(),
horizontal_messages: Vec::new(),
processed_downward_messages: 0,
hrmp_watermark: 0,
};

let commitments = CandidateCommitments {
head_data: validation_result.head_data.clone(),
upward_messages: validation_result.upward_messages.clone(),
horizontal_messages: validation_result.horizontal_messages.clone(),
new_validation_code: validation_result.new_validation_code.clone(),
processed_downward_messages: validation_result.processed_downward_messages,
hrmp_watermark: validation_result.hrmp_watermark,
};

let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };

let v = executor::block_on(validate_candidate_exhaustive(
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
Ok(validation_result),
]),
validation_data.clone(),
validation_code,
candidate_receipt,
Arc::new(pov),
Duration::from_secs(0),
&Default::default(),
))
.unwrap();

assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1]));
assert_eq!(outputs.upward_messages, Vec::<UpwardMessage>::new());
assert_eq!(outputs.horizontal_messages, Vec::new());
assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into()));
assert_eq!(outputs.hrmp_watermark, 0);
assert_eq!(used_validation_data, validation_data);
});
}

#[test]
fn candidate_validation_multiple_ambiguous_errors_is_invalid() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };

let pov = PoV { block_data: BlockData(vec![1; 32]) };
let validation_code = ValidationCode(vec![2; 16]);

let descriptor = make_valid_candidate_descriptor(
ParaId::from(1_u32),
dummy_hash(),
validation_data.hash(),
pov.hash(),
validation_code.hash(),
dummy_hash(),
dummy_hash(),
Sr25519Keyring::Alice,
);

let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());

let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };

let v = executor::block_on(validate_candidate_exhaustive(
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
]),
validation_data,
validation_code,
candidate_receipt,
Arc::new(pov),
Duration::from_secs(0),
&Default::default(),
))
.unwrap();

assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_)));
}

Expand Down Expand Up @@ -779,9 +907,9 @@ impl MockPreCheckBackend {
impl ValidationBackend for MockPreCheckBackend {
async fn validate_candidate(
&mut self,
_raw_validation_code: Vec<u8>,
_pvf: Pvf,
_timeout: Duration,
_params: ValidationParams,
_encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> {
unreachable!()
}
Expand Down
6 changes: 3 additions & 3 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ fn handle_job_finish(
"execute worker concluded",
);

// First we send the result. It may fail due the other end of the channel being dropped, that's
// legitimate and we don't treat that as an error.
// First we send the result. It may fail due to the other end of the channel being dropped,
// that's legitimate and we don't treat that as an error.
let _ = result_tx.send(result);

// Then, we should deal with the worker:
Expand Down Expand Up @@ -305,7 +305,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);

// Assume that the failure intermittent and retry after a delay.
// Assume that the failure is intermittent and retry after a delay.
Delay::new(Duration::from_secs(3)).await;
},
}
Expand Down