Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolved Bytes vtable #1057

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,20 @@ impl ByteStreamServer {
Ok(bytes) => {
if bytes.is_empty() {
// EOF.
return Some((Ok(response), None));
event!(Level::WARN, "EOF");
// Instead of returning immediately, wait for the `get_part_fut` to complete.
let get_part_result = if let Some(result) = state.maybe_get_part_result.take() {
result
} else {
// Wrapping in an async block to use await
async { state.get_part_fut.await }.await
};
event!(Level::WARN, "COMPLETE");

match get_part_result {
Ok(_) => return Some((Ok(response), None)),
Err(e) => return Some((Err(e.into()), None)),
}
}
if bytes.len() > state.max_bytes_per_stream {
let err = make_err!(Code::Internal, "Returned store size was larger than read size");
Expand Down
12 changes: 6 additions & 6 deletions nativelink-store/src/dedup_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The NativeLink Authors. All rights reserved.
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,7 @@ use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::fastcdc::FastCDC;
use nativelink_util::ultracdc::UltraCDC;
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
use serde::{Deserialize, Serialize};
Expand All @@ -46,7 +46,7 @@ pub struct DedupIndex {
pub struct DedupStore {
index_store: Store,
content_store: Store,
fast_cdc_decoder: FastCDC,
ultra_cdc_decoder: UltraCDC,
max_concurrent_fetch_per_get: usize,
bincode_options: WithOtherIntEncoding<DefaultOptions, FixintEncoding>,
}
Expand Down Expand Up @@ -80,7 +80,7 @@ impl DedupStore {
Arc::new(Self {
index_store,
content_store,
fast_cdc_decoder: FastCDC::new(min_size, normal_size, max_size),
ultra_cdc_decoder: UltraCDC::new(min_size, normal_size, max_size),
max_concurrent_fetch_per_get,
bincode_options: DefaultOptions::new().with_fixint_encoding(),
})
Expand Down Expand Up @@ -172,9 +172,9 @@ impl StoreDriver for DedupStore {
_size_info: UploadSizeInfo,
) -> Result<(), Error> {
let mut bytes_reader = StreamReader::new(reader);
let frame_reader = FramedRead::new(&mut bytes_reader, self.fast_cdc_decoder.clone());
let frame_reader = FramedRead::new(&mut bytes_reader, self.ultra_cdc_decoder.clone());
let index_entries = frame_reader
.map(|r| r.err_tip(|| "Failed to decode frame from fast_cdc"))
.map(|r| r.err_tip(|| "Failed to decode frame from ultra_cdc"))
.map_ok(|frame| async move {
let hash = blake3::hash(&frame[..]).into();
let index_entry = DigestInfo::new(hash, frame.len() as i64);
Expand Down
20 changes: 20 additions & 0 deletions nativelink-store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,28 @@ use crate::cas_utils::is_zero_digest;
#[derive(Clone)]
pub struct BytesWrapper(Bytes);

impl BytesWrapper {
pub fn from_string(s: &str) -> Self {
BytesWrapper(Bytes::copy_from_slice(s.as_bytes()))
}
}

impl Debug for BytesWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("BytesWrapper { -- Binary data -- }")
}
}

impl Drop for BytesWrapper {
fn drop(&mut self) {
print!(
"BytesWrapper dropped {}\n",
std::str::from_utf8(&self.0).unwrap()
);
std::mem::drop(&self.0);
}
}

impl LenEntry for BytesWrapper {
#[inline]
fn len(&self) -> usize {
Expand Down Expand Up @@ -154,6 +170,10 @@ impl MemoryStore {
self.evicting_map.remove(&key.into_owned()).await
}

pub async fn insert(&self, key: StoreKey<'static>, data: BytesWrapper) -> Option<BytesWrapper> {
return self.evicting_map.insert(key, data).await;
}

/// Tells the store that a subscription has been dropped and gives an opportunity to clean up.
fn remove_dropped_subscription(&self, key: StoreKey<'static>) {
let mut subscriptions = self.subscriptions.write();
Expand Down
61 changes: 57 additions & 4 deletions nativelink-store/tests/dedup_store_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The NativeLink Authors. All rights reserved.
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,15 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::io::Cursor;
use aws_smithy_runtime::client::http::test_util::dvr::Event;
use futures::stream::StreamExt;
use nativelink_error::{Code, Error, ResultExt};
use nativelink_macro::nativelink_test;
use nativelink_store::dedup_store::DedupStore;
use nativelink_store::memory_store::MemoryStore;
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::{Store, StoreLike};
use nativelink_util::ultracdc::UltraCDC;
use pretty_assertions::assert_eq;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use tokio_util::codec::FramedRead;
use tracing::{event, Level};

use tokio::io::AsyncRead;
use tokio_util::codec::Decoder;
use bytes::Bytes;
use sha2::{Digest, Sha256};

fn make_default_config() -> nativelink_config::stores::DedupStore {
nativelink_config::stores::DedupStore {
Expand Down Expand Up @@ -77,6 +89,16 @@ async fn simple_round_trip_test() -> Result<(), Error> {
Ok(())
}

async fn get_frames<T: AsyncRead + Unpin, D: Decoder>(
frame_reader: &mut FramedRead<T, D>,
) -> Result<Vec<D::Item>, D::Error> {
let mut frames = vec![];
while let Some(frame) = frame_reader.next().await {
frames.push(frame?);
}
Ok(frames)
}

#[nativelink_test]
async fn check_missing_last_chunk_test() -> Result<(), Error> {
let content_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default());
Expand All @@ -91,15 +113,37 @@ async fn check_missing_last_chunk_test() -> Result<(), Error> {
let original_data = make_random_data(MEGABYTE_SZ);
let digest = DigestInfo::try_new(VALID_HASH1, MEGABYTE_SZ).unwrap();

let mut cursor = Cursor::new(&original_data);

// Print the hash and lenght for chunks data
// {
// let mut frame_reader = FramedRead::new(&mut cursor, UltraCDC::new(8 * 1024, 32 * 1024, 128 * 1024));
// let frames: Vec<Bytes> = get_frames(&mut frame_reader).await?;

// let mut frames_map = HashMap::new();
// let mut pos = 0;
// for frame in frames {
// let frame_len = frame.len();
// let key = blake3::hash(&frame[..]);
// event!(Level::WARN, "key: {}, len: {}", key, frame_len);
// pos += frame_len;
// }
// frames_map
// };

store
.update_oneshot(digest, original_data.into())
.await
.err_tip(|| "Failed to write data to dedup store")?;

// This is the hash & size of the last chunk item in the content_store.
const LAST_CHUNK_HASH: &str =
"7c8608f5b079bef66c45bd67f7d8ede15d2e1830ea38fd8ad4c6de08b6f21a0c";
const LAST_CHUNK_SIZE: usize = 25779;
"3706c7d79d112ed209a391ae6854b6824cb4ab4f3976e5b46c1f65a4dff0487a";
const LAST_CHUNK_SIZE: usize = 5460;

let data = content_store.has(digest).await;

event!(Level::WARN, ?data, "Data");

let did_delete = content_store
.remove_entry(
Expand Down Expand Up @@ -341,9 +385,18 @@ async fn has_checks_content_store() -> Result<(), Error> {
let size_info = store.has(digest2).await.err_tip(|| "Failed to run .has")?;
assert_eq!(size_info, Some(DATA2.len()), "Expected sizes to match");
}

const DATA3: &str = "1234";
let digest3 = DigestInfo::try_new(VALID_HASH2, DATA3.len()).unwrap();
store
.update_oneshot(digest3, DATA3.into())
.await
.err_tip(|| "Failed to write data to dedup store")?;

{
// Check our first added entry is now invalid (because part of it was evicted).
let size_info = store.has(digest1).await.err_tip(|| "Failed to run .has")?;
event!(Level::WARN, "Length of Origin Data {}", original_data.len());
assert_eq!(
size_info, None,
"Expected .has() to return None (not found)"
Expand Down
25 changes: 24 additions & 1 deletion nativelink-store/tests/memory_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use std::pin::Pin;
use bytes::{BufMut, Bytes, BytesMut};
use futures::poll;
use memory_stats::memory_stats;
use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{Code, Error, ResultExt};
use nativelink_macro::nativelink_test;
use nativelink_store::memory_store::MemoryStore;
use nativelink_store::memory_store::{BytesWrapper, MemoryStore};
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::common::DigestInfo;
use nativelink_util::spawn;
Expand All @@ -36,6 +37,28 @@ const TOO_LONG_HASH: &str = "0123456789abcdef00000000000000000001000000000000012
const TOO_SHORT_HASH: &str = "100000000000000000000000000000000000000000000000000000000000001";
const INVALID_HASH: &str = "g111111111111111111111111111111111111111111111111111111111111111";

#[nativelink_test]
async fn insert_drop_test() -> Result<(), Error> {
const VALUE1: &str = "13";
const VALUE2: &str = "2345";
let eviction_policy = EvictionPolicy {
max_bytes: 5,
evict_bytes: 0,
max_seconds: 0,
max_count: 2,
};
let store = MemoryStore::new(&nativelink_config::stores::MemoryStore {
eviction_policy: Some(eviction_policy),
});
store
.insert(StoreKey::new_str(VALUE1), BytesWrapper::from_string(VALUE1))
.await;
store
.insert(StoreKey::new_str(VALUE2), BytesWrapper::from_string(VALUE2))
.await;
Ok(())
}

#[nativelink_test]
async fn insert_one_item_then_update() -> Result<(), Error> {
const VALUE1: &str = "13";
Expand Down
7 changes: 7 additions & 0 deletions nativelink-util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rust_library(
srcs = [
"src/action_messages.rs",
"src/buf_channel.rs",
"src/chunked_stream.rs",
"src/common.rs",
"src/connection_manager.rs",
"src/default_store_key_subscribe.rs",
Expand All @@ -21,6 +22,7 @@ rust_library(
"src/health_utils.rs",
"src/lib.rs",
"src/metrics_utils.rs",
"src/operation_state_manager.rs",
"src/origin_context.rs",
"src/platform_properties.rs",
"src/proto_stream_utils.rs",
Expand All @@ -29,6 +31,7 @@ rust_library(
"src/store_trait.rs",
"src/task.rs",
"src/tls_utils.rs",
"src/ultracdc.rs",
"src/write_counter.rs",
],
proc_macro_deps = [
Expand All @@ -40,14 +43,17 @@ rust_library(
"//nativelink-error",
"//nativelink-proto",
"@crates//:async-lock",
"@crates//:bitflags",
"@crates//:blake3",
"@crates//:bytes",
"@crates//:console-subscriber",
"@crates//:futures",
"@crates//:hex",
"@crates//:hyper",
"@crates//:hyper-util",
"@crates//:lru",
"@crates//:parking_lot",
"@crates//:pin-project",
"@crates//:pin-project-lite",
"@crates//:prometheus-client",
"@crates//:prost",
Expand Down Expand Up @@ -77,6 +83,7 @@ rust_test_suite(
"tests/proto_stream_utils_test.rs",
"tests/resource_info_test.rs",
"tests/retry_test.rs",
"tests/ultracdc_test.rs",
],
compile_data = [
"tests/data/SekienAkashita.jpg",
Expand Down
21 changes: 21 additions & 0 deletions nativelink-util/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use nativelink_error::{make_input_err, Error, ResultExt};
use nativelink_proto::build::bazel::remote::execution::v2::Digest;
use prost::Message;
use serde::{Deserialize, Serialize};
use std::process::Command;

pub use crate::fs;

Expand Down Expand Up @@ -212,3 +213,23 @@ pub fn encode_stream_proto<T: Message>(proto: &T) -> Result<Bytes, Box<dyn std::

Ok(buf.freeze())
}

// Utility to obtain the version of NativeLink.
pub fn get_version() -> String {
let output = Command::new("git")
.args(&["describe", "--tags", "--abbrev=0"])
.output()
.expect("Failed to execute git command");

String::from_utf8(output.stdout).expect("Invalid UTF-8 output").trim().to_string()
}

// Utility to obtain the current hash of NativeLink.
pub fn get_hash() -> String {
let output = Command::new("git")
.args(&["rev-parse", "HEAD"])
.output()
.expect("Failed to execute git command");

String::from_utf8(output.stdout).expect("Invalid UTF-8 output").trim().to_string()
}
21 changes: 20 additions & 1 deletion nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,25 @@ struct EvictionItem<T: LenEntry + Debug> {
}

#[async_trait]
pub trait LenEntry: 'static {
impl<T: LenEntry + Debug> LenEntry for EvictionItem<T> {
#[inline]
fn len(&self) -> usize {
return self.data.len();
}

#[inline]
fn is_empty(&self) -> bool {
return self.data.is_empty();
}

#[inline]
async fn unref(&self) {
std::mem::drop(&self.data);
}
}

#[async_trait]
pub trait LenEntry: 'static + Send + Sync {
/// Length of referenced data.
fn len(&self) -> usize;

Expand Down Expand Up @@ -303,6 +321,7 @@ where
.lru
.pop_lru()
.expect("Tried to peek() then pop() but failed");
print!("Evicting item -- ");
event!(Level::INFO, ?key, "Evicting",);
state.remove(&key, &eviction_item, false).await;

Expand Down
Loading