diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 34950fd4d28a..e0a30e1183d7 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1281,7 +1281,9 @@ dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 0.1.1 (git+https://github.com/pantsbuild/futures-timer?rev=0b747e565309a58537807ab43c674d8951f9e5a0)", "grpcio 0.3.0 (git+https://github.com/pantsbuild/grpc-rs.git?rev=4dfafe9355dc996d7d0702e7386a6fedcd9734c0)", + "h2 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "hashing 0.0.1", + "http 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "mock 0.0.1", "protobuf 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1290,8 +1292,14 @@ dependencies = [ "tempfile 3.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "testutil 0.0.1", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-connect 0.1.0 (git+https://github.com/pantsbuild/tokio-connect?rev=f7ad1ca437973d6e24037ac6f7d5ef1013833c0b)", "tokio-process 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-grpc 0.1.0 (git+https://github.com/pantsbuild/tower-grpc.git?rev=ef19f2e1715f415ecb699e8f17f5845ad2b45daf)", + "tower-h2 0.1.0 (git+https://github.com/pantsbuild/tower-h2?rev=44b0efb4983b769283efd5b2a3bc3decbf7c33de)", + "tower-http 0.1.0 (git+https://github.com/pantsbuild/tower-http?rev=56049ee7f31d4f6c549f5d1d5fbbfd7937df3d00)", + "tower-util 0.1.0 (git+https://github.com/pantsbuild/tower?rev=7b61c1fc1992c1df684fd3f179644ef0ca9bfa4c)", ] [[package]] @@ -1306,6 +1314,7 @@ dependencies = [ "hashing 0.0.1", "process_execution 0.0.1", "resettable 0.0.1", + "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 2bbef270ffb4..9d407b963d22 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -14,6 +14,7 @@ digest = "0.8" fs = { path = "../fs" } futures = "^0.1.16" grpcio = { git = "https://github.com/pantsbuild/grpc-rs.git", rev = "4dfafe9355dc996d7d0702e7386a6fedcd9734c0", default_features = false, features = ["protobuf-codec"] } +h2 = "0.1.13" hashing = { path = "../hashing" } log = "0.4" protobuf = { version = "2.0.4", features = ["with-bytes"] } @@ -23,8 +24,15 @@ tempfile = "3" # TODO: Switch to a release once https://github.com/alexcrichton/futures-timer/pull/11 and https://github.com/alexcrichton/futures-timer/pull/12 merge futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b747e565309a58537807ab43c674d8951f9e5a0" } time = "0.1.40" +http = "0.1" +tokio = "0.1.14" tokio-codec = "0.1" +tokio-connect = { git = "https://github.com/pantsbuild/tokio-connect.git", rev = "f7ad1ca437973d6e24037ac6f7d5ef1013833c0b" } tokio-process = "0.2.1" +tower-h2 = { git = "https://github.com/pantsbuild/tower-h2.git", rev = "44b0efb4983b769283efd5b2a3bc3decbf7c33de" } +tower-http = { git = "https://github.com/pantsbuild/tower-http.git", rev = "56049ee7f31d4f6c549f5d1d5fbbfd7937df3d00" } +tower-grpc = { git = "https://github.com/pantsbuild/tower-grpc.git", rev = "ef19f2e1715f415ecb699e8f17f5845ad2b45daf" } +tower-util = { git = "https://github.com/pantsbuild/tower.git", rev = "7b61c1fc1992c1df684fd3f179644ef0ca9bfa4c" } [dev-dependencies] mock = { path = "../testutil/mock" } diff --git a/src/rust/engine/process_execution/bazel_protos/src/conversions.rs b/src/rust/engine/process_execution/bazel_protos/src/conversions.rs index f017612f3211..4ed75bc2ef43 100644 --- a/src/rust/engine/process_execution/bazel_protos/src/conversions.rs +++ b/src/rust/engine/process_execution/bazel_protos/src/conversions.rs @@ -18,19 +18,31 @@ impl<'a> From<&'a hashing::Digest> for crate::build::bazel::remote::execution::v } } -impl<'a> From<&'a super::remote_execution::Digest> for Result { - fn from(d: &super::remote_execution::Digest) -> Self { +impl<'a> From<&'a crate::remote_execution::Digest> for Result { + fn from(d: &crate::remote_execution::Digest) -> Self { hashing::Fingerprint::from_hex_string(d.get_hash()) .map_err(|err| format!("Bad fingerprint in Digest {:?}: {:?}", d.get_hash(), err)) .map(|fingerprint| hashing::Digest(fingerprint, d.get_size_bytes() as usize)) } } +impl<'a> From<&'a crate::build::bazel::remote::execution::v2::Digest> + for Result +{ + fn from(d: &crate::build::bazel::remote::execution::v2::Digest) -> Self { + hashing::Fingerprint::from_hex_string(&d.hash) + .map_err(|err| format!("Bad fingerprint in Digest {:?}: {:?}", d.hash, err)) + .map(|fingerprint| hashing::Digest(fingerprint, d.size_bytes as usize)) + } +} + impl From for crate::operations::Operation { fn from(op: crate::google::longrunning::Operation) -> Self { let mut dst = Self::new(); dst.set_name(op.name); - dst.set_metadata(prost_any_to_gcprio_any(op.metadata.unwrap())); + if let Some(metadata) = op.metadata { + dst.set_metadata(prost_any_to_gcprio_any(metadata)); + } dst.set_done(op.done); match op.result { Some(crate::google::longrunning::operation::Result::Response(response)) => { @@ -45,6 +57,46 @@ impl From for crate::operations::Operatio } } +// This should only be used in test contexts. It should be deleted when the mock systems use tower. +impl From + for crate::build::bazel::remote::execution::v2::ExecuteRequest +{ + fn from(req: crate::remote_execution::ExecuteRequest) -> Self { + if req.has_execution_policy() || req.has_results_cache_policy() { + panic!("Can't convert ExecuteRequest protos with execution policy or results cache policy"); + } + let digest: Result = req.get_action_digest().into(); + Self { + action_digest: Some((&digest.expect("Bad digest converting ExecuteRequest proto")).into()), + instance_name: req.instance_name, + execution_policy: None, + results_cache_policy: None, + skip_cache_lookup: req.skip_cache_lookup, + } + } +} + +// This should only be used in test contexts. It should be deleted when the mock systems use tower. +impl From + for crate::remote_execution::ExecuteRequest +{ + fn from(req: crate::build::bazel::remote::execution::v2::ExecuteRequest) -> Self { + if req.execution_policy.is_some() || req.results_cache_policy.is_some() { + panic!("Can't convert ExecuteRequest protos with execution policy or results cache policy"); + } + let digest: Result = (&req + .action_digest + .expect("Missing digest converting ExecuteRequest proto")) + .into(); + + let mut ret = Self::new(); + ret.set_action_digest((&digest.expect("Bad digest converting ExecuteRequest proto")).into()); + ret.set_instance_name(req.instance_name); + ret.set_skip_cache_lookup(req.skip_cache_lookup); + ret + } +} + pub fn prost_any_to_gcprio_any(any: prost_types::Any) -> protobuf::well_known_types::Any { let prost_types::Any { type_url, value } = any; let mut dst = protobuf::well_known_types::Any::new(); diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index aad42bf3b2a6..a264a0e25337 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; -use std::mem::drop; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use bazel_protos; @@ -22,6 +21,13 @@ use super::{ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult} use std; use std::cmp::min; +use std::net::SocketAddr; +use std::net::ToSocketAddrs; +use tokio::net::tcp::{ConnectFuture, TcpStream}; +use tower_grpc::Request; +use tower_h2::client; +use tower_util::MakeService; + // Environment variable which is exclusively used for cache key invalidation. // This may be not specified in an ExecuteProcessRequest, and may be populated only by the // CommandRunner. @@ -34,13 +40,31 @@ enum OperationOrStatus { } #[derive(Clone)] +#[allow(clippy::type_complexity)] pub struct CommandRunner { cache_key_gen_version: Option, instance_name: Option, authorization_header: Option, channel: grpcio::Channel, env: Arc, - execution_client: Arc, + execution_client: futures::future::Shared< + BoxFuture< + Arc< + Mutex< + bazel_protos::build::bazel::remote::execution::v2::client::Execution< + tower_http::add_origin::AddOrigin< + tower_h2::client::Connection< + tokio::net::tcp::TcpStream, + tokio::runtime::TaskExecutor, + tower_grpc::BoxBody, + >, + >, + >, + >, + >, + String, + >, + >, operations_client: Arc, store: Store, futures_timer_thread: resettable::Resettable, @@ -72,35 +96,35 @@ impl CommandRunner { // behavior. fn oneshot_execute( &self, - execute_request: &Arc, - ) -> BoxFuture { - let stream = try_future!(self + execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, + ) -> impl Future { + self .execution_client - .execute_opt(&execute_request, self.call_option()) - .map_err(rpcerror_to_string)); - stream - .take(1) - .into_future() - // If there was a response, drop the _stream to disconnect so that the server doesn't keep - // the connection alive and continue sending on it. - .map(|(maybe_operation, stream)| { - drop(stream); - maybe_operation - }) - // If there was an error, drop the _stream to disconnect so that the server doesn't keep the - // connection alive and continue sending on it. - .map_err(|(error, stream)| { - drop(stream); - error - }) - .then(|maybe_operation_result| match maybe_operation_result { - Ok(Some(operation)) => Ok(OperationOrStatus::Operation(operation)), - Ok(None) => { - Err("Didn't get proper stream response from server during remote execution".to_owned()) - } - Err(err) => rpcerror_to_status_or_string(err).map(OperationOrStatus::Status), + .clone() + .map_err(|err| format!("Error getting execution_client: {}", err)) + .and_then(|execution_client| { + let mut execution_client = execution_client.lock().unwrap(); + execution_client + .execute(Request::new(execute_request)) + .map_err(towergrpcerror_to_string) + .and_then(|response_stream| { + response_stream + .into_inner() + .take(1) + .into_future() + .map(|(resp, stream)| { + std::mem::drop(stream); + resp.unwrap() + }) + .map_err(|err| { + format!( + "Error getting response from remote process execution {:?}", + err + ) + }) + .map(|operation| OperationOrStatus::Operation(operation.into())) + }) }) - .to_boxed() } } @@ -145,7 +169,6 @@ impl super::CommandRunner for CommandRunner { let command_runner = self.clone(); let command_runner2 = self.clone(); let command_runner3 = self.clone(); - let execute_request = Arc::new(execute_request); let execute_request2 = execute_request.clone(); let futures_timer_thread = self.futures_timer_thread.clone(); @@ -166,7 +189,7 @@ impl super::CommandRunner for CommandRunner { command ); command_runner - .oneshot_execute(&execute_request) + .oneshot_execute(execute_request) .join(future::ok(history)) }) .and_then(move |(operation, history)| { @@ -212,7 +235,7 @@ impl super::CommandRunner for CommandRunner { let mut history = history; history.current_attempt += summary; command_runner2 - .oneshot_execute(&execute_request) + .oneshot_execute(execute_request) .join(future::ok(history)) }) // Reset `iter_num` on `MissingDigests` @@ -306,6 +329,7 @@ impl CommandRunner { thread_count: usize, store: Store, futures_timer_thread: resettable::Resettable, + executor: tokio::runtime::TaskExecutor, ) -> CommandRunner { let env = Arc::new(grpcio::Environment::new(thread_count)); let channel = { @@ -322,21 +346,49 @@ impl CommandRunner { builder.connect(address) } }; - let execution_client = Arc::new(bazel_protos::remote_execution_grpc::ExecutionClient::new( - channel.clone(), - )); let operations_client = Arc::new(bazel_protos::operations_grpc::OperationsClient::new( channel.clone(), )); + struct Dst(SocketAddr); + + impl tokio_connect::Connect for Dst { + type Connected = TcpStream; + type Error = ::std::io::Error; + type Future = ConnectFuture; + + fn connect(&self) -> Self::Future { + TcpStream::connect(&self.0) + } + } + + // TODO: Less unwrapping. + // TODO: Support https + let uri: http::Uri = format!("http://{}", address).parse().unwrap(); + let socket_addr = address.to_socket_addrs().unwrap().next().unwrap(); + let execution_client = + client::Connect::new(Dst(socket_addr), h2::client::Builder::default(), executor) + .make_service(()) + .map(move |conn| { + let conn = tower_http::add_origin::Builder::new() + .uri(uri) + .build(conn) + .unwrap(); + let conn = + bazel_protos::build::bazel::remote::execution::v2::client::Execution::new(conn); + Arc::new(Mutex::new(conn)) + }) + .map_err(|err| format!("Error connecting to remote execution server: {}", err)) + .to_boxed() + .shared(); CommandRunner { cache_key_gen_version, instance_name, authorization_header: oauth_bearer_token.map(|t| format!("Bearer {}", t)), channel, env, - execution_client, operations_client, + execution_client, store, futures_timer_thread, } @@ -737,7 +789,7 @@ fn make_execute_request( ( bazel_protos::remote_execution::Action, bazel_protos::remote_execution::Command, - bazel_protos::remote_execution::ExecuteRequest, + bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, ), String, > { @@ -806,11 +858,13 @@ fn make_execute_request( action.set_command_digest((&digest(&command)?).into()); action.set_input_root_digest((&req.input_files).into()); - let mut execute_request = bazel_protos::remote_execution::ExecuteRequest::new(); - if let Some(instance_name) = instance_name { - execute_request.set_instance_name(instance_name.clone()); - } - execute_request.set_action_digest((&digest(&action)?).into()); + let execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { + action_digest: Some((&digest(&action)?).into()), + skip_cache_lookup: false, + instance_name: instance_name.clone().unwrap_or_default(), + execution_policy: None, + results_cache_policy: None, + }; Ok((action, command, execute_request)) } @@ -845,29 +899,6 @@ fn rpcerror_recover_cancelled( Err(err) } -fn rpcerror_to_status_or_string( - error: grpcio::Error, -) -> Result { - match error { - grpcio::Error::RpcFailure(grpcio::RpcStatus { - status_proto_bytes: Some(status_proto_bytes), - .. - }) => { - let mut status_proto = bazel_protos::status::Status::new(); - status_proto.merge_from_bytes(&status_proto_bytes).unwrap(); - Ok(status_proto) - } - grpcio::Error::RpcFailure(grpcio::RpcStatus { - status, details, .. - }) => Err(format!( - "{:?}: {:?}", - status, - details.unwrap_or_else(|| "[no message]".to_string()) - )), - err => Err(format!("{:?}", err)), - } -} - fn rpcerror_to_string(error: grpcio::Error) -> String { match error { grpcio::Error::RpcFailure(status) => format!( @@ -879,6 +910,20 @@ fn rpcerror_to_string(error: grpcio::Error) -> String { } } +fn towergrpcerror_to_string(error: tower_grpc::Error) -> String { + match error { + tower_grpc::Error::Grpc(status) => { + let error_message = if status.error_message() == "" { + "[no message]" + } else { + &status.error_message() + }; + format!("{:?}: {}", status.code(), error_message) + } + tower_grpc::Error::Inner(v) => format!("{:?}", v), + } +} + fn digest(message: &dyn Message) -> Result { let bytes = message.write_to_bytes().map_err(|e| format!("{:?}", e))?; @@ -989,17 +1034,22 @@ mod tests { ); want_action.set_input_root_digest((&input_directory.digest()).into()); - let mut want_execute_request = bazel_protos::remote_execution::ExecuteRequest::new(); - want_execute_request.set_action_digest( - (&Digest( - Fingerprint::from_hex_string( - "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", - ) - .unwrap(), - 140, - )) - .into(), - ); + let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { + action_digest: Some( + (&Digest( + Fingerprint::from_hex_string( + "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", + ) + .unwrap(), + 140, + )) + .into(), + ), + instance_name: String::new(), + execution_policy: None, + results_cache_policy: None, + skip_cache_lookup: false, + }; assert_eq!( super::make_execute_request(&req, &None, &None), @@ -1075,6 +1125,23 @@ mod tests { .into(), ); + let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { + action_digest: Some( + (&Digest( + Fingerprint::from_hex_string( + "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", + ) + .unwrap(), + 140, + )) + .into(), + ), + instance_name: "dark-tower".to_owned(), + execution_policy: None, + results_cache_policy: None, + skip_cache_lookup: false, + }; + assert_eq!( super::make_execute_request(&req, &Some("dark-tower".to_owned()), &None), Ok((want_action, want_command, want_execute_request)) @@ -1142,17 +1209,22 @@ mod tests { ); want_action.set_input_root_digest((&input_directory.digest()).into()); - let mut want_execute_request = bazel_protos::remote_execution::ExecuteRequest::new(); - want_execute_request.set_action_digest( - (&Digest( - Fingerprint::from_hex_string( - "0ee5d4c8ac12513a87c8d949c6883ac533a264d30215126af71a9028c4ab6edf", - ) - .unwrap(), - 140, - )) - .into(), - ); + let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { + action_digest: Some( + (&Digest( + Fingerprint::from_hex_string( + "0ee5d4c8ac12513a87c8d949c6883ac533a264d30215126af71a9028c4ab6edf", + ) + .unwrap(), + 140, + )) + .into(), + ), + instance_name: String::new(), + execution_policy: None, + results_cache_policy: None, + skip_cache_lookup: false, + }; assert_eq!( super::make_execute_request(&req, &None, &Some("meep".to_owned())), @@ -1197,17 +1269,22 @@ mod tests { ); want_action.set_input_root_digest((&input_directory.digest()).into()); - let mut want_execute_request = bazel_protos::remote_execution::ExecuteRequest::new(); - want_execute_request.set_action_digest( - (&Digest( - Fingerprint::from_hex_string( - "b1fb7179ce496995a4e3636544ec000dca1b951f1f6216493f6c7608dc4dd910", - ) - .unwrap(), - 140, - )) - .into(), - ); + let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { + action_digest: Some( + (&Digest( + Fingerprint::from_hex_string( + "b1fb7179ce496995a4e3636544ec000dca1b951f1f6216493f6c7608dc4dd910", + ) + .unwrap(), + 140, + )) + .into(), + ), + instance_name: String::new(), + execution_policy: None, + results_cache_policy: None, + skip_cache_lookup: false, + }; assert_eq!( super::make_execute_request(&req, &None, &None), @@ -1245,7 +1322,7 @@ mod tests { let error = run_command_remote(mock_server.address(), execute_request).expect_err("Want Err"); assert_eq!( error, - "InvalidArgument: \"Did not expect this request\"".to_string() + "InvalidArgument: Did not expect this request".to_string() ); } @@ -1388,6 +1465,8 @@ mod tests { ) .expect("Failed to make store"); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let cmd_runner = CommandRunner::new( &mock_server.address(), None, @@ -1397,8 +1476,10 @@ mod tests { 1, store, timer_thread, + rt.executor(), ); - let result = cmd_runner.run(echo_roland_request()).wait().unwrap(); + let result = rt.block_on(cmd_runner.run(echo_roland_request())).unwrap(); + rt.shutdown_now().wait().unwrap(); assert_eq!( result.without_execution_attempts(), FallibleExecuteProcessResult { @@ -1764,21 +1845,25 @@ mod tests { .wait() .expect("Saving directory bytes to store"); - let result = CommandRunner::new( - &mock_server.address(), - None, - None, - None, - None, - 1, - store, - timer_thread, - ) - .run(cat_roland_request()) - .wait() - .unwrap(); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + + let result = rt.block_on( + CommandRunner::new( + &mock_server.address(), + None, + None, + None, + None, + 1, + store, + timer_thread, + rt.executor(), + ) + .run(cat_roland_request()), + ); + rt.shutdown_now().wait().unwrap(); assert_eq!( - result.without_execution_attempts(), + result.unwrap().without_execution_attempts(), FallibleExecuteProcessResult { stdout: roland.bytes(), stderr: Bytes::from(""), @@ -1860,18 +1945,21 @@ mod tests { .wait() .expect("Saving file bytes to store"); - let result = CommandRunner::new( - &mock_server.address(), - None, - None, - None, - None, - 1, - store, - timer_thread, - ) - .run(cat_roland_request()) - .wait(); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on( + CommandRunner::new( + &mock_server.address(), + None, + None, + None, + None, + 1, + store, + timer_thread, + rt.executor(), + ) + .run(cat_roland_request()), + ); assert_eq!( result, Ok(FallibleExecuteProcessResult { @@ -1928,19 +2016,23 @@ mod tests { ) .expect("Failed to make store"); - let error = CommandRunner::new( - &mock_server.address(), - None, - None, - None, - None, - 1, - store, - timer_thread, - ) - .run(cat_roland_request()) - .wait() - .expect_err("Want error"); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on( + CommandRunner::new( + &mock_server.address(), + None, + None, + None, + None, + 1, + store, + timer_thread, + rt.executor(), + ) + .run(cat_roland_request()), + ); + rt.shutdown_now().wait().unwrap(); + let error = result.expect_err("Want error"); assert_contains(&error, &format!("{}", missing_digest.0)); } @@ -2512,11 +2604,18 @@ mod tests { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let command_runner = create_command_runner(address, &cas); - command_runner.run(request).wait() + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let command_runner = create_command_runner(address, &cas, &runtime); + let result = runtime.block_on(command_runner.run(request)); + runtime.shutdown_now().wait().unwrap(); + result } - fn create_command_runner(address: String, cas: &mock::StubCAS) -> CommandRunner { + fn create_command_runner( + address: String, + cas: &mock::StubCAS, + runtime: &tokio::runtime::Runtime, + ) -> CommandRunner { let store_dir = TempDir::new().unwrap(); let timer_thread = timer_thread(); let store = fs::Store::with_remote( @@ -2535,7 +2634,17 @@ mod tests { ) .expect("Failed to make store"); - CommandRunner::new(&address, None, None, None, None, 1, store, timer_thread) + CommandRunner::new( + &address, + None, + None, + None, + None, + 1, + store, + timer_thread, + runtime.executor(), + ) } fn timer_thread() -> resettable::Resettable { @@ -2549,13 +2658,15 @@ mod tests { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let command_runner = create_command_runner("".to_owned(), &cas); - command_runner - .extract_execute_response( - super::OperationOrStatus::Operation(operation), - &mut ExecutionHistory::default(), - ) - .wait() + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas, &runtime); + let result = runtime.block_on(command_runner.extract_execute_response( + super::OperationOrStatus::Operation(operation), + &mut ExecutionHistory::default(), + )); + + runtime.shutdown_now().wait().unwrap(); + result } fn extract_output_files_from_response( @@ -2565,10 +2676,12 @@ mod tests { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let command_runner = create_command_runner("".to_owned(), &cas); - command_runner - .extract_output_files(&execute_response) - .wait() + + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas, &runtime); + let result = runtime.block_on(command_runner.extract_output_files(&execute_response)); + runtime.shutdown_now().wait().unwrap(); + result } fn make_any_proto(message: &dyn Message) -> protobuf::well_known_types::Any { diff --git a/src/rust/engine/process_executor/Cargo.toml b/src/rust/engine/process_executor/Cargo.toml index d4c45ad05ecf..87453d8b7972 100644 --- a/src/rust/engine/process_executor/Cargo.toml +++ b/src/rust/engine/process_executor/Cargo.toml @@ -15,3 +15,4 @@ hashing = { path = "../hashing" } futures = "^0.1.16" process_execution = { path = "../process_execution" } resettable = { path = "../resettable" } +tokio = "0.1.14" diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index a1be0b0700c8..5f29161381fa 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -194,6 +194,9 @@ fn main() { let timer_thread = resettable::Resettable::new(|| futures_timer::HelperThread::new().unwrap()); let server_arg = args.value_of("server"); let remote_instance_arg = args.value_of("remote-instance-name").map(str::to_owned); + + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let store = match (server_arg, args.value_of("cas-server")) { (Some(_server), Some(cas_server)) => { let chunk_size = @@ -269,25 +272,32 @@ fn main() { None }; + let executor = rt.executor().clone(); + let cache_key_gen_version = args.value_of("cache-key-gen-version").map(str::to_owned); + let timer_thread = timer_thread.clone(); Box::new(process_execution::remote::CommandRunner::new( address, - args.value_of("cache-key-gen-version").map(str::to_owned), + cache_key_gen_version, remote_instance_arg, root_ca_certs, oauth_bearer_token, 1, store, timer_thread, - )) + executor, + )) as Box } None => Box::new(process_execution::local::CommandRunner::new( store, pool, work_dir, true, - )), + )) as Box, }; - - let result = runner.run(request).wait().expect("Error executing"); + let result = rt.block_on(runner.run(request)).unwrap(); print!("{}", String::from_utf8(result.stdout.to_vec()).unwrap()); eprint!("{}", String::from_utf8(result.stderr.to_vec()).unwrap()); + + rt.shutdown_now().wait().unwrap(); + std::mem::drop(timer_thread); + exit(result.exit_code); } diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 024f9845c3ef..fe2b7e50fff1 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -101,6 +101,7 @@ impl Core { let fs_pool2 = fs_pool.clone(); let futures_timer_thread = Resettable::new(|| futures_timer::HelperThread::new().unwrap()); let futures_timer_thread2 = futures_timer_thread.clone(); + let runtime2 = runtime.clone(); let store_and_command_runner_and_http_client = Resettable::new(move || { let local_store_dir = local_store_dir.clone(); let store = safe_create_dir_all_ioerror(&local_store_dir) @@ -140,13 +141,14 @@ impl Core { process_execution_parallelism + 2, store.clone(), futures_timer_thread2.clone(), - )), + runtime2.with(|runtime| runtime.executor()), + )) as Box, None => Box::new(process_execution::local::CommandRunner::new( store.clone(), fs_pool2.clone(), work_dir.clone(), process_execution_cleanup_local_dirs, - )), + )) as Box, }; let command_runner = diff --git a/src/rust/engine/testutil/mock/src/execution_server.rs b/src/rust/engine/testutil/mock/src/execution_server.rs index 38a9ccd8783a..07b49af9e472 100644 --- a/src/rust/engine/testutil/mock/src/execution_server.rs +++ b/src/rust/engine/testutil/mock/src/execution_server.rs @@ -53,12 +53,12 @@ impl MockExecution { /// pub fn new( name: String, - execute_request: bazel_protos::remote_execution::ExecuteRequest, + execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, operation_responses: Vec, ) -> MockExecution { MockExecution { name: name, - execute_request: execute_request, + execute_request: execute_request.into(), operation_responses: Arc::new(Mutex::new(VecDeque::from(operation_responses))), } }