Skip to content

Commit

Permalink
Recover from cancelled remote execution RPCs (pantsbuild#6188)
Browse files Browse the repository at this point in the history
### Problem

It's possible for an individual RPC to be cancelled while an entire Operation continues, but this is not currently handled. See the problem description on pantsbuild#6100.

### Solution

Recover from `RpcStatus::Cancelled` by retrying the same operation.

### Result

Without the Fixes pantsbuild#6100.
  • Loading branch information
Stu Hood authored and Borja Lorente committed Aug 15, 2018
1 parent a8e3d15 commit 8f8d108
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 77 deletions.
197 changes: 133 additions & 64 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ impl super::CommandRunner for CommandRunner {
operations_client
.get()
.get_operation(&operation_request)
.or_else(move |err| {
rpcerror_recover_cancelled(operation_request.take_name(), err)
})
.map_err(rpcerror_to_string),
).map(move |operation| {
future::Loop::Continue((operation, iter_num + 1))
Expand Down Expand Up @@ -633,6 +636,27 @@ fn format_error(error: &bazel_protos::status::Status) -> String {
format!("{}: {}", error_code, error.get_message())
}

///
/// If the given operation represents a cancelled request, recover it into
/// ExecutionError::NotFinished.
///
fn rpcerror_recover_cancelled(
operation_name: String,
err: grpcio::Error,
) -> Result<bazel_protos::operations::Operation, grpcio::Error> {
// If the error represented cancellation, return an Operation for the given Operation name.
match &err {
&grpcio::Error::RpcFailure(ref rs) if rs.status == grpcio::RpcStatusCode::Cancelled => {
let mut next_operation = bazel_protos::operations::Operation::new();
next_operation.set_name(operation_name);
return Ok(next_operation);
}
_ => {}
}
// Did not represent cancellation.
Err(err)
}

fn rpcerror_to_string(error: grpcio::Error) -> String {
match error {
grpcio::Error::RpcFailure(status) => format!(
Expand Down Expand Up @@ -673,6 +697,7 @@ mod tests {

use super::super::CommandRunner as CommandRunnerTrait;
use super::{CommandRunner, ExecuteProcessRequest, ExecutionError, FallibleExecuteProcessResult};
use mock::execution_server::MockOperation;
use std::collections::{BTreeMap, BTreeSet};
use std::iter::{self, FromIterator};
use std::ops::Sub;
Expand Down Expand Up @@ -838,7 +863,8 @@ mod tests {
StdoutType::Digest(testdata.digest()),
StderrType::Raw(testdata_empty.string()),
0,
).0
).op
.unwrap()
),
Ok(FallibleExecuteProcessResult {
stdout: testdata.bytes(),
Expand All @@ -861,7 +887,8 @@ mod tests {
StdoutType::Raw(testdata_empty.string()),
StderrType::Digest(testdata.digest()),
0,
).0
).op
.unwrap()
),
Ok(FallibleExecuteProcessResult {
stdout: testdata_empty.bytes(),
Expand Down Expand Up @@ -1012,6 +1039,42 @@ mod tests {
assert_contains(&error_msg, "echo-a-foo");
}

#[test]
fn retry_for_canceled_channel() {
let execute_request = echo_foo_request();

let mock_server = {
let op_name = "gimme-foo".to_string();

mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![
make_incomplete_operation(&op_name),
make_canceled_operation(Some(Duration::from_millis(100))),
make_successful_operation(
&op_name,
StdoutType::Raw("foo".to_owned()),
StderrType::Raw("".to_owned()),
0,
),
],
))
};

let result = run_command_remote(mock_server.address(), execute_request).unwrap();

assert_eq!(
result,
FallibleExecuteProcessResult {
stdout: as_bytes("foo"),
stderr: as_bytes(""),
exit_code: 0,
output_directory: fs::EMPTY_DIGEST,
}
);
}

#[test]
fn bad_result_bytes() {
let execute_request = echo_foo_request();
Expand All @@ -1022,23 +1085,26 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![make_incomplete_operation(&op_name), {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.clone());
op.set_done(true);
op.set_response({
let mut response_wrapper = protobuf::well_known_types::Any::new();
response_wrapper.set_type_url(format!(
"type.googleapis.com/{}",
bazel_protos::remote_execution::ExecuteResponse::new()
.descriptor()
.full_name()
));
response_wrapper.set_value(vec![0x00, 0x00, 0x00]);
response_wrapper
});
(op, None)
}],
vec![
make_incomplete_operation(&op_name),
MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.clone());
op.set_done(true);
op.set_response({
let mut response_wrapper = protobuf::well_known_types::Any::new();
response_wrapper.set_type_url(format!(
"type.googleapis.com/{}",
bazel_protos::remote_execution::ExecuteResponse::new()
.descriptor()
.full_name()
));
response_wrapper.set_value(vec![0x00, 0x00, 0x00]);
response_wrapper
});
op
}),
],
))
};

Expand All @@ -1055,7 +1121,7 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![{
vec![MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
Expand All @@ -1065,8 +1131,8 @@ mod tests {
error.set_message("Something went wrong".to_string());
error
});
(op, None)
}],
op
})],
))
};

Expand All @@ -1085,18 +1151,21 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![make_incomplete_operation(&op_name), {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
op.set_error({
let mut error = bazel_protos::status::Status::new();
error.set_code(bazel_protos::code::Code::INTERNAL.value());
error.set_message("Something went wrong".to_string());
error
});
(op, None)
}],
vec![
make_incomplete_operation(&op_name),
MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
op.set_error({
let mut error = bazel_protos::status::Status::new();
error.set_code(bazel_protos::code::Code::INTERNAL.value());
error.set_message("Something went wrong".to_string());
error
});
op
}),
],
))
};

Expand All @@ -1115,12 +1184,12 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![{
vec![MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
(op, None)
}],
op
})],
))
};

Expand All @@ -1139,12 +1208,15 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![make_incomplete_operation(&op_name), {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
(op, None)
}],
vec![
make_incomplete_operation(&op_name),
MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
op
}),
],
))
};

Expand Down Expand Up @@ -1334,7 +1406,7 @@ mod tests {
.map(missing_preconditionfailure_violation)
.collect();

let (operation, _duration) = make_precondition_failure_operation(missing);
let operation = make_precondition_failure_operation(missing).op.unwrap();

assert_eq!(
extract_execute_response(operation),
Expand All @@ -1354,7 +1426,7 @@ mod tests {
},
];

let (operation, _duration) = make_precondition_failure_operation(missing);
let operation = make_precondition_failure_operation(missing).op.unwrap();

match extract_execute_response(operation) {
Err(ExecutionError::Fatal(err)) => assert_contains(&err, "monkeys"),
Expand All @@ -1370,7 +1442,7 @@ mod tests {
violation
}];

let (operation, _duration) = make_precondition_failure_operation(missing);
let operation = make_precondition_failure_operation(missing).op.unwrap();

match extract_execute_response(operation) {
Err(ExecutionError::Fatal(err)) => assert_contains(&err, "OUT_OF_CAPACITY"),
Expand All @@ -1382,7 +1454,7 @@ mod tests {
fn extract_execute_response_missing_without_list() {
let missing = vec![];

let (operation, _duration) = make_precondition_failure_operation(missing);
let operation = make_precondition_failure_operation(missing).op.unwrap();

match extract_execute_response(operation) {
Err(ExecutionError::Fatal(err)) => assert_contains(&err.to_lowercase(), "precondition"),
Expand Down Expand Up @@ -1671,36 +1743,33 @@ mod tests {
}
}

// NB: The following helper functions return tuples of Operation and an optional Duration in
// order to make setting up the operations for a test execution server easier to read.
// The test execution server uses the duration to introduce a delay so that we can test
// timeouts.
fn make_canceled_operation(duration: Option<Duration>) -> MockOperation {
MockOperation { op: None, duration }
}

fn make_incomplete_operation(
operation_name: &str,
) -> (bazel_protos::operations::Operation, Option<Duration>) {
fn make_incomplete_operation(operation_name: &str) -> MockOperation {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(operation_name.to_string());
op.set_done(false);
(op, None)
MockOperation::new(op)
}

fn make_delayed_incomplete_operation(
operation_name: &str,
delay: Duration,
) -> (bazel_protos::operations::Operation, Option<Duration>) {
fn make_delayed_incomplete_operation(operation_name: &str, delay: Duration) -> MockOperation {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(operation_name.to_string());
op.set_done(false);
(op, Some(delay))
MockOperation {
op: Some(op),
duration: Some(delay),
}
}

fn make_successful_operation(
operation_name: &str,
stdout: StdoutType,
stderr: StderrType,
exit_code: i32,
) -> (bazel_protos::operations::Operation, Option<Duration>) {
) -> MockOperation {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(operation_name.to_string());
op.set_done(true);
Expand Down Expand Up @@ -1737,12 +1806,12 @@ mod tests {
response_wrapper.set_value(response_proto_bytes);
response_wrapper
});
(op, None)
MockOperation::new(op)
}

fn make_precondition_failure_operation(
violations: Vec<bazel_protos::error_details::PreconditionFailure_Violation>,
) -> (bazel_protos::operations::Operation, Option<Duration>) {
) -> MockOperation {
let mut operation = bazel_protos::operations::Operation::new();
operation.set_name("cat".to_owned());
operation.set_done(true);
Expand All @@ -1762,7 +1831,7 @@ mod tests {
});
response
}));
(operation, None)
MockOperation::new(operation)
}

fn run_command_remote(
Expand Down
Loading

0 comments on commit 8f8d108

Please sign in to comment.