Skip to content

Commit

Permalink
refactor: add docs and do some code sanding
Browse files Browse the repository at this point in the history
  • Loading branch information
aatifsyed committed Jul 18, 2023
1 parent 62459ff commit 213c80c
Showing 1 changed file with 36 additions and 22 deletions.
58 changes: 36 additions & 22 deletions src/car_backed_blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,7 @@ fn zstd_compress_fold_varint_frame(
let mut header = unsigned_varint::encode::usize_buffer();
encoder
.write_all(unsigned_varint::encode::usize(body.len(), &mut header))
.expect("BytesMut has infallible IO");
encoder
.write_all(&body)
.and_then(|_| encoder.write_all(&body))
.expect("BytesMut has infallible IO");
encoder
}
Expand Down Expand Up @@ -766,7 +764,7 @@ pub async fn write_car(
match format {
CarFormat::V1Plain => {
stream::once(future::ready(Ok(uncompressed_v1_header(roots))))
.chain(blocks.map_ok(|(cid, ipld)| encode_concat_cid_and_ipld(cid, ipld)))
.chain(blocks.map_ok(|(cid, ipld)| concat_cid_and_block_data(cid, ipld)))
.forward(FramedWrite06::new(writer, VarintFrameCodec::default()))
.await
}
Expand All @@ -776,7 +774,7 @@ pub async fn write_car(
} => {
try_collate(
stream::once(future::ready(Ok(uncompressed_v1_header(roots))))
.chain(blocks.map_ok(|(cid, ipld)| encode_concat_cid_and_ipld(cid, ipld))),
.chain(blocks.map_ok(|(cid, ipld)| concat_cid_and_block_data(cid, ipld))),
varint_to_zstd_frame_collator(zstd_frame_size_tripwire, zstd_compression_level),
zstd_compress_finish,
)
Expand Down Expand Up @@ -819,7 +817,7 @@ async fn write_manyframe_and_create_index(
let mut zstd_frame_offset = u64::try_from(header.len()).unwrap();
let mut index = AIndexMap::default();
let zstd_frames = try_collate(
blocks.map_ok(|(cid, ipld)| encode_concat_cid_and_ipld(cid, ipld)),
blocks.map_ok(|(cid, ipld)| concat_cid_and_block_data(cid, ipld)),
varint_to_zstd_frame_collator(zstd_frame_size_tripwire, zstd_compression_level),
zstd_compress_finish,
)
Expand Down Expand Up @@ -851,6 +849,13 @@ async fn write_manyframe_and_create_index(
Ok(index)
}

/// Suitable for placing into a varint frame
///
/// ```text
/// β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
/// β”‚car headerβ”‚
/// β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
/// ```
fn uncompressed_v1_header(roots: Vec<Cid>) -> BytesMut {
let mut buffer = BytesMut::new();
let header = CarHeader { roots, version: 1 };
Expand All @@ -860,7 +865,29 @@ fn uncompressed_v1_header(roots: Vec<Cid>) -> BytesMut {
buffer
}

/// Store the header in its own varint frame
/// Suitable for placing into a varint frame
///
/// ```text
/// β”Œβ”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
/// β”‚cidβ”‚block dataβ”‚
/// β””β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
/// ```
fn concat_cid_and_block_data(cid: Cid, ipld: Vec<u8>) -> BytesMut {
let mut buffer = BytesMut::new();
cid.write_bytes((&mut buffer).writer())
.expect("BytesMut has infallible IO");
buffer.extend(ipld);
buffer
}

/// Store the header in its own varint frame, and compress it in a zstd frame
/// ```text
/// β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
/// β”‚ zstd frame β”‚
/// β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
/// β”‚body lengthβ”‚car headerβ”‚
/// β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
/// ```
fn compressed_v1_header_varint(roots: Vec<Cid>, zstd_compression_level: u16) -> BytesMut {
let mut compressor =
zstd::Encoder::new(BytesMut::new().writer(), i32::from(zstd_compression_level))
Expand All @@ -874,25 +901,12 @@ fn compressed_v1_header_varint(roots: Vec<Cid>, zstd_compression_level: u16) ->
let len = unsigned_varint::encode::usize(header.len(), &mut len_buffer);
compressor
.write_all(len)
.expect("BytesMut has infallible IO");
compressor
.write_all(&header)
.expect("BytesMut has infallible IO");
compressor
.finish()
.and_then(|_| compressor.write_all(&header))
.and_then(|_| compressor.finish())
.expect("BytesMut has infallible IO")
.into_inner()
}

// TODO(aatifsyed): don't actually need to take Vec<u8>..
fn encode_concat_cid_and_ipld(cid: Cid, ipld: Vec<u8>) -> BytesMut {
let mut buffer = BytesMut::new();
cid.write_bytes((&mut buffer).writer())
.expect("BytesMut has infallible IO");
buffer.extend(ipld);
buffer
}

#[cfg(test)]
mod tests {

Expand Down

0 comments on commit 213c80c

Please sign in to comment.