Skip to content

Commit

Permalink
Merge pull request #1 from alamb/alamb/level_histogram_structs
Browse files Browse the repository at this point in the history
Implement LevelHistograms as a struct
  • Loading branch information
etseidl committed Jul 26, 2024
2 parents 642c7d4 + 978264d commit 9582e0d
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 64 deletions.
63 changes: 19 additions & 44 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::levels::LevelEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::file::{
Expand Down Expand Up @@ -183,25 +183,14 @@ pub struct ColumnCloseResult {
pub offset_index: Option<OffsetIndex>,
}

/// Creates a vector to hold level histogram data. Length will be `max_level + 1`.
/// Because histograms are not necessary when `max_level == 0`, this will return
/// `None` in that case.
fn new_histogram(max_level: i16) -> Option<Vec<i64>> {
if max_level > 0 {
Some(vec![0; max_level as usize + 1])
} else {
None
}
}

// Metrics per page
#[derive(Default)]
struct PageMetrics {
num_buffered_values: u32,
num_buffered_rows: u32,
num_page_nulls: u64,
repetition_level_histogram: Option<Vec<i64>>,
definition_level_histogram: Option<Vec<i64>>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

impl PageMetrics {
Expand All @@ -211,50 +200,37 @@ impl PageMetrics {

/// Initialize the repetition level histogram
fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
self.repetition_level_histogram = new_histogram(max_level);
self.repetition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Initialize the definition level histogram
fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
self.definition_level_histogram = new_histogram(max_level);
self.definition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Sets all elements of `histogram` to 0
fn reset_histogram(histogram: &mut Option<Vec<i64>>) {
if let Some(ref mut hist) = histogram {
for v in hist {
*v = 0
}
}
}

/// Resets the state of this `PageMetrics` to the initial state.
/// If histograms have been initialized their contents will be reset to zero.
fn new_page(&mut self) {
self.num_buffered_values = 0;
self.num_buffered_rows = 0;
self.num_page_nulls = 0;
PageMetrics::reset_histogram(&mut self.repetition_level_histogram);
PageMetrics::reset_histogram(&mut self.definition_level_histogram);
self.repetition_level_histogram.as_mut().map(LevelHistogram::reset);
self.definition_level_histogram.as_mut().map(LevelHistogram::reset);
}

/// Updates histogram values using provided repetition levels
fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
if let Some(ref mut rep_hist) = self.repetition_level_histogram {
for &level in levels {
rep_hist[level as usize] += 1;
}
rep_hist.update_from_levels(levels);
}
}

/// Updates histogram values using provided definition levels
fn update_definition_level_histogram(&mut self, levels: &[i16]) {
if let Some(ref mut def_hist) = self.definition_level_histogram {
for &level in levels {
def_hist[level as usize] += 1;
}
def_hist.update_from_levels(levels);
}
}
}
Expand All @@ -274,8 +250,8 @@ struct ColumnMetrics<T: Default> {
num_column_nulls: u64,
column_distinct_count: Option<u64>,
variable_length_bytes: Option<i64>,
repetition_level_histogram: Option<Vec<i64>>,
definition_level_histogram: Option<Vec<i64>>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

impl<T: Default> ColumnMetrics<T> {
Expand All @@ -285,24 +261,23 @@ impl<T: Default> ColumnMetrics<T> {

/// Initialize the repetition level histogram
fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
self.repetition_level_histogram = new_histogram(max_level);
self.repetition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Initialize the definition level histogram
fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
self.definition_level_histogram = new_histogram(max_level);
self.definition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Sum `page_histogram` into `chunk_histogram`
fn update_histogram(chunk_histogram: &mut Option<Vec<i64>>, page_histogram: &Option<Vec<i64>>) {
if page_histogram.is_some() && chunk_histogram.is_some() {
let chunk_hist = chunk_histogram.as_mut().unwrap();
let page_hist = page_histogram.as_ref().unwrap();
for i in 0..page_hist.len() {
chunk_hist[i] += page_hist[i]
}
fn update_histogram(
chunk_histogram: &mut Option<LevelHistogram>,
page_histogram: &Option<LevelHistogram>,
) {
if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
chunk_hist.add(page_hist);
}
}

Expand Down
149 changes: 135 additions & 14 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,114 @@ pub struct ColumnChunkMetaData {
column_index_offset: Option<i64>,
column_index_length: Option<i32>,
unencoded_byte_array_data_bytes: Option<i64>,
repetition_level_histogram: Option<Vec<i64>>,
definition_level_histogram: Option<Vec<i64>>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

/// Histograms for repetition and definition levels.
///
/// Each histogram is a vector of length `max_level + 1`. The value at index `i` is the number of
/// values at level `i`.
///
/// For example, `vec[0]` is the number of rows with level 0, `vec[1]` is the
/// number of rows with level 1, and so on.
///
#[derive(Debug, Clone, PartialEq)]
pub struct LevelHistogram {
inner: Vec<i64>,
}

impl LevelHistogram {
/// Creates a new level histogram data.
///
/// Length will be `max_level + 1`.
///
/// Returns `None` when `max_level == 0` (because histograms are not necessary in this case)
pub fn try_new(max_level: i16) -> Option<Self> {
if max_level > 0 {
Some(Self {
inner: vec![0; max_level as usize + 1],
})
} else {
None
}
}
/// Returns a reference to the the histogram's values.
pub fn values(&self) -> &[i64] {
&self.inner
}

/// Return the inner vector, consuming self
pub fn into_inner(self) -> Vec<i64> {
self.inner
}

/// Returns the histogram value at the given index.
///
/// The value of `i` is the number of values with level `i`. For example,
/// `get(1)` returns the number of values with level 1.
///
/// Returns `None` if the index is out of bounds.
pub fn get(&self, index: usize) -> Option<i64> {
self.inner.get(index).copied()
}

/// Adds the values from the other histogram to this histogram
///
/// # Panics
/// If the histograms have different lengths
pub fn add(&mut self, other: &Self) {
assert_eq!(self.len(), other.len());
for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) {
*dst += src;
}
}

/// return the length of the histogram
pub fn len(&self) -> usize {
self.inner.len()
}

/// returns if the histogram is empty
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

/// Sets the values of all histogram levels to 0.
pub fn reset(&mut self) {
for value in self.inner.iter_mut() {
*value = 0;
}
}

/// Updates histogram values using provided repetition levels
///
/// # Panics
/// if any of the levels is greater than the length of the histogram (
/// the argument supplied to [`Self::try_new`])
pub fn update_from_levels(&mut self, levels: &[i16]) {
for &level in levels {
self.inner[level as usize] += 1;
}
}
}

impl From<Vec<i64>> for LevelHistogram {
fn from(inner: Vec<i64>) -> Self {
Self { inner }
}
}

impl From<LevelHistogram> for Vec<i64> {
fn from(value: LevelHistogram) -> Self {
value.into_inner()
}
}

impl HeapSize for LevelHistogram {
fn heap_size(&self) -> usize {
self.inner.heap_size()
}
}

/// Represents common operations for a column chunk.
Expand Down Expand Up @@ -724,7 +830,7 @@ impl ColumnChunkMetaData {
/// The returned value `vec[i]` is how many values are at repetition level `i`. For example,
/// `vec[0]` indicates how many rows the page contains.
/// This field may not be set by older writers.
pub fn repetition_level_histogram(&self) -> Option<&Vec<i64>> {
pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
self.repetition_level_histogram.as_ref()
}

Expand All @@ -733,7 +839,7 @@ impl ColumnChunkMetaData {
/// The returned value `vec[i]` is how many values are at definition level `i`. For example,
/// `vec[max_definition_level-1]` indicates how many non-null values are present in the page.
/// This field may not be set by older writers.
pub fn definition_level_histogram(&self) -> Option<&Vec<i64>> {
pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
self.definition_level_histogram.as_ref()
}

Expand Down Expand Up @@ -788,6 +894,9 @@ impl ColumnChunkMetaData {
(None, None, None)
};

let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from);
let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from);

let result = ColumnChunkMetaData {
column_descr,
encodings,
Expand Down Expand Up @@ -838,10 +947,20 @@ impl ColumnChunkMetaData {
|| self.repetition_level_histogram.is_some()
|| self.definition_level_histogram.is_some()
{
let repetition_level_histogram = self
.repetition_level_histogram
.as_ref()
.map(|hist| hist.clone().into_inner());

let definition_level_histogram = self
.definition_level_histogram
.as_ref()
.map(|hist| hist.clone().into_inner());

Some(SizeStatistics {
unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes,
repetition_level_histogram: self.repetition_level_histogram.clone(),
definition_level_histogram: self.definition_level_histogram.clone(),
repetition_level_histogram,
definition_level_histogram,
})
} else {
None
Expand Down Expand Up @@ -1023,13 +1142,13 @@ impl ColumnChunkMetaDataBuilder {
}

/// Sets optional repetition level histogram
pub fn set_repetition_level_histogram(mut self, value: Option<Vec<i64>>) -> Self {
pub fn set_repetition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
self.0.repetition_level_histogram = value;
self
}

/// Sets optional repetition level histogram
pub fn set_definition_level_histogram(mut self, value: Option<Vec<i64>>) -> Self {
pub fn set_definition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
self.0.definition_level_histogram = value;
self
}
Expand All @@ -1049,7 +1168,9 @@ pub struct ColumnIndexBuilder {
max_values: Vec<Vec<u8>>,
null_counts: Vec<i64>,
boundary_order: BoundaryOrder,
/// contains the concatenation of the histograms of all pages
repetition_level_histograms: Option<Vec<i64>>,
/// contains the concatenation of the histograms of all pages
definition_level_histograms: Option<Vec<i64>>,
/// Is the information in the builder valid?
///
Expand Down Expand Up @@ -1099,21 +1220,21 @@ impl ColumnIndexBuilder {
/// Does nothing if the `ColumnIndexBuilder` is not in the `valid` state.
pub fn append_histograms(
&mut self,
repetition_level_histogram: &Option<Vec<i64>>,
definition_level_histogram: &Option<Vec<i64>>,
repetition_level_histogram: &Option<LevelHistogram>,
definition_level_histogram: &Option<LevelHistogram>,
) {
if !self.valid {
return;
}
if let Some(ref rep_lvl_hist) = repetition_level_histogram {
let hist = self.repetition_level_histograms.get_or_insert(Vec::new());
hist.reserve(rep_lvl_hist.len());
hist.extend(rep_lvl_hist);
hist.extend(rep_lvl_hist.values());
}
if let Some(ref def_lvl_hist) = definition_level_histogram {
let hist = self.definition_level_histograms.get_or_insert(Vec::new());
hist.reserve(def_lvl_hist.len());
hist.extend(def_lvl_hist);
hist.extend(def_lvl_hist.values());
}
}

Expand Down Expand Up @@ -1358,8 +1479,8 @@ mod tests {
.set_column_index_offset(Some(8000))
.set_column_index_length(Some(25))
.set_unencoded_byte_array_data_bytes(Some(2000))
.set_repetition_level_histogram(Some(vec![100, 100]))
.set_definition_level_histogram(Some(vec![0, 200]))
.set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
.set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
.build()
.unwrap();

Expand Down
Loading

0 comments on commit 9582e0d

Please sign in to comment.