From 6e24ce84310b32ea3f98389a5b540fbf90336c3f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 14 Sep 2023 12:26:54 +0100 Subject: [PATCH 1/4] Adaptive Row Block Size (#4812) --- arrow-row/src/lib.rs | 13 +++- arrow-row/src/variable.rs | 136 ++++++++++++++++++++++---------------- 2 files changed, 89 insertions(+), 60 deletions(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index b59d84061a8a..23bb7c6365ce 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -232,10 +232,11 @@ mod variable; /// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array /// encoded using a block based scheme described below. /// -/// The byte array is broken up into 32-byte blocks, each block is written in turn +/// The byte array is broken up into fixed-width blocks, each block is written in turn /// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes /// with `0_u8` and written to the output, followed by the un-padded length in bytes -/// of this final block as a `u8`. +/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent +/// blocks using a length of 32. /// /// Note the following example encodings use a block size of 4 bytes, /// as opposed to 32 bytes for brevity: @@ -1698,12 +1699,18 @@ mod tests { None, Some(vec![0_u8; 0]), Some(vec![0_u8; 6]), + Some(vec![0_u8; variable::MINI_BLOCK_SIZE]), + Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]), Some(vec![0_u8; variable::BLOCK_SIZE]), Some(vec![0_u8; variable::BLOCK_SIZE + 1]), Some(vec![1_u8; 6]), + Some(vec![1_u8; variable::MINI_BLOCK_SIZE]), + Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]), Some(vec![1_u8; variable::BLOCK_SIZE]), Some(vec![1_u8; variable::BLOCK_SIZE + 1]), Some(vec![0xFF_u8; 6]), + Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]), + Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]), Some(vec![0xFF_u8; variable::BLOCK_SIZE]), Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]), ])) as ArrayRef; @@ -2221,7 +2228,7 @@ mod tests { } for r in r2.iter() { - assert_eq!(r.data.len(), 34); + assert_eq!(r.data.len(), 10); } } diff --git a/arrow-row/src/variable.rs b/arrow-row/src/variable.rs index e9f6160bf43c..1143c345cb08 100644 --- a/arrow-row/src/variable.rs +++ b/arrow-row/src/variable.rs @@ -26,6 +26,12 @@ use arrow_schema::{DataType, SortOptions}; /// The block size of the variable length encoding pub const BLOCK_SIZE: usize = 32; +/// The first block is split into `MINI_BLOCK_COUNT` mini-blocks +pub const MINI_BLOCK_COUNT: usize = 4; + +/// The mini block size +pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT; + /// The continuation token pub const BLOCK_CONTINUATION: u8 = 0xFF; @@ -45,7 +51,10 @@ pub fn encoded_len(a: Option<&[u8]>) -> usize { #[inline] pub fn padded_length(a: Option) -> usize { match a { - Some(a) => 1 + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1), + Some(a) if a <= BLOCK_SIZE => { + 1 + ceil(a, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1) + } + Some(a) => MINI_BLOCK_COUNT + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1), None => 1, } } @@ -82,44 +91,23 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz 1 } Some(val) => { - let block_count = ceil(val.len(), BLOCK_SIZE); - let end_offset = 1 + block_count * (BLOCK_SIZE + 1); - let to_write = &mut out[..end_offset]; - // Write `2_u8` to demarcate as non-empty, non-null string - to_write[0] = NON_EMPTY_SENTINEL; - - let chunks = val.chunks_exact(BLOCK_SIZE); - let remainder = chunks.remainder(); - for (input, output) in chunks - .clone() - .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1)) - { - let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap(); - let out_block: &mut [u8; BLOCK_SIZE] = - (&mut output[..BLOCK_SIZE]).try_into().unwrap(); - - *out_block = *input; - - // Indicate that there are further blocks to follow - output[BLOCK_SIZE] = BLOCK_CONTINUATION; - } + out[0] = NON_EMPTY_SENTINEL; - if !remainder.is_empty() { - let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1); - to_write[start_offset..start_offset + remainder.len()] - .copy_from_slice(remainder); - *to_write.last_mut().unwrap() = remainder.len() as u8; + let len = if val.len() <= BLOCK_SIZE { + 1 + encode_blocks::(&mut out[1..], val) } else { - // We must overwrite the continuation marker written by the loop above - *to_write.last_mut().unwrap() = BLOCK_SIZE as u8; - } + let (initial, rem) = val.split_at(BLOCK_SIZE); + let offset = encode_blocks::(&mut out[1..], initial); + out[offset] = BLOCK_CONTINUATION; + 1 + offset + encode_blocks::(&mut out[1 + offset..], rem) + }; if opts.descending { // Invert bits - to_write.iter_mut().for_each(|v| *v = !*v) + out[..len].iter_mut().for_each(|v| *v = !*v) } - end_offset + len } None => { out[0] = null_sentinel(opts); @@ -128,8 +116,36 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz } } -/// Returns the number of bytes of encoded data -fn decoded_len(row: &[u8], options: SortOptions) -> usize { +#[inline] +fn encode_blocks(out: &mut [u8], val: &[u8]) -> usize { + let block_count = ceil(val.len(), SIZE); + let end_offset = block_count * (SIZE + 1); + let to_write = &mut out[..end_offset]; + + let chunks = val.chunks_exact(SIZE); + let remainder = chunks.remainder(); + for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) { + let input: &[u8; SIZE] = input.try_into().unwrap(); + let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap(); + + *out_block = *input; + + // Indicate that there are further blocks to follow + output[SIZE] = BLOCK_CONTINUATION; + } + + if !remainder.is_empty() { + let start_offset = (block_count - 1) * (SIZE + 1); + to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder); + *to_write.last_mut().unwrap() = remainder.len() as u8; + } else { + // We must overwrite the continuation marker written by the loop above + *to_write.last_mut().unwrap() = SIZE as u8; + } + end_offset +} + +fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize { let (non_empty_sentinel, continuation) = match options.descending { true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION), false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION), @@ -137,26 +153,46 @@ fn decoded_len(row: &[u8], options: SortOptions) -> usize { if row[0] != non_empty_sentinel { // Empty or null string - return 0; + return 1; } - let mut str_len = 0; + // Extracts the block length from the sentinel + let block_len = |sentinel: u8| match options.descending { + true => !sentinel as usize, + false => sentinel as usize, + }; + let mut idx = 1; + for _ in 0..MINI_BLOCK_COUNT { + let sentinel = row[idx + MINI_BLOCK_SIZE]; + if sentinel == continuation { + f(&row[idx..idx + MINI_BLOCK_SIZE]); + idx += MINI_BLOCK_SIZE + 1; + continue; + } + f(&row[idx..idx + block_len(sentinel)]); + return idx + MINI_BLOCK_SIZE + 1; + } + loop { let sentinel = row[idx + BLOCK_SIZE]; if sentinel == continuation { + f(&row[idx..idx + BLOCK_SIZE]); idx += BLOCK_SIZE + 1; - str_len += BLOCK_SIZE; continue; } - let block_len = match options.descending { - true => !sentinel, - false => sentinel, - }; - return str_len + block_len as usize; + f(&row[idx..idx + block_len(sentinel)]); + return idx + BLOCK_SIZE + 1; } } +/// Returns the number of bytes of encoded data +fn decoded_len(row: &[u8], options: SortOptions) -> usize { + let mut len = 0; + decode_blocks(row, options, |block| len += block.len()); + len +} + /// Decodes a binary array from `rows` with the provided `options` pub fn decode_binary( rows: &mut [&[u8]], @@ -176,22 +212,8 @@ pub fn decode_binary( let mut values = MutableBuffer::new(values_capacity); for row in rows { - let str_length = decoded_len(row, options); - let mut to_read = str_length; - let mut offset = 1; - while to_read >= BLOCK_SIZE { - to_read -= BLOCK_SIZE; - - values.extend_from_slice(&row[offset..offset + BLOCK_SIZE]); - offset += BLOCK_SIZE + 1; - } - - if to_read != 0 { - values.extend_from_slice(&row[offset..offset + to_read]); - offset += BLOCK_SIZE + 1; - } + let offset = decode_blocks(row, options, |b| values.extend_from_slice(b)); *row = &row[offset..]; - offsets.append(I::from_usize(values.len()).expect("offset overflow")) } From 8e3b88457534350eca4361d348050104d3449ce5 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 14 Sep 2023 15:48:20 +0100 Subject: [PATCH 2/4] Perf improvements --- arrow-array/src/types.rs | 2 ++ arrow-row/src/lib.rs | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index d79b32a991ed..7988fe9f6690 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -1368,12 +1368,14 @@ pub(crate) mod bytes { } impl ByteArrayNativeType for [u8] { + #[inline] unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self { b } } impl ByteArrayNativeType for str { + #[inline] unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self { std::str::from_utf8_unchecked(b) } diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 23bb7c6365ce..a11a1950bc07 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -238,8 +238,7 @@ mod variable; /// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent /// blocks using a length of 32. /// -/// Note the following example encodings use a block size of 4 bytes, -/// as opposed to 32 bytes for brevity: +/// Note the following example encodings use a block size of 4 bytes for brevity: /// /// ```text /// ┌───┬───┬───┬───┬───┬───┐ From fe93e7ab97cd627839f21374493ffe16aa509074 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 14 Sep 2023 18:19:46 +0100 Subject: [PATCH 3/4] Further tweaks --- arrow-row/src/variable.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/arrow-row/src/variable.rs b/arrow-row/src/variable.rs index 1143c345cb08..c8cd7ab1bdaf 100644 --- a/arrow-row/src/variable.rs +++ b/arrow-row/src/variable.rs @@ -165,24 +165,22 @@ fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> let mut idx = 1; for _ in 0..MINI_BLOCK_COUNT { let sentinel = row[idx + MINI_BLOCK_SIZE]; - if sentinel == continuation { - f(&row[idx..idx + MINI_BLOCK_SIZE]); - idx += MINI_BLOCK_SIZE + 1; - continue; + if sentinel != continuation { + f(&row[idx..idx + block_len(sentinel)]); + return idx + MINI_BLOCK_SIZE + 1; } - f(&row[idx..idx + block_len(sentinel)]); - return idx + MINI_BLOCK_SIZE + 1; + f(&row[idx..idx + MINI_BLOCK_SIZE]); + idx += MINI_BLOCK_SIZE + 1; } loop { let sentinel = row[idx + BLOCK_SIZE]; - if sentinel == continuation { - f(&row[idx..idx + BLOCK_SIZE]); - idx += BLOCK_SIZE + 1; - continue; + if sentinel != continuation { + f(&row[idx..idx + block_len(sentinel)]); + return idx + BLOCK_SIZE + 1; } - f(&row[idx..idx + block_len(sentinel)]); - return idx + BLOCK_SIZE + 1; + f(&row[idx..idx + BLOCK_SIZE]); + idx += BLOCK_SIZE + 1; } } From 8640ed929642e67b938b122c8536351981ef555e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 17 Sep 2023 11:00:45 +0100 Subject: [PATCH 4/4] Review feedback --- arrow-row/src/lib.rs | 2 +- arrow-row/src/variable.rs | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index a11a1950bc07..bd1dd7256240 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -236,7 +236,7 @@ mod variable; /// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes /// with `0_u8` and written to the output, followed by the un-padded length in bytes /// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent -/// blocks using a length of 32. +/// blocks using a length of 32, this is to reduce space amplification for small strings. /// /// Note the following example encodings use a block size of 4 bytes for brevity: /// diff --git a/arrow-row/src/variable.rs b/arrow-row/src/variable.rs index c8cd7ab1bdaf..6c9c4c43bca3 100644 --- a/arrow-row/src/variable.rs +++ b/arrow-row/src/variable.rs @@ -27,6 +27,8 @@ use arrow_schema::{DataType, SortOptions}; pub const BLOCK_SIZE: usize = 32; /// The first block is split into `MINI_BLOCK_COUNT` mini-blocks +/// +/// This helps to reduce the space amplification for small strings pub const MINI_BLOCK_COUNT: usize = 4; /// The mini block size @@ -54,6 +56,8 @@ pub fn padded_length(a: Option) -> usize { Some(a) if a <= BLOCK_SIZE => { 1 + ceil(a, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1) } + // Each miniblock ends with a 1 byte continuation, therefore add + // `(MINI_BLOCK_COUNT - 1)` additional bytes over non-miniblock size Some(a) => MINI_BLOCK_COUNT + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1), None => 1, } @@ -116,6 +120,7 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz } } +/// Writes `val` in `SIZE` blocks with the appropriate continuation tokens #[inline] fn encode_blocks(out: &mut [u8], val: &[u8]) -> usize { let block_count = ceil(val.len(), SIZE);