Skip to content

Commit

Permalink
add boolean row selection implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Oct 25, 2024
1 parent b91b316 commit 42938bf
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 68 deletions.
101 changes: 81 additions & 20 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ edition = { workspace = true }
rust-version = "1.70.0"

[target.'cfg(target_arch = "wasm32")'.dependencies]
ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] }
ahash = { version = "0.8", default-features = false, features = [
"compile-time-rng",
] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }

[dependencies]
arrow-array = { workspace = true, optional = true }
Expand All @@ -49,25 +53,53 @@ object_store = { version = "0.11.0", default-features = false, optional = true }
bytes = { version = "1.1", default-features = false, features = ["std"] }
thrift = { version = "0.17", default-features = false }
snap = { version = "1.0", default-features = false, optional = true }
brotli = { version = "7.0", default-features = false, features = ["std"], optional = true }
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true }
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true }
brotli = { version = "7.0", default-features = false, features = [
"std",
], optional = true }
flate2 = { version = "1.0", default-features = false, features = [
"rust_backend",
], optional = true }
lz4_flex = { version = "0.11", default-features = false, features = [
"std",
"frame",
], optional = true }
zstd = { version = "0.13", optional = true, default-features = false }
chrono = { workspace = true }
num = { version = "0.4", default-features = false }
num-bigint = { version = "0.4", default-features = false }
base64 = { version = "0.22", default-features = false, features = ["std", ], optional = true }
clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true }
base64 = { version = "0.22", default-features = false, features = [
"std",
], optional = true }
clap = { version = "4.1", default-features = false, features = [
"std",
"derive",
"env",
"help",
"error-context",
"usage",
], optional = true }
serde = { version = "1.0", default-features = false, features = [
"derive",
], optional = true }
serde_json = { version = "1.0", default-features = false, features = [
"std",
], optional = true }
seq-macro = { version = "0.3", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
futures = { version = "0.3", default-features = false, features = [
"std",
], optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = [
"macros",
"rt",
"io-util",
] }
hashbrown = { version = "0.14", default-features = false }
twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.32.0", optional = true, default-features = false, features = ["system"] }
sysinfo = { version = "0.32.0", optional = true, default-features = false, features = [
"system",
] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }

[dev-dependencies]
Expand All @@ -76,14 +108,34 @@ criterion = { version = "0.5", default-features = false }
snap = { version = "1.0", default-features = false }
tempfile = { version = "3.0", default-features = false }
brotli = { version = "7.0", default-features = false, features = ["std"] }
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] }
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] }
flate2 = { version = "1.0", default-features = false, features = [
"rust_backend",
] }
lz4_flex = { version = "0.11", default-features = false, features = [
"std",
"frame",
] }
zstd = { version = "0.13", default-features = false }
serde_json = { version = "1.0", features = ["std"], default-features = false }
arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
object_store = { version = "0.11.0", default-features = false, features = ["azure"] }
arrow = { workspace = true, features = [
"ipc",
"test_utils",
"prettyprint",
"json",
] }
tokio = { version = "1.0", default-features = false, features = [
"macros",
"rt",
"io-util",
"fs",
] }
rand = { version = "0.8", default-features = false, features = [
"std",
"std_rng",
] }
object_store = { version = "0.11.0", default-features = false, features = [
"azure",
] }

# TODO: temporary to fix parquet wasm build
# upstream issue: https://github.com/gyscos/zstd-rs/issues/269
Expand All @@ -101,7 +153,16 @@ default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
# Enable lz4
lz4 = ["lz4_flex"]
# Enable arrow reader/writer APIs
arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
arrow = [
"base64",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-schema",
"arrow-select",
"arrow-ipc",
]
# Enable CLI tools
cli = ["json", "base64", "clap", "arrow-csv", "serde"]
# Enable JSON APIs
Expand Down Expand Up @@ -225,7 +286,7 @@ harness = false
[[bench]]
name = "row_selector"
harness = false
required-features = ["arrow"]
required-features = ["arrow", "e"]

[lib]
bench = false
113 changes: 76 additions & 37 deletions parquet/benches/row_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow_array::BooleanArray;
use criterion::*;
use parquet::arrow::arrow_reader::RowSelection;
use parquet::arrow::arrow_reader::{BooleanRowSelection, RowSelection};
use rand::Rng;

/// Generates a random RowSelection with a specified selection ratio.
Expand All @@ -40,47 +40,86 @@ fn generate_random_row_selection(total_rows: usize, selection_ratio: f64) -> Boo

fn criterion_benchmark(c: &mut Criterion) {
let total_rows = 300_000;
let selection_ratio = 1.0 / 3.0;
let selection_ratios = [0.000_01, 0.001, 0.1, 0.3];

// Generate two random RowSelections with approximately 1/3 of the rows selected.
let row_selection_a =
RowSelection::from_filters(&[generate_random_row_selection(total_rows, selection_ratio)]);
let row_selection_b =
RowSelection::from_filters(&[generate_random_row_selection(total_rows, selection_ratio)]);
for ratio in selection_ratios {
let slice_selection_a =
RowSelection::from_filters(&[generate_random_row_selection(total_rows, ratio)]);
let slice_selection_b =
RowSelection::from_filters(&[generate_random_row_selection(total_rows, ratio)]);

// Benchmark the intersection of the two RowSelections.
c.bench_function("intersection", |b| {
b.iter(|| {
let intersection = row_selection_a.intersection(&row_selection_b);
criterion::black_box(intersection);
})
});
let boolean_selection_a = BooleanRowSelection::from(slice_selection_a.clone());
let boolean_selection_b = BooleanRowSelection::from(slice_selection_b.clone());

c.bench_function("union", |b| {
b.iter(|| {
let union = row_selection_a.union(&row_selection_b);
criterion::black_box(union);
})
});
// Benchmark the intersection of the two RowSelections.
c.bench_function(&format!("slice intersection {}", ratio), |b| {
b.iter(|| {
let intersection = slice_selection_a.intersection(&slice_selection_b);
criterion::black_box(intersection);
})
});

c.bench_function("from_filters", |b| {
let boolean_array = generate_random_row_selection(total_rows, selection_ratio);
b.iter(|| {
let array = boolean_array.clone();
let selection = RowSelection::from_filters(&[array]);
criterion::black_box(selection);
})
});
c.bench_function(&format!("boolean intersection {}", ratio), |b| {
b.iter(|| {
let intersection = boolean_selection_a.intersection(&boolean_selection_b);
criterion::black_box(intersection);
})
});

c.bench_function("and_then", |b| {
let selected = row_selection_a.row_count();
let sub_selection =
RowSelection::from_filters(&[generate_random_row_selection(selected, selection_ratio)]);
b.iter(|| {
let result = row_selection_a.and_then(&sub_selection);
criterion::black_box(result);
})
});
c.bench_function(&format!("slice union {}", ratio), |b| {
b.iter(|| {
let union = slice_selection_a.union(&slice_selection_b);
criterion::black_box(union);
})
});

c.bench_function(&format!("boolean union {}", ratio), |b| {
b.iter(|| {
let union = boolean_selection_a.union(&boolean_selection_b);
criterion::black_box(union);
})
});

c.bench_function(&format!("slice from_filters {}", ratio), |b| {
let boolean_array = generate_random_row_selection(total_rows, ratio);
b.iter(|| {
let array = boolean_array.clone();
let selection = RowSelection::from_filters(&[array]);
criterion::black_box(selection);
})
});

c.bench_function(&format!("boolean from_filters {}", ratio), |b| {
let boolean_array = generate_random_row_selection(total_rows, ratio);
b.iter(|| {
let array = boolean_array.clone();
let selection = BooleanRowSelection::from_filters(&[array]);
criterion::black_box(selection);
})
});

c.bench_function(&format!("slice and_then {}", ratio), |b| {
let selected = slice_selection_a.row_count();
let sub_selection =
RowSelection::from_filters(&[generate_random_row_selection(selected, ratio)]);
b.iter(|| {
let result = slice_selection_a.and_then(&sub_selection);
criterion::black_box(result);
})
});

c.bench_function(&format!("boolean and_then {}", ratio), |b| {
let selected = boolean_selection_a.row_count();
let sub_selection =
BooleanRowSelection::from_filters(&[generate_random_row_selection(
selected, ratio,
)]);
b.iter(|| {
let result = boolean_selection_a.and_then(&sub_selection);
criterion::black_box(result);
})
});
}
}

criterion_group!(benches, criterion_benchmark);
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/arrow_reader/boolean_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ impl BooleanRowSelection {
return self.intersection(other);
}

let mut buffer = MutableBuffer::from_len_zeroed(self.len());
buffer.copy_from_slice(self.selector.values());
let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.len());
let mut buffer = MutableBuffer::from_len_zeroed(self.selector.inner().len());
buffer.copy_from_slice(self.selector.inner().as_slice());
let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.selector.len());

// Create iterators for 'self' and 'other' bits
let mut other_bits = other.selector.iter();
Expand Down
15 changes: 7 additions & 8 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@
use std::collections::VecDeque;
use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};

pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
Expand All @@ -37,6 +29,13 @@ use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
#[cfg(feature = "experimental")]
pub use boolean_selection::BooleanRowSelection;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};

#[cfg(feature = "experimental")]
mod boolean_selection;
Expand Down

0 comments on commit 42938bf

Please sign in to comment.