Skip to content

Commit

Permalink
refactor: combined capacity updates and decoding for all physical dec…
Browse files Browse the repository at this point in the history
…oders (#2508)

- Combines `update_capacity` and `decode_into` into a single function
`decode` for all physical (primitive) decoders (`BasicDecoder`,
`BinaryPageDecoder`, `ValuePageDecoder`, `FixedListDecoder`,
`BitmapDecoder`)
- As a result some decoders don't require explicit capacity allocation at a
certain level. Should enable decoders to have more flexibility in their
decode pipeline (required for dictionary encoding)
- Buffers built up from leaves of the encoding tree and combined instead
of being recursively passed down (like earlier)
- Rename PhysicalPageDecoder to PrimitivePageDecoder
  • Loading branch information
raunaks13 authored Jun 24, 2024
1 parent 1ee75d3 commit 95f98a9
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 214 deletions.
52 changes: 23 additions & 29 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,50 +1144,44 @@ impl BatchDecodeStream {
/// the decode task for batch 0 and the decode task for batch 1.
///
/// See [`crate::decoder`] for more information
pub trait PhysicalPageDecoder: Send + Sync {
/// Calculates and updates the capacity required to represent the requested data
pub trait PrimitivePageDecoder: Send + Sync {
/// Decode data into buffers
///
/// This may be a simple zero-copy from a disk buffer or could involve complex decoding
/// such as decompressing from some compressed representation.
///
/// Capacity is stored as a tuple of (num_bytes: u64, is_needed: bool). The `is_needed`
/// portion only needs to be updated if the encoding has some concept of an "optional"
/// buffer.
///
/// The decoder should look at `rows_to_skip` and `num_rows` and then calculate how
/// many bytes of data are needed. It should then update the first part of the tuple.
///
/// Note: Most encodings deal with a single buffer. They may have multiple input buffers
/// but they only have a single output buffer. The current exception to this rule is the
/// `basic` encoding which has an output "validity" buffer and an output "values" buffers.
/// We may find there are other such exceptions.
/// Encodings can have any number of input or output buffers. For example, a dictionary
/// decoding will convert two buffers (indices + dictionary) into a single buffer
///
/// # Arguments
/// Binary decodings have two output buffers (one for values, one for offsets)
///
/// * `rows_to_skip` - how many rows to skip (within the page) before decoding
/// * `num_rows` - how many rows to decode
/// * `buffers` - A mutable slice of "capacities" (as described above), one per buffer
/// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
fn update_capacity(
&self,
rows_to_skip: u32,
num_rows: u32,
buffers: &mut [(u64, bool)],
all_null: &mut bool,
);
/// Decodes the data into the requested buffers.
/// Other decodings could even expand the # of output buffers. For example, we could decode
/// fixed size strings into variable length strings going from one input buffer to multiple output
/// buffers.
///
/// You can assume that the capacity will have already been configured on the `BytesMut`
/// according to the capacity calculated in [`PhysicalPageDecoder::update_capacity`]
/// Each Arrow data type typically has a fixed structure of buffers and the encoding chain will
/// generally end at one of these structures. However, intermediate structures may exist which
/// do not correspond to any Arrow type at all. For example, a bitpacking encoding will deal
/// with buffers that have bits-per-value that is not a multiple of 8.
///
/// The `primitive_array_from_buffers` method has an expected buffer layout for each arrow
/// type (order matters) and encodings that aim to decode into arrow types should respect
/// this layout.
/// # Arguments
///
/// * `rows_to_skip` - how many rows to skip (within the page) before decoding
/// * `num_rows` - how many rows to decode
/// * `dest_buffers` - the output buffers to decode into
fn decode_into(
/// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
fn decode(
&self,
rows_to_skip: u32,
num_rows: u32,
dest_buffers: &mut [BytesMut],
) -> Result<()>;
all_null: &mut bool,
) -> Result<Vec<BytesMut>>;
fn num_buffers(&self) -> u32;
}

Expand Down Expand Up @@ -1217,7 +1211,7 @@ pub trait PageScheduler: Send + Sync + std::fmt::Debug {
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>>;
) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
}

/// Contains the context for a scheduler
Expand Down
45 changes: 12 additions & 33 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use lance_core::{Error, Result};
use crate::{
decoder::{
DecodeArrayTask, FieldScheduler, FilterExpression, LogicalPageDecoder, NextDecodeTask,
PageInfo, PageScheduler, PhysicalPageDecoder, ScheduledScanLine, SchedulerContext,
PageInfo, PageScheduler, PrimitivePageDecoder, ScheduledScanLine, SchedulerContext,
SchedulingJob,
},
encoder::{ArrayEncodingStrategy, EncodeTask, EncodedColumn, EncodedPage, FieldEncoder},
Expand Down Expand Up @@ -211,15 +211,15 @@ impl FieldScheduler for PrimitiveFieldScheduler {

pub struct PrimitiveFieldDecoder {
data_type: DataType,
unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>>>,
physical_decoder: Option<Arc<dyn PhysicalPageDecoder>>,
unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
num_rows: u32,
rows_drained: u32,
}

impl PrimitiveFieldDecoder {
pub fn new_from_data(
physical_decoder: Arc<dyn PhysicalPageDecoder>,
physical_decoder: Arc<dyn PrimitivePageDecoder>,
data_type: DataType,
num_rows: u32,
) -> Self {
Expand All @@ -246,46 +246,25 @@ impl Debug for PrimitiveFieldDecoder {
struct PrimitiveFieldDecodeTask {
rows_to_skip: u32,
rows_to_take: u32,
physical_decoder: Arc<dyn PhysicalPageDecoder>,
physical_decoder: Arc<dyn PrimitivePageDecoder>,
data_type: DataType,
}

impl DecodeArrayTask for PrimitiveFieldDecodeTask {
fn decode(self: Box<Self>) -> Result<ArrayRef> {
// We start by assuming that no buffers are required. The number of buffers needed is based
// on the data type. Most data types need two buffers but each layer of fixed-size-list, for
// example, adds another validity buffer
let mut capacities = vec![(0, false); self.physical_decoder.num_buffers() as usize];
let mut all_null = false;
self.physical_decoder.update_capacity(
self.rows_to_skip,
self.rows_to_take,
&mut capacities,
&mut all_null,
);

// The number of buffers needed is based on the data type.
// Most data types need two buffers but each layer of fixed-size-list, for
// example, adds another validity buffer.
let bufs =
self.physical_decoder
.decode(self.rows_to_skip, self.rows_to_take, &mut all_null)?;

if all_null {
return Ok(new_null_array(&self.data_type, self.rows_to_take as usize));
}

// At this point we know the size needed for each buffer
let mut bufs = capacities
.into_iter()
.map(|(num_bytes, is_needed)| {
// Only allocate the validity buffer if it is needed, otherwise we
// create an empty BytesMut (does not require allocation)
if is_needed {
BytesMut::with_capacity(num_bytes as usize)
} else {
BytesMut::default()
}
})
.collect::<Vec<_>>();

// Go ahead and fill the validity / values buffers
self.physical_decoder
.decode_into(self.rows_to_skip, self.rows_to_take, &mut bufs)?;

// Convert the two buffers into an Arrow array
Self::primitive_array_from_buffers(&self.data_type, bufs, self.rows_to_take)
}
Expand Down
69 changes: 29 additions & 40 deletions rust/lance-encoding/src/encodings/physical/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use std::sync::Arc;

use arrow_array::{ArrayRef, BooleanArray};
use arrow_buffer::BooleanBuffer;
use bytes::BytesMut;
use futures::{future::BoxFuture, FutureExt};
use log::trace;

use crate::{
decoder::{PageScheduler, PhysicalPageDecoder},
decoder::{PageScheduler, PrimitivePageDecoder},
encoder::{ArrayEncoder, BufferEncoder, EncodedArray, EncodedArrayBuffer},
format::pb,
EncodingsIo,
Expand All @@ -20,21 +21,21 @@ use lance_core::Result;
use super::buffers::BitmapBufferEncoder;

struct DataDecoders {
validity: Box<dyn PhysicalPageDecoder>,
values: Box<dyn PhysicalPageDecoder>,
validity: Box<dyn PrimitivePageDecoder>,
values: Box<dyn PrimitivePageDecoder>,
}

enum DataNullStatus {
// Neither validity nor values
All,
// Values only
None(Box<dyn PhysicalPageDecoder>),
None(Box<dyn PrimitivePageDecoder>),
// Validity and values
Some(DataDecoders),
}

impl DataNullStatus {
fn values_decoder(&self) -> Option<&dyn PhysicalPageDecoder> {
fn values_decoder(&self) -> Option<&dyn PrimitivePageDecoder> {
match self {
Self::All => None,
Self::Some(decoders) => Some(decoders.values.as_ref()),
Expand Down Expand Up @@ -124,7 +125,7 @@ impl PageScheduler for BasicPageScheduler {
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>> {
) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
let validity_future = match &self.mode {
SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None,
SchedulerNullStatus::Some(schedulers) => Some(schedulers.validity.schedule_ranges(
Expand Down Expand Up @@ -157,7 +158,7 @@ impl PageScheduler for BasicPageScheduler {
}
_ => unreachable!(),
};
Ok(Box::new(BasicPageDecoder { mode }) as Box<dyn PhysicalPageDecoder>)
Ok(Box::new(BasicPageDecoder { mode }) as Box<dyn PrimitivePageDecoder>)
}
.boxed()
}
Expand All @@ -167,51 +168,39 @@ struct BasicPageDecoder {
mode: DataNullStatus,
}

impl PhysicalPageDecoder for BasicPageDecoder {
fn update_capacity(
impl PrimitivePageDecoder for BasicPageDecoder {
fn decode(
&self,
rows_to_skip: u32,
num_rows: u32,
buffers: &mut [(u64, bool)],
all_null: &mut bool,
) {
// No need to look at the validity decoder to know the dest buffer size since it is boolean
buffers[0].0 = arrow_buffer::bit_util::ceil(num_rows as usize, 8) as u64;
// The validity buffer is only required if we have some nulls
buffers[0].1 = matches!(self.mode, DataNullStatus::Some(_));
if let Some(values) = self.mode.values_decoder() {
values.update_capacity(rows_to_skip, num_rows, &mut buffers[1..], all_null);
} else {
*all_null = true;
}
}

fn decode_into(
&self,
rows_to_skip: u32,
num_rows: u32,
dest_buffers: &mut [bytes::BytesMut],
) -> Result<()> {
match &self.mode {
) -> Result<Vec<BytesMut>> {
let dest_buffers = match &self.mode {
DataNullStatus::Some(decoders) => {
decoders
.validity
.decode_into(rows_to_skip, num_rows, &mut dest_buffers[..1])?;
decoders
.values
.decode_into(rows_to_skip, num_rows, &mut dest_buffers[1..])?;
let mut buffers = decoders.validity.decode(rows_to_skip, num_rows, all_null)?; // buffer 0
let mut values_bytesmut =
decoders.values.decode(rows_to_skip, num_rows, all_null)?; // buffer 1 onwards

buffers.append(&mut values_bytesmut);
buffers
}
// Either dest_buffers[0] is empty, in which case these are no-ops, or one of the
// other pages needed the buffer, in which case we need to fill our section
DataNullStatus::All => {
dest_buffers[0].fill(0);
let buffers = vec![BytesMut::default()];
*all_null = true;
buffers
}
DataNullStatus::None(values) => {
dest_buffers[0].fill(1);
values.decode_into(rows_to_skip, num_rows, &mut dest_buffers[1..])?;
let mut dest_buffers = vec![BytesMut::default()];

let mut values_bytesmut = values.decode(rows_to_skip, num_rows, all_null)?;
dest_buffers.append(&mut values_bytesmut);
dest_buffers
}
}
Ok(())
};

Ok(dest_buffers)
}

fn num_buffers(&self) -> u32 {
Expand Down
Loading

0 comments on commit 95f98a9

Please sign in to comment.