Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover from cancelled remote execution RPCs #6188

Merged
merged 4 commits into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 133 additions & 64 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ impl super::CommandRunner for CommandRunner {
operations_client
.get()
.get_operation(&operation_request)
.or_else(move |err| {
rpcerror_recover_cancelled(operation_request.take_name(), err)
})
.map_err(rpcerror_to_string),
).map(move |operation| {
future::Loop::Continue((operation, iter_num + 1))
Expand Down Expand Up @@ -631,6 +634,27 @@ fn format_error(error: &bazel_protos::status::Status) -> String {
format!("{}: {}", error_code, error.get_message())
}

///
/// If the given operation represents a cancelled request, recover it into
/// ExecutionError::NotFinished.
///
fn rpcerror_recover_cancelled(
operation_name: String,
err: grpcio::Error,
) -> Result<bazel_protos::operations::Operation, grpcio::Error> {
// If the error represented cancellation, return an Operation for the given Operation name.
match &err {
&grpcio::Error::RpcFailure(ref rs) if rs.status == grpcio::RpcStatusCode::Cancelled => {
let mut next_operation = bazel_protos::operations::Operation::new();
next_operation.set_name(operation_name);
return Ok(next_operation);
}
_ => {}
}
// Did not represent cancellation.
Err(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just be the RHS of the _ match clause?

Copy link
Member Author

@stuhood stuhood Aug 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would complicate things, because I'd have to take the err by value, which means I could not have a guard clause on that branch of the match.

}

fn rpcerror_to_string(error: grpcio::Error) -> String {
match error {
grpcio::Error::RpcFailure(status) => format!(
Expand Down Expand Up @@ -671,6 +695,7 @@ mod tests {

use super::super::CommandRunner as CommandRunnerTrait;
use super::{CommandRunner, ExecuteProcessRequest, ExecutionError, FallibleExecuteProcessResult};
use mock::execution_server::MockOperation;
use std::collections::{BTreeMap, BTreeSet};
use std::iter::{self, FromIterator};
use std::ops::Sub;
Expand Down Expand Up @@ -834,7 +859,8 @@ mod tests {
StdoutType::Digest(testdata.digest()),
StderrType::Raw(testdata_empty.string()),
0,
).0
).op
.unwrap()
),
Ok(FallibleExecuteProcessResult {
stdout: testdata.bytes(),
Expand All @@ -857,7 +883,8 @@ mod tests {
StdoutType::Raw(testdata_empty.string()),
StderrType::Digest(testdata.digest()),
0,
).0
).op
.unwrap()
),
Ok(FallibleExecuteProcessResult {
stdout: testdata_empty.bytes(),
Expand Down Expand Up @@ -1007,6 +1034,42 @@ mod tests {
assert_contains(&error_msg, "echo-a-foo");
}

#[test]
fn retry_for_canceled_channel() {
let execute_request = echo_foo_request();

let mock_server = {
let op_name = "gimme-foo".to_string();

mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![
make_incomplete_operation(&op_name),
make_canceled_operation(Some(Duration::from_millis(100))),
make_successful_operation(
&op_name,
StdoutType::Raw("foo".to_owned()),
StderrType::Raw("".to_owned()),
0,
),
],
))
};

let result = run_command_remote(mock_server.address(), execute_request).unwrap();

assert_eq!(
result,
FallibleExecuteProcessResult {
stdout: as_bytes("foo"),
stderr: as_bytes(""),
exit_code: 0,
output_directory: fs::EMPTY_DIGEST,
}
);
}

#[test]
fn bad_result_bytes() {
let execute_request = echo_foo_request();
Expand All @@ -1017,23 +1080,26 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![make_incomplete_operation(&op_name), {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.clone());
op.set_done(true);
op.set_response({
let mut response_wrapper = protobuf::well_known_types::Any::new();
response_wrapper.set_type_url(format!(
"type.googleapis.com/{}",
bazel_protos::remote_execution::ExecuteResponse::new()
.descriptor()
.full_name()
));
response_wrapper.set_value(vec![0x00, 0x00, 0x00]);
response_wrapper
});
(op, None)
}],
vec![
make_incomplete_operation(&op_name),
MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.clone());
op.set_done(true);
op.set_response({
let mut response_wrapper = protobuf::well_known_types::Any::new();
response_wrapper.set_type_url(format!(
"type.googleapis.com/{}",
bazel_protos::remote_execution::ExecuteResponse::new()
.descriptor()
.full_name()
));
response_wrapper.set_value(vec![0x00, 0x00, 0x00]);
response_wrapper
});
op
}),
],
))
};

Expand All @@ -1050,7 +1116,7 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![{
vec![MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
Expand All @@ -1060,8 +1126,8 @@ mod tests {
error.set_message("Something went wrong".to_string());
error
});
(op, None)
}],
op
})],
))
};

Expand All @@ -1080,18 +1146,21 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![make_incomplete_operation(&op_name), {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
op.set_error({
let mut error = bazel_protos::status::Status::new();
error.set_code(bazel_protos::code::Code::INTERNAL.value());
error.set_message("Something went wrong".to_string());
error
});
(op, None)
}],
vec![
make_incomplete_operation(&op_name),
MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
op.set_error({
let mut error = bazel_protos::status::Status::new();
error.set_code(bazel_protos::code::Code::INTERNAL.value());
error.set_message("Something went wrong".to_string());
error
});
op
}),
],
))
};

Expand All @@ -1110,12 +1179,12 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![{
vec![MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
(op, None)
}],
op
})],
))
};

Expand All @@ -1134,12 +1203,15 @@ mod tests {
mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new(
op_name.clone(),
super::make_execute_request(&execute_request).unwrap().2,
vec![make_incomplete_operation(&op_name), {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
(op, None)
}],
vec![
make_incomplete_operation(&op_name),
MockOperation::new({
let mut op = bazel_protos::operations::Operation::new();
op.set_name(op_name.to_string());
op.set_done(true);
op
}),
],
))
};

Expand Down Expand Up @@ -1329,7 +1401,7 @@ mod tests {
.map(missing_preconditionfailure_violation)
.collect();

let (operation, _duration) = make_precondition_failure_operation(missing);
let operation = make_precondition_failure_operation(missing).op.unwrap();

assert_eq!(
extract_execute_response(operation),
Expand All @@ -1349,7 +1421,7 @@ mod tests {
},
];

let (operation, _duration) = make_precondition_failure_operation(missing);
let operation = make_precondition_failure_operation(missing).op.unwrap();

match extract_execute_response(operation) {
Err(ExecutionError::Fatal(err)) => assert_contains(&err, "monkeys"),
Expand All @@ -1365,7 +1437,7 @@ mod tests {
violation
}];

let (operation, _duration) = make_precondition_failure_operation(missing);
let operation = make_precondition_failure_operation(missing).op.unwrap();

match extract_execute_response(operation) {
Err(ExecutionError::Fatal(err)) => assert_contains(&err, "OUT_OF_CAPACITY"),
Expand All @@ -1377,7 +1449,7 @@ mod tests {
fn extract_execute_response_missing_without_list() {
let missing = vec![];

let (operation, _duration) = make_precondition_failure_operation(missing);
let operation = make_precondition_failure_operation(missing).op.unwrap();

match extract_execute_response(operation) {
Err(ExecutionError::Fatal(err)) => assert_contains(&err.to_lowercase(), "precondition"),
Expand Down Expand Up @@ -1665,36 +1737,33 @@ mod tests {
}
}

// NB: The following helper functions return tuples of Operation and an optional Duration in
// order to make setting up the operations for a test execution server easier to read.
// The test execution server uses the duration to introduce a delay so that we can test
// timeouts.
fn make_canceled_operation(duration: Option<Duration>) -> MockOperation {
MockOperation { op: None, duration }
}

fn make_incomplete_operation(
operation_name: &str,
) -> (bazel_protos::operations::Operation, Option<Duration>) {
fn make_incomplete_operation(operation_name: &str) -> MockOperation {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(operation_name.to_string());
op.set_done(false);
(op, None)
MockOperation::new(op)
}

fn make_delayed_incomplete_operation(
operation_name: &str,
delay: Duration,
) -> (bazel_protos::operations::Operation, Option<Duration>) {
fn make_delayed_incomplete_operation(operation_name: &str, delay: Duration) -> MockOperation {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(operation_name.to_string());
op.set_done(false);
(op, Some(delay))
MockOperation {
op: Some(op),
duration: Some(delay),
}
}

fn make_successful_operation(
operation_name: &str,
stdout: StdoutType,
stderr: StderrType,
exit_code: i32,
) -> (bazel_protos::operations::Operation, Option<Duration>) {
) -> MockOperation {
let mut op = bazel_protos::operations::Operation::new();
op.set_name(operation_name.to_string());
op.set_done(true);
Expand Down Expand Up @@ -1731,12 +1800,12 @@ mod tests {
response_wrapper.set_value(response_proto_bytes);
response_wrapper
});
(op, None)
MockOperation::new(op)
}

fn make_precondition_failure_operation(
violations: Vec<bazel_protos::error_details::PreconditionFailure_Violation>,
) -> (bazel_protos::operations::Operation, Option<Duration>) {
) -> MockOperation {
let mut operation = bazel_protos::operations::Operation::new();
operation.set_name("cat".to_owned());
operation.set_done(true);
Expand All @@ -1756,7 +1825,7 @@ mod tests {
});
response
}));
(operation, None)
MockOperation::new(operation)
}

fn run_command_remote(
Expand Down
Loading