Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive Row Block Size (#4812) #4818

Merged
merged 5 commits into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arrow-array/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1368,12 +1368,14 @@ pub(crate) mod bytes {
}

impl ByteArrayNativeType for [u8] {
#[inline]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline(always)? -- at some point I thought that saying 'always' was required to get cross crate inlining to work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That hasn't been my experience, I believe #[inline] makes the data available to be inlined should LLVM think it a good idea, with '[inline(always)] only necessary when LLVM doesn't think it a good idea (for whatever reason).

unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
b
}
}

impl ByteArrayNativeType for str {
#[inline]
unsafe fn from_bytes_unchecked(b: &[u8]) -> &Self {
std::str::from_utf8_unchecked(b)
}
Expand Down
16 changes: 11 additions & 5 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,13 @@ mod variable;
/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
/// encoded using a block based scheme described below.
///
/// The byte array is broken up into 32-byte blocks, each block is written in turn
/// The byte array is broken up into fixed-width blocks, each block is written in turn
/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
/// with `0_u8` and written to the output, followed by the un-padded length in bytes
/// of this final block as a `u8`.
/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
/// blocks using a length of 32.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to explain the rationale for using smaller blocks up front (to avoid space wastage for smaller stings)

///
/// Note the following example encodings use a block size of 4 bytes,
/// as opposed to 32 bytes for brevity:
/// Note the following example encodings use a block size of 4 bytes for brevity:
///
/// ```text
/// ┌───┬───┬───┬───┬───┬───┐
Expand Down Expand Up @@ -1698,12 +1698,18 @@ mod tests {
None,
Some(vec![0_u8; 0]),
Some(vec![0_u8; 6]),
Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
Some(vec![0_u8; variable::BLOCK_SIZE]),
Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
Some(vec![1_u8; 6]),
Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
Some(vec![1_u8; variable::BLOCK_SIZE]),
Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
Some(vec![0xFF_u8; 6]),
Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
])) as ArrayRef;
Expand Down Expand Up @@ -2221,7 +2227,7 @@ mod tests {
}

for r in r2.iter() {
assert_eq!(r.data.len(), 34);
assert_eq!(r.data.len(), 10);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice demonstration of how this will reduce the impact of #4811. Whilst 10 is still worse than the 3 for dictionaries, it is a lot better than 34 😅

}
}

Expand Down
140 changes: 80 additions & 60 deletions arrow-row/src/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ use arrow_schema::{DataType, SortOptions};
/// The block size of the variable length encoding
pub const BLOCK_SIZE: usize = 32;

/// The first block is split into `MINI_BLOCK_COUNT` mini-blocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this would be a good place to add comments about the why for miniblocks.

pub const MINI_BLOCK_COUNT: usize = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if using a MINI_BLOCK count of 2 might get most of the memory savings but have lower CPU overhead 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find this to yield a meaningful performance delta

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for trying


/// The mini block size
pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;

/// The continuation token
pub const BLOCK_CONTINUATION: u8 = 0xFF;

Expand All @@ -45,7 +51,10 @@ pub fn encoded_len(a: Option<&[u8]>) -> usize {
#[inline]
pub fn padded_length(a: Option<usize>) -> usize {
match a {
Some(a) => 1 + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
Some(a) if a <= BLOCK_SIZE => {
1 + ceil(a, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1)
}
Some(a) => MINI_BLOCK_COUNT + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why the padded length gets a MINI_BLOCK_COUNT on it. Is it because each miniblock contains 1 byte continuation?

Maybe a comment would help

None => 1,
}
}
Expand Down Expand Up @@ -82,44 +91,23 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz
1
}
Some(val) => {
let block_count = ceil(val.len(), BLOCK_SIZE);
let end_offset = 1 + block_count * (BLOCK_SIZE + 1);
let to_write = &mut out[..end_offset];

// Write `2_u8` to demarcate as non-empty, non-null string
to_write[0] = NON_EMPTY_SENTINEL;

let chunks = val.chunks_exact(BLOCK_SIZE);
let remainder = chunks.remainder();
for (input, output) in chunks
.clone()
.zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1))
{
let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap();
let out_block: &mut [u8; BLOCK_SIZE] =
(&mut output[..BLOCK_SIZE]).try_into().unwrap();

*out_block = *input;

// Indicate that there are further blocks to follow
output[BLOCK_SIZE] = BLOCK_CONTINUATION;
}
out[0] = NON_EMPTY_SENTINEL;

if !remainder.is_empty() {
let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1);
to_write[start_offset..start_offset + remainder.len()]
.copy_from_slice(remainder);
*to_write.last_mut().unwrap() = remainder.len() as u8;
let len = if val.len() <= BLOCK_SIZE {
1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
} else {
// We must overwrite the continuation marker written by the loop above
*to_write.last_mut().unwrap() = BLOCK_SIZE as u8;
}
let (initial, rem) = val.split_at(BLOCK_SIZE);
let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
out[offset] = BLOCK_CONTINUATION;
1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
};

if opts.descending {
// Invert bits
to_write.iter_mut().for_each(|v| *v = !*v)
out[..len].iter_mut().for_each(|v| *v = !*v)
}
end_offset
len
}
None => {
out[0] = null_sentinel(opts);
Expand All @@ -128,35 +116,81 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz
}
}

/// Returns the number of bytes of encoded data
fn decoded_len(row: &[u8], options: SortOptions) -> usize {
#[inline]
fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we could add a comment here explaining what this writes (specifically continuation tokens vs BLOCK_CONTINUATION)

let block_count = ceil(val.len(), SIZE);
let end_offset = block_count * (SIZE + 1);
let to_write = &mut out[..end_offset];

let chunks = val.chunks_exact(SIZE);
let remainder = chunks.remainder();
for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
let input: &[u8; SIZE] = input.try_into().unwrap();
let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();

*out_block = *input;

// Indicate that there are further blocks to follow
output[SIZE] = BLOCK_CONTINUATION;
}

if !remainder.is_empty() {
let start_offset = (block_count - 1) * (SIZE + 1);
to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
*to_write.last_mut().unwrap() = remainder.len() as u8;
} else {
// We must overwrite the continuation marker written by the loop above
*to_write.last_mut().unwrap() = SIZE as u8;
}
end_offset
}

fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
let (non_empty_sentinel, continuation) = match options.descending {
true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
};

if row[0] != non_empty_sentinel {
// Empty or null string
return 0;
return 1;
}

let mut str_len = 0;
// Extracts the block length from the sentinel
let block_len = |sentinel: u8| match options.descending {
true => !sentinel as usize,
false => sentinel as usize,
};

let mut idx = 1;
for _ in 0..MINI_BLOCK_COUNT {
let sentinel = row[idx + MINI_BLOCK_SIZE];
if sentinel != continuation {
f(&row[idx..idx + block_len(sentinel)]);
return idx + MINI_BLOCK_SIZE + 1;
}
f(&row[idx..idx + MINI_BLOCK_SIZE]);
idx += MINI_BLOCK_SIZE + 1;
}

loop {
let sentinel = row[idx + BLOCK_SIZE];
if sentinel == continuation {
idx += BLOCK_SIZE + 1;
str_len += BLOCK_SIZE;
continue;
if sentinel != continuation {
f(&row[idx..idx + block_len(sentinel)]);
return idx + BLOCK_SIZE + 1;
}
let block_len = match options.descending {
true => !sentinel,
false => sentinel,
};
return str_len + block_len as usize;
f(&row[idx..idx + BLOCK_SIZE]);
idx += BLOCK_SIZE + 1;
}
}

/// Returns the number of bytes of encoded data
fn decoded_len(row: &[u8], options: SortOptions) -> usize {
let mut len = 0;
decode_blocks(row, options, |block| len += block.len());
len
}

/// Decodes a binary array from `rows` with the provided `options`
pub fn decode_binary<I: OffsetSizeTrait>(
rows: &mut [&[u8]],
Expand All @@ -176,22 +210,8 @@ pub fn decode_binary<I: OffsetSizeTrait>(
let mut values = MutableBuffer::new(values_capacity);

for row in rows {
let str_length = decoded_len(row, options);
let mut to_read = str_length;
let mut offset = 1;
while to_read >= BLOCK_SIZE {
to_read -= BLOCK_SIZE;

values.extend_from_slice(&row[offset..offset + BLOCK_SIZE]);
offset += BLOCK_SIZE + 1;
}

if to_read != 0 {
values.extend_from_slice(&row[offset..offset + to_read]);
offset += BLOCK_SIZE + 1;
}
let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
*row = &row[offset..];

offsets.append(I::from_usize(values.len()).expect("offset overflow"))
}

Expand Down
Loading