diff --git a/remote_execution/oss/re_grpc/Cargo.toml b/remote_execution/oss/re_grpc/Cargo.toml index 1e71767600d32..1f26459ebe75b 100644 --- a/remote_execution/oss/re_grpc/Cargo.toml +++ b/remote_execution/oss/re_grpc/Cargo.toml @@ -9,6 +9,7 @@ version = "0.1.0" [dependencies] anyhow = { workspace = true } dupe = { workspace = true } +dashmap = { workspace = true } futures = { workspace = true } gazebo = { workspace = true } http = { workspace = true } diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index 3c5b8a3b941cc..0db7eda307418 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -19,6 +19,7 @@ use std::time::Instant; use anyhow::Context; use buck2_re_configuration::Buck2OssReConfiguration; use buck2_re_configuration::HttpHeader; +use dashmap::DashMap; use dupe::Dupe; use futures::future::BoxFuture; use futures::future::Future; @@ -478,6 +479,13 @@ impl FindMissingCache { } } +#[derive(Clone)] +enum OngoingUploadStatus { + Active(tokio::sync::watch::Receiver>), + Done, + Error, +} + pub struct REClient { runtime_opts: RERuntimeOpts, grpc_clients: GRPCClients, @@ -485,6 +493,7 @@ pub struct REClient { instance_name: InstanceName, // buck2 calls find_missing for same blobs find_missing_cache: Mutex, + prev_uploads: DashMap, } impl Drop for REClient { @@ -559,6 +568,7 @@ impl REClient { ttl: Duration::from_secs(12 * 60 * 60), // 12 hours TODO: Tune this parameter last_check: Instant::now(), }), + prev_uploads: DashMap::new(), } } @@ -726,6 +736,7 @@ impl REClient { request, self.capabilities.max_total_batch_size, self.runtime_opts.max_concurrent_uploads_per_action, + &self.prev_uploads, |re_request| async { let metadata = metadata.clone(); let mut cas_client = self.grpc_clients.cas_client.clone(); @@ -1186,6 +1197,7 @@ async fn upload_impl( request: UploadRequest, max_total_batch_size: usize, max_concurrent_uploads: Option, + prev_uploads: &DashMap, cas_f: impl Fn(BatchUpdateBlobsRequest) -> Cas + Sync + Send + Copy, bystream_fut: impl Fn(Vec) -> Byt + Sync + Send + Copy, ) -> anyhow::Result @@ -1246,10 +1258,9 @@ where // Create futures for any files that needs uploading. for file in request.files_with_digest.unwrap_or_default() { - let hash = file.digest.hash.clone(); - let size = file.digest.size_in_bytes; + let digest = file.digest.clone(); let name = file.name.clone(); - if size < max_total_batch_size as i64 { + if digest.size_in_bytes < max_total_batch_size as i64 { batched_blob_updates.push(BatchUploadRequest::File(file)); continue; } @@ -1258,45 +1269,96 @@ where "{}uploads/{}/blobs/{}/{}", instance_name.as_resource_prefix(), client_uuid, - hash.clone(), - size + file.digest.hash, + file.digest.size_in_bytes ); + + enum UploadStatus { + New(tokio::sync::watch::Sender>), + Ongoing(OngoingUploadStatus), + } + + let upload_status = match prev_uploads.entry(digest.clone()) { + dashmap::mapref::entry::Entry::Occupied(o) => UploadStatus::Ongoing(o.get().clone()), + dashmap::mapref::entry::Entry::Vacant(v) => { + let (tx, rx) = tokio::sync::watch::channel(Err(())); + v.insert(OngoingUploadStatus::Active(rx)); + UploadStatus::New(tx) + } + }; let fut = async move { - let mut file = tokio::fs::File::open(&name) - .await - .with_context(|| format!("Opening `{name}` for reading failed"))?; - let mut data = vec![0; max_total_batch_size]; - - let mut write_offset = 0; - let mut upload_segments = Vec::new(); - loop { - let length = file - .read(&mut data) - .await - .with_context(|| format!("Error reading from {name}"))?; - if length == 0 { - break; + match upload_status { + UploadStatus::Ongoing(OngoingUploadStatus::Active(mut rx)) => { + // Another task was already uploading this artifact, wait for it complete and report result. + rx.changed().await?; + rx.borrow_and_update().as_ref().map_err(|_e| { + anyhow::anyhow!("Upload queued for previous action failed.") + })?; } - upload_segments.push(WriteRequest { - resource_name: resource_name.to_owned(), - write_offset, - finish_write: false, - data: data[..length].to_owned(), - }); - write_offset += length as i64; - } - upload_segments - .last_mut() - .with_context(|| format!("Read no segments from `{name} "))? - .finish_write = true; + UploadStatus::Ongoing(OngoingUploadStatus::Done) => { + // Another task has already completed the upload of this artifact, no need to do any work. + } + UploadStatus::Ongoing(OngoingUploadStatus::Error) => { + // Another task tried to perform the transmission, but failed. + anyhow::bail!("Upload queued for previous action failed.") + } + UploadStatus::New(tx) => { + let mut file = tokio::fs::File::open(&name) + .await + .with_context(|| format!("Opening `{name}` for reading failed"))?; + let mut data = vec![0; max_total_batch_size]; + + let mut write_offset = 0; + let mut upload_segments = Vec::new(); + loop { + let length = file + .read(&mut data) + .await + .with_context(|| format!("Error reading from {name}"))?; + if length == 0 { + break; + } + upload_segments.push(WriteRequest { + resource_name: resource_name.to_owned(), + write_offset, + finish_write: false, + data: data[..length].to_owned(), + }); + write_offset += length as i64; + } + upload_segments + .last_mut() + .with_context(|| format!("Read no segments from `{name} "))? + .finish_write = true; + + let upload_ret = bystream_fut(upload_segments) + .await + .and_then(|resp| { + if resp.committed_size != digest.size_in_bytes { + Err(anyhow::anyhow!( + "Failed to upload `{name}`: invalid committed_size from WriteResponse" + )) + } + else { + Ok(()) + } + }); - let resp = bystream_fut(upload_segments).await?; - if resp.committed_size != size { - return Err(anyhow::anyhow!( - "Failed to upload `{name}`: invalid committed_size from WriteResponse" - )); + // Mark artifact as uploaded and notify other potentially waiting tasks. + if upload_ret.is_ok() { + prev_uploads.alter(&digest, |_, _| OngoingUploadStatus::Done); + let _ = tx.send(upload_ret.as_ref().map_err(|_| ()).cloned()); + } else { + prev_uploads.alter(&digest, |_, _| OngoingUploadStatus::Error); + let _ = tx.send(Err(())); + } + + // Only propage errors _after_ notifying other waiting tasks that this task is complete. + upload_ret?; + } } - Ok(vec![hash]) + + Ok(vec![digest.hash]) }; upload_futures.push(Box::pin(fut)); } @@ -2071,6 +2133,7 @@ mod tests { req, 10000, None, + &DashMap::new(), |req| { let res = res.clone(); let digest1 = digest1.clone(); @@ -2154,6 +2217,7 @@ mod tests { req, 10, // kept small to simulate a large file upload None, + &DashMap::new(), |req| { let res = res.clone(); let digest1 = digest1.clone(); @@ -2228,6 +2292,7 @@ mod tests { req, 10, // kept small to simulate a large inlined upload None, + &DashMap::new(), |req| { let res = res.clone(); let digest1 = digest1.clone(); @@ -2289,6 +2354,7 @@ mod tests { req, 10, None, + &DashMap::new(), |_req| async move { panic!("This should not be called as there are no blobs to upload in batch"); }, @@ -2350,6 +2416,7 @@ mod tests { req, 3, None, + &DashMap::new(), |_req| async move { panic!("Not called"); }, @@ -2391,6 +2458,7 @@ mod tests { req, 0, None, + &DashMap::new(), |_req| async move { panic!("Not called"); }, @@ -2437,6 +2505,7 @@ mod tests { req, 1, None, + &DashMap::new(), |_req| async move { panic!("Not called"); },