From 0d47d35c460902ac609b1a66cf00ccd4739698bf Mon Sep 17 00:00:00 2001 From: aleksandarmladenovic0330 Date: Wed, 26 Jun 2024 09:38:08 -0400 Subject: [PATCH 1/4] Exposed Bytes vtable --- nativelink-store/src/memory_store.rs | 20 +++++++++++++++++ nativelink-store/tests/memory_store_test.rs | 25 ++++++++++++++++++++- nativelink-util/src/evicting_map.rs | 21 ++++++++++++++++- 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index ac7fe3bd3..bb4051f62 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -39,12 +39,28 @@ use crate::cas_utils::is_zero_digest; #[derive(Clone)] pub struct BytesWrapper(Bytes); +impl BytesWrapper { + pub fn from_string(s: &str) -> Self { + BytesWrapper(Bytes::copy_from_slice(s.as_bytes())) + } +} + impl Debug for BytesWrapper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("BytesWrapper { -- Binary data -- }") } } +impl Drop for BytesWrapper { + fn drop(&mut self) { + print!( + "BytesWrapper dropped {}\n", + std::str::from_utf8(&self.0).unwrap() + ); + std::mem::drop(&self.0); + } +} + impl LenEntry for BytesWrapper { #[inline] fn len(&self) -> usize { @@ -154,6 +170,10 @@ impl MemoryStore { self.evicting_map.remove(&key.into_owned()).await } + pub async fn insert(&self, key: StoreKey<'static>, data: BytesWrapper) -> Option { + return self.evicting_map.insert(key, data).await; + } + /// Tells the store that a subscription has been dropped and gives an opportunity to clean up. fn remove_dropped_subscription(&self, key: StoreKey<'static>) { let mut subscriptions = self.subscriptions.write(); diff --git a/nativelink-store/tests/memory_store_test.rs b/nativelink-store/tests/memory_store_test.rs index 9cc6e50d6..a39cffeba 100644 --- a/nativelink-store/tests/memory_store_test.rs +++ b/nativelink-store/tests/memory_store_test.rs @@ -18,9 +18,10 @@ use std::pin::Pin; use bytes::{BufMut, Bytes, BytesMut}; use futures::poll; use memory_stats::memory_stats; +use nativelink_config::stores::EvictionPolicy; use nativelink_error::{Code, Error, ResultExt}; use nativelink_macro::nativelink_test; -use nativelink_store::memory_store::MemoryStore; +use nativelink_store::memory_store::{BytesWrapper, MemoryStore}; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::DigestInfo; use nativelink_util::spawn; @@ -36,6 +37,28 @@ const TOO_LONG_HASH: &str = "0123456789abcdef00000000000000000001000000000000012 const TOO_SHORT_HASH: &str = "100000000000000000000000000000000000000000000000000000000000001"; const INVALID_HASH: &str = "g111111111111111111111111111111111111111111111111111111111111111"; +#[nativelink_test] +async fn insert_drop_test() -> Result<(), Error> { + const VALUE1: &str = "13"; + const VALUE2: &str = "2345"; + let eviction_policy = EvictionPolicy { + max_bytes: 5, + evict_bytes: 0, + max_seconds: 0, + max_count: 2, + }; + let store = MemoryStore::new(&nativelink_config::stores::MemoryStore { + eviction_policy: Some(eviction_policy), + }); + store + .insert(StoreKey::new_str(VALUE1), BytesWrapper::from_string(VALUE1)) + .await; + store + .insert(StoreKey::new_str(VALUE2), BytesWrapper::from_string(VALUE2)) + .await; + Ok(()) +} + #[nativelink_test] async fn insert_one_item_then_update() -> Result<(), Error> { const VALUE1: &str = "13"; diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index c43883a12..0b0e50fb4 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -67,7 +67,25 @@ struct EvictionItem { } #[async_trait] -pub trait LenEntry: 'static { +impl LenEntry for EvictionItem { + #[inline] + fn len(&self) -> usize { + return self.data.len(); + } + + #[inline] + fn is_empty(&self) -> bool { + return self.data.is_empty(); + } + + #[inline] + async fn unref(&self) { + std::mem::drop(&self.data); + } +} + +#[async_trait] +pub trait LenEntry: 'static + Send + Sync { /// Length of referenced data. fn len(&self) -> usize; @@ -303,6 +321,7 @@ where .lru .pop_lru() .expect("Tried to peek() then pop() but failed"); + print!("Evicting item -- "); event!(Level::INFO, ?key, "Evicting",); state.remove(&key, &eviction_item, false).await; From f6546be296e56e0cef9e4ae1a4c1e6c4042101aa Mon Sep 17 00:00:00 2001 From: aleksdmladenovic Date: Mon, 22 Jul 2024 17:49:24 -0400 Subject: [PATCH 2/4] ISSUE:1044 Need feedback? --- nativelink-util/src/common.rs | 21 +++++++++++++++++++++ src/bin/nativelink.rs | 12 ++++++++++++ 2 files changed, 33 insertions(+) diff --git a/nativelink-util/src/common.rs b/nativelink-util/src/common.rs index f10612988..7d89c37b8 100644 --- a/nativelink-util/src/common.rs +++ b/nativelink-util/src/common.rs @@ -23,6 +23,7 @@ use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_proto::build::bazel::remote::execution::v2::Digest; use prost::Message; use serde::{Deserialize, Serialize}; +use std::process::Command; pub use crate::fs; @@ -212,3 +213,23 @@ pub fn encode_stream_proto(proto: &T) -> Result String { + let output = Command::new("git") + .args(&["describe", "--tags", "--abbrev=0"]) + .output() + .expect("Failed to execute git command"); + + String::from_utf8(output.stdout).expect("Invalid UTF-8 output").trim().to_string() +} + +// Utility to obtain the current hash of NativeLink. +pub fn get_hash() -> String { + let output = Command::new("git") + .args(&["rev-parse", "HEAD"]) + .output() + .expect("Failed to execute git command"); + + String::from_utf8(output.stdout).expect("Invalid UTF-8 output").trim().to_string() +} \ No newline at end of file diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 3be18cdce..bb7990fc8 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -72,6 +72,9 @@ use tonic::transport::Server as TonicServer; use tower::util::ServiceExt; use tracing::{error_span, event, trace_span, Level}; +use nativelink_util::common::get_hash; +use nativelink_util::common::get_version; + #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -881,6 +884,15 @@ async fn get_config() -> Result> { fn main() -> Result<(), Box> { init_tracing()?; + //Testing example--> + + event!( + Level::WARN, + "Git version and Git hash {}, {}", + get_version(), + get_hash() + ); + let mut cfg = futures::executor::block_on(get_config())?; let (mut metrics_enabled, max_blocking_threads) = { From 9ee73d4a7b3a27ed55b8f9cf1c3ae8109c157bce Mon Sep 17 00:00:00 2001 From: aleksdmladenovic Date: Mon, 22 Jul 2024 23:24:34 -0400 Subject: [PATCH 3/4] ISSUE:490 Hope your feedback. --- nativelink-service/src/bytestream_server.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/nativelink-service/src/bytestream_server.rs b/nativelink-service/src/bytestream_server.rs index 043b0238b..e97967391 100644 --- a/nativelink-service/src/bytestream_server.rs +++ b/nativelink-service/src/bytestream_server.rs @@ -310,7 +310,20 @@ impl ByteStreamServer { Ok(bytes) => { if bytes.is_empty() { // EOF. - return Some((Ok(response), None)); + event!(Level::WARN, "EOF"); + // Instead of returning immediately, wait for the `get_part_fut` to complete. + let get_part_result = if let Some(result) = state.maybe_get_part_result.take() { + result + } else { + // Wrapping in an async block to use await + async { state.get_part_fut.await }.await + }; + event!(Level::WARN, "COMPLETE"); + + match get_part_result { + Ok(_) => return Some((Ok(response), None)), + Err(e) => return Some((Err(e.into()), None)), + } } if bytes.len() > state.max_bytes_per_stream { let err = make_err!(Code::Internal, "Returned store size was larger than read size"); From ee96d3e2e6056ac1130418909f5b7fed987c60a7 Mon Sep 17 00:00:00 2001 From: aleksdmladenovic Date: Mon, 22 Jul 2024 23:36:08 -0400 Subject: [PATCH 4/4] ISSUE:692 Occur errors on dedup_store_test, first please update source code Give me feedback --- nativelink-store/src/dedup_store.rs | 12 +- nativelink-store/tests/dedup_store_test.rs | 61 ++++- nativelink-util/BUILD.bazel | 7 + nativelink-util/src/lib.rs | 57 +++-- nativelink-util/src/ultracdc.rs | 260 +++++++++++++++++++++ nativelink-util/tests/ultracdc_test.rs | 216 +++++++++++++++++ 6 files changed, 587 insertions(+), 26 deletions(-) create mode 100644 nativelink-util/src/ultracdc.rs create mode 100644 nativelink-util/tests/ultracdc_test.rs diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index 88478408e..f5e51d1c6 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -1,4 +1,4 @@ -// Copyright 2023 The NativeLink Authors. All rights reserved. +// Copyright 2024 The NativeLink Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; -use nativelink_util::fastcdc::FastCDC; +use nativelink_util::ultracdc::UltraCDC; use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; use serde::{Deserialize, Serialize}; @@ -46,7 +46,7 @@ pub struct DedupIndex { pub struct DedupStore { index_store: Store, content_store: Store, - fast_cdc_decoder: FastCDC, + ultra_cdc_decoder: UltraCDC, max_concurrent_fetch_per_get: usize, bincode_options: WithOtherIntEncoding, } @@ -80,7 +80,7 @@ impl DedupStore { Arc::new(Self { index_store, content_store, - fast_cdc_decoder: FastCDC::new(min_size, normal_size, max_size), + ultra_cdc_decoder: UltraCDC::new(min_size, normal_size, max_size), max_concurrent_fetch_per_get, bincode_options: DefaultOptions::new().with_fixint_encoding(), }) @@ -172,9 +172,9 @@ impl StoreDriver for DedupStore { _size_info: UploadSizeInfo, ) -> Result<(), Error> { let mut bytes_reader = StreamReader::new(reader); - let frame_reader = FramedRead::new(&mut bytes_reader, self.fast_cdc_decoder.clone()); + let frame_reader = FramedRead::new(&mut bytes_reader, self.ultra_cdc_decoder.clone()); let index_entries = frame_reader - .map(|r| r.err_tip(|| "Failed to decode frame from fast_cdc")) + .map(|r| r.err_tip(|| "Failed to decode frame from ultra_cdc")) .map_ok(|frame| async move { let hash = blake3::hash(&frame[..]).into(); let index_entry = DigestInfo::new(hash, frame.len() as i64); diff --git a/nativelink-store/tests/dedup_store_test.rs b/nativelink-store/tests/dedup_store_test.rs index 97cb90e68..15750df1f 100644 --- a/nativelink-store/tests/dedup_store_test.rs +++ b/nativelink-store/tests/dedup_store_test.rs @@ -1,4 +1,4 @@ -// Copyright 2023 The NativeLink Authors. All rights reserved. +// Copyright 2024 The NativeLink Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,15 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::io::Cursor; +use aws_smithy_runtime::client::http::test_util::dvr::Event; +use futures::stream::StreamExt; use nativelink_error::{Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::dedup_store::DedupStore; use nativelink_store::memory_store::MemoryStore; use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::{Store, StoreLike}; +use nativelink_util::ultracdc::UltraCDC; use pretty_assertions::assert_eq; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; +use tokio_util::codec::FramedRead; +use tracing::{event, Level}; + +use tokio::io::AsyncRead; +use tokio_util::codec::Decoder; +use bytes::Bytes; +use sha2::{Digest, Sha256}; fn make_default_config() -> nativelink_config::stores::DedupStore { nativelink_config::stores::DedupStore { @@ -77,6 +89,16 @@ async fn simple_round_trip_test() -> Result<(), Error> { Ok(()) } +async fn get_frames( + frame_reader: &mut FramedRead, +) -> Result, D::Error> { + let mut frames = vec![]; + while let Some(frame) = frame_reader.next().await { + frames.push(frame?); + } + Ok(frames) +} + #[nativelink_test] async fn check_missing_last_chunk_test() -> Result<(), Error> { let content_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); @@ -91,15 +113,37 @@ async fn check_missing_last_chunk_test() -> Result<(), Error> { let original_data = make_random_data(MEGABYTE_SZ); let digest = DigestInfo::try_new(VALID_HASH1, MEGABYTE_SZ).unwrap(); + let mut cursor = Cursor::new(&original_data); + + // Print the hash and lenght for chunks data + // { + // let mut frame_reader = FramedRead::new(&mut cursor, UltraCDC::new(8 * 1024, 32 * 1024, 128 * 1024)); + // let frames: Vec = get_frames(&mut frame_reader).await?; + + // let mut frames_map = HashMap::new(); + // let mut pos = 0; + // for frame in frames { + // let frame_len = frame.len(); + // let key = blake3::hash(&frame[..]); + // event!(Level::WARN, "key: {}, len: {}", key, frame_len); + // pos += frame_len; + // } + // frames_map + // }; + store .update_oneshot(digest, original_data.into()) .await .err_tip(|| "Failed to write data to dedup store")?; - + // This is the hash & size of the last chunk item in the content_store. const LAST_CHUNK_HASH: &str = - "7c8608f5b079bef66c45bd67f7d8ede15d2e1830ea38fd8ad4c6de08b6f21a0c"; - const LAST_CHUNK_SIZE: usize = 25779; + "3706c7d79d112ed209a391ae6854b6824cb4ab4f3976e5b46c1f65a4dff0487a"; + const LAST_CHUNK_SIZE: usize = 5460; + + let data = content_store.has(digest).await; + + event!(Level::WARN, ?data, "Data"); let did_delete = content_store .remove_entry( @@ -341,9 +385,18 @@ async fn has_checks_content_store() -> Result<(), Error> { let size_info = store.has(digest2).await.err_tip(|| "Failed to run .has")?; assert_eq!(size_info, Some(DATA2.len()), "Expected sizes to match"); } + + const DATA3: &str = "1234"; + let digest3 = DigestInfo::try_new(VALID_HASH2, DATA3.len()).unwrap(); + store + .update_oneshot(digest3, DATA3.into()) + .await + .err_tip(|| "Failed to write data to dedup store")?; + { // Check our first added entry is now invalid (because part of it was evicted). let size_info = store.has(digest1).await.err_tip(|| "Failed to run .has")?; + event!(Level::WARN, "Length of Origin Data {}", original_data.len()); assert_eq!( size_info, None, "Expected .has() to return None (not found)" diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index f6bfc494d..c2d72e685 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -11,6 +11,7 @@ rust_library( srcs = [ "src/action_messages.rs", "src/buf_channel.rs", + "src/chunked_stream.rs", "src/common.rs", "src/connection_manager.rs", "src/default_store_key_subscribe.rs", @@ -21,6 +22,7 @@ rust_library( "src/health_utils.rs", "src/lib.rs", "src/metrics_utils.rs", + "src/operation_state_manager.rs", "src/origin_context.rs", "src/platform_properties.rs", "src/proto_stream_utils.rs", @@ -29,6 +31,7 @@ rust_library( "src/store_trait.rs", "src/task.rs", "src/tls_utils.rs", + "src/ultracdc.rs", "src/write_counter.rs", ], proc_macro_deps = [ @@ -40,14 +43,17 @@ rust_library( "//nativelink-error", "//nativelink-proto", "@crates//:async-lock", + "@crates//:bitflags", "@crates//:blake3", "@crates//:bytes", "@crates//:console-subscriber", "@crates//:futures", "@crates//:hex", "@crates//:hyper", + "@crates//:hyper-util", "@crates//:lru", "@crates//:parking_lot", + "@crates//:pin-project", "@crates//:pin-project-lite", "@crates//:prometheus-client", "@crates//:prost", @@ -77,6 +83,7 @@ rust_test_suite( "tests/proto_stream_utils_test.rs", "tests/resource_info_test.rs", "tests/retry_test.rs", + "tests/ultracdc_test.rs", ], compile_data = [ "tests/data/SekienAkashita.jpg", diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 717985274..ac4cc7ef3 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2023 The NativeLink Authors. All rights reserved. +// Copyright 2024 The NativeLink Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ pub mod action_messages; pub mod buf_channel; +pub mod chunked_stream; pub mod common; pub mod connection_manager; pub mod default_store_key_subscribe; @@ -23,6 +24,7 @@ pub mod fastcdc; pub mod fs; pub mod health_utils; pub mod metrics_utils; +pub mod operation_state_manager; pub mod origin_context; pub mod platform_properties; pub mod proto_stream_utils; @@ -31,6 +33,7 @@ pub mod retry; pub mod store_trait; pub mod task; pub mod tls_utils; +pub mod ultracdc; pub mod write_counter; // Re-export tracing mostly for use in macros. @@ -53,22 +56,44 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> { .with_default_directive(tracing::metadata::LevelFilter::WARN.into()) .from_env_lossy(); + // Setup tracing logger for multiple format types, compact, json, and pretty as a single layer. + // Configuration for log format comes from environment variable NL_LOG_FMT due to subscribers + // being configured before config parsing. + let nl_log_fmt = std::env::var("NL_LOG").unwrap_or_else(|_| "pretty".to_string()); + // Layers vector is used for due to how tracing_subscriber::fmt::layer builds type signature + // not being able to unify a single trait type before being boxed. For example see + // https://docs.rs/tracing-subscriber/0.3.18/tracing_subscriber/layer/index.html + let mut layers = Vec::new(); + match nl_log_fmt.as_str() { + "compact" => layers.push( + tracing_subscriber::fmt::layer() + .compact() + .with_timer(tracing_subscriber::fmt::time::time()) + .with_filter(env_filter) + .boxed(), + ), + "json" => layers.push( + tracing_subscriber::fmt::layer() + .json() + .with_timer(tracing_subscriber::fmt::time::time()) + .with_filter(env_filter) + .boxed(), + ), + _ => layers.push( + tracing_subscriber::fmt::layer() + .pretty() + .with_timer(tracing_subscriber::fmt::time::time()) + .with_filter(env_filter) + .boxed(), + ), + }; + + // Add a console subscriber if the feature is enabled, see tokio-console for a client console. + // https://crates.io/crates/tokio-console if cfg!(feature = "enable_tokio_console") { - tracing_subscriber::registry() - .with(console_subscriber::spawn()) - .with( - tracing_subscriber::fmt::layer() - .pretty() - .with_timer(tracing_subscriber::fmt::time::time()) - .with_filter(env_filter), - ) - .init(); - } else { - tracing_subscriber::fmt() - .pretty() - .with_timer(tracing_subscriber::fmt::time::time()) - .with_env_filter(env_filter) - .init(); + layers.push(console_subscriber::spawn().boxed()); } + + tracing_subscriber::registry().with(layers).init(); Ok(()) } diff --git a/nativelink-util/src/ultracdc.rs b/nativelink-util/src/ultracdc.rs new file mode 100644 index 000000000..a9d374680 --- /dev/null +++ b/nativelink-util/src/ultracdc.rs @@ -0,0 +1,260 @@ +// Copyright 2023 Nathan Fiedler, Trace Machina, Inc. Some rights reserved. +// Originally licensed under MIT license. +// +// This implementation is heavily based on: +// https://github.com/PlakarLabs/go-cdc-chunkers/blob/main/chunkers/ultracdc/ultracdc.go + +use bytes::{BufMut, Bytes, BytesMut}; +use tokio_util::codec::Decoder; + +struct State { + hash: u32, + position: usize, +} + +impl State { + fn reset(&mut self) { + self.hash = 0; + self.position = 0; + } +} + +/// This algorithm will take an input stream and build frames based on FastCDC algorithm. +/// see: +/// +/// In layman's terms this means we can take an input of unknown size and composition +/// and chunk it into smaller chunks with chunk boundaries that will be very similar +/// even when a small part of the file is changed. This use cases where a large file +/// (say 1G) has a few minor changes weather they byte inserts, deletes or updates +/// and when running through this algorithm it will slice the file up so that under +/// normal conditions all the chunk boundaries will be identical except the ones near +/// the mutations. +/// +/// This is not very useful on it's own, but is extremely useful because we can pair +/// this together with a hash algorithm (like sha256) to then hash each chunk and +/// then check to see if we already have the sha256 somewhere before attempting to +/// upload the file. If the file does exist, we can skip the chunk and continue then +/// in the index file we can reference the same chunk that already exists. +/// +/// Or put simply, it helps upload only the parts of the files that change, instead +/// of the entire file. +pub struct UltraCDC { + min_size: usize, + avg_size: usize, + max_size: usize, + + norm_size: usize, + mask_easy: u32, + mask_hard: u32, + + state: State, +} + +impl UltraCDC { + pub fn new(min_size: usize, avg_size: usize, max_size: usize) -> Self { + assert!(min_size < avg_size, "Expected {min_size} < {avg_size}"); + assert!(avg_size < max_size, "Expected {avg_size} < {max_size}"); + let norm_size = { + let mut offset = min_size + ((min_size + 1) / 2); + if offset > avg_size { + offset = avg_size; + } + avg_size - offset + }; + // Calculate the number of bits closest approximating our average. + let bits = (avg_size as f64).log2().round() as u32; + Self { + min_size, + avg_size, + max_size, + + norm_size, + // Turn our bits into a bitmask we can use later on for more + // efficient bitwise operations. + mask_hard: 2u32.pow(bits + 1) - 1, + mask_easy: 2u32.pow(bits - 1) - 1, + + state: State { + hash: 0, + position: 0, + }, + } + } + + pub fn rolling_hash(&self, data: &BytesMut, window_size: usize) -> u64 { + // Implement a simple rolling hash function + let mut hash_val = 0u64; + for (i, &byte) in data.iter().enumerate().take(window_size) { + hash_val += (byte as u64) << (i & 0x1F); + } + hash_val + } +} + +impl Decoder for UltraCDC { + type Item = Bytes; + type Error = std::io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + if buf.len() <= self.min_size { + return Ok(None); + } + // Zero means not found. + let mut split_point = 0; + + let window_size = 64; + + // Note: We use this kind of loop because it improved performance of this loop by 20%. + let mut i = 0; + let mut chunk = BytesMut::new(); + + while i < buf.len() { + let byte = buf[i] as usize; + chunk.put_u8(buf[i]); + + self.state.hash = (self.state.hash >> 1) + TABLE[byte]; + + // If we are < norm_size we start using the harder bit mask and if we go + // beyond the norm_size start using the more difficult bit mask. + let mask = if i < self.norm_size { + self.mask_hard as u64 + } else { + self.mask_easy as u64 + }; + + if chunk.len() >= self.min_size { + // Calculate the rolling hash value + let chunk_len = chunk.len(); + + let bytes_mut = BytesMut::from(&chunk[chunk_len - window_size.min(chunk_len)..]); + let rolling_hash_value = self.rolling_hash( + &bytes_mut, + window_size, + ) as u64; + + if (rolling_hash_value & mask) == mask || chunk.len() >= self.max_size { + split_point = i; + break; + } + } + i += 1; + } + + if split_point >= self.min_size { + self.state.reset(); + debug_assert!( + split_point <= self.max_size, + "Expected {} < {}", + split_point, + self.max_size + ); + return Ok(Some(buf.split_to(split_point).freeze())); + } + self.state.position = split_point; + if self.state.position == 0 { + self.state.position = buf.len(); + } + + Ok(None) + + + + } + + + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + match self.decode(buf)? { + Some(frame) => Ok(Some(frame)), + // If we are EOF and have no more bytes in stream return the entire buffer. + None => { + self.state.reset(); + if buf.is_empty() { + // If our buffer is empty we don't have any more data. + return Ok(None); + } + Ok(Some(buf.split().freeze())) + } + } + } +} + +impl Clone for UltraCDC { + /// Clone configuration but with new state. This is useful where you can create + /// a base FastCDC object then clone it when you want to process a new stream. + fn clone(&self) -> Self { + Self { + min_size: self.min_size, + avg_size: self.avg_size, + max_size: self.max_size, + + norm_size: self.norm_size, + mask_easy: self.mask_easy, + mask_hard: self.mask_hard, + + state: State { + hash: 0, + position: 0, + }, + } + } +} + +// +// TABLE contains seemingly "random" numbers which are created by ciphering a +// 1024-byte array of all zeros using a 32-byte key and 16-byte nonce (a.k.a. +// initialization vector) of all zeroes. The high bit of each value is cleared +// because 31-bit integers are immune from signed 32-bit integer overflow, which +// the implementation above relies on for hashing. +// +// While this may seem to be effectively noise, it is predictable noise, so the +// results are always the same. That is the most important aspect of the +// content-defined chunking algorithm, consistent results over time. +// +// Note: These values are based on: +// https://github.com/nlfiedler/fastcdc-rs/blob/1e804fe27444e37b2c4f93d540f861d170c8a257/src/lib.rs#L250 +#[rustfmt::skip] +const TABLE: [u32; 256] = [ + 0x5c95_c078, 0x2240_8989, 0x2d48_a214, 0x1284_2087, 0x530f_8afb, 0x4745_36b9, + 0x2963_b4f1, 0x44cb_738b, 0x4ea7_403d, 0x4d60_6b6e, 0x074e_c5d3, 0x3af3_9d18, + 0x7260_03ca, 0x37a6_2a74, 0x51a2_f58e, 0x7506_358e, 0x5d4a_b128, 0x4d4a_e17b, + 0x41e8_5924, 0x470c_36f7, 0x4741_cbe1, 0x01bb_7f30, 0x617c_1de3, 0x2b0c_3a1f, + 0x50c4_8f73, 0x21a8_2d37, 0x6095_ace0, 0x4191_67a0, 0x3caf_49b0, 0x40ce_a62d, + 0x66bc_1c66, 0x545e_1dad, 0x2bfa_77cd, 0x6e85_da24, 0x5fb0_bdc5, 0x652c_fc29, + 0x3a0a_e1ab, 0x2837_e0f3, 0x6387_b70e, 0x1317_6012, 0x4362_c2bb, 0x66d8_f4b1, + 0x37fc_e834, 0x2c9c_d386, 0x2114_4296, 0x6272_68a8, 0x650d_f537, 0x2805_d579, + 0x3b21_ebbd, 0x7357_ed34, 0x3f58_b583, 0x7150_ddca, 0x7362_225e, 0x620a_6070, + 0x2c5e_f529, 0x7b52_2466, 0x768b_78c0, 0x4b54_e51e, 0x75fa_07e5, 0x06a3_5fc6, + 0x30b7_1024, 0x1c86_26e1, 0x296a_d578, 0x28d7_be2e, 0x1490_a05a, 0x7cee_43bd, + 0x698b_56e3, 0x09dc_0126, 0x4ed6_df6e, 0x02c1_bfc7, 0x2a59_ad53, 0x29c0_e434, + 0x7d6c_5278, 0x5079_40a7, 0x5ef6_ba93, 0x68b6_af1e, 0x4653_7276, 0x611b_c766, + 0x155c_587d, 0x301b_a847, 0x2cc9_dda7, 0x0a43_8e2c, 0x0a69_d514, 0x744c_72d3, + 0x4f32_6b9b, 0x7ef3_4286, 0x4a0e_f8a7, 0x6ae0_6ebe, 0x669c_5372, 0x1240_2dcb, + 0x5fea_e99d, 0x76c7_f4a7, 0x6abd_b79c, 0x0dfa_a038, 0x20e2_282c, 0x730e_d48b, + 0x069d_ac2f, 0x168e_cf3e, 0x2610_e61f, 0x2c51_2c8e, 0x15fb_8c06, 0x5e62_bc76, + 0x6955_5135, 0x0adb_864c, 0x4268_f914, 0x349a_b3aa, 0x20ed_fdb2, 0x5172_7981, + 0x37b4_b3d8, 0x5dd1_7522, 0x6b2c_bfe4, 0x5c47_cf9f, 0x30fa_1ccd, 0x23de_db56, + 0x13d1_f50a, 0x64ed_dee7, 0x0820_b0f7, 0x46e0_7308, 0x1e2d_1dfd, 0x17b0_6c32, + 0x2500_36d8, 0x284d_bf34, 0x6829_2ee0, 0x362e_c87c, 0x087c_b1eb, 0x76b4_6720, + 0x1041_30db, 0x7196_6387, 0x482d_c43f, 0x2388_ef25, 0x5241_44e1, 0x44bd_834e, + 0x448e_7da3, 0x3fa6_eaf9, 0x3cda_215c, 0x3a50_0cf3, 0x395c_b432, 0x5195_129f, + 0x4394_5f87, 0x5186_2ca4, 0x56ea_8ff1, 0x2010_34dc, 0x4d32_8ff5, 0x7d73_a909, + 0x6234_d379, 0x64cf_bf9c, 0x36f6_589a, 0x0a2c_e98a, 0x5fe4_d971, 0x03bc_15c5, + 0x4402_1d33, 0x16c1_932b, 0x3750_3614, 0x1aca_f69d, 0x3f03_b779, 0x49e6_1a03, + 0x1f52_d7ea, 0x1c6d_dd5c, 0x0622_18ce, 0x07e7_a11a, 0x1905_757a, 0x7ce0_0a53, + 0x49f4_4f29, 0x4bcc_70b5, 0x39fe_ea55, 0x5242_cee8, 0x3ce5_6b85, 0x00b8_1672, + 0x46be_eccc, 0x3ca0_ad56, 0x2396_cee8, 0x7854_7f40, 0x6b08_089b, 0x66a5_6751, + 0x781e_7e46, 0x1e2c_f856, 0x3bc1_3591, 0x494a_4202, 0x5204_94d7, 0x2d87_459a, + 0x7575_55b6, 0x4228_4cc1, 0x1f47_8507, 0x75c9_5dff, 0x35ff_8dd7, 0x4e47_57ed, + 0x2e11_f88c, 0x5e1b_5048, 0x420e_6699, 0x226b_0695, 0x4d16_79b4, 0x5a22_646f, + 0x161d_1131, 0x125c_68d9, 0x1313_e32e, 0x4aa8_5724, 0x21dc_7ec1, 0x4ffa_29fe, + 0x7296_8382, 0x1ca8_eef3, 0x3f3b_1c28, 0x39c2_fb6c, 0x6d76_493f, 0x7a22_a62e, + 0x789b_1c2a, 0x16e0_cb53, 0x7dec_eeeb, 0x0dc7_e1c6, 0x5c75_bf3d, 0x5221_8333, + 0x106d_e4d6, 0x7dc6_4422, 0x6559_0ff4, 0x2c02_ec30, 0x64a9_ac67, 0x59ca_b2e9, + 0x4a21_d2f3, 0x0f61_6e57, 0x23b5_4ee8, 0x0273_0aaa, 0x2f3c_634d, 0x7117_fc6c, + 0x01ac_6f05, 0x5a9e_d20c, 0x158c_4e2a, 0x42b6_99f0, 0x0c7c_14b3, 0x02bd_9641, + 0x15ad_56fc, 0x1c72_2f60, 0x7da1_af91, 0x23e0_dbcb, 0x0e93_e12b, 0x64b2_791d, + 0x440d_2476, 0x588e_a8dd, 0x4665_a658, 0x7446_c418, 0x1877_a774, 0x5626_407e, + 0x7f63_bd46, 0x32d2_dbd8, 0x3c79_0f4a, 0x772b_7239, 0x6f8b_2826, 0x677f_f609, + 0x0dc8_2c11, 0x23ff_e354, 0x2eac_53a6, 0x1613_9e09, 0x0afd_0dbc, 0x2a4d_4237, + 0x56a3_68c7, 0x2343_25e4, 0x2dce_9187, 0x32e8_ea7e +]; diff --git a/nativelink-util/tests/ultracdc_test.rs b/nativelink-util/tests/ultracdc_test.rs new file mode 100644 index 000000000..6f2dab1bc --- /dev/null +++ b/nativelink-util/tests/ultracdc_test.rs @@ -0,0 +1,216 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::io::Cursor; + +use bytes::Bytes; +use futures::stream::StreamExt; +use nativelink_macro::nativelink_test; +use nativelink_util::ultracdc::UltraCDC; +use pretty_assertions::assert_eq; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use sha2::{Digest, Sha256}; +use tokio::io::AsyncRead; +use tokio_util::codec::{Decoder, FramedRead}; + +const MEGABYTE_SZ: usize = 1024 * 1024; + +/// Extracts the frames of the reader into a vector. +async fn get_frames( + frame_reader: &mut FramedRead, +) -> Result, D::Error> { + let mut frames = vec![]; + while let Some(frame) = frame_reader.next().await { + frames.push(frame?); + } + Ok(frames) +} + +#[nativelink_test] +async fn test_all_zeros() -> Result<(), std::io::Error> { + // For all zeros, always returns chunks of maximum size. + const ZERO_DATA: [u8; 10240] = [0u8; 10240]; + let mut cursor = Cursor::new(&ZERO_DATA); + let mut frame_reader = FramedRead::new(&mut cursor, UltraCDC::new(64, 256, 1024)); + + let mut total_data = 0; + while let Some(frame) = frame_reader.next().await { + let frame = frame?; + assert_eq!(frame.len(), 1024); + total_data += frame.len(); + } + assert_eq!(ZERO_DATA.len(), total_data); + Ok(()) +} + +#[nativelink_test] +async fn test_sekien_16k_chunks() -> Result<(), Box> { + let contents = include_bytes!("data/SekienAkashita.jpg"); + let mut cursor = Cursor::new(&contents); + let mut frame_reader = FramedRead::new(&mut cursor, UltraCDC::new(8192, 16384, 32768)); + + let mut frames = vec![]; + let mut sum_frame_len = 0; + while let Some(frame) = frame_reader.next().await { + let frame = frame?; + sum_frame_len += frame.len(); + frames.push(frame); + } + + assert_eq!(frames.len(), 9); + assert_eq!(frames[0].len(), 10926); + assert_eq!(frames[1].len(), 17398); + assert_eq!(frames[2].len(), 12026); + assert_eq!(frames[3].len(), 16883); + assert_eq!(frames[4].len(), 12264); + assert_eq!(frames[5].len(), 10269); + assert_eq!(frames[6].len(), 11570); + assert_eq!(frames[7].len(), 11491); + assert_eq!(frames[8].len(), 6639); + assert_eq!(sum_frame_len, contents.len()); + Ok(()) +} + +#[nativelink_test] +async fn test_random_20mb_16k_chunks() -> Result<(), std::io::Error> { + let data = { + let mut data = vec![0u8; 20 * MEGABYTE_SZ]; + let mut rng = SmallRng::seed_from_u64(1); + rng.fill(&mut data[..]); + data + }; + let mut cursor = Cursor::new(&data); + let mut frame_reader = FramedRead::new(&mut cursor, UltraCDC::new(1024, 2048, 4096)); + + let mut lens = vec![]; + for frame in get_frames(&mut frame_reader).await? { + lens.push(frame.len()); + } + lens.sort(); + Ok(()) +} + +#[nativelink_test] +async fn insert_garbage_check_boundarys_recover_test() -> Result<(), std::io::Error> { + let mut rand_data = { + let mut data = vec![0u8; 100_000]; + let mut rng = SmallRng::seed_from_u64(1); + rng.fill(&mut data[..]); + data + }; + + let fast_cdc = UltraCDC::new(1024, 2048, 4096); + let left_frames = { + let mut frame_reader = FramedRead::new(Cursor::new(&rand_data), fast_cdc.clone()); + let frames: Vec = get_frames(&mut frame_reader).await?; + + let mut frames_map = HashMap::new(); + let mut pos = 0; + for frame in frames { + let frame_len = frame.len(); + frames_map.insert(hex::encode(Sha256::digest(&frame[..])), (frame, pos)); + pos += frame_len; + } + frames_map + }; + + { + // Replace 2k of bytes and append one byte to middle. + let mut rng = SmallRng::seed_from_u64(2); + rng.fill(&mut rand_data[0..2000]); + rand_data.insert(rand_data.len() / 2, 0x71); + } + + let mut right_frames = { + let mut frame_reader = FramedRead::new(Cursor::new(&rand_data), fast_cdc.clone()); + let frames: Vec = get_frames(&mut frame_reader).await?; + + let mut frames_map = HashMap::new(); + let mut pos = 0; + for frame in frames { + let frame_len = frame.len(); + frames_map.insert(hex::encode(Sha256::digest(&frame[..])), (frame, pos)); + pos += frame_len; + } + frames_map + }; + + let mut expected_missing_hashes = HashSet::::new(); + { + expected_missing_hashes + .insert("3461d18541f76cbaf93921775c75f1cae6a907883753c1b6ca0d5d06996d9b20".into()); + expected_missing_hashes + .insert("74a991a6dfe28f57a367c9c2ba5920dd922c321ab4be46c066cc2a255c886d63".into()); + expected_missing_hashes + .insert("f1c367e2648ac8b3a77af867a786a51803f96c8ce33ec8ab19fc466d92fade57".into()); + expected_missing_hashes + .insert("c304bd66045d69171ecf1124b6d2d32801fcf74e242c38e9bf9d54c242d60e33".into()); + + for key in left_frames.keys() { + let maybe_right_frame = right_frames.get(key); + if maybe_right_frame.is_none() { + assert_eq!( + expected_missing_hashes.contains(key), + true, + "Expected to find: {}", + key + ); + expected_missing_hashes.remove(key); + } else { + right_frames.remove(key); + } + } + } + let mut expected_new_hashes = HashMap::::new(); + { + expected_new_hashes.insert( + "9e9891cc82822f6ed325efa5d749d25f7c65dade324e5fc7fe5b206d1c89d642".into(), + 0, + ); + expected_new_hashes.insert( + "6cc0066e9f85f21cd36fd130666fd95f3ed2398f46725438cf3225c23b773c94".into(), + 1235, + ); + + expected_new_hashes.insert( + "b355bd47dd26b1ce40caa2df54d3b39ee766d9c556dccc44971f8315fe0da66a".into(), + 3087, + ); + expected_new_hashes.insert( + "e1140aa52c81cf9055129574d4749d622c33599768e96b1e4e3cdc8ecb9e5c7d".into(), + 49940, + ); + + + for (key, (_, pos)) in right_frames { + let expected_pos = expected_new_hashes.get(&key); + assert_eq!(Some(&pos), expected_pos, "For hash {}", key); + expected_new_hashes.remove(&key); + } + } + assert_eq!( + expected_missing_hashes.len(), + 0, + "Some old hashes were not found" + ); + assert_eq!( + expected_new_hashes.len(), + 0, + "Some new hashes were not consumed" + ); + + Ok(()) +}