Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
pvf: Fix missing execution request when retrying preparation (#6537)
Browse files Browse the repository at this point in the history
* pvf: Add checks for result sender when retrying preparation in tests

* pvf: Fix missing execution request when retrying preparation

* Update comment
  • Loading branch information
mrcnski authored Jan 11, 2023
1 parent 62df676 commit 00a503b
Showing 1 changed file with 89 additions and 13 deletions.
102 changes: 89 additions & 13 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,10 @@ async fn handle_execute_pvf(
},
)
.await?;

// Add an execution request that will wait to run after this prepare job has
// finished.
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
} else {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
}
Expand Down Expand Up @@ -931,6 +935,13 @@ mod tests {
ValidationHost { to_host_tx }
}

async fn poll_and_recv_result<T>(&mut self, result_rx: oneshot::Receiver<T>) -> T
where
T: Send,
{
run_until(&mut self.run, async { result_rx.await.unwrap() }.boxed()).await
}

async fn poll_and_recv_to_prepare_queue(&mut self) -> prepare::ToQueue {
let to_prepare_queue_rx = &mut self.to_prepare_queue_rx;
run_until(&mut self.run, async { to_prepare_queue_rx.next().await.unwrap() }.boxed())
Expand Down Expand Up @@ -991,7 +1002,7 @@ mod tests {
futures::select! {
_ = Delay::new(Duration::from_millis(500)).fuse() => (),
msg = to_sweeper_rx.next().fuse() => {
panic!("the sweeper supposed to be empty, but received: {:?}", msg)
panic!("the sweeper is supposed to be empty, but received: {:?}", msg)
}
}
}
Expand Down Expand Up @@ -1311,12 +1322,12 @@ mod tests {
// Test that multiple prechecking requests do not trigger preparation retries if the first one
// failed.
#[tokio::test]
async fn test_precheck_prepare_retry() {
async fn test_precheck_prepare_no_retry() {
let mut test = Builder::default().build();
let mut host = test.host_handle();

// Submit a precheck request that fails.
let (result_tx, _result_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();

// The queue received the prepare request.
Expand All @@ -1333,22 +1344,34 @@ mod tests {
.await
.unwrap();

// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(PrepareError::TimedOut));

// Submit another precheck request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
let (result_tx_2, result_rx_2) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap();

// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(PrepareError::TimedOut));

// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;

// Submit another precheck request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap();

// Assert the prepare queue is empty - we do not retry for precheck requests.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should still contain the original error.
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(result, Err(PrepareError::TimedOut));
}

// Test that multiple execution requests trigger preparation retries if the first one failed due
Expand All @@ -1359,7 +1382,7 @@ mod tests {
let mut host = test.host_handle();

// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1384,8 +1407,12 @@ mod tests {
.await
.unwrap();

// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(result, Err(ValidationError::InternalError(_)));

// Submit another execute request. We shouldn't try to prepare again, yet.
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1399,11 +1426,15 @@ mod tests {
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(result, Err(ValidationError::InternalError(_)));

// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;

// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1419,6 +1450,30 @@ mod tests {
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);

test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();

// Preparation should have been retried and succeeded this time.
let result_tx_3 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
);

// Send an error for the execution here, just so we can check the result receiver is still
// alive.
result_tx_3
.send(Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)))
.unwrap();
assert_matches!(
result_rx_3.now_or_never().unwrap().unwrap(),
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
);
}

// Test that multiple execution requests don't trigger preparation retries if the first one
Expand All @@ -1428,8 +1483,8 @@ mod tests {
let mut test = Builder::default().build();
let mut host = test.host_handle();

// Submit a execute request that fails.
let (result_tx, _result_rx) = oneshot::channel();
// Submit an execute request that fails.
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1454,8 +1509,15 @@ mod tests {
.await
.unwrap();

// The result should contain the error.
let result = test.poll_and_recv_result(result_rx).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);

// Submit another execute request.
let (result_tx_2, _result_rx_2) = oneshot::channel();
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1469,11 +1531,18 @@ mod tests {
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should contain the original error.
let result = test.poll_and_recv_result(result_rx_2).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);

// Pause for enough time to reset the cooldown for this failed prepare request.
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;

// Submit another execute request.
let (result_tx_3, _result_rx_3) = oneshot::channel();
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
Expand All @@ -1486,6 +1555,13 @@ mod tests {

// Assert the prepare queue is empty - we do not retry for prevalidation errors.
test.poll_ensure_to_prepare_queue_is_empty().await;

// The result should still contain the original error.
let result = test.poll_and_recv_result(result_rx_3).await;
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(_)))
);
}

// Test that multiple heads-up requests trigger preparation retries if the first one failed.
Expand Down

0 comments on commit 00a503b

Please sign in to comment.