diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 03735db2ed3..e2910109e5c 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -182,6 +182,9 @@ impl super::CommandRunner for CommandRunner { operations_client .get() .get_operation(&operation_request) + .or_else(move |err| { + rpcerror_recover_cancelled(operation_request.take_name(), err) + }) .map_err(rpcerror_to_string), ).map(move |operation| { future::Loop::Continue((operation, iter_num + 1)) @@ -631,6 +634,27 @@ fn format_error(error: &bazel_protos::status::Status) -> String { format!("{}: {}", error_code, error.get_message()) } +/// +/// If the given operation represents a cancelled request, recover it into +/// ExecutionError::NotFinished. +/// +fn rpcerror_recover_cancelled( + operation_name: String, + err: grpcio::Error, +) -> Result { + // If the error represented cancellation, return an Operation for the given Operation name. + match &err { + &grpcio::Error::RpcFailure(ref rs) if rs.status == grpcio::RpcStatusCode::Cancelled => { + let mut next_operation = bazel_protos::operations::Operation::new(); + next_operation.set_name(operation_name); + return Ok(next_operation); + } + _ => {} + } + // Did not represent cancellation. + Err(err) +} + fn rpcerror_to_string(error: grpcio::Error) -> String { match error { grpcio::Error::RpcFailure(status) => format!( @@ -671,6 +695,7 @@ mod tests { use super::super::CommandRunner as CommandRunnerTrait; use super::{CommandRunner, ExecuteProcessRequest, ExecutionError, FallibleExecuteProcessResult}; + use mock::execution_server::MockOperation; use std::collections::{BTreeMap, BTreeSet}; use std::iter::{self, FromIterator}; use std::ops::Sub; @@ -834,7 +859,8 @@ mod tests { StdoutType::Digest(testdata.digest()), StderrType::Raw(testdata_empty.string()), 0, - ).0 + ).op + .unwrap() ), Ok(FallibleExecuteProcessResult { stdout: testdata.bytes(), @@ -857,7 +883,8 @@ mod tests { StdoutType::Raw(testdata_empty.string()), StderrType::Digest(testdata.digest()), 0, - ).0 + ).op + .unwrap() ), Ok(FallibleExecuteProcessResult { stdout: testdata_empty.bytes(), @@ -1007,6 +1034,42 @@ mod tests { assert_contains(&error_msg, "echo-a-foo"); } + #[test] + fn retry_for_canceled_channel() { + let execute_request = echo_foo_request(); + + let mock_server = { + let op_name = "gimme-foo".to_string(); + + mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( + op_name.clone(), + super::make_execute_request(&execute_request).unwrap().2, + vec![ + make_incomplete_operation(&op_name), + make_canceled_operation(Some(Duration::from_millis(100))), + make_successful_operation( + &op_name, + StdoutType::Raw("foo".to_owned()), + StderrType::Raw("".to_owned()), + 0, + ), + ], + )) + }; + + let result = run_command_remote(mock_server.address(), execute_request).unwrap(); + + assert_eq!( + result, + FallibleExecuteProcessResult { + stdout: as_bytes("foo"), + stderr: as_bytes(""), + exit_code: 0, + output_directory: fs::EMPTY_DIGEST, + } + ); + } + #[test] fn bad_result_bytes() { let execute_request = echo_foo_request(); @@ -1017,23 +1080,26 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), super::make_execute_request(&execute_request).unwrap().2, - vec![make_incomplete_operation(&op_name), { - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(op_name.clone()); - op.set_done(true); - op.set_response({ - let mut response_wrapper = protobuf::well_known_types::Any::new(); - response_wrapper.set_type_url(format!( - "type.googleapis.com/{}", - bazel_protos::remote_execution::ExecuteResponse::new() - .descriptor() - .full_name() - )); - response_wrapper.set_value(vec![0x00, 0x00, 0x00]); - response_wrapper - }); - (op, None) - }], + vec![ + make_incomplete_operation(&op_name), + MockOperation::new({ + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(op_name.clone()); + op.set_done(true); + op.set_response({ + let mut response_wrapper = protobuf::well_known_types::Any::new(); + response_wrapper.set_type_url(format!( + "type.googleapis.com/{}", + bazel_protos::remote_execution::ExecuteResponse::new() + .descriptor() + .full_name() + )); + response_wrapper.set_value(vec![0x00, 0x00, 0x00]); + response_wrapper + }); + op + }), + ], )) }; @@ -1050,7 +1116,7 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), super::make_execute_request(&execute_request).unwrap().2, - vec![{ + vec![MockOperation::new({ let mut op = bazel_protos::operations::Operation::new(); op.set_name(op_name.to_string()); op.set_done(true); @@ -1060,8 +1126,8 @@ mod tests { error.set_message("Something went wrong".to_string()); error }); - (op, None) - }], + op + })], )) }; @@ -1080,18 +1146,21 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), super::make_execute_request(&execute_request).unwrap().2, - vec![make_incomplete_operation(&op_name), { - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(op_name.to_string()); - op.set_done(true); - op.set_error({ - let mut error = bazel_protos::status::Status::new(); - error.set_code(bazel_protos::code::Code::INTERNAL.value()); - error.set_message("Something went wrong".to_string()); - error - }); - (op, None) - }], + vec![ + make_incomplete_operation(&op_name), + MockOperation::new({ + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(op_name.to_string()); + op.set_done(true); + op.set_error({ + let mut error = bazel_protos::status::Status::new(); + error.set_code(bazel_protos::code::Code::INTERNAL.value()); + error.set_message("Something went wrong".to_string()); + error + }); + op + }), + ], )) }; @@ -1110,12 +1179,12 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), super::make_execute_request(&execute_request).unwrap().2, - vec![{ + vec![MockOperation::new({ let mut op = bazel_protos::operations::Operation::new(); op.set_name(op_name.to_string()); op.set_done(true); - (op, None) - }], + op + })], )) }; @@ -1134,12 +1203,15 @@ mod tests { mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), super::make_execute_request(&execute_request).unwrap().2, - vec![make_incomplete_operation(&op_name), { - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(op_name.to_string()); - op.set_done(true); - (op, None) - }], + vec![ + make_incomplete_operation(&op_name), + MockOperation::new({ + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(op_name.to_string()); + op.set_done(true); + op + }), + ], )) }; @@ -1329,7 +1401,7 @@ mod tests { .map(missing_preconditionfailure_violation) .collect(); - let (operation, _duration) = make_precondition_failure_operation(missing); + let operation = make_precondition_failure_operation(missing).op.unwrap(); assert_eq!( extract_execute_response(operation), @@ -1349,7 +1421,7 @@ mod tests { }, ]; - let (operation, _duration) = make_precondition_failure_operation(missing); + let operation = make_precondition_failure_operation(missing).op.unwrap(); match extract_execute_response(operation) { Err(ExecutionError::Fatal(err)) => assert_contains(&err, "monkeys"), @@ -1365,7 +1437,7 @@ mod tests { violation }]; - let (operation, _duration) = make_precondition_failure_operation(missing); + let operation = make_precondition_failure_operation(missing).op.unwrap(); match extract_execute_response(operation) { Err(ExecutionError::Fatal(err)) => assert_contains(&err, "OUT_OF_CAPACITY"), @@ -1377,7 +1449,7 @@ mod tests { fn extract_execute_response_missing_without_list() { let missing = vec![]; - let (operation, _duration) = make_precondition_failure_operation(missing); + let operation = make_precondition_failure_operation(missing).op.unwrap(); match extract_execute_response(operation) { Err(ExecutionError::Fatal(err)) => assert_contains(&err.to_lowercase(), "precondition"), @@ -1665,28 +1737,25 @@ mod tests { } } - // NB: The following helper functions return tuples of Operation and an optional Duration in - // order to make setting up the operations for a test execution server easier to read. - // The test execution server uses the duration to introduce a delay so that we can test - // timeouts. + fn make_canceled_operation(duration: Option) -> MockOperation { + MockOperation { op: None, duration } + } - fn make_incomplete_operation( - operation_name: &str, - ) -> (bazel_protos::operations::Operation, Option) { + fn make_incomplete_operation(operation_name: &str) -> MockOperation { let mut op = bazel_protos::operations::Operation::new(); op.set_name(operation_name.to_string()); op.set_done(false); - (op, None) + MockOperation::new(op) } - fn make_delayed_incomplete_operation( - operation_name: &str, - delay: Duration, - ) -> (bazel_protos::operations::Operation, Option) { + fn make_delayed_incomplete_operation(operation_name: &str, delay: Duration) -> MockOperation { let mut op = bazel_protos::operations::Operation::new(); op.set_name(operation_name.to_string()); op.set_done(false); - (op, Some(delay)) + MockOperation { + op: Some(op), + duration: Some(delay), + } } fn make_successful_operation( @@ -1694,7 +1763,7 @@ mod tests { stdout: StdoutType, stderr: StderrType, exit_code: i32, - ) -> (bazel_protos::operations::Operation, Option) { + ) -> MockOperation { let mut op = bazel_protos::operations::Operation::new(); op.set_name(operation_name.to_string()); op.set_done(true); @@ -1731,12 +1800,12 @@ mod tests { response_wrapper.set_value(response_proto_bytes); response_wrapper }); - (op, None) + MockOperation::new(op) } fn make_precondition_failure_operation( violations: Vec, - ) -> (bazel_protos::operations::Operation, Option) { + ) -> MockOperation { let mut operation = bazel_protos::operations::Operation::new(); operation.set_name("cat".to_owned()); operation.set_done(true); @@ -1756,7 +1825,7 @@ mod tests { }); response })); - (operation, None) + MockOperation::new(operation) } fn run_command_remote( diff --git a/src/rust/engine/testutil/mock/src/execution_server.rs b/src/rust/engine/testutil/mock/src/execution_server.rs index f7b1bedda0d..f695783e0a1 100644 --- a/src/rust/engine/testutil/mock/src/execution_server.rs +++ b/src/rust/engine/testutil/mock/src/execution_server.rs @@ -12,12 +12,33 @@ use futures::{Future, Sink}; use grpcio; use protobuf; +/// +/// A MockOperation to be used with MockExecution. +/// +/// If the op is None, the MockExecution will drop the channel, triggering cancelation on the +/// client. If the duration is not None, it represents a delay before either responding or +/// canceling for the operation. +/// +#[derive(Clone, Debug)] +pub struct MockOperation { + pub op: Option, + pub duration: Option, +} + +impl MockOperation { + pub fn new(op: bazel_protos::operations::Operation) -> MockOperation { + MockOperation { + op: Some(op), + duration: None, + } + } +} + #[derive(Clone, Debug)] pub struct MockExecution { name: String, execute_request: bazel_protos::remote_execution::ExecuteRequest, - operation_responses: - Arc)>>>, + operation_responses: Arc>>, } impl MockExecution { @@ -32,7 +53,7 @@ impl MockExecution { pub fn new( name: String, execute_request: bazel_protos::remote_execution::ExecuteRequest, - operation_responses: Vec<(bazel_protos::operations::Operation, Option)>, + operation_responses: Vec, ) -> MockExecution { MockExecution { name: name, @@ -169,11 +190,17 @@ impl MockResponder { .unwrap() .pop_front() { - Some((op, duration)) => { + Some(MockOperation { op, duration }) => { if let Some(d) = duration { sleep(d); } - sink.success(op.clone()); + if let Some(op) = op { + // Complete the channel with the op. + sink.success(op.clone()); + } else { + // Cancel the request by dropping the sink. + drop(sink); + } } None => { sink.fail(grpcio::RpcStatus::new( @@ -196,17 +223,22 @@ impl MockResponder { .unwrap() .pop_front() { - Some((op, duration)) => { + Some(MockOperation { op, duration }) => { if let Some(d) = duration { sleep(d); } - ctx.spawn( - sink - .send((op.clone(), grpcio::WriteFlags::default())) - .map(|mut stream| stream.close()) - .map(|_| ()) - .map_err(|_| ()), - ) + if let Some(op) = op { + ctx.spawn( + sink + .send((op.clone(), grpcio::WriteFlags::default())) + .map(|mut stream| stream.close()) + .map(|_| ()) + .map_err(|_| ()), + ) + } else { + // Cancel the request by dropping the sink. + drop(sink) + } } None => ctx.spawn( sink