Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize xorb upload #98

Merged
merged 13 commits into from
Dec 12, 2024
25 changes: 10 additions & 15 deletions data/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use tokio::task::{JoinHandle, JoinSet};
use tracing::{debug, error, info, warn};
use utils::ThreadPool;

use crate::cas_interface::Client;
use crate::chunking::{chunk_target_default, ChunkYieldType};
use crate::constants::MIN_SPACING_BETWEEN_GLOBAL_DEDUP_QUERIES;
use crate::data_processing::{register_new_cas_block, CASDataAggregator};
use crate::data_processing::CASDataAggregator;
use crate::errors::DataProcessingError::*;
use crate::errors::Result;
use crate::metrics::FILTER_BYTES_CLEANED;
use crate::parallel_xorb_uploader::XorbUpload;
use crate::remote_shard_interface::RemoteShardInterface;
use crate::repo_salt::RepoSalt;
use crate::small_file_determination::{is_file_passthrough, is_possible_start_to_text_file};
Expand Down Expand Up @@ -112,7 +112,7 @@ pub struct Cleaner {
// Utils
shard_manager: Arc<ShardFileManager>,
remote_shards: Arc<RemoteShardInterface>,
cas: Arc<dyn Client + Send + Sync>,
xorb_uploader: Arc<dyn XorbUpload + Send + Sync>,

// External Data
global_cas_data: Arc<Mutex<CASDataAggregator>>,
Expand All @@ -139,14 +139,14 @@ pub struct Cleaner {

impl Cleaner {
#[allow(clippy::too_many_arguments)]
pub async fn new(
pub(crate) async fn new(
small_file_threshold: usize,
enable_global_dedup_queries: bool,
cas_prefix: String,
repo_salt: Option<RepoSalt>,
shard_manager: Arc<ShardFileManager>,
remote_shards: Arc<RemoteShardInterface>,
cas: Arc<dyn Client + Send + Sync>,
xorb_uploader: Arc<dyn XorbUpload + Send + Sync>,
cas_data: Arc<Mutex<CASDataAggregator>>,
buffer_size: usize,
file_name: Option<&Path>,
Expand All @@ -165,7 +165,7 @@ impl Cleaner {
repo_salt,
shard_manager,
remote_shards,
cas,
xorb_uploader,
global_cas_data: cas_data,
chunk_data_queue: data_p,
chunking_worker: Mutex::new(Some(chunker)),
Expand Down Expand Up @@ -534,13 +534,8 @@ impl Cleaner {
self.metrics.new_bytes_after_dedup.fetch_add(n_bytes as u64, Ordering::Relaxed);

if tracking_info.cas_data.data.len() > TARGET_CAS_BLOCK_SIZE {
let cas_hash = register_new_cas_block(
&mut tracking_info.cas_data,
&self.shard_manager,
&self.cas,
&self.cas_prefix,
)
.await?;
let cas_data = take(&mut tracking_info.cas_data);
let cas_hash = self.xorb_uploader.register_new_cas_block(cas_data).await?;

for i in take(&mut tracking_info.current_cas_file_info_indices) {
tracking_info.file_info[i].cas_hash = cas_hash;
Expand Down Expand Up @@ -655,9 +650,9 @@ impl Cleaner {
.push((new_file_info, tracking_info.current_cas_file_info_indices.clone()));

if cas_data_accumulator.data.len() >= TARGET_CAS_BLOCK_SIZE {
let mut new_cas_data = take(cas_data_accumulator.deref_mut());
let new_cas_data = take(cas_data_accumulator.deref_mut());
drop(cas_data_accumulator); // Release the lock.
register_new_cas_block(&mut new_cas_data, &self.shard_manager, &self.cas, &self.cas_prefix).await?;
self.xorb_uploader.register_new_cas_block(new_cas_data).await?;
} else {
drop(cas_data_accumulator);
}
Expand Down
9 changes: 4 additions & 5 deletions data/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use lazy_static::lazy_static;

lazy_static! {
// The xet library version.
/// The xet library version.
pub static ref XET_VERSION: String =
std::env::var("XET_VERSION").unwrap_or_else(|_| CURRENT_VERSION.to_string());
/// The maximum number of simultaneous download streams
pub static ref MAX_CONCURRENT_DOWNLOADS: usize = std::env::var("XET_CONCURRENT_DOWNLOADS").ok().and_then(|s| s.parse().ok()).unwrap_or(8);
/// The maximum number of simultaneous upload streams
pub static ref MAX_CONCURRENT_UPLOADS: usize = std::env::var("XET_CONCURRENT_UPLOADS").ok().and_then(|s| s.parse().ok()).unwrap_or(8);
/// The maximum number of simultaneous xorb upload streams.
/// The default value is 8 and can be overwritten by environment variable "XET_CONCURRENT_XORB_UPLOADS".
pub static ref MAX_CONCURRENT_XORB_UPLOADS: usize = std::env::var("XET_CONCURRENT_XORB_UPLOADS").ok().and_then(|s| s.parse().ok()).unwrap_or(8);
}

/// The maximum git filter protocol packet size
Expand Down
4 changes: 2 additions & 2 deletions data/src/data_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::errors::DataProcessingError;
use crate::{errors, PointerFile, PointerFileTranslator};

// Concurrency in number of files
pub const MAX_CONCURRENT_UPLOADS: usize = 8; // TODO
pub const MAX_CONCURRENT_DOWNLOADS: usize = 8; // TODO
const MAX_CONCURRENT_UPLOADS: usize = 8; // TODO
const MAX_CONCURRENT_DOWNLOADS: usize = 8; // TODO

// We now process every file delegated from the Python library.
const SMALL_FILE_THRESHOLD: usize = 1;
Expand Down
121 changes: 26 additions & 95 deletions data/src/data_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,30 @@ use std::sync::Arc;

use cas_client::Client;
use cas_types::FileRange;
use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo};
use lazy_static::lazy_static;
use mdb_shard::file_structs::MDBFileInfo;
use mdb_shard::ShardFileManager;
use merkledb::aggregate_hashes::cas_node_hash;
use merklehash::MerkleHash;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Semaphore};
use utils::progress::ProgressUpdater;
use utils::ThreadPool;

use crate::cas_interface::create_cas_client;
use crate::clean::Cleaner;
use crate::configurations::*;
use crate::constants::MAX_CONCURRENT_XORB_UPLOADS;
use crate::errors::*;
use crate::parallel_xorb_uploader::{ParallelXorbUploader, XorbUpload};
use crate::remote_shard_interface::RemoteShardInterface;
use crate::shard_interface::create_shard_manager;
use crate::PointerFile;

lazy_static! {
pub static ref XORB_UPLOAD_RATE_LIMITER: Arc<Semaphore> = Arc::new(Semaphore::new(*MAX_CONCURRENT_XORB_UPLOADS));
}

#[derive(Default, Debug)]
pub struct CASDataAggregator {
pub(crate) struct CASDataAggregator {
/// Bytes of all chunks accumulated in one CAS block concatenated together.
pub data: Vec<u8>,
/// Metadata of all chunks accumulated in one CAS block. Each entry is
Expand Down Expand Up @@ -60,6 +65,7 @@ pub struct PointerFileTranslator {
shard_manager: Arc<ShardFileManager>,
remote_shards: Arc<RemoteShardInterface>,
cas: Arc<dyn Client + Send + Sync>,
xorb_uploader: Arc<dyn XorbUpload + Send + Sync>,

/* ----- Deduped data shared across files ----- */
global_cas_data: Arc<Mutex<CASDataAggregator>>,
Expand Down Expand Up @@ -101,11 +107,21 @@ impl PointerFileTranslator {
}
};

let xorb_uploader = ParallelXorbUploader::new(
&config.cas_storage_config.prefix,
shard_manager.clone(),
cas_client.clone(),
XORB_UPLOAD_RATE_LIMITER.clone(),
threadpool.clone(),
)
.await;

Ok(Self {
config,
shard_manager,
remote_shards,
cas: cas_client,
xorb_uploader,
global_cas_data: Default::default(),
threadpool,
})
Expand All @@ -132,7 +148,7 @@ impl PointerFileTranslator {
dedup.repo_salt,
self.shard_manager.clone(),
self.remote_shards.clone(),
self.cas.clone(),
self.xorb_uploader.clone(),
self.global_cas_data.clone(),
buffer_size,
file_name,
Expand All @@ -144,46 +160,27 @@ impl PointerFileTranslator {
pub async fn finalize_cleaning(&self) -> Result<()> {
// flush accumulated CAS data.
let mut cas_data_accumulator = self.global_cas_data.lock().await;
let mut new_cas_data = take(cas_data_accumulator.deref_mut());
let new_cas_data = take(cas_data_accumulator.deref_mut());
drop(cas_data_accumulator); // Release the lock.

if !new_cas_data.is_empty() {
register_new_cas_block(
&mut new_cas_data,
&self.shard_manager,
&self.cas,
&self.config.cas_storage_config.prefix,
)
.await?;
self.xorb_uploader.register_new_cas_block(new_cas_data).await?;
}

debug_assert!(new_cas_data.is_empty());
self.xorb_uploader.flush().await?;

// flush accumulated memory shard.
self.shard_manager.flush().await?;

self.upload().await?;
self.upload_shards().await?;

Ok(())
}

async fn upload(&self) -> Result<()> {
async fn upload_shards(&self) -> Result<()> {
// First, get all the shards prepared and load them.
let merged_shards_jh = self.remote_shards.merge_shards()?;

// Make sure that all the uploads and everything are in a good state before proceeding with
// anything changing the remote repository.
//
// Waiting until the CAS uploads finish avoids the following scenario:
// 1. user 1 commit file A and push, but network drops after
// sync_notes_to_remote before uploading cas finishes.
// 2. user 2 tries to git add the same file A, which on filter pulls in
// the new notes, and file A is 100% deduped so no CAS blocks will be created,
// and push.
//
// This results in a bad repo state.
self.upload_cas().await?;

// Get a list of all the merged shards in order to upload them.
let merged_shards = merged_shards_jh.await??;

Expand All @@ -196,72 +193,6 @@ impl PointerFileTranslator {

Ok(())
}

async fn upload_cas(&self) -> Result<()> {
// We don't have staging client support yet.
Ok(())
}
}

/// Clean operation helpers
pub(crate) async fn register_new_cas_block(
cas_data: &mut CASDataAggregator,
shard_manager: &Arc<ShardFileManager>,
cas: &Arc<dyn Client + Send + Sync>,
cas_prefix: &str,
) -> Result<MerkleHash> {
let cas_hash = cas_node_hash(&cas_data.chunks[..]);

let raw_bytes_len = cas_data.data.len();

let metadata = CASChunkSequenceHeader::new(cas_hash, cas_data.chunks.len(), raw_bytes_len);

let mut pos = 0;
let chunks: Vec<_> = cas_data
.chunks
.iter()
.map(|(h, len)| {
let result = CASChunkSequenceEntry::new(*h, *len, pos);
pos += *len;
result
})
.collect();
let cas_info = MDBCASInfo { metadata, chunks };

pos = 0;
let chunk_boundaries = cas_data
.chunks
.iter()
.map(|(hash, len)| {
pos += *len;
(*hash, pos as u32)
})
.collect();

if !cas_info.chunks.is_empty() {
cas.put(cas_prefix, &cas_hash, take(&mut cas_data.data), chunk_boundaries)
.await?;

shard_manager.add_cas_block(cas_info).await?;
} else {
debug_assert_eq!(cas_hash, MerkleHash::default());
}

// Now register any new files as needed.
for (mut fi, chunk_hash_indices) in take(&mut cas_data.pending_file_info) {
for i in chunk_hash_indices {
debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default());
fi.segments[i].cas_hash = cas_hash;
}

shard_manager.add_file_reconstruction_info(fi).await?;
}

cas_data.data.clear();
cas_data.chunks.clear();
cas_data.pending_file_info.clear();

Ok(cas_hash)
}

/// Smudge operations
Expand Down
3 changes: 3 additions & 0 deletions data/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub enum DataProcessingError {
#[error("Clean task error: {0}")]
CleanTaskError(String),

#[error("Upload task error: {0}")]
UploadTaskError(String),

#[error("Internal error : {0}")]
InternalError(String),

Expand Down
1 change: 1 addition & 0 deletions data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod data_client;
mod data_processing;
pub mod errors;
mod metrics;
mod parallel_xorb_uploader;
mod pointer_file;
mod remote_shard_interface;
mod repo_salt;
Expand Down
Loading