From 96ba1319252549722e7d6fd04d120e8934321efb Mon Sep 17 00:00:00 2001 From: seanses Date: Tue, 26 Nov 2024 16:23:40 -0800 Subject: [PATCH 01/12] parallelize xorb upload --- Cargo.lock | 13 +++ data/Cargo.toml | 2 + data/src/clean.rs | 25 ++-- data/src/constants.rs | 9 +- data/src/data_processing.rs | 125 +++++--------------- data/src/lib.rs | 1 + data/src/parallel_xorb_uploader.rs | 179 +++++++++++++++++++++++++++++ data/src/remote_shard_interface.rs | 4 +- hf_xet/Cargo.lock | 13 +++ 9 files changed, 255 insertions(+), 116 deletions(-) create mode 100644 data/src/parallel_xorb_uploader.rs diff --git a/Cargo.lock b/Cargo.lock index 948d8935..60e577bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -874,6 +874,7 @@ dependencies = [ "dirs", "error_printer", "file_utils", + "futures", "gearhash", "glob", "hashers", @@ -895,6 +896,7 @@ dependencies = [ "static_assertions", "tempfile", "tokio", + "tokio-stream", "toml 0.5.11", "tracing", "utils", @@ -3425,6 +3427,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.12" diff --git a/data/Cargo.toml b/data/Cargo.toml index 28737827..b4136f35 100644 --- a/data/Cargo.toml +++ b/data/Cargo.toml @@ -23,6 +23,8 @@ file_utils = { path = "../file_utils" } error_printer = { path = "../error_printer" } xet_error = { path = "../xet_error" } tokio = { version = "1.36", features = ["full"] } +tokio-stream = "0.1.16" +futures = "0.3.31" anyhow = "1" tracing = "0.1.*" async-trait = "0.1.53" diff --git a/data/src/clean.rs b/data/src/clean.rs index b9bdc9e0..bb5339cf 100644 --- a/data/src/clean.rs +++ b/data/src/clean.rs @@ -24,14 +24,14 @@ use tokio::task::{JoinHandle, JoinSet}; use tracing::{debug, error, info, warn}; use utils::ThreadPool; -use crate::cas_interface::Client; use crate::chunking::{chunk_target_default, ChunkYieldType}; use crate::configurations::FileQueryPolicy; use crate::constants::MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES; -use crate::data_processing::{register_new_cas_block, CASDataAggregator}; +use crate::data_processing::CASDataAggregator; use crate::errors::DataProcessingError::*; use crate::errors::Result; use crate::metrics::FILTER_BYTES_CLEANED; +use crate::parallel_xorb_uploader::ParallelXorbUploader; use crate::remote_shard_interface::RemoteShardInterface; use crate::repo_salt::RepoSalt; use crate::small_file_determination::{is_file_passthrough, is_possible_start_to_text_file}; @@ -114,7 +114,7 @@ pub struct Cleaner { // Utils shard_manager: Arc, remote_shards: Arc, - cas: Arc, + xorb_uploader: Arc, // External Data global_cas_data: Arc>, @@ -141,14 +141,14 @@ pub struct Cleaner { impl Cleaner { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub(crate) async fn new( small_file_threshold: usize, enable_global_dedup_queries: bool, cas_prefix: String, repo_salt: Option, shard_manager: Arc, remote_shards: Arc, - cas: Arc, + xorb_uploader: Arc, cas_data: Arc>, buffer_size: usize, file_name: Option<&Path>, @@ -167,7 +167,7 @@ impl Cleaner { repo_salt, shard_manager, remote_shards, - cas, + xorb_uploader, global_cas_data: cas_data, chunk_data_queue: data_p, chunking_worker: Mutex::new(Some(chunker)), @@ -536,13 +536,8 @@ impl Cleaner { self.metrics.new_bytes_after_dedup.fetch_add(n_bytes as u64, Ordering::Relaxed); if tracking_info.cas_data.data.len() > TARGET_CAS_BLOCK_SIZE { - let cas_hash = register_new_cas_block( - &mut tracking_info.cas_data, - &self.shard_manager, - &self.cas, - &self.cas_prefix, - ) - .await?; + let cas_data = take(&mut tracking_info.cas_data); + let cas_hash = self.xorb_uploader.register_new_cas_block(cas_data).await?; for i in take(&mut tracking_info.current_cas_file_info_indices) { tracking_info.file_info[i].cas_hash = cas_hash; @@ -661,9 +656,9 @@ impl Cleaner { .push((new_file_info, tracking_info.current_cas_file_info_indices.clone())); if cas_data_accumulator.data.len() >= TARGET_CAS_BLOCK_SIZE { - let mut new_cas_data = take(cas_data_accumulator.deref_mut()); + let new_cas_data = take(cas_data_accumulator.deref_mut()); drop(cas_data_accumulator); // Release the lock. - register_new_cas_block(&mut new_cas_data, &self.shard_manager, &self.cas, &self.cas_prefix).await?; + self.xorb_uploader.register_new_cas_block(new_cas_data).await?; } else { drop(cas_data_accumulator); } diff --git a/data/src/constants.rs b/data/src/constants.rs index 08a35272..7b2f87a5 100644 --- a/data/src/constants.rs +++ b/data/src/constants.rs @@ -1,13 +1,12 @@ use lazy_static::lazy_static; lazy_static! { - // The xet library version. + /// The xet library version. pub static ref XET_VERSION: String = std::env::var("XET_VERSION").unwrap_or_else(|_| CURRENT_VERSION.to_string()); - /// The maximum number of simultaneous download streams - pub static ref MAX_CONCURRENT_DOWNLOADS: usize = std::env::var("XET_CONCURRENT_DOWNLOADS").ok().and_then(|s| s.parse().ok()).unwrap_or(8); - /// The maximum number of simultaneous upload streams - pub static ref MAX_CONCURRENT_UPLOADS: usize = std::env::var("XET_CONCURRENT_UPLOADS").ok().and_then(|s| s.parse().ok()).unwrap_or(8); + /// The maximum number of simultaneous xorb upload streams. + /// The default value is 8 and can be overwritten by environment variable "XET_CONCURRENT_XORB_UPLOADS". + pub static ref MAX_CONCURRENT_XORB_UPLOADS: usize = std::env::var("XET_CONCURRENT_XORB_UPLOADS").ok().and_then(|s| s.parse().ok()).unwrap_or(8); } /// The maximum git filter protocol packet size diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index 8b3e4362..f2340125 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -6,10 +6,8 @@ use std::sync::Arc; use cas_client::Client; use cas_types::FileRange; -use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}; use mdb_shard::file_structs::MDBFileInfo; use mdb_shard::ShardFileManager; -use merkledb::aggregate_hashes::cas_node_hash; use merklehash::MerkleHash; use tokio::sync::Mutex; use utils::ThreadPool; @@ -17,13 +15,15 @@ use utils::ThreadPool; use crate::cas_interface::create_cas_client; use crate::clean::Cleaner; use crate::configurations::*; +use crate::constants::MAX_CONCURRENT_XORB_UPLOADS; use crate::errors::*; +use crate::parallel_xorb_uploader::ParallelXorbUploader; use crate::remote_shard_interface::RemoteShardInterface; use crate::shard_interface::create_shard_manager; use crate::PointerFile; #[derive(Default, Debug)] -pub struct CASDataAggregator { +pub(crate) struct CASDataAggregator { /// Bytes of all chunks accumulated in one CAS block concatenated together. pub data: Vec, /// Metadata of all chunks accumulated in one CAS block. Each entry is @@ -59,6 +59,7 @@ pub struct PointerFileTranslator { shard_manager: Arc, remote_shards: Arc, cas: Arc, + xorb_uploader: Mutex>>, /* ----- Deduped data shared across files ----- */ global_cas_data: Arc>, @@ -105,6 +106,7 @@ impl PointerFileTranslator { shard_manager, remote_shards, cas: cas_client, + xorb_uploader: Mutex::new(None), global_cas_data: Default::default(), threadpool, }) @@ -124,6 +126,22 @@ impl PointerFileTranslator { return Err(DataProcessingError::DedupConfigError("empty dedup config".to_owned())); }; + let mut xorb_uploader = self.xorb_uploader.lock().await; + let uploader = match xorb_uploader.take() { + Some(uploader) => uploader, + None => { + ParallelXorbUploader::new( + &self.config.cas_storage_config.prefix, + self.shard_manager.clone(), + self.cas.clone(), + self.threadpool.clone(), + *MAX_CONCURRENT_XORB_UPLOADS * 2, // set buffer size to double the concurrent uploads should be enough + ) + .await + }, + }; + *xorb_uploader = Some(uploader.clone()); + Cleaner::new( dedup.small_file_threshold, matches!(dedup.global_dedup_policy, GlobalDedupPolicy::Always), @@ -131,7 +149,7 @@ impl PointerFileTranslator { dedup.repo_salt, self.shard_manager.clone(), self.remote_shards.clone(), - self.cas.clone(), + uploader, self.global_cas_data.clone(), buffer_size, file_name, @@ -143,46 +161,31 @@ impl PointerFileTranslator { pub async fn finalize_cleaning(&self) -> Result<()> { // flush accumulated CAS data. let mut cas_data_accumulator = self.global_cas_data.lock().await; - let mut new_cas_data = take(cas_data_accumulator.deref_mut()); + let new_cas_data = take(cas_data_accumulator.deref_mut()); drop(cas_data_accumulator); // Release the lock. + let Some(ref xorb_uploader) = *self.xorb_uploader.lock().await else { + return Err(DataProcessingError::InternalError("no active xorb upload task".to_owned())); + }; + if !new_cas_data.is_empty() { - register_new_cas_block( - &mut new_cas_data, - &self.shard_manager, - &self.cas, - &self.config.cas_storage_config.prefix, - ) - .await?; + xorb_uploader.register_new_cas_block(new_cas_data).await?; } - debug_assert!(new_cas_data.is_empty()); + xorb_uploader.flush().await?; // flush accumulated memory shard. self.shard_manager.flush().await?; - self.upload().await?; + self.upload_shards().await?; Ok(()) } - async fn upload(&self) -> Result<()> { + async fn upload_shards(&self) -> Result<()> { // First, get all the shards prepared and load them. let merged_shards_jh = self.remote_shards.merge_shards()?; - // Make sure that all the uploads and everything are in a good state before proceeding with - // anything changing the remote repository. - // - // Waiting until the CAS uploads finish avoids the following scenario: - // 1. user 1 commit file A and push, but network drops after - // sync_notes_to_remote before uploading cas finishes. - // 2. user 2 tries to git add the same file A, which on filter pulls in - // the new notes, and file A is 100% deduped so no CAS blocks will be created, - // and push. - // - // This results in a bad repo state. - self.upload_cas().await?; - // Get a list of all the merged shards in order to upload them. let merged_shards = merged_shards_jh.await??; @@ -195,72 +198,6 @@ impl PointerFileTranslator { Ok(()) } - - async fn upload_cas(&self) -> Result<()> { - // We don't have staging client support yet. - Ok(()) - } -} - -/// Clean operation helpers -pub(crate) async fn register_new_cas_block( - cas_data: &mut CASDataAggregator, - shard_manager: &Arc, - cas: &Arc, - cas_prefix: &str, -) -> Result { - let cas_hash = cas_node_hash(&cas_data.chunks[..]); - - let raw_bytes_len = cas_data.data.len(); - - let metadata = CASChunkSequenceHeader::new(cas_hash, cas_data.chunks.len(), raw_bytes_len); - - let mut pos = 0; - let chunks: Vec<_> = cas_data - .chunks - .iter() - .map(|(h, len)| { - let result = CASChunkSequenceEntry::new(*h, *len, pos); - pos += *len; - result - }) - .collect(); - let cas_info = MDBCASInfo { metadata, chunks }; - - pos = 0; - let chunk_boundaries = cas_data - .chunks - .iter() - .map(|(hash, len)| { - pos += *len; - (*hash, pos as u32) - }) - .collect(); - - if !cas_info.chunks.is_empty() { - cas.put(cas_prefix, &cas_hash, take(&mut cas_data.data), chunk_boundaries) - .await?; - - shard_manager.add_cas_block(cas_info).await?; - } else { - debug_assert_eq!(cas_hash, MerkleHash::default()); - } - - // Now register any new files as needed. - for (mut fi, chunk_hash_indices) in take(&mut cas_data.pending_file_info) { - for i in chunk_hash_indices { - debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default()); - fi.segments[i].cas_hash = cas_hash; - } - - shard_manager.add_file_reconstruction_info(fi).await?; - } - - cas_data.data.clear(); - cas_data.chunks.clear(); - cas_data.pending_file_info.clear(); - - Ok(cas_hash) } /// Smudge operations diff --git a/data/src/lib.rs b/data/src/lib.rs index 53bb754e..400c6f27 100644 --- a/data/src/lib.rs +++ b/data/src/lib.rs @@ -9,6 +9,7 @@ pub mod data_client; mod data_processing; pub mod errors; mod metrics; +mod parallel_xorb_uploader; mod pointer_file; mod remote_shard_interface; mod repo_salt; diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs new file mode 100644 index 00000000..3051499b --- /dev/null +++ b/data/src/parallel_xorb_uploader.rs @@ -0,0 +1,179 @@ +use std::sync::Arc; + +use cas_client::Client; +use futures::StreamExt; +use mdb_shard::{ + cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}, + ShardFileManager, +}; +use merkledb::aggregate_hashes::cas_node_hash; +use merklehash::MerkleHash; +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + oneshot, Mutex, +}; +use tokio::task::JoinHandle; +use utils::ThreadPool; + +use crate::constants::MAX_CONCURRENT_XORB_UPLOADS; +use crate::data_processing::CASDataAggregator; +use crate::errors::{DataProcessingError::*, *}; + +pub enum QueueItem { + Value(T), + Flush(S), +} +type XorbUploadValueType = (MerkleHash, Vec, Vec<(MerkleHash, usize)>); +type XorbUploadSignalType = oneshot::Sender<()>; +type XorbUploadInputType = QueueItem; +type XorbUploadOutputType = QueueItem<(), XorbUploadSignalType>; + +pub(crate) struct ParallelXorbUploader { + // Configurations + cas_prefix: String, + + // Utils + shard_manager: Arc, + cas: Arc, + + // Internal worker + xorb_data_queue: Mutex>, + upload_worker: Mutex>>, + + // Theadpool + threadpool: Arc, +} + +impl ParallelXorbUploader { + pub async fn new( + cas_prefix: &str, + shard_manager: Arc, + cas: Arc, + threadpool: Arc, + buffer_size: usize, + ) -> Arc { + let (xorb_data_p, xorb_data_c) = mpsc::channel::(buffer_size); + + let uploader = Arc::new(ParallelXorbUploader { + cas_prefix: cas_prefix.to_owned(), + shard_manager, + cas, + xorb_data_queue: Mutex::new(xorb_data_p), + upload_worker: Mutex::new(None), + threadpool, + }); + + Self::run(uploader.clone(), xorb_data_c).await; + + uploader + } + + async fn run(uploader: Arc, xorbs: Receiver) { + let shard_manager = uploader.shard_manager.clone(); + let cas = uploader.cas.clone(); + let cas_prefix = uploader.cas_prefix.clone(); + let upload_task = uploader.threadpool.spawn(async move { + let stream_of_xorbs = tokio_stream::wrappers::ReceiverStream::new(xorbs); + let mut buffered_upload = stream_of_xorbs + .map(|item| process_xorb_data_queue_item(item, &shard_manager, &cas, &cas_prefix)) + .buffer_unordered(*MAX_CONCURRENT_XORB_UPLOADS); + + while let Some(ret) = buffered_upload.next().await { + match ret { + // All xorbs added to the queue before this signal were all uploaded successfully, + // send out this signal. + Ok(QueueItem::Flush(signal)) => signal.send(()).expect("Upload flush signal error"), + Err(e) => { + panic!("Uploading and registering Xorb failed with {e}"); + }, + // Uploaded a xorb successfully. + _ => (), + } + } + }); + let mut worker = uploader.upload_worker.lock().await; + *worker = Some(upload_task); + } + + pub async fn register_new_cas_block(&self, cas_data: CASDataAggregator) -> Result { + let cas_hash = cas_node_hash(&cas_data.chunks[..]); + + let sender = self.xorb_data_queue.lock().await; + + sender + .send(QueueItem::Value((cas_hash, cas_data.data, cas_data.chunks))) + .await + .map_err(|e| InternalError(format!("{e}")))?; + + // Now register any new files as needed. + for (mut fi, chunk_hash_indices) in cas_data.pending_file_info { + for i in chunk_hash_indices { + debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default()); + fi.segments[i].cas_hash = cas_hash; + } + + self.shard_manager.add_file_reconstruction_info(fi).await?; + } + + Ok(cas_hash) + } + + pub async fn flush(&self) -> Result<()> { + let sender = self.xorb_data_queue.lock().await; + let (signal_tx, signal_rx) = oneshot::channel(); + sender + .send(QueueItem::Flush(signal_tx)) + .await + .map_err(|e| InternalError(format!("{e}")))?; + + signal_rx.await.map_err(|e| InternalError(format!("{e}")))?; + + Ok(()) + } +} + +async fn process_xorb_data_queue_item( + item: XorbUploadInputType, + shard_manager: &Arc, + cas: &Arc, + cas_prefix: &str, +) -> Result { + let (cas_hash, data, chunks) = match item { + QueueItem::Value(tuple) => tuple, + QueueItem::Flush(signal) => return Ok(QueueItem::Flush(signal)), + }; + + let raw_bytes_len = data.len(); + // upload xorb + { + let mut pos = 0; + let chunk_and_boundaries = chunks + .iter() + .map(|(hash, len)| { + pos += *len; + (*hash, pos as u32) + }) + .collect(); + cas.put(cas_prefix, &cas_hash, data, chunk_and_boundaries).await?; + } + + // register for dedup + { + let metadata = CASChunkSequenceHeader::new(cas_hash, chunks.len(), raw_bytes_len); + + let mut pos = 0; + let chunks: Vec<_> = chunks + .iter() + .map(|(h, len)| { + let result = CASChunkSequenceEntry::new(*h, *len, pos); + pos += *len; + result + }) + .collect(); + let cas_info = MDBCASInfo { metadata, chunks }; + + shard_manager.add_cas_block(cas_info).await?; + } + + Ok(QueueItem::Value(())) +} diff --git a/data/src/remote_shard_interface.rs b/data/src/remote_shard_interface.rs index 6c4af44d..e16105fa 100644 --- a/data/src/remote_shard_interface.rs +++ b/data/src/remote_shard_interface.rs @@ -21,7 +21,7 @@ use super::configurations::{FileQueryPolicy, StorageConfig}; use super::errors::{DataProcessingError, Result}; use super::shard_interface::{create_shard_client, create_shard_manager}; use crate::cas_interface::Client; -use crate::constants::{FILE_RECONSTRUCTION_CACHE_SIZE, MAX_CONCURRENT_UPLOADS}; +use crate::constants::{FILE_RECONSTRUCTION_CACHE_SIZE, MAX_CONCURRENT_XORB_UPLOADS}; use crate::repo_salt::RepoSalt; pub struct RemoteShardInterface { @@ -279,7 +279,7 @@ impl RemoteShardInterface { let shard_prefix = self.shard_prefix.clone(); let shard_prefix_ref = &shard_prefix; - tokio_par_for_each(shards, *MAX_CONCURRENT_UPLOADS, |si, _| async move { + tokio_par_for_each(shards, *MAX_CONCURRENT_XORB_UPLOADS, |si, _| async move { // For each shard: // 1. Upload directly to CAS. // 2. Sync to server. diff --git a/hf_xet/Cargo.lock b/hf_xet/Cargo.lock index a4457c08..5286bcd5 100644 --- a/hf_xet/Cargo.lock +++ b/hf_xet/Cargo.lock @@ -436,6 +436,7 @@ dependencies = [ "dirs", "error_printer", "file_utils", + "futures", "gearhash", "glob", "hashers", @@ -457,6 +458,7 @@ dependencies = [ "static_assertions", "tempfile", "tokio", + "tokio-stream", "toml", "tracing", "utils", @@ -2742,6 +2744,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.12" From d443d5b3c9634d53ca93f42e7ab6903b7d6baef5 Mon Sep 17 00:00:00 2001 From: seanses Date: Wed, 27 Nov 2024 12:30:02 -0800 Subject: [PATCH 02/12] error checking and comments --- data/src/errors.rs | 3 +++ data/src/parallel_xorb_uploader.rs | 24 +++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/data/src/errors.rs b/data/src/errors.rs index b55d1a82..074a7340 100644 --- a/data/src/errors.rs +++ b/data/src/errors.rs @@ -27,6 +27,9 @@ pub enum DataProcessingError { #[error("Clean task error: {0}")] CleanTaskError(String), + #[error("Upload task error: {0}")] + UploadTaskError(String), + #[error("Internal error : {0}")] InternalError(String), diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index 3051499b..ead1b61d 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -28,6 +28,9 @@ type XorbUploadSignalType = oneshot::Sender<()>; type XorbUploadInputType = QueueItem; type XorbUploadOutputType = QueueItem<(), XorbUploadSignalType>; +/// Helper to parallelize xorb upload and registration. +/// Calls to registering xorbs return immediately after computing a xorb hash so callers +/// can continue with other work, and xorb data is queued internally to be uploaded and registered. pub(crate) struct ParallelXorbUploader { // Configurations cas_prefix: String, @@ -76,7 +79,7 @@ impl ParallelXorbUploader { let stream_of_xorbs = tokio_stream::wrappers::ReceiverStream::new(xorbs); let mut buffered_upload = stream_of_xorbs .map(|item| process_xorb_data_queue_item(item, &shard_manager, &cas, &cas_prefix)) - .buffer_unordered(*MAX_CONCURRENT_XORB_UPLOADS); + .buffered(*MAX_CONCURRENT_XORB_UPLOADS); while let Some(ret) = buffered_upload.next().await { match ret { @@ -96,6 +99,8 @@ impl ParallelXorbUploader { } pub async fn register_new_cas_block(&self, cas_data: CASDataAggregator) -> Result { + self.task_is_running().await?; + let cas_hash = cas_node_hash(&cas_data.chunks[..]); let sender = self.xorb_data_queue.lock().await; @@ -119,6 +124,11 @@ impl ParallelXorbUploader { } pub async fn flush(&self) -> Result<()> { + // no need to flush if task already finished + if self.task_is_running().await.is_err() { + return Ok(()); + } + let sender = self.xorb_data_queue.lock().await; let (signal_tx, signal_rx) = oneshot::channel(); sender @@ -130,6 +140,16 @@ impl ParallelXorbUploader { Ok(()) } + + async fn task_is_running(&self) -> Result<()> { + let uploader_worker = self.upload_worker.lock().await; + + if uploader_worker.is_none() { + return Err(UploadTaskError("no active upload task".to_owned())); + } + + Ok(()) + } } async fn process_xorb_data_queue_item( @@ -158,6 +178,8 @@ async fn process_xorb_data_queue_item( } // register for dedup + // This should happen after uploading xorb above succeeded so not to + // leave invalid information in the local shard to dedup other xorbs. { let metadata = CASChunkSequenceHeader::new(cas_hash, chunks.len(), raw_bytes_len); From 72bd3d70573560354cf06015ddb27feba5cf119b Mon Sep 17 00:00:00 2001 From: seanses Date: Fri, 6 Dec 2024 02:31:35 +0800 Subject: [PATCH 03/12] impl Drop --- data/src/parallel_xorb_uploader.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index ead1b61d..b283d155 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -8,11 +8,14 @@ use mdb_shard::{ }; use merkledb::aggregate_hashes::cas_node_hash; use merklehash::MerkleHash; -use tokio::sync::{ - mpsc::{self, Receiver, Sender}, - oneshot, Mutex, -}; use tokio::task::JoinHandle; +use tokio::{ + runtime::Handle, + sync::{ + mpsc::{self, Receiver, Sender}, + oneshot, Mutex, + }, +}; use utils::ThreadPool; use crate::constants::MAX_CONCURRENT_XORB_UPLOADS; @@ -31,6 +34,10 @@ type XorbUploadOutputType = QueueItem<(), XorbUploadSignalType>; /// Helper to parallelize xorb upload and registration. /// Calls to registering xorbs return immediately after computing a xorb hash so callers /// can continue with other work, and xorb data is queued internally to be uploaded and registered. +/// +/// It is critical to call [`flush`] before `ParallelXorbUploader` is dropped. Though +/// dropping will attempt to wait for all transfers to finish, any errors +/// that happen in the process of dropping will be ignored. pub(crate) struct ParallelXorbUploader { // Configurations cas_prefix: String, @@ -123,6 +130,9 @@ impl ParallelXorbUploader { Ok(cas_hash) } + /// Flush makes sure all xorbs added to queue before this call are sent successfully + /// to remote. This function can be called multiple times and should be called at + /// least once before `ParallelXorbUploader` is dropped. pub async fn flush(&self) -> Result<()> { // no need to flush if task already finished if self.task_is_running().await.is_err() { @@ -152,6 +162,12 @@ impl ParallelXorbUploader { } } +impl Drop for ParallelXorbUploader { + fn drop(&mut self) { + let _ = tokio::task::block_in_place(|| Handle::current().block_on(async move { self.flush().await })); + } +} + async fn process_xorb_data_queue_item( item: XorbUploadInputType, shard_manager: &Arc, From eb34cf9b8f97076e3f0c747489327ff7b38fd6b2 Mon Sep 17 00:00:00 2001 From: seanses Date: Fri, 6 Dec 2024 04:25:02 +0800 Subject: [PATCH 04/12] rust fmt --- data/src/data_processing.rs | 3 ++- data/src/parallel_xorb_uploader.rs | 19 +++++++------------ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index f2340125..24f04fc1 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -135,7 +135,8 @@ impl PointerFileTranslator { self.shard_manager.clone(), self.cas.clone(), self.threadpool.clone(), - *MAX_CONCURRENT_XORB_UPLOADS * 2, // set buffer size to double the concurrent uploads should be enough + *MAX_CONCURRENT_XORB_UPLOADS * 2, /* set buffer size to double the concurrent uploads should be + * enough */ ) .await }, diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index b283d155..07c61dcf 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -2,25 +2,20 @@ use std::sync::Arc; use cas_client::Client; use futures::StreamExt; -use mdb_shard::{ - cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}, - ShardFileManager, -}; +use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}; +use mdb_shard::ShardFileManager; use merkledb::aggregate_hashes::cas_node_hash; use merklehash::MerkleHash; +use tokio::runtime::Handle; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::{oneshot, Mutex}; use tokio::task::JoinHandle; -use tokio::{ - runtime::Handle, - sync::{ - mpsc::{self, Receiver, Sender}, - oneshot, Mutex, - }, -}; use utils::ThreadPool; use crate::constants::MAX_CONCURRENT_XORB_UPLOADS; use crate::data_processing::CASDataAggregator; -use crate::errors::{DataProcessingError::*, *}; +use crate::errors::DataProcessingError::*; +use crate::errors::*; pub enum QueueItem { Value(T), From 6baa3bfa42e83812502c0d5b1c4ff1bb6e1cc29b Mon Sep 17 00:00:00 2001 From: seanses Date: Tue, 10 Dec 2024 05:19:27 +0800 Subject: [PATCH 05/12] add trait --- data/src/clean.rs | 6 ++--- data/src/data_processing.rs | 4 ++-- data/src/parallel_xorb_uploader.rs | 36 ++++++++++++++++++++---------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/data/src/clean.rs b/data/src/clean.rs index bb5339cf..8c458a96 100644 --- a/data/src/clean.rs +++ b/data/src/clean.rs @@ -31,7 +31,7 @@ use crate::data_processing::CASDataAggregator; use crate::errors::DataProcessingError::*; use crate::errors::Result; use crate::metrics::FILTER_BYTES_CLEANED; -use crate::parallel_xorb_uploader::ParallelXorbUploader; +use crate::parallel_xorb_uploader::XorbUpload; use crate::remote_shard_interface::RemoteShardInterface; use crate::repo_salt::RepoSalt; use crate::small_file_determination::{is_file_passthrough, is_possible_start_to_text_file}; @@ -114,7 +114,7 @@ pub struct Cleaner { // Utils shard_manager: Arc, remote_shards: Arc, - xorb_uploader: Arc, + xorb_uploader: Arc, // External Data global_cas_data: Arc>, @@ -148,7 +148,7 @@ impl Cleaner { repo_salt: Option, shard_manager: Arc, remote_shards: Arc, - xorb_uploader: Arc, + xorb_uploader: Arc, cas_data: Arc>, buffer_size: usize, file_name: Option<&Path>, diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index 24f04fc1..a6c5391f 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -17,7 +17,7 @@ use crate::clean::Cleaner; use crate::configurations::*; use crate::constants::MAX_CONCURRENT_XORB_UPLOADS; use crate::errors::*; -use crate::parallel_xorb_uploader::ParallelXorbUploader; +use crate::parallel_xorb_uploader::{ParallelXorbUploader, XorbUpload}; use crate::remote_shard_interface::RemoteShardInterface; use crate::shard_interface::create_shard_manager; use crate::PointerFile; @@ -59,7 +59,7 @@ pub struct PointerFileTranslator { shard_manager: Arc, remote_shards: Arc, cas: Arc, - xorb_uploader: Mutex>>, + xorb_uploader: Mutex>>, /* ----- Deduped data shared across files ----- */ global_cas_data: Arc>, diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index 07c61dcf..b7c5b6d8 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use async_trait::async_trait; use cas_client::Client; use futures::StreamExt; use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}; @@ -17,6 +18,14 @@ use crate::data_processing::CASDataAggregator; use crate::errors::DataProcessingError::*; use crate::errors::*; +#[async_trait] +pub(crate) trait XorbUpload { + /// Register a block of data ready for upload and dedup, return the hash of the produced xorb. + async fn register_new_cas_block(&self, cas_data: CASDataAggregator) -> Result; + /// Flush all xorbs that are pending to be sent to remote. + async fn flush(&self) -> Result<()>; +} + pub enum QueueItem { Value(T), Flush(S), @@ -100,7 +109,20 @@ impl ParallelXorbUploader { *worker = Some(upload_task); } - pub async fn register_new_cas_block(&self, cas_data: CASDataAggregator) -> Result { + async fn task_is_running(&self) -> Result<()> { + let uploader_worker = self.upload_worker.lock().await; + + if uploader_worker.is_none() { + return Err(UploadTaskError("no active upload task".to_owned())); + } + + Ok(()) + } +} + +#[async_trait] +impl XorbUpload for ParallelXorbUploader { + async fn register_new_cas_block(&self, cas_data: CASDataAggregator) -> Result { self.task_is_running().await?; let cas_hash = cas_node_hash(&cas_data.chunks[..]); @@ -128,7 +150,7 @@ impl ParallelXorbUploader { /// Flush makes sure all xorbs added to queue before this call are sent successfully /// to remote. This function can be called multiple times and should be called at /// least once before `ParallelXorbUploader` is dropped. - pub async fn flush(&self) -> Result<()> { + async fn flush(&self) -> Result<()> { // no need to flush if task already finished if self.task_is_running().await.is_err() { return Ok(()); @@ -145,16 +167,6 @@ impl ParallelXorbUploader { Ok(()) } - - async fn task_is_running(&self) -> Result<()> { - let uploader_worker = self.upload_worker.lock().await; - - if uploader_worker.is_none() { - return Err(UploadTaskError("no active upload task".to_owned())); - } - - Ok(()) - } } impl Drop for ParallelXorbUploader { From e176f1ac43dd1810a08605d88a2f080f1f567cdd Mon Sep 17 00:00:00 2001 From: seanses Date: Thu, 12 Dec 2024 03:19:21 +0800 Subject: [PATCH 06/12] move block_in_place into ThreadPool struct --- data/src/parallel_xorb_uploader.rs | 3 +-- utils/src/threadpool.rs | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index b7c5b6d8..c6d3baf0 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -7,7 +7,6 @@ use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBC use mdb_shard::ShardFileManager; use merkledb::aggregate_hashes::cas_node_hash; use merklehash::MerkleHash; -use tokio::runtime::Handle; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{oneshot, Mutex}; use tokio::task::JoinHandle; @@ -171,7 +170,7 @@ impl XorbUpload for ParallelXorbUploader { impl Drop for ParallelXorbUploader { fn drop(&mut self) { - let _ = tokio::task::block_in_place(|| Handle::current().block_on(async move { self.flush().await })); + let _ = self.threadpool.block_in_place(async { self.flush().await }); } } diff --git a/utils/src/threadpool.rs b/utils/src/threadpool.rs index b1ce8af8..f5c4e725 100644 --- a/utils/src/threadpool.rs +++ b/utils/src/threadpool.rs @@ -94,6 +94,10 @@ impl ThreadPool { pub fn get_handle(&self) -> tokio::runtime::Handle { self.inner.handle().clone() } + + pub fn block_in_place(&self, future: F) -> F::Output { + tokio::task::block_in_place(|| self.inner.handle().block_on(future)) + } } impl Display for ThreadPool { From d8c4a35a0edca2b004cf99fe9a08300ecc1c16f2 Mon Sep 17 00:00:00 2001 From: seanses Date: Fri, 13 Dec 2024 02:50:15 +0800 Subject: [PATCH 07/12] use JoinSet instead of explicit work queue --- data/src/data_client.rs | 4 +- data/src/data_processing.rs | 3 +- data/src/parallel_xorb_uploader.rs | 137 +++++++++++------------------ 3 files changed, 53 insertions(+), 91 deletions(-) diff --git a/data/src/data_client.rs b/data/src/data_client.rs index d8b71f4d..c5621488 100644 --- a/data/src/data_client.rs +++ b/data/src/data_client.rs @@ -14,8 +14,8 @@ use crate::errors::DataProcessingError; use crate::{errors, PointerFile, PointerFileTranslator}; /// The maximum git filter protocol packet size -pub const MAX_CONCURRENT_UPLOADS: usize = 8; // TODO -pub const MAX_CONCURRENT_DOWNLOADS: usize = 8; // TODO +const MAX_CONCURRENT_UPLOADS: usize = 8; // TODO +const MAX_CONCURRENT_DOWNLOADS: usize = 8; // TODO const DEFAULT_CAS_ENDPOINT: &str = "http://localhost:8080"; const READ_BLOCK_SIZE: usize = 1024 * 1024; diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index a6c5391f..aacf711b 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -134,9 +134,8 @@ impl PointerFileTranslator { &self.config.cas_storage_config.prefix, self.shard_manager.clone(), self.cas.clone(), + *MAX_CONCURRENT_XORB_UPLOADS, self.threadpool.clone(), - *MAX_CONCURRENT_XORB_UPLOADS * 2, /* set buffer size to double the concurrent uploads should be - * enough */ ) .await }, diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index c6d3baf0..92a54ecc 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -2,17 +2,14 @@ use std::sync::Arc; use async_trait::async_trait; use cas_client::Client; -use futures::StreamExt; use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}; use mdb_shard::ShardFileManager; use merkledb::aggregate_hashes::cas_node_hash; use merklehash::MerkleHash; -use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::sync::{oneshot, Mutex}; -use tokio::task::JoinHandle; +use tokio::sync::{Mutex, Semaphore}; +use tokio::task::JoinSet; use utils::ThreadPool; -use crate::constants::MAX_CONCURRENT_XORB_UPLOADS; use crate::data_processing::CASDataAggregator; use crate::errors::DataProcessingError::*; use crate::errors::*; @@ -25,14 +22,7 @@ pub(crate) trait XorbUpload { async fn flush(&self) -> Result<()>; } -pub enum QueueItem { - Value(T), - Flush(S), -} type XorbUploadValueType = (MerkleHash, Vec, Vec<(MerkleHash, usize)>); -type XorbUploadSignalType = oneshot::Sender<()>; -type XorbUploadInputType = QueueItem; -type XorbUploadOutputType = QueueItem<(), XorbUploadSignalType>; /// Helper to parallelize xorb upload and registration. /// Calls to registering xorbs return immediately after computing a xorb hash so callers @@ -50,8 +40,10 @@ pub(crate) struct ParallelXorbUploader { cas: Arc, // Internal worker - xorb_data_queue: Mutex>, - upload_worker: Mutex>>, + upload_tasks: Mutex>>, + + // Rate limiter + concurrency: Arc, // Theadpool threadpool: Arc, @@ -62,57 +54,23 @@ impl ParallelXorbUploader { cas_prefix: &str, shard_manager: Arc, cas: Arc, + n_concurrent_uploads: usize, threadpool: Arc, - buffer_size: usize, ) -> Arc { - let (xorb_data_p, xorb_data_c) = mpsc::channel::(buffer_size); - - let uploader = Arc::new(ParallelXorbUploader { + Arc::new(ParallelXorbUploader { cas_prefix: cas_prefix.to_owned(), shard_manager, cas, - xorb_data_queue: Mutex::new(xorb_data_p), - upload_worker: Mutex::new(None), + upload_tasks: Mutex::new(JoinSet::new()), + concurrency: Arc::new(Semaphore::new(n_concurrent_uploads)), threadpool, - }); - - Self::run(uploader.clone(), xorb_data_c).await; - - uploader + }) } - async fn run(uploader: Arc, xorbs: Receiver) { - let shard_manager = uploader.shard_manager.clone(); - let cas = uploader.cas.clone(); - let cas_prefix = uploader.cas_prefix.clone(); - let upload_task = uploader.threadpool.spawn(async move { - let stream_of_xorbs = tokio_stream::wrappers::ReceiverStream::new(xorbs); - let mut buffered_upload = stream_of_xorbs - .map(|item| process_xorb_data_queue_item(item, &shard_manager, &cas, &cas_prefix)) - .buffered(*MAX_CONCURRENT_XORB_UPLOADS); - - while let Some(ret) = buffered_upload.next().await { - match ret { - // All xorbs added to the queue before this signal were all uploaded successfully, - // send out this signal. - Ok(QueueItem::Flush(signal)) => signal.send(()).expect("Upload flush signal error"), - Err(e) => { - panic!("Uploading and registering Xorb failed with {e}"); - }, - // Uploaded a xorb successfully. - _ => (), - } - } - }); - let mut worker = uploader.upload_worker.lock().await; - *worker = Some(upload_task); - } - - async fn task_is_running(&self) -> Result<()> { - let uploader_worker = self.upload_worker.lock().await; - - if uploader_worker.is_none() { - return Err(UploadTaskError("no active upload task".to_owned())); + async fn status_is_ok(&self) -> Result<()> { + let mut upload_tasks = self.upload_tasks.lock().await; + while let Some(result) = upload_tasks.try_join_next() { + result??; } Ok(()) @@ -122,16 +80,32 @@ impl ParallelXorbUploader { #[async_trait] impl XorbUpload for ParallelXorbUploader { async fn register_new_cas_block(&self, cas_data: CASDataAggregator) -> Result { - self.task_is_running().await?; + self.status_is_ok().await?; let cas_hash = cas_node_hash(&cas_data.chunks[..]); - let sender = self.xorb_data_queue.lock().await; - - sender - .send(QueueItem::Value((cas_hash, cas_data.data, cas_data.chunks))) + // Rate limiting, the acquired permit is dropped after the task completes. + let permit = self + .concurrency + .clone() + .acquire_owned() .await - .map_err(|e| InternalError(format!("{e}")))?; + .map_err(|e| UploadTaskError(e.to_string()))?; + + let item = (cas_hash, cas_data.data, cas_data.chunks); + let shard_manager = self.shard_manager.clone(); + let cas = self.cas.clone(); + let cas_prefix = self.cas_prefix.clone(); + + let mut upload_tasks = self.upload_tasks.lock().await; + upload_tasks.spawn_on( + async move { + let ret = upload_and_register_xorb(item, shard_manager, cas, cas_prefix).await; + drop(permit); + ret + }, + &self.threadpool.get_handle(), + ); // Now register any new files as needed. for (mut fi, chunk_hash_indices) in cas_data.pending_file_info { @@ -150,19 +124,11 @@ impl XorbUpload for ParallelXorbUploader { /// to remote. This function can be called multiple times and should be called at /// least once before `ParallelXorbUploader` is dropped. async fn flush(&self) -> Result<()> { - // no need to flush if task already finished - if self.task_is_running().await.is_err() { - return Ok(()); - } - - let sender = self.xorb_data_queue.lock().await; - let (signal_tx, signal_rx) = oneshot::channel(); - sender - .send(QueueItem::Flush(signal_tx)) - .await - .map_err(|e| InternalError(format!("{e}")))?; + let mut upload_tasks = self.upload_tasks.lock().await; - signal_rx.await.map_err(|e| InternalError(format!("{e}")))?; + while let Some(result) = upload_tasks.join_next().await { + result??; + } Ok(()) } @@ -174,16 +140,13 @@ impl Drop for ParallelXorbUploader { } } -async fn process_xorb_data_queue_item( - item: XorbUploadInputType, - shard_manager: &Arc, - cas: &Arc, - cas_prefix: &str, -) -> Result { - let (cas_hash, data, chunks) = match item { - QueueItem::Value(tuple) => tuple, - QueueItem::Flush(signal) => return Ok(QueueItem::Flush(signal)), - }; +async fn upload_and_register_xorb( + item: XorbUploadValueType, + shard_manager: Arc, + cas: Arc, + cas_prefix: String, +) -> Result<()> { + let (cas_hash, data, chunks) = item; let raw_bytes_len = data.len(); // upload xorb @@ -196,7 +159,7 @@ async fn process_xorb_data_queue_item( (*hash, pos as u32) }) .collect(); - cas.put(cas_prefix, &cas_hash, data, chunk_and_boundaries).await?; + cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await?; } // register for dedup @@ -219,5 +182,5 @@ async fn process_xorb_data_queue_item( shard_manager.add_cas_block(cas_info).await?; } - Ok(QueueItem::Value(())) + Ok(()) } From 1232f7b76a78c8982276ffa808cc3fcf4dbdd4dc Mon Sep 17 00:00:00 2001 From: seanses Date: Fri, 13 Dec 2024 03:00:05 +0800 Subject: [PATCH 08/12] drop unneeded deps --- Cargo.lock | 13 ------------- data/Cargo.toml | 2 -- hf_xet/Cargo.lock | 13 ------------- 3 files changed, 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60e577bc..948d8935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -874,7 +874,6 @@ dependencies = [ "dirs", "error_printer", "file_utils", - "futures", "gearhash", "glob", "hashers", @@ -896,7 +895,6 @@ dependencies = [ "static_assertions", "tempfile", "tokio", - "tokio-stream", "toml 0.5.11", "tracing", "utils", @@ -3427,17 +3425,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.12" diff --git a/data/Cargo.toml b/data/Cargo.toml index b4136f35..28737827 100644 --- a/data/Cargo.toml +++ b/data/Cargo.toml @@ -23,8 +23,6 @@ file_utils = { path = "../file_utils" } error_printer = { path = "../error_printer" } xet_error = { path = "../xet_error" } tokio = { version = "1.36", features = ["full"] } -tokio-stream = "0.1.16" -futures = "0.3.31" anyhow = "1" tracing = "0.1.*" async-trait = "0.1.53" diff --git a/hf_xet/Cargo.lock b/hf_xet/Cargo.lock index 5286bcd5..a4457c08 100644 --- a/hf_xet/Cargo.lock +++ b/hf_xet/Cargo.lock @@ -436,7 +436,6 @@ dependencies = [ "dirs", "error_printer", "file_utils", - "futures", "gearhash", "glob", "hashers", @@ -458,7 +457,6 @@ dependencies = [ "static_assertions", "tempfile", "tokio", - "tokio-stream", "toml", "tracing", "utils", @@ -2744,17 +2742,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.12" From 8a18cf68845f2bfe1aeb1553f0dba6252007d193 Mon Sep 17 00:00:00 2001 From: seanses Date: Fri, 13 Dec 2024 04:21:50 +0800 Subject: [PATCH 09/12] use global rate limiter --- data/src/data_processing.rs | 9 +++++++-- data/src/parallel_xorb_uploader.rs | 8 ++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index 7d1b7281..b3523d29 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -6,10 +6,11 @@ use std::sync::Arc; use cas_client::Client; use cas_types::FileRange; +use lazy_static::lazy_static; use mdb_shard::file_structs::MDBFileInfo; use mdb_shard::ShardFileManager; use merklehash::MerkleHash; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, Semaphore}; use utils::progress::ProgressUpdater; use utils::ThreadPool; @@ -23,6 +24,10 @@ use crate::remote_shard_interface::RemoteShardInterface; use crate::shard_interface::create_shard_manager; use crate::PointerFile; +lazy_static! { + pub static ref XORB_UPLOAD_RATE_LIMITER: Arc = Arc::new(Semaphore::new(*MAX_CONCURRENT_XORB_UPLOADS)); +} + #[derive(Default, Debug)] pub(crate) struct CASDataAggregator { /// Bytes of all chunks accumulated in one CAS block concatenated together. @@ -135,7 +140,7 @@ impl PointerFileTranslator { &self.config.cas_storage_config.prefix, self.shard_manager.clone(), self.cas.clone(), - *MAX_CONCURRENT_XORB_UPLOADS, + XORB_UPLOAD_RATE_LIMITER.clone(), self.threadpool.clone(), ) .await diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index 92a54ecc..fdd0d3ec 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -43,7 +43,7 @@ pub(crate) struct ParallelXorbUploader { upload_tasks: Mutex>>, // Rate limiter - concurrency: Arc, + rate_limiter: Arc, // Theadpool threadpool: Arc, @@ -54,7 +54,7 @@ impl ParallelXorbUploader { cas_prefix: &str, shard_manager: Arc, cas: Arc, - n_concurrent_uploads: usize, + rate_limiter: Arc, threadpool: Arc, ) -> Arc { Arc::new(ParallelXorbUploader { @@ -62,7 +62,7 @@ impl ParallelXorbUploader { shard_manager, cas, upload_tasks: Mutex::new(JoinSet::new()), - concurrency: Arc::new(Semaphore::new(n_concurrent_uploads)), + rate_limiter, threadpool, }) } @@ -86,7 +86,7 @@ impl XorbUpload for ParallelXorbUploader { // Rate limiting, the acquired permit is dropped after the task completes. let permit = self - .concurrency + .rate_limiter .clone() .acquire_owned() .await From 772085fe12709b7134be8fbbfd9a2088ebccdb1c Mon Sep 17 00:00:00 2001 From: seanses Date: Fri, 13 Dec 2024 04:25:35 +0800 Subject: [PATCH 10/12] add comment --- data/src/parallel_xorb_uploader.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index fdd0d3ec..28bf7587 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -85,6 +85,7 @@ impl XorbUpload for ParallelXorbUploader { let cas_hash = cas_node_hash(&cas_data.chunks[..]); // Rate limiting, the acquired permit is dropped after the task completes. + // The chosen Semaphore is fair, meaning xorbs added first will be scheduled to upload first. let permit = self .rate_limiter .clone() From 8732fe3f49e5787bbd99c73c46d2af54f1fa2128 Mon Sep 17 00:00:00 2001 From: seanses Date: Fri, 13 Dec 2024 04:37:51 +0800 Subject: [PATCH 11/12] creation of xorb_uploader moved into PFT constructor --- data/src/data_processing.rs | 39 +++++++++++++------------------------ 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index b3523d29..f910175a 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -65,7 +65,7 @@ pub struct PointerFileTranslator { shard_manager: Arc, remote_shards: Arc, cas: Arc, - xorb_uploader: Mutex>>, + xorb_uploader: Arc, /* ----- Deduped data shared across files ----- */ global_cas_data: Arc>, @@ -107,12 +107,21 @@ impl PointerFileTranslator { } }; + let xorb_uploader = ParallelXorbUploader::new( + &config.cas_storage_config.prefix, + shard_manager.clone(), + cas_client.clone(), + XORB_UPLOAD_RATE_LIMITER.clone(), + threadpool.clone(), + ) + .await; + Ok(Self { config, shard_manager, remote_shards, cas: cas_client, - xorb_uploader: Mutex::new(None), + xorb_uploader, global_cas_data: Default::default(), threadpool, }) @@ -132,22 +141,6 @@ impl PointerFileTranslator { return Err(DataProcessingError::DedupConfigError("empty dedup config".to_owned())); }; - let mut xorb_uploader = self.xorb_uploader.lock().await; - let uploader = match xorb_uploader.take() { - Some(uploader) => uploader, - None => { - ParallelXorbUploader::new( - &self.config.cas_storage_config.prefix, - self.shard_manager.clone(), - self.cas.clone(), - XORB_UPLOAD_RATE_LIMITER.clone(), - self.threadpool.clone(), - ) - .await - }, - }; - *xorb_uploader = Some(uploader.clone()); - Cleaner::new( dedup.small_file_threshold, matches!(dedup.global_dedup_policy, GlobalDedupPolicy::Always), @@ -155,7 +148,7 @@ impl PointerFileTranslator { dedup.repo_salt, self.shard_manager.clone(), self.remote_shards.clone(), - uploader, + self.xorb_uploader.clone(), self.global_cas_data.clone(), buffer_size, file_name, @@ -170,15 +163,11 @@ impl PointerFileTranslator { let new_cas_data = take(cas_data_accumulator.deref_mut()); drop(cas_data_accumulator); // Release the lock. - let Some(ref xorb_uploader) = *self.xorb_uploader.lock().await else { - return Err(DataProcessingError::InternalError("no active xorb upload task".to_owned())); - }; - if !new_cas_data.is_empty() { - xorb_uploader.register_new_cas_block(new_cas_data).await?; + self.xorb_uploader.register_new_cas_block(new_cas_data).await?; } - xorb_uploader.flush().await?; + self.xorb_uploader.flush().await?; // flush accumulated memory shard. self.shard_manager.flush().await?; From 61470a3e055d41654549bed0eaedbbf20a566d18 Mon Sep 17 00:00:00 2001 From: seanses Date: Fri, 13 Dec 2024 05:34:26 +0800 Subject: [PATCH 12/12] remove impl Drop --- data/src/parallel_xorb_uploader.rs | 11 ++--------- utils/src/threadpool.rs | 4 ---- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/data/src/parallel_xorb_uploader.rs b/data/src/parallel_xorb_uploader.rs index 28bf7587..0e6435a2 100644 --- a/data/src/parallel_xorb_uploader.rs +++ b/data/src/parallel_xorb_uploader.rs @@ -28,9 +28,8 @@ type XorbUploadValueType = (MerkleHash, Vec, Vec<(MerkleHash, usize)>); /// Calls to registering xorbs return immediately after computing a xorb hash so callers /// can continue with other work, and xorb data is queued internally to be uploaded and registered. /// -/// It is critical to call [`flush`] before `ParallelXorbUploader` is dropped. Though -/// dropping will attempt to wait for all transfers to finish, any errors -/// that happen in the process of dropping will be ignored. +/// It is critical to call [`flush`] before `ParallelXorbUploader` is dropped. Dropping will +/// cancel all ongoing transfers automatically. pub(crate) struct ParallelXorbUploader { // Configurations cas_prefix: String, @@ -135,12 +134,6 @@ impl XorbUpload for ParallelXorbUploader { } } -impl Drop for ParallelXorbUploader { - fn drop(&mut self) { - let _ = self.threadpool.block_in_place(async { self.flush().await }); - } -} - async fn upload_and_register_xorb( item: XorbUploadValueType, shard_manager: Arc, diff --git a/utils/src/threadpool.rs b/utils/src/threadpool.rs index dcd50166..38f05056 100644 --- a/utils/src/threadpool.rs +++ b/utils/src/threadpool.rs @@ -94,10 +94,6 @@ impl ThreadPool { pub fn get_handle(&self) -> tokio::runtime::Handle { self.inner.handle().clone() } - - pub fn block_in_place(&self, future: F) -> F::Output { - tokio::task::block_in_place(|| self.inner.handle().block_on(future)) - } } impl Display for ThreadPool {