Skip to content

Commit

Permalink
Update to arrow/parquet 11.0 (#2048)
Browse files Browse the repository at this point in the history
* Update to arrow/parquet 11

* Adapt to API changes

* macro fmt

Co-authored-by: Yijie Shen <henry.yijieshen@gmail.com>
  • Loading branch information
alamb and yjshen authored Mar 22, 2022
1 parent f5c0cea commit 2e6833c
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 119 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.7", default-features = false }

arrow-flight = { version = "10.0" }
arrow-flight = { version = "11" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]

[dependencies]
anyhow = "1"
arrow = { version = "10.0" }
arrow-flight = { version = "10.0" }
arrow = { version = "11" }
arrow-flight = { version = "11" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.6.0" }
chrono = { version = "0.4", default-features = false }
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/executor/src/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ impl FlightService for BallistaFlightService {
))
})
.map_err(|e| from_ballista_err(&e))?;
let reader = FileReader::try_new(file).map_err(|e| from_arrow_err(&e))?;
let reader =
FileReader::try_new(file, None).map_err(|e| from_arrow_err(&e))?;

let (tx, rx): (FlightDataSender, FlightDataReceiver) = channel(2);

Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ repository = "https://github.com/apache/arrow-datafusion"
rust-version = "1.59"

[dependencies]
arrow = { version = "10.0" }
arrow = { version = "11" }
ballista = { path = "../ballista/rust/client", version = "0.6.0", optional = true }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion", version = "7.0.0" }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ jit = ["cranelift-module"]
pyarrow = ["pyo3"]

[dependencies]
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.82.0", optional = true }
ordered-float = "2.10"
parquet = { version = "10.0", features = ["arrow"], optional = true }
parquet = { version = "11", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = "0.15"
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "10.0" }
arrow-flight = { version = "11" }
async-trait = "0.1.41"
datafusion = { path = "../datafusion" }
futures = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ path = "src/lib.rs"

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
sqlparser = "0.15"
2 changes: 1 addition & 1 deletion datafusion-physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions"]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
chrono = { version = "0.4", default-features = false }
Expand All @@ -71,7 +71,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
ordered-float = "2.10"
parking_lot = "0.12"
parquet = { version = "10.0", features = ["arrow"] }
parquet = { version = "11", features = ["arrow"] }
paste = "^1.0"
pin-project-lite= "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
174 changes: 75 additions & 99 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ use arrow::{
use log::{debug, warn};
use parquet::arrow::ArrowWriter;
use parquet::file::{
metadata::RowGroupMetaData,
reader::{FileReader, SerializedFileReader},
statistics::Statistics as ParquetStatistics,
metadata::RowGroupMetaData, reader::SerializedFileReader,
serialized_reader::ReadOptionsBuilder, statistics::Statistics as ParquetStatistics,
};

use fmt::Debug;
Expand Down Expand Up @@ -309,7 +308,7 @@ fn send_result(
/// Wraps parquet statistics in a way
/// that implements [`PruningStatistics`]
struct RowGroupPruningStatistics<'a> {
row_group_metadata: &'a [RowGroupMetaData],
row_group_metadata: &'a RowGroupMetaData,
parquet_schema: &'a Schema,
}

Expand Down Expand Up @@ -342,33 +341,26 @@ macro_rules! get_statistic {
// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate
macro_rules! get_min_max_values {
($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
let (column_index, field) = if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
(v, f)
} else {
// Named column was not present
return None
};
let (column_index, field) =
if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
(v, f)
} else {
// Named column was not present
return None;
};

let data_type = field.data_type();
// The result may be None, because DataFusion doesn't have support for ScalarValues of the column type
let null_scalar: ScalarValue = data_type.try_into().ok()?;

let scalar_values : Vec<ScalarValue> = $self.row_group_metadata
.iter()
.flat_map(|meta| {
meta.column(column_index).statistics()
})
.map(|stats| {
get_statistic!(stats, $func, $bytes_func)
})
.map(|maybe_scalar| {
// column either did't have statistics at all or didn't have min/max values
maybe_scalar.unwrap_or_else(|| null_scalar.clone())
})
.collect();

// ignore errors converting to arrays (e.g. different types)
ScalarValue::iter_to_array(scalar_values).ok()
$self.row_group_metadata
.column(column_index)
.statistics()
.map(|stats| get_statistic!(stats, $func, $bytes_func))
.flatten()
// column either didn't have statistics at all or didn't have min/max values
.or_else(|| Some(null_scalar.clone()))
.map(|s| s.to_array())
}}
}

Expand All @@ -383,17 +375,14 @@ macro_rules! get_null_count_values {
return None;
};

let scalar_values: Vec<ScalarValue> = $self
.row_group_metadata
.iter()
.flat_map(|meta| meta.column(column_index).statistics())
.map(|stats| {
ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap()))
})
.collect();

// ignore errors converting to arrays (e.g. different types)
ScalarValue::iter_to_array(scalar_values).ok()
let value = ScalarValue::UInt64(
$self
.row_group_metadata
.column(column_index)
.statistics()
.map(|s| s.null_count()),
);
Some(value.to_array())
}};
}

Expand All @@ -407,7 +396,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
}

fn num_containers(&self) -> usize {
self.row_group_metadata.len()
1
}

fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
Expand All @@ -418,31 +407,33 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn build_row_group_predicate(
pruning_predicate: &PruningPredicate,
metrics: ParquetFileMetrics,
row_group_metadata: &[RowGroupMetaData],
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
let parquet_schema = pruning_predicate.schema().as_ref();

let pruning_stats = RowGroupPruningStatistics {
row_group_metadata,
parquet_schema,
};
let predicate_values = pruning_predicate.prune(&pruning_stats);

match predicate_values {
Ok(values) => {
// NB: false means don't scan row group
let num_pruned = values.iter().filter(|&v| !*v).count();
metrics.row_groups_pruned.add(num_pruned);
Box::new(move |_, i| values[i])
}
// stats filter array could not be built
// return a closure which will not filter out any row groups
Err(e) => {
debug!("Error evaluating row group predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
Box::new(|_r, _i| true)
}
}
) -> Box<dyn FnMut(&RowGroupMetaData, usize) -> bool> {
let pruning_predicate = pruning_predicate.clone();
Box::new(
move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool {
let parquet_schema = pruning_predicate.schema().as_ref();
let pruning_stats = RowGroupPruningStatistics {
row_group_metadata,
parquet_schema,
};
let predicate_values = pruning_predicate.prune(&pruning_stats);
match predicate_values {
Ok(values) => {
// NB: false means don't scan row group
let num_pruned = values.iter().filter(|&v| !*v).count();
metrics.row_groups_pruned.add(num_pruned);
values[0]
}
// stats filter array could not be built
// return a closure which will not filter out any row groups
Err(e) => {
debug!("Error evaluating row group predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
true
}
}
},
)
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -470,17 +461,20 @@ fn read_partition(
);
let object_reader =
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
let mut file_reader =
SerializedFileReader::new(ChunkObjectReader(object_reader))?;

let mut opt = ReadOptionsBuilder::new();
if let Some(pruning_predicate) = pruning_predicate {
let row_group_predicate = build_row_group_predicate(
opt = opt.with_predicate(build_row_group_predicate(
pruning_predicate,
file_metrics,
file_reader.metadata().row_groups(),
);
file_reader.filter_row_groups(&row_group_predicate);
));
}

let file_reader = SerializedFileReader::new_with_options(
ChunkObjectReader(object_reader),
opt.build(),
)?;

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let adapted_projections =
schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?;
Expand Down Expand Up @@ -1054,11 +1048,8 @@ mod tests {
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -1087,11 +1078,8 @@ mod tests {
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -1135,11 +1123,8 @@ mod tests {
],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand All @@ -1153,11 +1138,8 @@ mod tests {
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -1202,11 +1184,8 @@ mod tests {
let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();

let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -1234,11 +1213,8 @@ mod tests {
let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();

let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ fn write_sorted(

fn read_spill(sender: Sender<ArrowResult<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(&path)?);
let reader = FileReader::try_new(file)?;
let reader = FileReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch)
Expand Down
Loading

0 comments on commit 2e6833c

Please sign in to comment.