Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused operation wrapper #7194

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 90 additions & 108 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ use tower_util::MakeService;
// CommandRunner.
const CACHE_KEY_GEN_VERSION_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_GEN_VERSION";

#[derive(Debug)]
enum OperationOrStatus {
Operation(bazel_protos::google::longrunning::Operation),
Status(bazel_protos::google::rpc::Status),
}

type Connection = tower_http::add_origin::AddOrigin<
tower_h2::client::Connection<tokio::net::tcp::TcpStream, DefaultExecutor, tower_grpc::BoxBody>,
>;
Expand Down Expand Up @@ -88,7 +82,7 @@ impl CommandRunner {
fn oneshot_execute(
&self,
execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest,
) -> impl Future<Item = OperationOrStatus, Error = String> {
) -> impl Future<Item = bazel_protos::google::longrunning::Operation, Error = String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what is the utility of using impl Future<Item = .., Error = ...> here? Is the idea that the method can return anything type convertible via From/Into into these types (or does it allow us to avoid calling .to_boxed() on the result?)? Mostly asking to see if I should be writing this on all my futures-y code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl Future is shorthand for "Exactly one type which implements the Future trait".

So we can't use it in places like:

fn... {
  if x > 1 {
    futures::future::ok("Hello")
  } else {
    futures::future::ok("goodbye").map(str::to_upper)
  }
}

because the function returns two different types that can't be unified.

BoxFuture is used to make both of the things we return have the same type, so that we can have a single return type. It does this by moving the dyn Future to the heap, and making a BoxFuture pointing to it. We introduced BoxFuture before impl Future was a stable thing in the language; we should prefer impl Future to BoxFuture because it avoids moving the Future to the heap. But BoxFuture still currently has its place; mostly it's used where it's much more clear to early return than the use awkward combinators:

fn... {
  if some_condition {
    return futures::future::err("Broken").to_boxed();
  }
  some_complex_future_expression().to_boxed()
}

can be more clear than:

fn... {
  let f = if some_condition { futures::future::err("Broken") } else { futures::future::ok(()) };
  f.and_then(|()| some_complex_future_expression())
}

which is the dance you need to go through to make there be a single return type.

But also, some usages will just be legacy from before impl Future was a thing. So if you're writing new code, aim for impl Future, but if you're reading old code, there may not be a good reason it used BoxFuture :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other reason we chose not to rewrite a bunch of code to use impl Future is that async fn is going to replace almost all usage of impl Future. See #6076 for some more discussion.

let command_runner = self.clone();
self
.clients
Expand All @@ -115,7 +109,6 @@ impl CommandRunner {
std::mem::drop(stream);
resp.ok_or_else(|| "Didn't get response from remote process execution".to_owned())
})
.map(OperationOrStatus::Operation)
})
})
}
Expand Down Expand Up @@ -281,7 +274,6 @@ impl super::CommandRunner for CommandRunner {
})
.map_err(towergrpcerror_to_string)
})
.map(OperationOrStatus::Operation)
.map(move |operation| {
future::Loop::Continue((history, operation, iter_num + 1))
})
Expand Down Expand Up @@ -419,108 +411,99 @@ impl CommandRunner {

fn extract_execute_response(
&self,
operation_or_status: OperationOrStatus,
operation: bazel_protos::google::longrunning::Operation,
attempts: &mut ExecutionHistory,
) -> BoxFuture<FallibleExecuteProcessResult, ExecutionError> {
trace!("Got operation response: {:?}", operation_or_status);
trace!("Got operation response: {:?}", operation);

let status = match operation_or_status {
OperationOrStatus::Operation(operation) => {
if !operation.done {
return future::err(ExecutionError::NotFinished(operation.name)).to_boxed();
}
let execute_response = if let Some(result) = operation.result {
match result {
bazel_protos::google::longrunning::operation::Result::Error(ref status) => {
return future::err(ExecutionError::Fatal(format_error(status))).to_boxed();
}
bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!(
bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode(
&any.value
)
.map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))
),
}
} else {
return future::err(ExecutionError::Fatal(
"Operation finished but no response supplied".to_string(),
))
.to_boxed();
};

trace!("Got (nested) execute response: {:?}", execute_response);

if let Some(ref result) = execute_response.result {
if let Some(ref metadata) = result.execution_metadata {
let enqueued = timespec_from(&metadata.queued_timestamp);
let worker_start = timespec_from(&metadata.worker_start_timestamp);
let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp);
let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp);
let execution_start = timespec_from(&metadata.execution_start_timestamp);
let execution_completed = timespec_from(&metadata.execution_completed_timestamp);
let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp);
let output_upload_completed =
timespec_from(&metadata.output_upload_completed_timestamp);

match (worker_start - enqueued).to_std() {
Ok(duration) => attempts.current_attempt.remote_queue = Some(duration),
Err(err) => warn!("Got negative remote queue time: {}", err),
}
match (input_fetch_completed - input_fetch_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration),
Err(err) => warn!("Got negative remote input fetch time: {}", err),
}
match (execution_completed - execution_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_execution = Some(duration),
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration),
Err(err) => warn!("Got negative remote output store time: {}", err),
}
attempts.current_attempt.was_cache_hit = execute_response.cached_result;
}
if !operation.done {
return future::err(ExecutionError::NotFinished(operation.name)).to_boxed();
}
let execute_response = if let Some(result) = operation.result {
match result {
bazel_protos::google::longrunning::operation::Result::Error(ref status) => {
return future::err(ExecutionError::Fatal(format_error(status))).to_boxed();
}
bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!(
bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode(&any.value)
.map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))
),
}
} else {
return future::err(ExecutionError::Fatal(
"Operation finished but no response supplied".to_string(),
))
.to_boxed();
};

let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]);
execution_attempts.push(attempts.current_attempt);

let maybe_result = execute_response.result;

let status = execute_response
.status
.unwrap_or_else(|| bazel_protos::google::rpc::Status {
code: bazel_protos::google::rpc::Code::Ok.into(),
message: String::new(),
details: vec![],
});
if status.code == bazel_protos::google::rpc::Code::Ok.into() {
if let Some(result) = maybe_result {
return self
.extract_stdout(&result)
.join(self.extract_stderr(&result))
.join(self.extract_output_files(&result))
.and_then(move |((stdout, stderr), output_directory)| {
Ok(FallibleExecuteProcessResult {
stdout: stdout,
stderr: stderr,
exit_code: result.exit_code,
output_directory: output_directory,
execution_attempts: execution_attempts,
})
})
.to_boxed();
} else {
return futures::future::err(ExecutionError::Fatal(
"No result found on ExecuteResponse".to_owned(),
))
.to_boxed();
}
trace!("Got (nested) execute response: {:?}", execute_response);

if let Some(ref result) = execute_response.result {
if let Some(ref metadata) = result.execution_metadata {
let enqueued = timespec_from(&metadata.queued_timestamp);
let worker_start = timespec_from(&metadata.worker_start_timestamp);
let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp);
let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp);
let execution_start = timespec_from(&metadata.execution_start_timestamp);
let execution_completed = timespec_from(&metadata.execution_completed_timestamp);
let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp);
let output_upload_completed = timespec_from(&metadata.output_upload_completed_timestamp);

match (worker_start - enqueued).to_std() {
Ok(duration) => attempts.current_attempt.remote_queue = Some(duration),
Err(err) => warn!("Got negative remote queue time: {}", err),
}
match (input_fetch_completed - input_fetch_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration),
Err(err) => warn!("Got negative remote input fetch time: {}", err),
}
status
match (execution_completed - execution_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_execution = Some(duration),
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration),
Err(err) => warn!("Got negative remote output store time: {}", err),
}
attempts.current_attempt.was_cache_hit = execute_response.cached_result;
}
OperationOrStatus::Status(status) => status,
};
}

let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]);
execution_attempts.push(attempts.current_attempt);

let maybe_result = execute_response.result;

let status = execute_response
.status
.unwrap_or_else(|| bazel_protos::google::rpc::Status {
code: bazel_protos::google::rpc::Code::Ok.into(),
message: String::new(),
details: vec![],
});
if status.code == bazel_protos::google::rpc::Code::Ok.into() {
if let Some(result) = maybe_result {
return self
.extract_stdout(&result)
.join(self.extract_stderr(&result))
.join(self.extract_output_files(&result))
.and_then(move |((stdout, stderr), output_directory)| {
Ok(FallibleExecuteProcessResult {
stdout: stdout,
stderr: stderr,
exit_code: result.exit_code,
output_directory: output_directory,
execution_attempts: execution_attempts,
})
})
.to_boxed();
} else {
return futures::future::err(ExecutionError::Fatal(
"No result found on ExecuteResponse".to_owned(),
))
.to_boxed();
}
}

match bazel_protos::code_from_i32(status.code) {
bazel_protos::google::rpc::Code::Ok => unreachable!(),
Expand Down Expand Up @@ -2625,10 +2608,9 @@ mod tests {
.build();
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas);
let result = runtime.block_on(command_runner.extract_execute_response(
super::OperationOrStatus::Operation(operation),
&mut ExecutionHistory::default(),
));
let result = runtime.block_on(
command_runner.extract_execute_response(operation, &mut ExecutionHistory::default()),
);

runtime.shutdown_now().wait().unwrap();
result
Expand Down