From 746fb44f73d813f54e3a939d19d12d1e296a3eb9 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Sat, 18 May 2024 08:31:58 +0200 Subject: [PATCH 1/3] Set typesize in alternative constructors for Schunk --- src/lib.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9938d5a..00bf59b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -563,8 +563,9 @@ pub mod schunk { /// assert_eq!(chunk.info().unwrap().nbytes(), 10); /// ``` pub fn uninit(cparams: CParams, len: usize) -> Result { + let mut cparams = cparams; if mem::size_of::() != cparams.0.typesize as usize { - return Err("typesize mismatch between CParams and T".into()); + cparams.0.typesize = mem::size_of::() as _; } let mut dst = Vec::with_capacity( (len * cparams.0.typesize as usize) + ffi::BLOSC_EXTENDED_HEADER_LENGTH as usize, @@ -597,8 +598,9 @@ pub mod schunk { /// assert_eq!(chunk.decompress::().unwrap(), vec![0.123_f32, 0.123, 0.123, 0.123]); /// ``` pub fn repeatval(cparams: CParams, value: T, len: usize) -> Result { + let mut cparams = cparams; if mem::size_of::() != cparams.0.typesize as usize { - return Err("typesize mismatch between CParams and T".into()); + cparams.0.typesize = mem::size_of::() as _; } let mut dst = Vec::with_capacity( (len * cparams.0.typesize as usize) + ffi::BLOSC_EXTENDED_HEADER_LENGTH as usize, @@ -632,8 +634,9 @@ pub mod schunk { /// assert_eq!(chunk.info().unwrap().nbytes(), 40); // 10 elements * 4 bytes each /// ``` pub fn zeros(cparams: CParams, len: usize) -> Result { + let mut cparams = cparams; if mem::size_of::() != cparams.0.typesize as usize { - return Err("typesize mismatch between CParams and T".into()); + cparams.0.typesize = mem::size_of::() as _; } let mut dst = Vec::with_capacity( (len * cparams.0.typesize as usize) + ffi::BLOSC_EXTENDED_HEADER_LENGTH as usize, From cbb29ce307190facc0b382284aebe2c4e7b6bd9a Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Sat, 18 May 2024 08:32:24 +0200 Subject: [PATCH 2/3] impl Send for SChunk and basic test --- src/lib.rs | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 00bf59b..2711b62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -790,6 +790,8 @@ pub mod schunk { #[derive(Clone)] pub struct SChunk(pub(crate) Arc>); + unsafe impl Send for SChunk {} + // Loosely inspired by blosc2-python implementation impl SChunk { pub fn new(storage: Storage) -> Self { @@ -1232,8 +1234,7 @@ impl CParams { impl Default for CParams { #[inline] fn default() -> Self { - let mut cparams = unsafe { ffi::blosc2_get_blosc2_cparams_defaults() }; - cparams.typesize = 1; + let cparams = unsafe { ffi::blosc2_get_blosc2_cparams_defaults() }; Self(cparams) } } @@ -1898,6 +1899,38 @@ mod tests { Ok(()) } + #[test] + fn test_schunk_thread_shared() -> Result<()> { + let input = b"some data"; + let storage = schunk::Storage::default() + .set_contiguous(true) + .set_cparams(CParams::from(&input[0])) + .set_dparams(DParams::default()); + let mut schunk = schunk::SChunk::new(storage); + + schunk.append_buffer(input)?; + + let mut schunk2 = schunk.clone(); + std::thread::spawn(move || { + assert_eq!(schunk2.n_chunks(), 1); + schunk2.append_buffer(b"more data").unwrap(); + }) + .join() + .unwrap(); + + assert_eq!(schunk.n_chunks(), 2); + assert_eq!( + b"some data", + schunk.decompress_chunk_vec(0).unwrap().as_slice() + ); + assert_eq!( + b"more data", + schunk.decompress_chunk_vec(1).unwrap().as_slice() + ); + + Ok(()) + } + #[cfg(not(target_os = "windows"))] #[test] fn test_schunk_write() -> Result<()> { From 65c19584b4f79b8207a1852c159ce23ed599ff65 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Sat, 18 May 2024 11:35:37 +0200 Subject: [PATCH 3/3] Port c-blosc2 schunk_simple.c example --- .github/workflows/CI.yml | 9 +++++ examples/schunk_simple.rs | 83 +++++++++++++++++++++++++++++++++++++++ src/lib.rs | 12 +++--- 3 files changed, 99 insertions(+), 5 deletions(-) create mode 100644 examples/schunk_simple.rs diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 78e003d..39ea127 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -125,3 +125,12 @@ jobs: run: | cargo clean # ensure we're starting fresh, no funny business cargo test --no-default-features --features static -vv --release + + - name: Run examples + shell: bash + run: | + for example in $(find examples -name '*.rs'); do + example_name=$(basename $example .rs) + echo "------- Running example: ${example_name} -------" + cargo run --example $example_name + done diff --git a/examples/schunk_simple.rs b/examples/schunk_simple.rs new file mode 100644 index 0000000..951f91e --- /dev/null +++ b/examples/schunk_simple.rs @@ -0,0 +1,83 @@ +const KB: usize = 1024; +const MB: usize = 1024 * KB; + +const CHUNKSIZE: usize = 1_000 * 1_000; +const NCHUNKS: usize = 100; +const NTHREADS: usize = 4; + +fn main() { + blosc2::init(); + + let mut data = vec![0_i32; CHUNKSIZE]; + let mut data_dest = vec![0_i32; CHUNKSIZE]; + + println!( + "Blosc2 version info: {} ({})", + blosc2::get_version_string().unwrap(), + String::from_utf8(blosc2::BLOSC2_VERSION_DATE.to_vec()).unwrap() + ); + + // Create a super-chunk container + let cparams = blosc2::CParams::default() + .set_typesize::() + .set_clevel(blosc2::CLevel::Nine) + .set_nthreads(NTHREADS); + let dparams = blosc2::DParams::default().set_nthreads(NTHREADS); + let storage = blosc2::schunk::Storage::default() + .set_cparams(cparams) + .set_dparams(dparams); + let mut schunk = blosc2::schunk::SChunk::new(storage); + + let ttotal = std::time::Instant::now(); + for nchunk in 0..NCHUNKS { + for i in 0..CHUNKSIZE { + data[i] = (i * nchunk) as i32; + } + let nchunks = schunk.append_buffer(&data).unwrap(); + if nchunks != nchunk + 1 { + panic!( + "Unexpected nchunks! Got {}, but expected {}", + nchunks, + nchunk + 1 + ); + } + } + + println!( + "Compression ratio: {:.1} MB -> {:.1} MB ({:.1}x)", + schunk.nbytes() as f32 / MB as f32, + schunk.cbytes() as f32 / MB as f32, + schunk.compression_ratio() + ); + println!( + "Compression time: {:.3}s, {:.1}MB/s", + ttotal.elapsed().as_secs_f32(), + schunk.nbytes() as f32 / (ttotal.elapsed().as_secs_f32() * MB as f32) + ); + + // Retrieve and decompress the chunks (0-based count) + let ttotal = std::time::Instant::now(); + for nchunk in (0..(NCHUNKS - 1)).rev() { + // The error check in c-blosc2 example is removed b/c will have been raised thru .unwrap call + let dsize = schunk.decompress_chunk(nchunk, &mut data_dest).unwrap(); + assert_eq!(dsize, CHUNKSIZE); + } + println!( + "Decompression time: {:.3}s, {:.1}MB/s", + ttotal.elapsed().as_secs_f32(), + schunk.nbytes() as f32 / (ttotal.elapsed().as_secs_f32() * MB as f32) + ); + + // Check integrity of the second chunk (made of non-zeros) + schunk.decompress_chunk(1, &mut data_dest).unwrap(); + for i in 0..CHUNKSIZE { + if data_dest[i] != i as i32 { + panic!( + "Decompressed data differs from original {}, {}", + i as i32, data_dest[i] + ); + } + } + + blosc2::destroy(); +} diff --git a/src/lib.rs b/src/lib.rs index 2711b62..4ac0826 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,8 @@ use std::ffi::{c_void, CStr, CString}; use std::sync::Arc; use std::{io, mem}; +pub use blosc2_sys::BLOSC2_VERSION_DATE; + /// Result type used in this library pub type Result = std::result::Result; @@ -849,16 +851,16 @@ pub mod schunk { Ok(nchunks as _) } - /// Decompress a chunk, returning number of bytes written to output buffer + /// Decompress a chunk, returning number of elements of `T` written to output buffer #[inline] pub fn decompress_chunk(&mut self, nchunk: usize, dst: &mut [T]) -> Result { let chunk = Chunk::from_schunk(self, nchunk)?; let info = chunk.info()?; - if dst.len() < info.nbytes as usize { + if dst.len() * mem::size_of::() < info.nbytes as usize { let msg = format!( - "Not large enough, need {} but got {}", + "Not large enough, need {} bytes but got buffer w/ {} bytes of storage", info.nbytes, - dst.len() + dst.len() * mem::size_of::() ); return Err(msg.into()); } @@ -879,7 +881,7 @@ pub mod schunk { let msg = format!("Non-initialized error decompressing chunk '{}'", nchunk); return Err(msg.into()); } else { - Ok(size as _) + Ok((size / mem::size_of::() as i32) as _) } }