From a03520004a1454b6acfdfca84be9d72767598dd3 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Mon, 6 Sep 2021 14:09:15 -0700 Subject: [PATCH] Fix build errors Co-authored-by: Yijie Shen --- Cargo.toml | 3 + .../src/execution_plans/shuffle_writer.rs | 43 +- .../src/serde/physical_plan/from_proto.rs | 2 - ballista/rust/core/src/utils.rs | 1 + datafusion/Cargo.toml | 3 +- datafusion/benches/physical_plan.rs | 2 +- datafusion/src/arrow_temporal_util.rs | 302 ++++++++ datafusion/src/datasource/parquet.rs | 202 +++--- datafusion/src/error.rs | 12 + datafusion/src/execution/context.rs | 20 +- datafusion/src/execution/dataframe_impl.rs | 8 +- datafusion/src/lib.rs | 2 + datafusion/src/logical_plan/expr.rs | 3 - datafusion/src/logical_plan/plan.rs | 13 - datafusion/src/optimizer/constant_folding.rs | 5 +- .../src/physical_optimizer/repartition.rs | 2 + datafusion/src/physical_plan/analyze.rs | 30 +- .../src/physical_plan/datetime_expressions.rs | 12 +- .../src/physical_plan/distinct_expressions.rs | 85 +-- datafusion/src/physical_plan/explain.rs | 6 +- .../src/physical_plan/expressions/binary.rs | 21 +- .../src/physical_plan/expressions/in_list.rs | 72 +- .../src/physical_plan/expressions/lead_lag.rs | 26 +- .../src/physical_plan/expressions/min_max.rs | 12 +- .../physical_plan/expressions/nth_value.rs | 25 +- .../src/physical_plan/expressions/rank.rs | 10 +- .../physical_plan/expressions/row_number.rs | 12 +- .../src/physical_plan/hash_aggregate.rs | 50 +- datafusion/src/physical_plan/hash_join.rs | 4 +- datafusion/src/physical_plan/hash_utils.rs | 77 +- datafusion/src/physical_plan/mod.rs | 13 +- datafusion/src/physical_plan/parquet.rs | 170 ++--- datafusion/src/physical_plan/repartition.rs | 5 +- .../physical_plan/sort_preserving_merge.rs | 91 ++- .../src/physical_plan/windows/aggregate.rs | 6 +- .../src/physical_plan/windows/built_in.rs | 4 +- datafusion/src/physical_plan/windows/mod.rs | 6 +- .../physical_plan/windows/window_agg_exec.rs | 4 +- datafusion/src/scalar.rs | 656 +++++++++--------- datafusion/src/test_util.rs | 4 +- 40 files changed, 1162 insertions(+), 862 deletions(-) create mode 100644 datafusion/src/arrow_temporal_util.rs diff --git a/Cargo.toml b/Cargo.toml index d6da8c14cd96..4e57ac6d7018 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,6 @@ members = [ ] exclude = ["python"] + +[patch.crates-io] +arrow2 = { path = "/home/houqp/Documents/code/arrow/arrow2" } diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 36e445bc4ead..31143323cb34 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -34,14 +34,11 @@ use crate::utils; use crate::serde::protobuf::ShuffleWritePartition; use crate::serde::scheduler::{PartitionLocation, PartitionStats}; use async_trait::async_trait; -use datafusion::arrow::array::{ - Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, - UInt64Builder, -}; +use datafusion::arrow::array::*; use datafusion::arrow::compute::take; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::arrow::ipc::reader::FileReader; -use datafusion::arrow::ipc::writer::FileWriter; +use datafusion::arrow::io::ipc::read::FileReader; +use datafusion::arrow::io::ipc::write::FileWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use datafusion::physical_plan::hash_utils::create_hashes; @@ -244,7 +241,7 @@ impl ShuffleWriterExec { .collect::>>>()?; let output_batch = - RecordBatch::try_new(input_batch.schema(), columns)?; + RecordBatch::try_new(input_batch.schema().clone(), columns)?; // write non-empty batch out @@ -356,18 +353,18 @@ impl ExecutionPlan for ShuffleWriterExec { // build metadata result batch let num_writers = part_loc.len(); - let mut partition_builder = UInt32Builder::new(num_writers); - let mut path_builder = StringBuilder::new(num_writers); - let mut num_rows_builder = UInt64Builder::new(num_writers); - let mut num_batches_builder = UInt64Builder::new(num_writers); - let mut num_bytes_builder = UInt64Builder::new(num_writers); + let mut partition_builder = UInt32Vec::with_capacity(num_writers); + let mut path_builder = MutableUtf8Array::with_capacity(num_writers); + let mut num_rows_builder = UInt64Vec::with_capacity(num_writers); + let mut num_batches_builder = UInt64Vec::with_capacity(num_writers); + let mut num_bytes_builder = UInt64Vec::with_capacity(num_writers); for loc in &part_loc { - path_builder.append_value(loc.path.clone())?; - partition_builder.append_value(loc.partition_id as u32)?; - num_rows_builder.append_value(loc.num_rows)?; - num_batches_builder.append_value(loc.num_batches)?; - num_bytes_builder.append_value(loc.num_bytes)?; + path_builder.push(Some(loc.path.clone())); + partition_builder.push(Some(loc.partition_id as u32)); + num_rows_builder.push(Some(loc.num_rows)); + num_batches_builder.push(Some(loc.num_batches)); + num_bytes_builder.push(Some(loc.num_bytes)); } // build arrays @@ -428,17 +425,17 @@ fn result_schema() -> SchemaRef { ])) } -struct ShuffleWriter { +struct ShuffleWriter<'a> { path: String, - writer: FileWriter, + writer: FileWriter<'a, File>, num_batches: u64, num_rows: u64, num_bytes: u64, } -impl ShuffleWriter { +impl<'a> ShuffleWriter<'a> { fn new(path: &str, schema: &Schema) -> Result { - let file = File::create(path) + let mut file = File::create(path) .map_err(|e| { BallistaError::General(format!( "Failed to create partition file at {}: {:?}", @@ -451,7 +448,7 @@ impl ShuffleWriter { num_rows: 0, num_bytes: 0, path: path.to_owned(), - writer: FileWriter::try_new(file, schema)?, + writer: FileWriter::try_new(&mut file, schema)?, }) } @@ -480,7 +477,7 @@ impl ShuffleWriter { #[cfg(test)] mod tests { use super::*; - use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array}; + use datafusion::arrow::array::{Utf8Array, StructArray, UInt32Array, UInt64Array}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::expressions::Column; use datafusion::physical_plan::limit::GlobalLimitExec; diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index d371fabdf098..8b9544498264 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -61,7 +61,6 @@ use datafusion::physical_plan::{ expressions::{ col, Avg, BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr, NotExpr, PhysicalSortExpr, TryCastExpr, - DEFAULT_DATAFUSION_CAST_OPTIONS, }, filter::FilterExec, functions::{self, BuiltinScalarFunction, ScalarFunctionExpr}, @@ -620,7 +619,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { ExprType::Cast(e) => Arc::new(CastExpr::new( convert_box_required!(e.expr)?, convert_required!(e.arrow_type)?, - DEFAULT_DATAFUSION_CAST_OPTIONS, )), ExprType::TryCast(e) => Arc::new(TryCastExpr::new( convert_box_required!(e.expr)?, diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index b7e465ccd20a..a1d3a63fb9b8 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -31,6 +31,7 @@ use crate::serde::scheduler::PartitionStats; use crate::config::BallistaConfig; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::{ array::*, diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index a3fdc978bc16..935f3f766741 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -49,7 +49,8 @@ force_hash_collisions = [] [dependencies] ahash = "0.7" hashbrown = { version = "0.11", features = ["raw"] } -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "43d8cf5c54805aa437a1c7ee48f80e90f07bc553", features = ["io_csv", "io_json", "io_parquet", "io_ipc", "io_print", "ahash", "merge_sort", "compute", "regex"] } +arrow = { package = "arrow2", version="0.5", features = ["io_csv", "io_json", "io_parquet", "io_ipc", "io_print", "ahash", "merge_sort", "compute", "regex"] } +parquet = { package = "parquet2", version = "0.4", default_features = false, features = ["stream"] } sqlparser = "0.10" paste = "^1.0" num_cpus = "1.13.0" diff --git a/datafusion/benches/physical_plan.rs b/datafusion/benches/physical_plan.rs index 9222ae131b8f..ce1893b37257 100644 --- a/datafusion/benches/physical_plan.rs +++ b/datafusion/benches/physical_plan.rs @@ -51,7 +51,7 @@ fn sort_preserving_merge_operator(batches: Vec, sort: &[&str]) { let exec = MemoryExec::try_new( &batches.into_iter().map(|rb| vec![rb]).collect::>(), - schema, + schema.clone(), None, ) .unwrap(); diff --git a/datafusion/src/arrow_temporal_util.rs b/datafusion/src/arrow_temporal_util.rs new file mode 100644 index 000000000000..d8ca4f7ec89f --- /dev/null +++ b/datafusion/src/arrow_temporal_util.rs @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::error::{ArrowError, Result}; +use chrono::{prelude::*, LocalResult}; + +/// Accepts a string in RFC3339 / ISO8601 standard format and some +/// variants and converts it to a nanosecond precision timestamp. +/// +/// Implements the `to_timestamp` function to convert a string to a +/// timestamp, following the model of spark SQL’s to_`timestamp`. +/// +/// In addition to RFC3339 / ISO8601 standard timestamps, it also +/// accepts strings that use a space ` ` to separate the date and time +/// as well as strings that have no explicit timezone offset. +/// +/// Examples of accepted inputs: +/// * `1997-01-31T09:26:56.123Z` # RCF3339 +/// * `1997-01-31T09:26:56.123-05:00` # RCF3339 +/// * `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space rather than T +/// * `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone offset specified +/// * `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and no timezone offset +/// * `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds +// +/// Internally, this function uses the `chrono` library for the +/// datetime parsing +/// +/// We hope to extend this function in the future with a second +/// parameter to specifying the format string. +/// +/// ## Timestamp Precision +/// +/// Function uses the maximum precision timestamps supported by +/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This +/// means the range of dates that timestamps can represent is ~1677 AD +/// to 2262 AM +/// +/// +/// ## Timezone / Offset Handling +/// +/// Numerical values of timestamps are stored compared to offset UTC. +/// +/// This function intertprets strings without an explicit time zone as +/// timestamps with offsets of the local time on the machine +/// +/// For example, `1997-01-31 09:26:56.123Z` is interpreted as UTC, as +/// it has an explicit timezone specifier (“Z” for Zulu/UTC) +/// +/// `1997-01-31T09:26:56.123` is interpreted as a local timestamp in +/// the timezone of the machine. For example, if +/// the system timezone is set to Americas/New_York (UTC-5) the +/// timestamp will be interpreted as though it were +/// `1997-01-31T09:26:56.123-05:00` +/// +/// TODO: remove this hack and redesign DataFusion's time related API, with regard to timezone. +#[inline] +pub(crate) fn string_to_timestamp_nanos(s: &str) -> Result { + // Fast path: RFC3339 timestamp (with a T) + // Example: 2020-09-08T13:42:29.190855Z + if let Ok(ts) = DateTime::parse_from_rfc3339(s) { + return Ok(ts.timestamp_nanos()); + } + + // Implement quasi-RFC3339 support by trying to parse the + // timestamp with various other format specifiers to to support + // separating the date and time with a space ' ' rather than 'T' to be + // (more) compatible with Apache Spark SQL + + // timezone offset, using ' ' as a separator + // Example: 2020-09-08 13:42:29.190855-05:00 + if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") { + return Ok(ts.timestamp_nanos()); + } + + // with an explicit Z, using ' ' as a separator + // Example: 2020-09-08 13:42:29Z + if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") { + return Ok(ts.timestamp_nanos()); + } + + // Support timestamps without an explicit timezone offset, again + // to be compatible with what Apache Spark SQL does. + + // without a timezone specifier as a local time, using T as a separator + // Example: 2020-09-08T13:42:29.190855 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using T as a + // separator, no fractional seconds + // Example: 2020-09-08T13:42:29 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using ' ' as a separator + // Example: 2020-09-08 13:42:29.190855 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S.%f") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using ' ' as a + // separator, no fractional seconds + // Example: 2020-09-08 13:42:29 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") { + return naive_datetime_to_timestamp(s, ts); + } + + // Note we don't pass along the error message from the underlying + // chrono parsing because we tried several different format + // strings and we don't know which the user was trying to + // match. Ths any of the specific error messages is likely to be + // be more confusing than helpful + Err(ArrowError::Other(format!( + "Error parsing '{}' as timestamp", + s + ))) +} + +/// Converts the naive datetime (which has no specific timezone) to a +/// nanosecond epoch timestamp relative to UTC. +fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result { + let l = Local {}; + + match l.from_local_datetime(&datetime) { + LocalResult::None => Err(ArrowError::Other(format!( + "Error parsing '{}' as timestamp: local time representation is invalid", + s + ))), + LocalResult::Single(local_datetime) => { + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) + } + // Ambiguous times can happen if the timestamp is exactly when + // a daylight savings time transition occurs, for example, and + // so the datetime could validly be said to be in two + // potential offsets. However, since we are about to convert + // to UTC anyways, we can pick one arbitrarily + LocalResult::Ambiguous(local_datetime, _) => { + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn string_to_timestamp_timezone() -> Result<()> { + // Explicit timezone + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08T13:42:29.190855+00:00")? + ); + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08T13:42:29.190855Z")? + ); + assert_eq!( + 1599572549000000000, + parse_timestamp("2020-09-08T13:42:29Z")? + ); // no fractional part + assert_eq!( + 1599590549190855000, + parse_timestamp("2020-09-08T13:42:29.190855-05:00")? + ); + Ok(()) + } + + #[test] + fn string_to_timestamp_timezone_space() -> Result<()> { + // Ensure space rather than T between time and date is accepted + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08 13:42:29.190855+00:00")? + ); + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08 13:42:29.190855Z")? + ); + assert_eq!( + 1599572549000000000, + parse_timestamp("2020-09-08 13:42:29Z")? + ); // no fractional part + assert_eq!( + 1599590549190855000, + parse_timestamp("2020-09-08 13:42:29.190855-05:00")? + ); + Ok(()) + } + + /// Interprets a naive_datetime (with no explicit timzone offset) + /// using the local timezone and returns the timestamp in UTC (0 + /// offset) + fn naive_datetime_to_timestamp(naive_datetime: &NaiveDateTime) -> i64 { + // Note: Use chrono APIs that are different than + // naive_datetime_to_timestamp to compute the utc offset to + // try and double check the logic + let utc_offset_secs = match Local.offset_from_local_datetime(&naive_datetime) { + LocalResult::Single(local_offset) => { + local_offset.fix().local_minus_utc() as i64 + } + _ => panic!("Unexpected failure converting to local datetime"), + }; + let utc_offset_nanos = utc_offset_secs * 1_000_000_000; + naive_datetime.timestamp_nanos() - utc_offset_nanos + } + + #[test] + #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function: mktime + fn string_to_timestamp_no_timezone() -> Result<()> { + // This test is designed to succeed in regardless of the local + // timezone the test machine is running. Thus it is still + // somewhat suceptable to bugs in the use of chrono + let naive_datetime = NaiveDateTime::new( + NaiveDate::from_ymd(2020, 9, 8), + NaiveTime::from_hms_nano(13, 42, 29, 190855), + ); + + // Ensure both T and ' ' variants work + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime), + parse_timestamp("2020-09-08T13:42:29.190855")? + ); + + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime), + parse_timestamp("2020-09-08 13:42:29.190855")? + ); + + // Also ensure that parsing timestamps with no fractional + // second part works as well + let naive_datetime_whole_secs = NaiveDateTime::new( + NaiveDate::from_ymd(2020, 9, 8), + NaiveTime::from_hms(13, 42, 29), + ); + + // Ensure both T and ' ' variants work + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime_whole_secs), + parse_timestamp("2020-09-08T13:42:29")? + ); + + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime_whole_secs), + parse_timestamp("2020-09-08 13:42:29")? + ); + + Ok(()) + } + + #[test] + fn string_to_timestamp_invalid() { + // Test parsing invalid formats + + // It would be nice to make these messages better + expect_timestamp_parse_error("", "Error parsing '' as timestamp"); + expect_timestamp_parse_error("SS", "Error parsing 'SS' as timestamp"); + expect_timestamp_parse_error( + "Wed, 18 Feb 2015 23:16:09 GMT", + "Error parsing 'Wed, 18 Feb 2015 23:16:09 GMT' as timestamp", + ); + } + + // Parse a timestamp to timestamp int with a useful human readable error message + fn parse_timestamp(s: &str) -> Result { + let result = string_to_timestamp_nanos(s); + if let Err(e) = &result { + eprintln!("Error parsing timestamp '{}': {:?}", s, e); + } + result + } + + fn expect_timestamp_parse_error(s: &str, expected_err: &str) { + match string_to_timestamp_nanos(s) { + Ok(v) => panic!( + "Expected error '{}' while parsing '{}', but parsed {} instead", + expected_err, s, v + ), + Err(e) => { + assert!(e.to_string().contains(expected_err), + "Can not find expected error '{}' while parsing '{}'. Actual error '{}'", + expected_err, s, e); + } + } + } +} diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index c5e41ae21ad4..5134ccce3ffd 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -17,14 +17,16 @@ //! Parquet data source -use std::any::Any; +use std::any::{type_name, Any}; use std::fs::File; use std::sync::Arc; -use parquet::arrow::ArrowReader; -use parquet::arrow::ParquetFileArrowReader; -use parquet::file::serialized_reader::SerializedFileReader; -use parquet::file::statistics::Statistics as ParquetStatistics; +use arrow::io::parquet::read::{get_schema, read_metadata}; +use parquet::statistics::{ + BinaryStatistics as ParquetBinaryStatistics, + BooleanStatistics as ParquetBooleanStatistics, + PrimitiveStatistics as ParquetPrimitiveStatistics, Statistics as ParquetStatistics, +}; use super::datasource::TableProviderFilterPushDown; use crate::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -33,7 +35,7 @@ use crate::datasource::{ create_max_min_accs, get_col_stats, get_statistics_with_limit, FileAndSchema, PartitionedFile, TableDescriptor, TableDescriptorBuilder, TableProvider, }; -use crate::error::Result; +use crate::error::{DataFusionError, Result}; use crate::logical_plan::{combine_filters, Expr}; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::parquet::ParquetExec; @@ -210,50 +212,35 @@ impl ParquetTableDescriptor { min_values: &mut Vec>, fields: &[Field], i: usize, - stat: &ParquetStatistics, - ) { - match stat { - ParquetStatistics::Boolean(s) => { - if let DataType::Boolean = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value - .update(&[ScalarValue::Boolean(Some(*s.max()))]) - { + stats: Arc, + ) -> Result<()> { + use arrow::io::parquet::read::PhysicalType; + + macro_rules! update_primitive_min_max { + ($DT:ident, $PRIMITIVE_TYPE:ident) => {{ + if let DataType::$DT = fields[i].data_type() { + let stats = stats + .as_any() + .downcast_ref::>() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Failed to cast stats to {} stats", + type_name::<$PRIMITIVE_TYPE>() + )) + })?; + if let Some(max_value) = &mut max_values[i] { + if let Some(v) = stats.max_value { + match max_value.update(&[ScalarValue::$DT(Some(v))]) { Ok(_) => {} Err(_) => { max_values[i] = None; } } } - if let Some(min_value) = &mut min_values[i] { - match min_value - .update(&[ScalarValue::Boolean(Some(*s.min()))]) - { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } } - } - } - ParquetStatistics::Int32(s) => { - if let DataType::Int32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ScalarValue::Int32(Some(*s.max()))]) - { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ScalarValue::Int32(Some(*s.min()))]) - { + if let Some(min_value) = &mut min_values[i] { + if let Some(v) = stats.min_value { + match min_value.update(&[ScalarValue::$DT(Some(v))]) { Ok(_) => {} Err(_) => { min_values[i] = None; @@ -262,48 +249,33 @@ impl ParquetTableDescriptor { } } } - } - ParquetStatistics::Int64(s) => { - if let DataType::Int64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ScalarValue::Int64(Some(*s.max()))]) - { + }}; + } + + match stats.physical_type() { + PhysicalType::Boolean => { + if let DataType::Boolean = fields[i].data_type() { + let stats = stats + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "Failed to cast stats to boolean stats".to_owned(), + ) + })?; + if let Some(max_value) = &mut max_values[i] { + if let Some(v) = stats.max_value { + match max_value.update(&[ScalarValue::Boolean(Some(v))]) { Ok(_) => {} Err(_) => { max_values[i] = None; } } } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ScalarValue::Int64(Some(*s.min()))]) - { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } } - } - } - ParquetStatistics::Float(s) => { - if let DataType::Float32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value - .update(&[ScalarValue::Float32(Some(*s.max()))]) - { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value - .update(&[ScalarValue::Float32(Some(*s.min()))]) - { + if let Some(min_value) = &mut min_values[i] { + if let Some(v) = stats.min_value { + match min_value.update(&[ScalarValue::Boolean(Some(v))]) { Ok(_) => {} Err(_) => { min_values[i] = None; @@ -313,23 +285,47 @@ impl ParquetTableDescriptor { } } } - ParquetStatistics::Double(s) => { - if let DataType::Float64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value - .update(&[ScalarValue::Float64(Some(*s.max()))]) - { + PhysicalType::Int32 => { + update_primitive_min_max!(Int32, i32); + } + PhysicalType::Int64 => { + update_primitive_min_max!(Int64, i64); + } + // 96 bit ints not supported + PhysicalType::Int96 => {} + PhysicalType::Float => { + update_primitive_min_max!(Float32, f32); + } + PhysicalType::Double => { + update_primitive_min_max!(Float64, f64); + } + PhysicalType::ByteArray => { + if let DataType::Utf8 = fields[i].data_type() { + let stats = stats + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "Failed to cast stats to binary stats".to_owned(), + ) + })?; + if let Some(max_value) = &mut max_values[i] { + if let Some(v) = &stats.max_value { + match max_value.update(&[ScalarValue::Utf8( + std::str::from_utf8(&*v).map(|s| s.to_string()).ok(), + )]) { Ok(_) => {} Err(_) => { max_values[i] = None; } } } - if let Some(min_value) = &mut min_values[i] { - match min_value - .update(&[ScalarValue::Float64(Some(*s.min()))]) - { + } + if let Some(min_value) = &mut min_values[i] { + if let Some(v) = &stats.min_value { + match min_value.update(&[ScalarValue::Utf8( + std::str::from_utf8(&*v).map(|s| s.to_string()).ok(), + )]) { Ok(_) => {} Err(_) => { min_values[i] = None; @@ -339,21 +335,22 @@ impl ParquetTableDescriptor { } } } - _ => {} + PhysicalType::FixedLenByteArray(_) => { + // type not supported yet + } } + + Ok(()) } } impl TableDescriptorBuilder for ParquetTableDescriptor { fn file_meta(path: &str) -> Result { let file = File::open(path)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let path = path.to_string(); - let schema = arrow_reader.get_schema()?; + let meta_data = read_metadata(&mut std::io::BufReader::new(file))?; + let schema = get_schema(&meta_data)?; let num_fields = schema.fields().len(); let fields = schema.fields().to_vec(); - let meta_data = arrow_reader.get_metadata(); let mut num_rows = 0; let mut total_byte_size = 0; @@ -362,17 +359,17 @@ impl TableDescriptorBuilder for ParquetTableDescriptor { let (mut max_values, mut min_values) = create_max_min_accs(&schema); - for row_group_meta in meta_data.row_groups() { + for row_group_meta in meta_data.row_groups { num_rows += row_group_meta.num_rows(); total_byte_size += row_group_meta.total_byte_size(); let columns_null_counts = row_group_meta .columns() .iter() - .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + .flat_map(|c| c.statistics().map(|stats| stats.unwrap().null_count())); for (i, cnt) in columns_null_counts.enumerate() { - null_counts[i] += cnt as usize + null_counts[i] += cnt.unwrap_or(0) as usize } for (i, column) in row_group_meta.columns().iter().enumerate() { @@ -383,8 +380,8 @@ impl TableDescriptorBuilder for ParquetTableDescriptor { &mut min_values, &fields, i, - stat, - ) + stat?, + )? } } } @@ -407,7 +404,10 @@ impl TableDescriptorBuilder for ParquetTableDescriptor { }; Ok(FileAndSchema { - file: PartitionedFile { path, statistics }, + file: PartitionedFile { + path: path.to_owned(), + statistics, + }, schema, }) } diff --git a/datafusion/src/error.rs b/datafusion/src/error.rs index a229198e9dae..b5676669df00 100644 --- a/datafusion/src/error.rs +++ b/datafusion/src/error.rs @@ -23,6 +23,7 @@ use std::io; use std::result; use arrow::error::ArrowError; +use parquet::error::ParquetError; use sqlparser::parser::ParserError; /// Result type for operations that could result in an [DataFusionError] @@ -34,6 +35,8 @@ pub type Result = result::Result; pub enum DataFusionError { /// Error returned by arrow. ArrowError(ArrowError), + /// Wraps an error from the Parquet crate + ParquetError(ParquetError), /// Error associated to I/O operations and associated traits. IoError(io::Error), /// Error returned when SQL is syntactically incorrect. @@ -74,6 +77,12 @@ impl From for DataFusionError { } } +impl From for DataFusionError { + fn from(e: ParquetError) -> Self { + DataFusionError::ParquetError(e) + } +} + impl From for DataFusionError { fn from(e: ParserError) -> Self { DataFusionError::SQL(e) @@ -84,6 +93,9 @@ impl Display for DataFusionError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match *self { DataFusionError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc), + DataFusionError::ParquetError(ref desc) => { + write!(f, "Parquet error: {}", desc) + } DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc), DataFusionError::SQL(ref desc) => { write!(f, "SQL error: {:?}", desc) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index ec14a15aa35f..ac797b448bbd 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -2504,43 +2504,43 @@ mod tests { let type_values = vec![ ( DataType::Int8, - Arc::new(Int8Array::from(vec![1])) as ArrayRef, + Arc::new(Int8Array::from_values(vec![1])) as ArrayRef, ), ( DataType::Int16, - Arc::new(Int16Array::from(vec![1])) as ArrayRef, + Arc::new(Int16Array::from_values(vec![1])) as ArrayRef, ), ( DataType::Int32, - Arc::new(Int32Array::from(vec![1])) as ArrayRef, + Arc::new(Int32Array::from_values(vec![1])) as ArrayRef, ), ( DataType::Int64, - Arc::new(Int64Array::from(vec![1])) as ArrayRef, + Arc::new(Int64Array::from_values(vec![1])) as ArrayRef, ), ( DataType::UInt8, - Arc::new(UInt8Array::from(vec![1])) as ArrayRef, + Arc::new(UInt8Array::from_values(vec![1])) as ArrayRef, ), ( DataType::UInt16, - Arc::new(UInt16Array::from(vec![1])) as ArrayRef, + Arc::new(UInt16Array::from_values(vec![1])) as ArrayRef, ), ( DataType::UInt32, - Arc::new(UInt32Array::from(vec![1])) as ArrayRef, + Arc::new(UInt32Array::from_values(vec![1])) as ArrayRef, ), ( DataType::UInt64, - Arc::new(UInt64Array::from(vec![1])) as ArrayRef, + Arc::new(UInt64Array::from_values(vec![1])) as ArrayRef, ), ( DataType::Float32, - Arc::new(Float32Array::from(vec![1.0_f32])) as ArrayRef, + Arc::new(Float32Array::from_values(vec![1.0_f32])) as ArrayRef, ), ( DataType::Float64, - Arc::new(Float64Array::from(vec![1.0_f64])) as ArrayRef, + Arc::new(Float64Array::from_values(vec![1.0_f64])) as ArrayRef, ), ]; diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 724a3f8493c5..c48b9e5a13de 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -19,7 +19,6 @@ use std::sync::{Arc, Mutex}; -use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::execution::context::{ExecutionContext, ExecutionContextState}; use crate::logical_plan::{ @@ -30,8 +29,9 @@ use crate::{ dataframe::*, physical_plan::{collect, collect_partitioned}, }; +use arrow::io::print; +use arrow::record_batch::RecordBatch; -use crate::arrow::util::pretty; use crate::physical_plan::{ execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream, }; @@ -160,13 +160,13 @@ impl DataFrame for DataFrameImpl { /// Print results. async fn show(&self) -> Result<()> { let results = self.collect().await?; - Ok(pretty::print_batches(&results)?) + Ok(print::print(&results)) } /// Print results and limit rows. async fn show_limit(&self, num: usize) -> Result<()> { let results = self.limit(num)?.collect().await?; - Ok(pretty::print_batches(&results)?) + Ok(print::print(&results)) } /// Convert the logical plan represented by this DataFrame into a physical plan and diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index 5841a2a144c8..529809729cf6 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -229,6 +229,8 @@ pub mod variable; // re-export dependencies from arrow-rs to minimise version maintenance for crate users pub use arrow; +mod arrow_temporal_util; + #[cfg(test)] pub mod test; pub mod test_util; diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index 9815f34aa279..2d0d8d25b9a8 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -20,9 +20,6 @@ pub use super::Operator; -use std::fmt; -use std::sync::Arc; - use arrow::{compute::cast::can_cast_types, datatypes::DataType}; use crate::error::{DataFusionError, Result}; diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index b3b6d5369dab..cb81b8d852fb 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -31,19 +31,6 @@ use std::{ sync::Arc, }; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - -use crate::datasource::TableProvider; -use crate::sql::parser::FileType; - -use super::expr::Expr; -use super::extension::UserDefinedLogicalNode; -use super::{ - display::{GraphvizVisitor, IndentVisitor}, - Column, -}; -use crate::logical_plan::dfschema::DFSchemaRef; - /// Join type #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum JoinType { diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index 5b4c28078c72..94404148c00d 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -21,9 +21,9 @@ use std::sync::Arc; use arrow::compute::cast; -use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use arrow::datatypes::DataType; +use crate::arrow_temporal_util::string_to_timestamp_nanos; use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::{DFSchemaRef, Expr, ExprRewriter, LogicalPlan, Operator}; @@ -31,7 +31,6 @@ use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; use crate::physical_plan::functions::BuiltinScalarFunction; use crate::scalar::ScalarValue; -use arrow::compute::{kernels, DEFAULT_CAST_OPTIONS}; /// Optimizer that simplifies comparison expressions involving boolean literals. /// @@ -228,7 +227,7 @@ impl<'a> ExprRewriter for ConstantRewriter<'a> { if !args.is_empty() { match &args[0] { Expr::Literal(ScalarValue::Utf8(Some(val))) => { - match cast::utf8_to_timestamp_ns_scalar(val) { + match string_to_timestamp_nanos(val) { Ok(timestamp) => Expr::Literal( ScalarValue::TimestampNanosecond(Some(timestamp)), ), diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 31d10d627261..fd8650411d71 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -133,6 +133,7 @@ mod tests { metrics, None, 2048, + None, )), )?; @@ -173,6 +174,7 @@ mod tests { metrics, None, 2048, + None, )), )?), )?; diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs index d0125579ace2..541aa34f1207 100644 --- a/datafusion/src/physical_plan/analyze.rs +++ b/datafusion/src/physical_plan/analyze.rs @@ -25,10 +25,11 @@ use crate::{ physical_plan::{display::DisplayableExecutionPlan, Partitioning}, physical_plan::{DisplayFormatType, ExecutionPlan}, }; -use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use futures::StreamExt; use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream}; +use arrow::array::MutableUtf8Array; use async_trait::async_trait; /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input, @@ -149,44 +150,39 @@ impl ExecutionPlan for AnalyzeExec { } let end = Instant::now(); - let mut type_builder = StringBuilder::new(1); - let mut plan_builder = StringBuilder::new(1); + let mut type_builder: MutableUtf8Array = MutableUtf8Array::new(); + let mut plan_builder: MutableUtf8Array = MutableUtf8Array::new(); // TODO use some sort of enum rather than strings? - type_builder.append_value("Plan with Metrics").unwrap(); + type_builder.push(Some("Plan with Metrics")); let annotated_plan = DisplayableExecutionPlan::with_metrics(captured_input.as_ref()) .indent() .to_string(); - plan_builder.append_value(annotated_plan).unwrap(); + plan_builder.push(Some(annotated_plan)); // Verbose output // TODO make this more sophisticated if verbose { - type_builder.append_value("Plan with Full Metrics").unwrap(); + type_builder.push(Some("Plan with Full Metrics")); let annotated_plan = DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref()) .indent() .to_string(); - plan_builder.append_value(annotated_plan).unwrap(); + plan_builder.push(Some(annotated_plan)); - type_builder.append_value("Output Rows").unwrap(); - plan_builder.append_value(total_rows.to_string()).unwrap(); + type_builder.push(Some("Output Rows")); + plan_builder.push(Some(total_rows.to_string())); - type_builder.append_value("Duration").unwrap(); - plan_builder - .append_value(format!("{:?}", end - start)) - .unwrap(); + type_builder.push(Some("Duration")); + plan_builder.push(Some(format!("{:?}", end - start))); } let maybe_batch = RecordBatch::try_new( captured_schema, - vec![ - Arc::new(type_builder.finish()), - Arc::new(plan_builder.finish()), - ], + vec![type_builder.into_arc(), plan_builder.into_arc()], ); // again ignore error tx.send(maybe_batch).await.ok(); diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs index f48dcded9979..638b91f5f8ae 100644 --- a/datafusion/src/physical_plan/datetime_expressions.rs +++ b/datafusion/src/physical_plan/datetime_expressions.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use super::ColumnarValue; +use crate::arrow_temporal_util::string_to_timestamp_nanos; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -26,17 +27,14 @@ use crate::{ use arrow::{ array::*, compute::cast, - compute::kernels::cast_utils::string_to_timestamp_nanos, - datatypes::{ - ArrowPrimitiveType, DataType, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, - }, + datatypes::{DataType, TimeUnit}, types::NativeType, }; use arrow::{compute::temporal, temporal_conversions::timestamp_ns_to_datetime}; -use chrono::prelude::{DateTime, Local, NaiveDateTime, Utc}; +use chrono::prelude::{DateTime, Utc}; use chrono::Datelike; use chrono::Duration; +use chrono::Timelike; /// given a function `op` that maps a `&str` to a Result of an arrow native type, /// returns a `PrimitiveArray` after the application @@ -135,7 +133,7 @@ where } } -/// Calls string_to_timestamp_nanos and converts the error type +/// Calls cast::string_to_timestamp_nanos and converts the error type fn string_to_timestamp_nanos_shim(s: &str) -> Result { string_to_timestamp_nanos(s).map_err(|e| e.into()) } diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index dc518465f89b..f09481a94400 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -18,7 +18,6 @@ //! Implementations for DISTINCT expressions, e.g. `COUNT(DISTINCT c)` use std::any::Any; -use std::convert::TryFrom; use std::fmt::Debug; use std::sync::Arc; @@ -159,8 +158,8 @@ impl Accumulator for DistinctCountAccumulator { (0..col_values[0].len()).try_for_each(|row_index| { let row_values = col_values .iter() - .map(|col| ScalarValue::try_from_array(col, row_index)) - .collect::>>()?; + .map(|col| col[row_index].clone()) + .collect::>(); self.update(&row_values) }) } @@ -213,52 +212,25 @@ mod tests { use arrow::datatypes::DataType; macro_rules! state_to_vec { - ($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{ + ($LIST:expr, $DATA_TYPE:ident, $PRIM_TY:ty) => {{ match $LIST { - ScalarValue::List(_, data_type) => assert_eq!( - ListArray::::get_child_type(data_type), - &DataType::$DATA_TYPE - ), - _ => panic!("Expected a ScalarValue::List"), - } - - match $LIST { - ScalarValue::List(None, _) => None, - ScalarValue::List(Some(values), _) => { - let vec = values - .as_any() - .downcast_ref::<$ARRAY_TY>() - .unwrap() - .iter() - .map(|x| x.map(|x| *x)) - .collect::>(); - - Some(vec) - } - _ => unreachable!(), - } - }}; - } - - macro_rules! state_to_vec_bool { - ($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{ - match $LIST { - ScalarValue::List(_, data_type) => assert_eq!( - ListArray::::get_child_type(data_type), - &DataType::$DATA_TYPE - ), + ScalarValue::List(_, data_type) => match data_type.as_ref() { + &DataType::$DATA_TYPE => (), + _ => panic!("Unexpected DataType for list"), + }, _ => panic!("Expected a ScalarValue::List"), } match $LIST { ScalarValue::List(None, _) => None, - ScalarValue::List(Some(values), _) => { - let vec = values - .as_any() - .downcast_ref::<$ARRAY_TY>() - .unwrap() + ScalarValue::List(Some(scalar_values), _) => { + let vec = scalar_values .iter() - .collect::>(); + .map(|scalar_value| match scalar_value { + ScalarValue::$DATA_TYPE(value) => *value, + _ => panic!("Unexpected ScalarValue variant"), + }) + .collect::>>(); Some(vec) } @@ -337,7 +309,7 @@ mod tests { macro_rules! test_count_distinct_update_batch_numeric { ($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{ - let values = &[ + let values: Vec> = vec![ Some(1), Some(1), None, @@ -354,7 +326,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; let mut state_vec = - state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap(); + state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap(); state_vec.sort(); assert_eq!(states.len(), 1); @@ -406,7 +378,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; let mut state_vec = - state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap(); + state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap(); state_vec.sort_by(|a, b| match (a, b) { (Some(lhs), Some(rhs)) => { OrderedFloat::from(*lhs).cmp(&OrderedFloat::from(*rhs)) @@ -490,8 +462,7 @@ mod tests { let get_count = |data: BooleanArray| -> Result<(Vec>, u64)> { let arrays = vec![Arc::new(data) as ArrayRef]; let (states, result) = run_update_batch(&arrays)?; - let mut state_vec = - state_to_vec_bool!(&states[0], Boolean, BooleanArray).unwrap(); + let mut state_vec = state_to_vec!(&states[0], Boolean, bool).unwrap(); state_vec.sort(); let count = match result { ScalarValue::UInt64(c) => c.ok_or_else(|| { @@ -551,7 +522,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; assert_eq!(states.len(), 1); - assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![])); + assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![])); assert_eq!(result, ScalarValue::UInt64(Some(0))); Ok(()) @@ -564,7 +535,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; assert_eq!(states.len(), 1); - assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![])); + assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![])); assert_eq!(result, ScalarValue::UInt64(Some(0))); Ok(()) @@ -578,8 +549,8 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; - let state_vec1 = state_to_vec!(&states[0], Int8, Int8Array).unwrap(); - let state_vec2 = state_to_vec!(&states[1], Int16, Int16Array).unwrap(); + let state_vec1 = state_to_vec!(&states[0], Int8, i8).unwrap(); + let state_vec2 = state_to_vec!(&states[1], Int16, i16).unwrap(); let state_pairs = collect_states::(&state_vec1, &state_vec2); assert_eq!(states.len(), 2); @@ -608,8 +579,8 @@ mod tests { ], )?; - let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap(); - let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap(); + let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap(); + let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap(); let state_pairs = collect_states::(&state_vec1, &state_vec2); assert_eq!(states.len(), 2); @@ -645,8 +616,8 @@ mod tests { ], )?; - let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap(); - let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap(); + let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap(); + let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap(); let state_pairs = collect_states::(&state_vec1, &state_vec2); assert_eq!(states.len(), 2); @@ -681,8 +652,8 @@ mod tests { let (states, result) = run_merge_batch(&[Arc::new(state_in1), Arc::new(state_in2)])?; - let state_out_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap(); - let state_out_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap(); + let state_out_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap(); + let state_out_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap(); let state_pairs = collect_states::(&state_out_vec1, &state_out_vec2); assert_eq!( diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index 4fa926eb68a1..8f833c166689 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -121,13 +121,13 @@ impl ExecutionPlan for ExplainExec { let mut prev: Option<&StringifiedPlan> = None; for p in plans_to_print { - type_builder.append_value(p.plan_type.to_string())?; + type_builder.push(Some(p.plan_type.to_string())); match prev { Some(prev) if !should_show(prev, p) => { - plan_builder.append_value("SAME TEXT AS ABOVE")?; + plan_builder.push(Some("SAME TEXT AS ABOVE")); } Some(_) | None => { - plan_builder.append_value(&*p.plan)?; + plan_builder.push(Some(p.plan.to_string())); } } prev = Some(p); diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs index 3192c5dbfbb7..3185d036a837 100644 --- a/datafusion/src/physical_plan/expressions/binary.rs +++ b/datafusion/src/physical_plan/expressions/binary.rs @@ -776,25 +776,6 @@ mod tests { Ok(()) } - #[test] - fn modulus_op() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - ])); - let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048])); - let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32])); - - apply_arithmetic::( - schema, - vec![a, b], - Operator::Modulo, - Int32Array::from(vec![0, 0, 2, 8, 0]), - )?; - - Ok(()) - } - fn apply_arithmetic( schema: Arc, data: Vec>, @@ -837,7 +818,7 @@ mod tests { apply_arithmetic::( schema, vec![a, b], - Operator::Modulus, + Operator::Modulo, Int32Array::from_slice(&[0, 0, 2, 8, 0]), )?; diff --git a/datafusion/src/physical_plan/expressions/in_list.rs b/datafusion/src/physical_plan/expressions/in_list.rs index 0585c78d7a0a..cc037debdc97 100644 --- a/datafusion/src/physical_plan/expressions/in_list.rs +++ b/datafusion/src/physical_plan/expressions/in_list.rs @@ -20,39 +20,43 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::Utf8Array; -use arrow::array::*; -use arrow::datatypes::ArrowPrimitiveType; use arrow::{ + array::*, + bitmap::Bitmap, datatypes::{DataType, Schema}, record_batch::RecordBatch, + types::NativeType, }; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ColumnarValue, PhysicalExpr}; use crate::scalar::ScalarValue; -use arrow::array::*; -use arrow::buffer::{Buffer, MutableBuffer}; macro_rules! compare_op_scalar { ($left: expr, $right:expr, $op:expr) => {{ - let null_bit_buffer = $left.data().null_buffer().cloned(); - - let comparison = - (0..$left.len()).map(|i| unsafe { $op($left.value_unchecked(i), $right) }); - // same as $left.len() - let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(comparison) }; + let validity = $left.validity(); + let values = + Bitmap::from_trusted_len_iter($left.values_iter().map(|x| $op(x, $right))); + Ok(BooleanArray::from_data( + DataType::Boolean, + values, + validity.clone(), + )) + }}; +} - let data = ArrayData::new( +// TODO: primitive array currently doesn't have `values_iter()`, it may +// worth adding one there, and this specialized case could be removed. +macro_rules! compare_primitive_op_scalar { + ($left: expr, $right:expr, $op:expr) => {{ + let validity = $left.validity(); + let values = + Bitmap::from_trusted_len_iter($left.values().iter().map(|x| $op(x, $right))); + Ok(BooleanArray::from_data( DataType::Boolean, - $left.len(), - None, - null_bit_buffer, - 0, - vec![Buffer::from(buffer)], - vec![], - ); - Ok(BooleanArray::from(data)) + values, + validity.clone(), + )) }}; } @@ -175,39 +179,31 @@ macro_rules! make_contains_primitive { } // whether each value on the left (can be null) is contained in the non-null list -fn in_list_primitive( +fn in_list_primitive( array: &PrimitiveArray, - values: &[::Native], + values: &[T], ) -> Result { - compare_op_scalar!( - array, - values, - |x, v: &[::Native]| v.contains(&x) - ) + compare_primitive_op_scalar!(array, values, |x, v: &[T]| v.contains(x)) } // whether each value on the left (can be null) is contained in the non-null list -fn not_in_list_primitive( +fn not_in_list_primitive( array: &PrimitiveArray, - values: &[::Native], + values: &[T], ) -> Result { - compare_op_scalar!( - array, - values, - |x, v: &[::Native]| !v.contains(&x) - ) + compare_primitive_op_scalar!(array, values, |x, v: &[T]| !v.contains(x)) } // whether each value on the left (can be null) is contained in the non-null list -fn in_list_utf8( - array: &GenericStringArray, +fn in_list_utf8( + array: &Utf8Array, values: &[&str], ) -> Result { compare_op_scalar!(array, values, |x, v: &[&str]| v.contains(&x)) } -fn not_in_list_utf8( - array: &GenericStringArray, +fn not_in_list_utf8( + array: &Utf8Array, values: &[&str], ) -> Result { compare_op_scalar!(array, values, |x, v: &[&str]| !v.contains(&x)) diff --git a/datafusion/src/physical_plan/expressions/lead_lag.rs b/datafusion/src/physical_plan/expressions/lead_lag.rs index d1f6c197a186..76ba5692f693 100644 --- a/datafusion/src/physical_plan/expressions/lead_lag.rs +++ b/datafusion/src/physical_plan/expressions/lead_lag.rs @@ -23,10 +23,11 @@ use crate::physical_plan::window_functions::PartitionEvaluator; use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; use crate::scalar::ScalarValue; use arrow::array::ArrayRef; -use arrow::compute::cast; +use arrow::compute::cast::cast; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; use std::any::Any; +use std::borrow::Borrow; use std::ops::Neg; use std::ops::Range; use std::sync::Arc; @@ -127,9 +128,11 @@ fn create_empty_array( let array = value .as_ref() .map(|scalar| scalar.to_array_of_size(size)) - .unwrap_or_else(|| new_null_array(data_type, size)); + .unwrap_or_else(|| ArrayRef::from(new_null_array(data_type.clone(), size))); if array.data_type() != data_type { - cast(&array, data_type).map_err(DataFusionError::ArrowError) + cast(array.borrow(), data_type) + .map_err(DataFusionError::ArrowError) + .map(ArrayRef::from) } else { Ok(array) } @@ -145,7 +148,7 @@ fn shift_with_default_value( let value_len = array.len() as i64; if offset == 0 { - Ok(arrow::array::make_array(array.data_ref().clone())) + Ok(array.clone()) } else if offset == i64::MIN || offset.abs() >= value_len { create_empty_array(value, array.data_type(), array.len()) } else { @@ -158,11 +161,13 @@ fn shift_with_default_value( let default_values = create_empty_array(value, slice.data_type(), nulls)?; // Concatenate both arrays, add nulls after if shift > 0 else before if offset > 0 { - concat(&[default_values.as_ref(), slice.as_ref()]) + concat::concatenate(&[default_values.as_ref(), slice.as_ref()]) .map_err(DataFusionError::ArrowError) + .map(ArrayRef::from) } else { - concat(&[slice.as_ref(), default_values.as_ref()]) + concat::concatenate(&[slice.as_ref(), default_values.as_ref()]) .map_err(DataFusionError::ArrowError) + .map(ArrayRef::from) } } } @@ -171,7 +176,11 @@ impl PartitionEvaluator for WindowShiftEvaluator { fn evaluate_partition(&self, partition: Range) -> Result { let value = &self.values[0]; let value = value.slice(partition.start, partition.end - partition.start); - shift_with_default_value(&value, self.shift_offset, &self.default_value) + shift_with_default_value( + ArrayRef::from(value).borrow(), + self.shift_offset, + &self.default_value, + ) } } @@ -184,7 +193,8 @@ mod tests { use arrow::{array::*, datatypes::*}; fn test_i32_result(expr: WindowShift, expected: Int32Array) -> Result<()> { - let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, -2, 3, -4, 5, -6, 7, 8])); + let arr: ArrayRef = + Arc::new(Int32Array::from_slice(&[1, -2, 3, -4, 5, -6, 7, 8])); let values = vec![arr]; let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index 81a9985a038a..c37dc09614af 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -687,7 +687,8 @@ mod tests { #[test] fn min_date32() -> Result<()> { - let a: ArrayRef = Arc::new(Date32Array::from(vec![1, 2, 3, 4, 5])); + let a: ArrayRef = + Arc::new(Int32Array::from_slice(&[1, 2, 3, 4, 5]).to(DataType::Date32)); generic_test_op!( a, DataType::Date32, @@ -699,7 +700,8 @@ mod tests { #[test] fn min_date64() -> Result<()> { - let a: ArrayRef = Arc::new(Date64Array::from(vec![1, 2, 3, 4, 5])); + let a: ArrayRef = + Arc::new(Int64Array::from_slice(&[1, 2, 3, 4, 5]).to(DataType::Date64)); generic_test_op!( a, DataType::Date64, @@ -711,7 +713,8 @@ mod tests { #[test] fn max_date32() -> Result<()> { - let a: ArrayRef = Arc::new(Date32Array::from(vec![1, 2, 3, 4, 5])); + let a: ArrayRef = + Arc::new(Int32Array::from_slice(&[1, 2, 3, 4, 5]).to(DataType::Date32)); generic_test_op!( a, DataType::Date32, @@ -723,7 +726,8 @@ mod tests { #[test] fn max_date64() -> Result<()> { - let a: ArrayRef = Arc::new(Date64Array::from(vec![1, 2, 3, 4, 5])); + let a: ArrayRef = + Arc::new(Int64Array::from_slice(&[1, 2, 3, 4, 5]).to(DataType::Date64)); generic_test_op!( a, DataType::Date64, diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 0139f39b1cef..b363f9c1606c 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -23,7 +23,7 @@ use crate::physical_plan::window_functions::PartitionEvaluator; use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; use crate::scalar::ScalarValue; use arrow::array::{new_null_array, ArrayRef}; -use arrow::compute::kernels::window::shift; +use arrow::compute::window::shift; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; use std::any::Any; @@ -174,12 +174,15 @@ impl PartitionEvaluator for NthValueEvaluator { .collect::>>()? .into_iter() .flatten(); - ScalarValue::iter_to_array(values) + ScalarValue::iter_to_array(values).map(ArrayRef::from) } NthValueKind::Nth(n) => { let index = (n as usize) - 1; if index >= num_rows { - Ok(new_null_array(arr.data_type(), num_rows)) + Ok(ArrayRef::from(new_null_array( + arr.data_type().clone(), + num_rows, + ))) } else { let value = ScalarValue::try_from_array(arr, partition.start + index)?; @@ -187,7 +190,9 @@ impl PartitionEvaluator for NthValueEvaluator { // because the default window frame is between unbounded preceding and current // row, hence the shift because for values with indices < index they should be // null. This changes when window frames other than default is implemented - shift(arr.as_ref(), index as i64).map_err(DataFusionError::ArrowError) + shift(arr.as_ref(), index as i64) + .map_err(DataFusionError::ArrowError) + .map(ArrayRef::from) } } } @@ -202,7 +207,7 @@ mod tests { use arrow::record_batch::RecordBatch; use arrow::{array::*, datatypes::*}; - fn test_i32_result(expr: NthValue, expected: Vec) -> Result<()> { + fn test_i32_result(expr: NthValue, expected: Int32Array) -> Result<()> { let arr: ArrayRef = Arc::new(Int32Array::from_slice(&[1, -2, 3, -4, 5, -6, 7, 8])); let values = vec![arr]; @@ -212,7 +217,7 @@ mod tests { .create_evaluator(&batch)? .evaluate_with_rank(vec![0..8], vec![0..8])?; assert_eq!(1, result.len()); - let result = result.as_any().downcast_ref::().unwrap(); + let result = result[0].as_any().downcast_ref::().unwrap(); assert_eq!(expected, *result); Ok(()) } @@ -224,7 +229,7 @@ mod tests { Arc::new(Column::new("arr", 0)), DataType::Int32, ); - test_i32_result(first_value, Int32Array::from_iter_values(vec![1; 8]))?; + test_i32_result(first_value, Int32Array::from_values(vec![1; 8]))?; Ok(()) } @@ -235,7 +240,7 @@ mod tests { Arc::new(Column::new("arr", 0)), DataType::Int32, ); - test_i32_result(last_value, Int32Array::from_iter_values(vec![8; 8]))?; + test_i32_result(last_value, Int32Array::from_values(vec![8; 8]))?; Ok(()) } @@ -247,7 +252,7 @@ mod tests { DataType::Int32, 1, )?; - test_i32_result(nth_value, Int32Array::from_iter_values(vec![1; 8]))?; + test_i32_result(nth_value, Int32Array::from_values(vec![1; 8]))?; Ok(()) } @@ -261,7 +266,7 @@ mod tests { )?; test_i32_result( nth_value, - Int32Array::from(vec![ + Int32Array::from(&[ None, Some(-2), Some(-2), diff --git a/datafusion/src/physical_plan/expressions/rank.rs b/datafusion/src/physical_plan/expressions/rank.rs index b88dec378c06..e9f10622f2fd 100644 --- a/datafusion/src/physical_plan/expressions/rank.rs +++ b/datafusion/src/physical_plan/expressions/rank.rs @@ -93,14 +93,14 @@ impl PartitionEvaluator for RankEvaluator { ranks_in_partition: &[Range], ) -> Result { let result = if self.dense { - UInt64Array::from_iter_values(ranks_in_partition.iter().zip(1u64..).flat_map( + UInt64Array::from_values(ranks_in_partition.iter().zip(1u64..).flat_map( |(range, rank)| { let len = range.end - range.start; iter::repeat(rank).take(len) }, )) } else { - UInt64Array::from_iter_values( + UInt64Array::from_values( ranks_in_partition .iter() .scan(1_u64, |acc, range| { @@ -140,7 +140,7 @@ mod tests { ranks: Vec>, expected: Vec, ) -> Result<()> { - let arr: ArrayRef = Arc::new(Int32Array::from(data)); + let arr: ArrayRef = Arc::new(Int32Array::from_values(data)); let values = vec![arr]; let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; @@ -149,8 +149,8 @@ mod tests { .evaluate_with_rank(vec![0..8], ranks)?; assert_eq!(1, result.len()); let result = result[0].as_any().downcast_ref::().unwrap(); - let result = result.values(); - assert_eq!(expected, result); + let expected = UInt64Array::from_values(expected); + assert_eq!(expected, *result); Ok(()) } diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 1ce478fadba8..abcb2df3b913 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -21,7 +21,6 @@ use crate::error::Result; use crate::physical_plan::window_functions::PartitionEvaluator; use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; use arrow::array::{ArrayRef, UInt64Array}; -use arrow::buffer::Buffer; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; use std::any::Any; @@ -75,9 +74,7 @@ pub(crate) struct NumRowsEvaluator {} impl PartitionEvaluator for NumRowsEvaluator { fn evaluate_partition(&self, partition: Range) -> Result { let num_rows = partition.end - partition.start; - Ok(Arc::new(UInt64Array::from_iter_values( - 1..(num_rows as u64) + 1, - ))) + Ok(Arc::new(UInt64Array::from_values(1..(num_rows as u64) + 1))) } } @@ -99,7 +96,7 @@ mod tests { let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?; assert_eq!(1, result.len()); let result = result[0].as_any().downcast_ref::().unwrap(); - let result = result.values(); + let result = result.values().as_slice(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); Ok(()) } @@ -112,8 +109,9 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = row_number.evaluate(batch.num_rows(), &[])?; - let result = result.as_any().downcast_ref::().unwrap(); + let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?; + assert_eq!(1, result.len()); + let result = result[0].as_any().downcast_ref::().unwrap(); let result = result.values().as_slice(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); Ok(()) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 09e6191daadd..db65b1cf6cbf 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -21,12 +21,12 @@ use std::any::Any; use std::sync::Arc; use std::task::{Context, Poll}; +use ahash::RandomState; use futures::{ stream::{Stream, StreamExt}, Future, }; -use crate::error::{DataFusionError, Result}; use crate::physical_plan::hash_utils::create_hashes; use crate::physical_plan::{ Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan, @@ -37,11 +37,12 @@ use crate::{ scalar::ScalarValue, }; -use arrow::error::{ArrowError, Result as ArrowResult}; -use arrow::{array::*, compute}; -use arrow::{buffer::MutableBuffer, datatypes::*}; use arrow::{ + array::*, + buffer::MutableBuffer, + compute, datatypes::{DataType, Field, Schema, SchemaRef}, + error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; use hashbrown::raw::RawTable; @@ -320,36 +321,6 @@ pin_project! { } } -fn hash_(group_values: &[ArrayRef]) -> Result> { - // compute the hashes - // todo: we should be able to use `MutableBuffer` to compute the hash and ^ them without - // allocating all the hashes before ^ them - let hashes = group_values - .iter() - .map(|x| { - let a = match x.data_type() { - DataType::Dictionary(_, d) => { - // todo: think about how to perform this more efficiently - // * first hash, then unpack - // * do not unpack at all, and instead figure out a way to leverage dictionary-encoded. - let unpacked = arrow::compute::cast::cast(x.as_ref(), d)?; - arrow::compute::hash::hash(unpacked.as_ref()) - } - _ => arrow::compute::hash::hash(x.as_ref()), - }; - Ok(a?) - }) - .collect::>>()?; - let hash = MutableBuffer::::from(hashes[0].values().as_slice()); - - Ok(hashes.iter().skip(1).fold(hash, |mut acc, x| { - acc.iter_mut() - .zip(x.values().iter()) - .for_each(|(hash, other)| *hash = combine_hashes(*hash, *other)); - acc - })) -} - fn group_aggregate_batch( mode: &AggregateMode, random_state: &RandomState, @@ -438,17 +409,17 @@ fn group_aggregate_batch( } // Collect all indices + offsets based on keys in this vec - let mut batch_indices = MutableBuffer::::new(); + let mut batch_indices = MutableBuffer::::new(); let mut offsets = vec![0]; let mut offset_so_far = 0; for group_idx in groups_with_rows.iter() { let indices = &accumulators.group_states[*group_idx].indices; - batch_indices.append_slice(indices)?; + batch_indices.extend_from_slice(indices); offset_so_far += indices.len(); offsets.push(offset_so_far); } let batch_indices = - Int32Array::from_data(DataType::Int32, batch_indices.into(), None); + UInt32Array::from_data(DataType::UInt32, batch_indices.into(), None); // `Take` all values based on indices into Arrays let values: Vec>> = aggr_input_values @@ -974,7 +945,10 @@ fn create_batch_from_map( let columns = columns .iter() .zip(output_schema.fields().iter()) - .map(|(col, desired_field)| cast(col, desired_field.data_type())) + .map(|(col, desired_field)| { + arrow::compute::cast::cast(col.as_ref(), desired_field.data_type()) + .map(|v| Arc::from(v)) + }) .collect::>>()?; RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 52f666e56e73..8221e676f074 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -757,8 +757,8 @@ fn build_join_indexes( // If no rows matched left, still must keep the right // with all nulls for left if no_match { - left_indices.push(None)?; - right_indices.push(Some(row as u32))?; + left_indices.push(None); + right_indices.push(Some(row as u32)); } } None => { diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index 6a622df4f68d..bc7f4f611601 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -18,17 +18,13 @@ //! Functionality used both on logical and physical plans use crate::error::{DataFusionError, Result}; -use ahash::{CallHasher, RandomState}; +pub use ahash::{CallHasher, RandomState}; use arrow::array::{ - Array, ArrayRef, BooleanArray, Date32Array, Date64Array, DictionaryArray, - Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, - LargeStringArray, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, -}; -use arrow::datatypes::{ - ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Int16Type, Int32Type, - Int64Type, Int8Type, Schema, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + Array, ArrayRef, BooleanArray, DictionaryArray, DictionaryKey, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, + UInt32Array, UInt64Array, UInt8Array, Utf8Array, }; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use std::collections::HashSet; use std::sync::Arc; @@ -120,7 +116,7 @@ fn combine_hashes(l: u64, r: u64) -> u64 { } macro_rules! hash_array { - ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { + ($array_type:ty, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); if array.null_count() == 0 { if $multi_col { @@ -250,7 +246,7 @@ macro_rules! hash_array_float { } /// Hash the values in a dictionary array -fn create_hashes_dictionary( +fn create_hashes_dictionary( array: &ArrayRef, random_state: &RandomState, hashes_buffer: &mut Vec, @@ -432,7 +428,7 @@ pub fn create_hashes<'a>( } DataType::Timestamp(TimeUnit::Millisecond, None) => { hash_array_primitive!( - TimestampMillisecondArray, + Int64Array, col, i64, hashes_buffer, @@ -442,7 +438,7 @@ pub fn create_hashes<'a>( } DataType::Timestamp(TimeUnit::Microsecond, None) => { hash_array_primitive!( - TimestampMicrosecondArray, + Int64Array, col, i64, hashes_buffer, @@ -452,7 +448,7 @@ pub fn create_hashes<'a>( } DataType::Timestamp(TimeUnit::Nanosecond, None) => { hash_array_primitive!( - TimestampNanosecondArray, + Int64Array, col, i64, hashes_buffer, @@ -462,7 +458,7 @@ pub fn create_hashes<'a>( } DataType::Date32 => { hash_array_primitive!( - Date32Array, + Int32Array, col, i32, hashes_buffer, @@ -472,7 +468,7 @@ pub fn create_hashes<'a>( } DataType::Date64 => { hash_array_primitive!( - Date64Array, + Int64Array, col, i64, hashes_buffer, @@ -492,7 +488,7 @@ pub fn create_hashes<'a>( } DataType::Utf8 => { hash_array!( - StringArray, + Utf8Array::, col, str, hashes_buffer, @@ -502,7 +498,7 @@ pub fn create_hashes<'a>( } DataType::LargeUtf8 => { hash_array!( - LargeStringArray, + Utf8Array::, col, str, hashes_buffer, @@ -512,7 +508,7 @@ pub fn create_hashes<'a>( } DataType::Dictionary(index_type, _) => match **index_type { DataType::Int8 => { - create_hashes_dictionary::( + create_hashes_dictionary::( col, random_state, hashes_buffer, @@ -520,7 +516,7 @@ pub fn create_hashes<'a>( )?; } DataType::Int16 => { - create_hashes_dictionary::( + create_hashes_dictionary::( col, random_state, hashes_buffer, @@ -528,7 +524,7 @@ pub fn create_hashes<'a>( )?; } DataType::Int32 => { - create_hashes_dictionary::( + create_hashes_dictionary::( col, random_state, hashes_buffer, @@ -536,7 +532,7 @@ pub fn create_hashes<'a>( )?; } DataType::Int64 => { - create_hashes_dictionary::( + create_hashes_dictionary::( col, random_state, hashes_buffer, @@ -544,7 +540,7 @@ pub fn create_hashes<'a>( )?; } DataType::UInt8 => { - create_hashes_dictionary::( + create_hashes_dictionary::( col, random_state, hashes_buffer, @@ -552,7 +548,7 @@ pub fn create_hashes<'a>( )?; } DataType::UInt16 => { - create_hashes_dictionary::( + create_hashes_dictionary::( col, random_state, hashes_buffer, @@ -560,7 +556,7 @@ pub fn create_hashes<'a>( )?; } DataType::UInt32 => { - create_hashes_dictionary::( + create_hashes_dictionary::( col, random_state, hashes_buffer, @@ -568,7 +564,7 @@ pub fn create_hashes<'a>( )?; } DataType::UInt64 => { - create_hashes_dictionary::( + create_hashes_dictionary::( col, random_state, hashes_buffer, @@ -598,7 +594,8 @@ pub fn create_hashes<'a>( mod tests { use std::sync::Arc; - use arrow::{array::DictionaryArray, datatypes::Int8Type}; + use arrow::array::TryExtend; + use arrow::array::{DictionaryArray, MutableDictionaryArray, MutableUtf8Array}; use super::*; @@ -663,8 +660,8 @@ mod tests { #[test] fn create_hashes_for_float_arrays() -> Result<()> { - let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7])); - let f64_arr = Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7])); + let f32_arr = Arc::new(Float32Array::from_slice(&[0.12, 0.5, 1f32, 444.7])); + let f64_arr = Arc::new(Float64Array::from_slice(&[0.12, 0.5, 1f64, 444.7])); let random_state = RandomState::with_seeds(0, 0, 0, 0); let hashes_buff = &mut vec![0; f32_arr.len()]; @@ -683,13 +680,10 @@ mod tests { fn create_hashes_for_dict_arrays() { let strings = vec![Some("foo"), None, Some("bar"), Some("foo"), None]; - let string_array = Arc::new(strings.iter().cloned().collect::()); - let dict_array = Arc::new( - strings - .iter() - .cloned() - .collect::>(), - ); + let string_array = Arc::new(strings.iter().cloned().collect::>()); + let mut dict_array = MutableDictionaryArray::>::new(); + dict_array.try_extend(strings.iter().cloned()).unwrap(); + let dict_array = dict_array.into_arc(); let random_state = RandomState::with_seeds(0, 0, 0, 0); @@ -728,13 +722,10 @@ mod tests { let strings1 = vec![Some("foo"), None, Some("bar")]; let strings2 = vec![Some("blarg"), Some("blah"), None]; - let string_array = Arc::new(strings1.iter().cloned().collect::()); - let dict_array = Arc::new( - strings2 - .iter() - .cloned() - .collect::>(), - ); + let string_array = Arc::new(strings1.iter().cloned().collect::>()); + let mut dict_array = MutableDictionaryArray::>::new(); + dict_array.try_extend(strings2.iter().cloned()).unwrap(); + let dict_array = dict_array.into_arc(); let random_state = RandomState::with_seeds(0, 0, 0, 0); diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 13726e702752..e571e97beb4f 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -17,18 +17,16 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. -use self::display::DisplayableExecutionPlan; -use self::expressions::{PhysicalSortExpr, SortColumn}; pub use self::metrics::Metric; use self::metrics::MetricsSet; use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; -use crate::error::DataFusionError; -use crate::execution::context::ExecutionContextState; -use crate::logical_plan::LogicalPlan; -use crate::physical_plan::merge::MergeExec; -use crate::{error::Result, scalar::ScalarValue}; +use crate::physical_plan::expressions::{PhysicalSortExpr, SortColumn}; +use crate::{ + error::{DataFusionError, Result}, + scalar::ScalarValue, +}; use arrow::array::ArrayRef; use arrow::compute::merge_sort::SortOptions; use arrow::compute::partition::lexicographical_partition_ranges; @@ -41,7 +39,6 @@ use futures::stream::Stream; use std::fmt; use std::fmt::{Debug, Display}; use std::ops::Range; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::{any::Any, pin::Pin}; diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index a6b3357740ee..aa2221be9f6e 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -17,15 +17,12 @@ //! Execution plan for reading Parquet files -use std::any::Any; +use fmt::Debug; use std::fmt; use std::fs::File; use std::sync::Arc; -use std::task::{Context, Poll}; use std::{any::Any, convert::TryInto}; -use super::{RecordBatchStream, SendableRecordBatchStream}; -use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning}; use crate::{ error::{DataFusionError, Result}, logical_plan::{Column, Expr}, @@ -37,17 +34,20 @@ use crate::{ }; use arrow::{ - datatypes::*, error::Result as ArrowResult, io::parquet::read, + array::ArrayRef, + datatypes::*, + error::Result as ArrowResult, + io::parquet::read::{self, RowGroupMetaData}, record_batch::RecordBatch, }; use log::debug; -use parquet::file::{ - metadata::RowGroupMetaData, - reader::{FileReader, SerializedFileReader}, - statistics::Statistics as ParquetStatistics, + +use parquet::statistics::{ + BinaryStatistics as ParquetBinaryStatistics, + BooleanStatistics as ParquetBooleanStatistics, + PrimitiveStatistics as ParquetPrimitiveStatistics, Statistics as ParquetStatistics, }; -use fmt::Debug; use tokio::{ sync::mpsc::{channel, Receiver, Sender}, task, @@ -67,9 +67,11 @@ pub struct ParquetExec { /// Parquet partitions to read pub partitions: Vec, /// Schema after projection is applied - schema: Arc, + pub schema: Arc, /// Projection for which columns to load projection: Vec, + /// Batch size + batch_size: usize, /// Statistics for the data set (sum of statistics for all partitions) statistics: Statistics, /// Execution metrics @@ -77,7 +79,7 @@ pub struct ParquetExec { /// Optional predicate builder predicate_builder: Option, /// Optional limit of the number of rows - limit: usize, + limit: Option, } /// Represents one partition of a Parquet data set and this currently means one Parquet file. @@ -303,7 +305,7 @@ fn producer_task( reader, Some(projection.to_vec()), Some(limit), - Arc::new(|_, _| true), + None, None, )?; @@ -358,8 +360,9 @@ impl ExecutionPlan for ParquetExec { let partition = self.partitions[partition_index].clone(); let metrics = self.metrics.clone(); let projection = self.projection.clone(); + let predicate_builder = self.predicate_builder.clone(); + let batch_size = self.batch_size; let limit = self.limit; - let schema = self.schema.clone(); task::spawn_blocking(move || { if let Err(e) = read_partition( @@ -427,33 +430,59 @@ struct RowGroupPruningStatistics<'a> { /// Extract the min/max statistics from a `ParquetStatistics` object macro_rules! get_statistic { - ($column_statistics:expr, $func:ident, $bytes_func:ident) => {{ - if !$column_statistics.has_min_max_set() { - return None; - } - match $column_statistics { - ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))), - ParquetStatistics::Int32(s) => Some(ScalarValue::Int32(Some(*s.$func()))), - ParquetStatistics::Int64(s) => Some(ScalarValue::Int64(Some(*s.$func()))), + ($column_statistics:expr, $attr:ident) => {{ + use arrow::io::parquet::read::PhysicalType; + + match $column_statistics.physical_type() { + PhysicalType::Boolean => { + let stats = $column_statistics + .as_any() + .downcast_ref::()?; + stats.$attr.map(|v| ScalarValue::Boolean(Some(v))) + } + PhysicalType::Int32 => { + let stats = $column_statistics + .as_any() + .downcast_ref::>()?; + stats.$attr.map(|v| ScalarValue::Int32(Some(v))) + } + PhysicalType::Int64 => { + let stats = $column_statistics + .as_any() + .downcast_ref::>()?; + stats.$attr.map(|v| ScalarValue::Int64(Some(v))) + } // 96 bit ints not supported - ParquetStatistics::Int96(_) => None, - ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))), - ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))), - ParquetStatistics::ByteArray(s) => { - let s = std::str::from_utf8(s.$bytes_func()) - .map(|s| s.to_string()) - .ok(); - Some(ScalarValue::Utf8(s)) + PhysicalType::Int96 => None, + PhysicalType::Float => { + let stats = $column_statistics + .as_any() + .downcast_ref::>()?; + stats.$attr.map(|v| ScalarValue::Float32(Some(v))) + } + PhysicalType::Double => { + let stats = $column_statistics + .as_any() + .downcast_ref::>()?; + stats.$attr.map(|v| ScalarValue::Float64(Some(v))) + } + PhysicalType::ByteArray => { + let stats = $column_statistics + .as_any() + .downcast_ref::()?; + stats.$attr.as_ref().map(|v| { + ScalarValue::Utf8(std::str::from_utf8(v).map(|s| s.to_string()).ok()) + }) } // type not supported yet - ParquetStatistics::FixedLenByteArray(_) => None, + PhysicalType::FixedLenByteArray(_) => None, } }}; } -// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate +// Extract the min or max value through the `attr` field from ParquetStatistics as appropriate macro_rules! get_min_max_values { - ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ + ($self:expr, $column:expr, $attr:ident) => {{ let (column_index, field) = if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { (v, f) } else { @@ -472,10 +501,11 @@ macro_rules! get_min_max_values { let scalar_values : Vec = $self.row_group_metadata .iter() .flat_map(|meta| { - meta.column(column_index).statistics() + // FIXME: get rid of unwrap + meta.column(column_index).statistics().unwrap() }) .map(|stats| { - get_statistic!(stats, $func, $bytes_func) + get_statistic!(stats, $attr) }) .map(|maybe_scalar| { // column either did't have statistics at all or didn't have min/max values @@ -484,17 +514,17 @@ macro_rules! get_min_max_values { .collect(); // ignore errors converting to arrays (e.g. different types) - ScalarValue::iter_to_array(scalar_values).ok() + ScalarValue::iter_to_array(scalar_values).ok().map(|v| Arc::from(v)) }} } impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn min_values(&self, column: &Column) -> Option { - get_min_max_values!(self, column, min, min_bytes) + get_min_max_values!(self, column, min_value) } fn max_values(&self, column: &Column) -> Option { - get_min_max_values!(self, column, max, max_bytes) + get_min_max_values!(self, column, max_value) } fn num_containers(&self) -> usize { @@ -506,7 +536,7 @@ fn build_row_group_predicate( predicate_builder: &PruningPredicate, metrics: ParquetFileMetrics, row_group_metadata: &[RowGroupMetaData], -) -> Box bool> { +) -> Box bool> { let parquet_schema = predicate_builder.schema().as_ref(); let pruning_stats = RowGroupPruningStatistics { @@ -520,14 +550,14 @@ fn build_row_group_predicate( // NB: false means don't scan row group let num_pruned = values.iter().filter(|&v| !*v).count(); metrics.row_groups_pruned.add(num_pruned); - Box::new(move |_, i| values[i]) + Box::new(move |i, _| values[i]) } // stats filter array could not be built // return a closure which will not filter out any row groups Err(e) => { debug!("Error evaluating row group predicate values {}", e); metrics.predicate_evaluation_errors.add(1); - Box::new(|_r, _i| true) + Box::new(|_i, _r| true) } } } @@ -543,56 +573,36 @@ fn read_partition( response_tx: Sender>, limit: Option, ) -> Result<()> { - let mut total_rows = 0; let all_files = partition.file_partition.files; - 'outer: for partitioned_file in all_files { + for partitioned_file in all_files { let file_metrics = ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics); let file = File::open(partitioned_file.path.as_str())?; - let mut file_reader = SerializedFileReader::new(file)?; + let mut reader = read::RecordReader::try_new( + std::io::BufReader::new(file), + Some(projection.to_vec()), + limit, + None, + None, + )?; + if let Some(predicate_builder) = predicate_builder { - let row_group_predicate = build_row_group_predicate( + let file_metadata = reader.metadata(); + reader.set_groups_filter(Arc::new(build_row_group_predicate( predicate_builder, file_metrics, - file_reader.metadata().row_groups(), - ); - file_reader.filter_row_groups(&row_group_predicate); + &reader.metadata().row_groups, + ))); } - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - let mut batch_reader = arrow_reader - .get_record_reader_by_columns(projection.to_owned(), batch_size)?; - loop { - match batch_reader.next() { - Some(Ok(batch)) => { - total_rows += batch.num_rows(); - send_result(&response_tx, Ok(batch))?; - if limit.map(|l| total_rows >= l).unwrap_or(false) { - break 'outer; - } - } - None => { - break; - } - Some(Err(e)) => { - let err_msg = format!( - "Error reading batch from {}: {}", - partitioned_file, - e.to_string() - ); - // send error to operator - send_result( - &response_tx, - Err(ArrowError::ParquetError(err_msg.clone())), - )?; - // terminate thread with error - return Err(DataFusionError::Execution(err_msg)); - } - } + + for batch in reader { + response_tx + .blocking_send(batch) + .map_err(|x| DataFusionError::Execution(format!("{}", x)))?; } } - // finished reading files (dropping response_tx will close - // channel) + // finished reading files (dropping response_tx will close channel) Ok(()) } diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 7c6b80e66d1d..bccebf5e467a 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -27,7 +27,10 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::hash_utils::create_hashes; use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::record_batch::RecordBatch; -use arrow::{array::Array, error::Result as ArrowResult}; +use arrow::{ + array::{Array, ArrayRef, UInt32Array, UInt64Array, Utf8Array}, + error::Result as ArrowResult, +}; use arrow::{compute::take, datatypes::SchemaRef}; use tokio_stream::wrappers::UnboundedReceiverStream; diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index f3f0803165ab..ef668afcb2cf 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -25,8 +25,13 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use arrow::array::ord::DynComparator; use arrow::array::{growable::make_growable, ord::build_compare, ArrayRef}; use arrow::compute::sort::SortOptions; +use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; use futures::channel::mpsc; use futures::stream::FusedStream; @@ -289,7 +294,7 @@ impl SortKeyCursor { for (i, ((l, r), sort_options)) in zipped.enumerate() { if i >= cmp.len() { // initialise comparators as potentially needed - cmp.push(arrow::array::build_compare(l.as_ref(), r.as_ref())?); + cmp.push(build_compare(l.as_ref(), r.as_ref())?); } match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) { @@ -486,7 +491,7 @@ impl SortPreservingMergeStream { make_growable(&arrays, false, self.in_progress.len()); if self.in_progress.is_empty() { - return make_arrow_array(array_data.freeze()); + return array_data.as_arc(); } let first = &self.in_progress[0]; @@ -516,7 +521,7 @@ impl SortPreservingMergeStream { // emit final batch of rows array_data.extend(buffer_idx, start_row_idx, end_row_idx); - make_arrow_array(array_data.freeze()) + array_data.as_arc() }) .collect(); @@ -663,18 +668,25 @@ mod tests { Some("g"), Some("j"), ])); - let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let c: ArrayRef = Arc::new( + Int64Array::from_slice(&[8, 7, 6, 5, 8]) + .to(DataType::Timestamp(TimeUnit::Nanosecond, None)), + ); let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); - let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70, 90, 30])); - let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[10, 20, 70, 90, 30])); + let b: ArrayRef = Arc::new(Utf8Array::::from_iter(vec![ Some("b"), Some("d"), Some("f"), Some("h"), Some("j"), ])); - let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6])); + let c: ArrayRef = Arc::new( + Int64Array::from_slice(&[4, 6, 2, 2, 6]) + .to(DataType::Timestamp(TimeUnit::Nanosecond, None)), + ); + let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); _test_merge( @@ -701,8 +713,8 @@ mod tests { #[tokio::test] async fn test_merge_some_overlap() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); - let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[1, 2, 7, 9, 3])); + let b: ArrayRef = Arc::new(Utf8Array::::from_iter(vec![ Some("a"), Some("b"), Some("c"), @@ -715,7 +727,7 @@ mod tests { ); let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); - let a: ArrayRef = Arc::new(Int32Array::from(vec![70, 90, 30, 100, 110])); + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[70, 90, 30, 100, 110])); let b: ArrayRef = Arc::new(Utf8Array::::from(&[ Some("c"), Some("d"), @@ -723,7 +735,10 @@ mod tests { Some("f"), Some("g"), ])); - let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6])); + let c: ArrayRef = Arc::new( + Int64Array::from_slice(&[4, 6, 2, 2, 6]) + .to(DataType::Timestamp(TimeUnit::Nanosecond, None)), + ); let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); _test_merge( @@ -750,7 +765,7 @@ mod tests { #[tokio::test] async fn test_merge_no_overlap() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(Utf8Array::::from(&[ Some("a"), Some("b"), @@ -758,11 +773,14 @@ mod tests { Some("d"), Some("e"), ])); - let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let c: ArrayRef = Arc::new( + Int64Array::from_slice(&[8, 7, 6, 5, 8]) + .to(DataType::Timestamp(TimeUnit::Nanosecond, None)), + ); let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); - let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70, 90, 30])); - let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[10, 20, 70, 90, 30])); + let b: ArrayRef = Arc::new(Utf8Array::::from_iter(vec![ Some("f"), Some("g"), Some("h"), @@ -799,7 +817,7 @@ mod tests { #[tokio::test] async fn test_merge_three_partitions() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(Utf8Array::::from(&[ Some("a"), Some("b"), @@ -807,30 +825,38 @@ mod tests { Some("d"), Some("f"), ])); - let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let c: ArrayRef = Arc::new( + Int64Array::from_slice(&[8, 7, 6, 5, 8]) + .to(DataType::Timestamp(TimeUnit::Nanosecond, None)), + ); let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); - let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70, 90, 30])); - let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[10, 20, 70, 90, 30])); + let b: ArrayRef = Arc::new(Utf8Array::::from_iter(vec![ Some("e"), Some("g"), Some("h"), Some("i"), Some("j"), ])); - let c: ArrayRef = - Arc::new(TimestampNanosecondArray::from(vec![40, 60, 20, 20, 60])); + let c: ArrayRef = Arc::new( + Int64Array::from_slice(&[40, 60, 20, 20, 60]) + .to(DataType::Timestamp(TimeUnit::Nanosecond, None)), + ); let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); - let a: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 700, 900, 300])); - let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[100, 200, 700, 900, 300])); + let b: ArrayRef = Arc::new(Utf8Array::::from_iter(vec![ Some("f"), Some("g"), Some("h"), Some("i"), Some("j"), ])); - let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6])); + let c: ArrayRef = Arc::new( + Int64Array::from_slice(&[4, 6, 2, 2, 6]) + .to(DataType::Timestamp(TimeUnit::Nanosecond, None)), + ); let b3 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); _test_merge( @@ -872,7 +898,7 @@ mod tests { options: Default::default(), }, ]; - let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = MemoryExec::try_new(partitions, schema.clone(), None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 1024)); let collected = collect(merge).await.unwrap(); @@ -1208,20 +1234,23 @@ mod tests { #[tokio::test] async fn test_merge_metrics() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); - let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"), Some("c")])); + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[1, 2])); + let b: ArrayRef = + Arc::new(Utf8Array::::from_iter(vec![Some("a"), Some("c")])); let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); - let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20])); - let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("b"), Some("d")])); + let a: ArrayRef = Arc::new(Int32Array::from_slice(&[10, 20])); + let b: ArrayRef = + Arc::new(Utf8Array::::from_iter(vec![Some("b"), Some("d")])); let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); - let schema = b1.schema(); + let schema = b1.schema().clone(); let sort = vec![PhysicalSortExpr { expr: col("b", &schema).unwrap(), options: Default::default(), }]; - let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap(); + let exec = + MemoryExec::try_new(&[vec![b1], vec![b2]], schema.clone(), None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 1024)); let collected = collect(merge.clone()).await.unwrap(); diff --git a/datafusion/src/physical_plan/windows/aggregate.rs b/datafusion/src/physical_plan/windows/aggregate.rs index f7c29ba6aff7..c709c2061052 100644 --- a/datafusion/src/physical_plan/windows/aggregate.rs +++ b/datafusion/src/physical_plan/windows/aggregate.rs @@ -94,7 +94,9 @@ impl AggregateWindowExpr { .flatten() .collect::>(); let results = results.iter().map(|i| i.as_ref()).collect::>(); - concat(&results).map_err(DataFusionError::ArrowError) + concat::concatenate(&results) + .map(|x| ArrayRef::from(x)) + .map_err(DataFusionError::ArrowError) } fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result { @@ -171,7 +173,7 @@ impl AggregateWindowAccumulator { let len = value_range.end - value_range.start; let values = values .iter() - .map(|v| v.slice(value_range.start, len)) + .map(|v| ArrayRef::from(v.slice(value_range.start, len))) .collect::>(); self.accumulator.update_batch(&values)?; let value = self.accumulator.evaluate()?; diff --git a/datafusion/src/physical_plan/windows/built_in.rs b/datafusion/src/physical_plan/windows/built_in.rs index 82040de6ef5c..0111eaf3cb0e 100644 --- a/datafusion/src/physical_plan/windows/built_in.rs +++ b/datafusion/src/physical_plan/windows/built_in.rs @@ -98,6 +98,8 @@ impl WindowExpr for BuiltInWindowExpr { evaluator.evaluate(partition_points)? }; let results = results.iter().map(|i| i.as_ref()).collect::>(); - concat(&results).map_err(DataFusionError::ArrowError) + concat::concatenate(&results) + .map(|x| ArrayRef::from(x)) + .map_err(DataFusionError::ArrowError) } } diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs index 194aa8de5bb5..ec76600435a3 100644 --- a/datafusion/src/physical_plan/windows/mod.rs +++ b/datafusion/src/physical_plan/windows/mod.rs @@ -241,15 +241,15 @@ mod tests { // c3 is small int - let count: &UInt64Array = as_primitive_array(&columns[0]); + let count = columns[0].as_any().downcast_ref::().unwrap(); assert_eq!(count.value(0), 100); assert_eq!(count.value(99), 100); - let max: &Int8Array = as_primitive_array(&columns[1]); + let max = columns[1].as_any().downcast_ref::().unwrap(); assert_eq!(max.value(0), 125); assert_eq!(max.value(99), 125); - let min: &Int8Array = as_primitive_array(&columns[2]); + let min = columns[2].as_any().downcast_ref::().unwrap(); assert_eq!(min.value(0), -117); assert_eq!(min.value(99), -117); diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs index c7466477ce79..75565debfc99 100644 --- a/datafusion/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs @@ -261,7 +261,9 @@ impl Stream for WindowAggStream { *this.finished = true; // check for error in receiving channel and unwrap actual result let result = match result { - Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving + Err(e) => { + Some(Err(ArrowError::External("".to_string(), Box::new(e)))) + } // error receiving Ok(result) => Some(result), }; Poll::Ready(result) diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 5434e82c9843..bdb3d0053a74 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -19,26 +19,25 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; +use crate::error::{DataFusionError, Result}; use arrow::{ array::*, - bitmap::MutableBitmap, buffer::MutableBuffer, - datatypes::{DataType, IntervalUnit, TimeUnit}, - error::{ArrowError, Result as ArrowResult}, + datatypes::{DataType, Field, IntervalUnit, TimeUnit}, types::days_ms, }; use ordered_float::OrderedFloat; +use std::borrow::Borrow; use std::cmp::Ordering; use std::convert::{Infallible, TryInto}; use std::str::FromStr; -use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; -use crate::error::{DataFusionError, Result}; type StringArray = Utf8Array; type LargeStringArray = Utf8Array; type SmallBinaryArray = BinaryArray; type LargeBinaryArray = BinaryArray; - +type MutableStringArray = MutableUtf8Array; +type MutableLargeStringArray = MutableUtf8Array; /// Represents a dynamically typed, nullable single value. /// This is the single-valued counter-part of arrow’s `Array`. @@ -75,11 +74,8 @@ pub enum ScalarValue { /// large binary LargeBinary(Option>), /// list of nested ScalarValue (boxed to reduce size_of(ScalarValue)) - // 1st argument are the inner values (e.g. Int64Array) - // 2st argument is the Lists' datatype (i.e. it includes `Field`) - // to downcast inner values, use ListArray::::get_child() #[allow(clippy::box_vec)] - List(Option>>, Box), + List(Option>>, Box), /// Date stored as a signed 32bit int Date32(Option), /// Date stored as a signed 64bit int @@ -291,7 +287,7 @@ impl std::hash::Hash for ScalarValue { // as a reference to the dictionary values array. Returns None for the // index if the array is NULL at index #[inline] -fn get_dict_value( +fn get_dict_value( array: &ArrayRef, index: usize, ) -> Result<(&ArrayRef, Option)> { @@ -322,6 +318,86 @@ macro_rules! typed_cast { }}; } +macro_rules! build_list { + ($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{ + match $VALUES { + // the return on the macro is necessary, to short-circuit and return ArrayRef + None => { + return Arc::from(new_null_array( + DataType::List(Box::new(Field::new( + "item", + DataType::$SCALAR_TY, + true, + ))), + $SIZE, + )); + } + Some(values) => { + build_values_list!($VALUE_BUILDER_TY, $SCALAR_TY, values.as_ref(), $SIZE) + } + } + }}; +} + +macro_rules! build_timestamp_list { + ($TIME_UNIT:expr, $TIME_ZONE:expr, $VALUES:expr, $SIZE:expr) => {{ + match $VALUES { + // the return on the macro is necessary, to short-circuit and return ArrayRef + None => { + let null_array: ArrayRef = new_null_array( + DataType::List(Box::new(Field::new( + "item", + DataType::Timestamp($TIME_UNIT, $TIME_ZONE), + true, + ))), + $SIZE, + ) + .into(); + null_array + } + Some(values) => { + let values = values.as_ref(); + match $TIME_UNIT { + TimeUnit::Second => { + build_values_list!(Int64Vec, TimestampSecond, values, $SIZE) + } + TimeUnit::Microsecond => { + build_values_list!(Int64Vec, TimestampMillisecond, values, $SIZE) + } + TimeUnit::Millisecond => { + build_values_list!(Int64Vec, TimestampMicrosecond, values, $SIZE) + } + TimeUnit::Nanosecond => { + build_values_list!(Int64Vec, TimestampNanosecond, values, $SIZE) + } + } + } + } + }}; +} + +macro_rules! build_values_list { + ($MUTABLE_TY:ty, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{ + let mut array = MutableListArray::::new(); + + for _ in 0..$SIZE { + let mut vec = vec![]; + for scalar_value in $VALUES { + match scalar_value { + ScalarValue::$SCALAR_TY(v) => { + vec.push(v.clone()); + } + _ => panic!("Incompatible ScalarValue for list"), + }; + } + array.try_push(Some(vec)).unwrap(); + } + + let array: ListArray = array.into(); + Arc::new(array) + }}; +} + macro_rules! dyn_to_array { ($self:expr, $value:expr, $size:expr, $ty:ty) => {{ Arc::new(PrimitiveArray::<$ty>::from_data( @@ -448,7 +524,7 @@ impl ScalarValue { /// Example /// ``` /// use datafusion::scalar::ScalarValue; - /// use arrow::array::{ArrayRef, BooleanArray}; + /// use arrow::array::BooleanArray; /// /// let scalars = vec![ /// ScalarValue::Boolean(Some(true)), @@ -460,7 +536,7 @@ impl ScalarValue { /// let array = ScalarValue::iter_to_array(scalars.into_iter()) /// .unwrap(); /// - /// let expected: ArrayRef = std::sync::Arc::new( + /// let expected: Box = Box::new( /// BooleanArray::from(vec![ /// Some(true), /// None, @@ -472,7 +548,7 @@ impl ScalarValue { /// ``` pub fn iter_to_array( scalars: impl IntoIterator, - ) -> Result { + ) -> Result> { let mut scalars = scalars.into_iter().peekable(); // figure out the type based on the first element @@ -490,7 +566,7 @@ impl ScalarValue { macro_rules! build_array_primitive { ($TY:ty, $SCALAR_TY:ident, $DT:ident) => {{ { - Arc::new(scalars + Box::new(scalars .map(|sv| { if let ScalarValue::$SCALAR_TY(v) = sv { Ok(v) @@ -503,7 +579,7 @@ impl ScalarValue { } }) .collect::>>()?.to($DT) - ) as ArrayRef + ) as Box } }}; } @@ -526,14 +602,52 @@ impl ScalarValue { } }) .collect::>()?; - Arc::new(array) + Box::new(array) } }}; } + macro_rules! build_array_list { + ($MUTABLE_TY:ty, $SCALAR_TY:ident) => {{ + let mut array = MutableListArray::::new(); + for scalar in scalars.into_iter() { + match scalar { + ScalarValue::List(Some(xs), _) => { + let xs = *xs; + let mut vec = vec![]; + for s in xs { + match s { + ScalarValue::$SCALAR_TY(o) => { vec.push(o) } + sv => return Err(DataFusionError::Internal(format!( + "Inconsistent types in ScalarValue::iter_to_array. \ + Expected Utf8, got {:?}", + sv + ))), + } + } + array.try_push(Some(vec))?; + } + ScalarValue::List(None, _) => { + array.push_null(); + } + sv => { + return Err(DataFusionError::Internal(format!( + "Inconsistent types in ScalarValue::iter_to_array. \ + Expected List, got {:?}", + sv + ))) + } + } + } + + let array: ListArray = array.into(); + Box::new(array) + }} + } + use DataType::*; - let array: ArrayRef = match &data_type { - DataType::Boolean => Arc::new( + let array: Box = match &data_type { + DataType::Boolean => Box::new( scalars .map(|sv| { if let ScalarValue::Boolean(v) = sv { @@ -586,44 +700,41 @@ impl ScalarValue { Interval(IntervalUnit::YearMonth) => { build_array_primitive!(i32, IntervalYearMonth, data_type) } - List(_) => { - let iter = scalars - .map(|sv| { - if let ScalarValue::List(v, _) = sv { - Ok(v) - } else { - Err(ArrowError::from_external_error( - DataFusionError::Internal(format!( - "Inconsistent types in ScalarValue::iter_to_array. \ - Expected {:?}, got {:?}", - data_type, sv - )), - )) - } - }) - .collect::>>()?; - let mut offsets = MutableBuffer::::with_capacity(1 + iter.len()); - offsets.push(0); - let mut validity = MutableBitmap::with_capacity(iter.len()); - let mut values = Vec::with_capacity(iter.len()); - iter.iter().fold(0i32, |mut length, x| { - if let Some(array) = x { - length += array.len() as i32; - values.push(array.as_ref()); - validity.push(true) - } else { - validity.push(false) - }; - offsets.push(length); - length - }); - let values = arrow::compute::concat::concatenate(&values)?; - Arc::new(ListArray::from_data( - data_type, - offsets.into(), - values.into(), - validity.into(), - )) + DataType::List(fields) if fields.data_type() == &DataType::Int8 => { + build_array_list!(Int8Vec, Int8) + } + DataType::List(fields) if fields.data_type() == &DataType::Int16 => { + build_array_list!(Int16Vec, Int16) + } + DataType::List(fields) if fields.data_type() == &DataType::Int32 => { + build_array_list!(Int32Vec, Int32) + } + DataType::List(fields) if fields.data_type() == &DataType::Int64 => { + build_array_list!(Int64Vec, Int64) + } + DataType::List(fields) if fields.data_type() == &DataType::UInt8 => { + build_array_list!(UInt8Vec, UInt8) + } + DataType::List(fields) if fields.data_type() == &DataType::UInt16 => { + build_array_list!(UInt16Vec, UInt16) + } + DataType::List(fields) if fields.data_type() == &DataType::UInt32 => { + build_array_list!(UInt32Vec, UInt32) + } + DataType::List(fields) if fields.data_type() == &DataType::UInt64 => { + build_array_list!(UInt64Vec, UInt64) + } + DataType::List(fields) if fields.data_type() == &DataType::Float32 => { + build_array_list!(Float32Vec, Float32) + } + DataType::List(fields) if fields.data_type() == &DataType::Float64 => { + build_array_list!(Float64Vec, Float64) + } + DataType::List(fields) if fields.data_type() == &DataType::Utf8 => { + build_array_list!(MutableStringArray, Utf8) + } + DataType::List(fields) if fields.data_type() == &DataType::LargeUtf8 => { + build_array_list!(MutableLargeStringArray, LargeUtf8) } _ => { return Err(DataFusionError::Internal(format!( @@ -642,7 +753,7 @@ impl ScalarValue { match self { ScalarValue::Boolean(e) => { Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef - }, + } ScalarValue::Float64(e) => match e { Some(value) => dyn_to_array!(self, value, size, f64), None => new_null_array(self.get_datatype(), size).into(), @@ -718,42 +829,37 @@ impl ScalarValue { ), None => new_null_array(self.get_datatype(), size).into(), }, - ScalarValue::List(values, data_type) => { - if let Some(values) = values { - let length = values.len(); - let refs = std::iter::repeat(values.as_ref()) - .take(size) - .collect::>(); - let values = - arrow::compute::concat::concatenate(&refs).unwrap().into(); - let offsets: arrow::buffer::Buffer = - (0..=size).map(|i| (i * length) as i32).collect(); - Arc::new(ListArray::from_data( - data_type.clone(), - offsets, - values, - None, - )) - } else { - new_null_array(self.get_datatype(), size).into() + ScalarValue::List(values, data_type) => match data_type.as_ref() { + DataType::Boolean => { + build_list!(MutableBooleanArray, Boolean, values, size) } - } - ScalarValue::Date32(e) => todo!(), - ScalarValue::Date64(e) => todo!(), - // ScalarValue::Date32(e) => match e { - // Some(value) => Arc::new( - // build_array_from_option!(Date32, Date32Array, e, size) - // ), - // None => new_null_array(self.get_datatype(), size).into(), - // }, - // ScalarValue::Date64(e) => match e { - // Some(value) => Arc::new( - // PrimitiveArray::::from_trusted_len_values_iter( - // std::iter::repeat(*value).take(size), - // ) - // ), - // None => new_null_array(self.get_datatype(), size).into(), - // }, + DataType::Int8 => build_list!(Int8Vec, Int8, values, size), + DataType::Int16 => build_list!(Int16Vec, Int16, values, size), + DataType::Int32 => build_list!(Int32Vec, Int32, values, size), + DataType::Int64 => build_list!(Int64Vec, Int64, values, size), + DataType::UInt8 => build_list!(UInt8Vec, UInt8, values, size), + DataType::UInt16 => build_list!(UInt16Vec, UInt16, values, size), + DataType::UInt32 => build_list!(UInt32Vec, UInt32, values, size), + DataType::UInt64 => build_list!(UInt64Vec, UInt64, values, size), + DataType::Float32 => build_list!(Float32Vec, Float32, values, size), + DataType::Float64 => build_list!(Float64Vec, Float64, values, size), + DataType::Timestamp(unit, tz) => { + build_timestamp_list!(unit.clone(), tz.clone(), values, size) + } + DataType::Utf8 => build_list!(MutableStringArray, Utf8, values, size), + DataType::LargeUtf8 => { + build_list!(MutableLargeStringArray, LargeUtf8, values, size) + } + dt => panic!("Unexpected DataType for list {:?}", dt), + }, + ScalarValue::Date32(e) => match e { + Some(value) => dyn_to_array!(self, value, size, i32), + None => new_null_array(self.get_datatype(), size).into(), + }, + ScalarValue::Date64(e) => match e { + Some(value) => dyn_to_array!(self, value, size, i64), + None => new_null_array(self.get_datatype(), size).into(), + }, ScalarValue::IntervalDayTime(e) => match e { Some(value) => { Arc::new(PrimitiveArray::::from_trusted_len_values_iter( @@ -762,14 +868,10 @@ impl ScalarValue { } None => new_null_array(self.get_datatype(), size).into(), }, - ScalarValue::IntervalYearMonth(e) => todo!(), - // ScalarValue::IntervalYearMonth(e) => build_array_from_option!( - // Interval, - // IntervalUnit::YearMonth, - // IntervalYearMonthArray, - // e, - // size - // ), + ScalarValue::IntervalYearMonth(e) => match e { + Some(value) => dyn_to_array!(self, value, size, i32), + None => new_null_array(self.get_datatype(), size).into(), + }, } } @@ -794,7 +896,7 @@ impl ScalarValue { DataType::Int8 => typed_cast!(array, index, Int8Array, Int8), DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8), DataType::LargeUtf8 => typed_cast!(array, index, LargeStringArray, LargeUtf8), - DataType::List(_) => { + DataType::List(nested_type) => { let list_array = array .as_any() .downcast_ref::>() @@ -803,11 +905,15 @@ impl ScalarValue { "Failed to downcast ListArray".to_string(), ) })?; - let is_valid = list_array.is_valid(index); - let value = if is_valid { - Some(list_array.value(index).into()) - } else { - None + let value = match list_array.is_null(index) { + true => None, + false => { + let nested_array = ArrayRef::from(list_array.value(index)); + let scalar_vec = (0..nested_array.len()) + .map(|i| ScalarValue::try_from_array(&nested_array, i)) + .collect::>>()?; + Some(scalar_vec) + } }; let value = value.map(Box::new); let data_type = Box::new(nested_type.data_type().clone()); @@ -864,34 +970,6 @@ impl ScalarValue { }) } -macro_rules! impl_scalar { - ($ty:ty, $scalar:tt) => { - impl From<$ty> for ScalarValue { - fn from(value: $ty) -> Self { - ScalarValue::$scalar(Some(value)) - } - } - - impl From> for ScalarValue { - fn from(value: Option<$ty>) -> Self { - ScalarValue::$scalar(value) - } - } - }; -} - -impl_scalar!(f64, Float64); -impl_scalar!(f32, Float32); -impl_scalar!(i8, Int8); -impl_scalar!(i16, Int16); -impl_scalar!(i32, Int32); -impl_scalar!(i64, Int64); -impl_scalar!(bool, Boolean); -impl_scalar!(u8, UInt8); -impl_scalar!(u16, UInt16); -impl_scalar!(u32, UInt32); -impl_scalar!(u64, UInt64); - /// Compares a single row of array @ index for equality with self, /// in an optimized fashion. /// @@ -943,35 +1021,35 @@ impl_scalar!(u64, UInt64); eq_array_primitive!(array, index, LargeStringArray, val) } ScalarValue::Binary(val) => { - eq_array_primitive!(array, index, BinaryArray, val) + eq_array_primitive!(array, index, SmallBinaryArray, val) } ScalarValue::LargeBinary(val) => { eq_array_primitive!(array, index, LargeBinaryArray, val) } ScalarValue::List(_, _) => unimplemented!(), ScalarValue::Date32(val) => { - eq_array_primitive!(array, index, Date32Array, val) + eq_array_primitive!(array, index, Int32Array, val) } ScalarValue::Date64(val) => { - eq_array_primitive!(array, index, Date64Array, val) + eq_array_primitive!(array, index, Int64Array, val) } ScalarValue::TimestampSecond(val) => { - eq_array_primitive!(array, index, TimestampSecondArray, val) + eq_array_primitive!(array, index, Int64Array, val) } ScalarValue::TimestampMillisecond(val) => { - eq_array_primitive!(array, index, TimestampMillisecondArray, val) + eq_array_primitive!(array, index, Int64Array, val) } ScalarValue::TimestampMicrosecond(val) => { - eq_array_primitive!(array, index, TimestampMicrosecondArray, val) + eq_array_primitive!(array, index, Int64Array, val) } ScalarValue::TimestampNanosecond(val) => { - eq_array_primitive!(array, index, TimestampNanosecondArray, val) + eq_array_primitive!(array, index, Int64Array, val) } ScalarValue::IntervalYearMonth(val) => { - eq_array_primitive!(array, index, IntervalYearMonthArray, val) + eq_array_primitive!(array, index, Int32Array, val) } ScalarValue::IntervalDayTime(val) => { - eq_array_primitive!(array, index, IntervalDayTimeArray, val) + eq_array_primitive!(array, index, DaysMsArray, val) } } } @@ -1003,137 +1081,33 @@ impl_scalar!(u64, UInt64); } } -impl From for ScalarValue { - fn from(value: f64) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::Float64(value) - } -} - -impl From for ScalarValue { - fn from(value: f32) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::Float32(value) - } -} - -impl From for ScalarValue { - fn from(value: i8) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::Int8(value) - } -} - -impl From for ScalarValue { - fn from(value: i16) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::Int16(value) - } -} - -impl From for ScalarValue { - fn from(value: i32) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::Int32(value) - } -} - -impl From for ScalarValue { - fn from(value: i64) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::Int64(value) - } -} - -impl From for ScalarValue { - fn from(value: bool) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::Boolean(value) - } -} - -impl From for ScalarValue { - fn from(value: u8) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::UInt8(value) - } -} - -impl From for ScalarValue { - fn from(value: u16) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::UInt16(value) - } -} - -impl From for ScalarValue { - fn from(value: u32) -> Self { - Some(value).into() - } -} - -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::UInt32(value) - } -} +macro_rules! impl_scalar { + ($ty:ty, $scalar:tt) => { + impl From<$ty> for ScalarValue { + fn from(value: $ty) -> Self { + ScalarValue::$scalar(Some(value)) + } + } -impl From for ScalarValue { - fn from(value: u64) -> Self { - Some(value).into() - } + impl From> for ScalarValue { + fn from(value: Option<$ty>) -> Self { + ScalarValue::$scalar(value) + } + } + }; } -impl From> for ScalarValue { - fn from(value: Option) -> Self { - ScalarValue::UInt64(value) - } -} +impl_scalar!(f64, Float64); +impl_scalar!(f32, Float32); +impl_scalar!(i8, Int8); +impl_scalar!(i16, Int16); +impl_scalar!(i32, Int32); +impl_scalar!(i64, Int64); +impl_scalar!(bool, Boolean); +impl_scalar!(u8, UInt8); +impl_scalar!(u16, UInt16); +impl_scalar!(u32, UInt32); +impl_scalar!(u64, UInt64); impl From<&str> for ScalarValue { fn from(value: &str) -> Self { @@ -1324,13 +1298,17 @@ impl fmt::Display for ScalarValue { )?, None => write!(f, "NULL")?, }, - ScalarValue::List(e, _) => { - if let Some(e) = e { - write!(f, "{}", e)? - } else { - write!(f, "NULL")? - } - } + ScalarValue::List(e, _) => match e { + Some(l) => write!( + f, + "{}", + l.iter() + .map(|v| format!("{}", v)) + .collect::>() + .join(",") + )?, + None => write!(f, "NULL")?, + }, ScalarValue::Date32(e) => format_option!(f, e)?, ScalarValue::Date64(e) => format_option!(f, e)?, ScalarValue::IntervalDayTime(e) => format_option!(f, e)?, @@ -1387,8 +1365,6 @@ impl fmt::Debug for ScalarValue { #[cfg(test)] mod tests { - use arrow::datatypes::Field; - use super::*; #[test] @@ -1427,7 +1403,10 @@ mod tests { fn scalar_list_null_to_array() { let list_array_ref = ScalarValue::List(None, Box::new(DataType::UInt64)).to_array(); - let list_array = list_array_ref.as_any().downcast_ref::().unwrap(); + let list_array = list_array_ref + .as_any() + .downcast_ref::>() + .unwrap(); assert!(list_array.is_null(0)); assert_eq!(list_array.len(), 1); @@ -1445,14 +1424,23 @@ mod tests { Box::new(DataType::UInt64), ) .to_array(); + let list_array = list_array_ref .as_any() .downcast_ref::>() .unwrap(); - - assert!(list_array.is_null(0)); assert_eq!(list_array.len(), 1); - assert_eq!(list_array.values().len(), 0); + assert_eq!(list_array.values().len(), 3); + + let prim_array_ref = list_array.value(0); + let prim_array = prim_array_ref + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(prim_array.len(), 3); + assert_eq!(prim_array.value(0), 100); + assert!(prim_array.is_null(1)); + assert_eq!(prim_array.value(2), 101); } /// Creates array directly and via ScalarValue and ensures they are the same @@ -1463,7 +1451,7 @@ mod tests { let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap(); - let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT)); + let expected: Box = Box::new($ARRAYTYPE::from($INPUT)); assert_eq!(&array, &expected); }}; @@ -1480,7 +1468,7 @@ mod tests { let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap(); - let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT)); + let expected: Box = Box::new($ARRAYTYPE::from($INPUT)); assert_eq!(&array, &expected); }}; @@ -1500,7 +1488,7 @@ mod tests { let expected: $ARRAYTYPE = $INPUT.iter().map(|v| v.map(|v| v.to_vec())).collect(); - let expected: ArrayRef = Arc::new(expected); + let expected: Box = Box::new(expected); assert_eq!(&array, &expected); }}; @@ -1620,13 +1608,14 @@ mod tests { let i16_vals = make_typed_vec!(i8_vals, i16); let i32_vals = make_typed_vec!(i8_vals, i32); let i64_vals = make_typed_vec!(i8_vals, i64); + let days_ms_vals = &[Some(days_ms::new(1, 2)), None, Some(days_ms::new(10, 0))]; let u8_vals = vec![Some(0), None, Some(1)]; let u16_vals = make_typed_vec!(u8_vals, u16); let u32_vals = make_typed_vec!(u8_vals, u32); let u64_vals = make_typed_vec!(u8_vals, u64); - let str_vals = vec![Some("foo"), None, Some("bar")]; + let str_vals = &[Some("foo"), None, Some("bar")]; /// Test each value in `scalar` with the corresponding element /// at `array`. Assumes each element is unique (aka not equal @@ -1646,6 +1635,39 @@ mod tests { }}; } + macro_rules! make_date_test_case { + ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident) => {{ + TestCase { + array: Arc::new($ARRAY_TY::from($INPUT).to(DataType::$SCALAR_TY)), + scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(), + } + }}; + } + + macro_rules! make_ts_test_case { + ($INPUT:expr, $ARRAY_TY:ident, $ARROW_TU:ident, $SCALAR_TY:ident) => {{ + TestCase { + array: Arc::new( + $ARRAY_TY::from($INPUT) + .to(DataType::Timestamp(TimeUnit::$ARROW_TU, None)), + ), + scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(), + } + }}; + } + + macro_rules! make_temporal_test_case { + ($INPUT:expr, $ARRAY_TY:ident, $ARROW_TU:ident, $SCALAR_TY:ident) => {{ + TestCase { + array: Arc::new( + $ARRAY_TY::from($INPUT) + .to(DataType::Interval(IntervalUnit::$ARROW_TU)), + ), + scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(), + } + }}; + } + macro_rules! make_str_test_case { ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident) => {{ TestCase { @@ -1674,14 +1696,17 @@ mod tests { /// create a test case for DictionaryArray<$INDEX_TY> macro_rules! make_str_dict_test_case { - ($INPUT:expr, $INDEX_TY:ident, $SCALAR_TY:ident) => {{ + ($INPUT:expr, $INDEX_TY:ty, $SCALAR_TY:ident) => {{ TestCase { - array: Arc::new( - $INPUT - .iter() - .cloned() - .collect::>(), - ), + array: { + let mut array = MutableDictionaryArray::< + $INDEX_TY, + MutableUtf8Array, + >::new(); + array.try_extend(*($INPUT)).unwrap(); + let array: DictionaryArray<$INDEX_TY> = array.into(); + Arc::new(array) + }, scalars: $INPUT .iter() .map(|v| ScalarValue::$SCALAR_TY(v.map(|v| v.to_string()))) @@ -1704,24 +1729,29 @@ mod tests { make_test_case!(u64_vals, UInt64Array, UInt64), make_str_test_case!(str_vals, StringArray, Utf8), make_str_test_case!(str_vals, LargeStringArray, LargeUtf8), - make_binary_test_case!(str_vals, BinaryArray, Binary), + make_binary_test_case!(str_vals, SmallBinaryArray, Binary), make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary), - make_test_case!(i32_vals, Date32Array, Date32), - make_test_case!(i64_vals, Date64Array, Date64), - make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond), - make_test_case!(i64_vals, TimestampMillisecondArray, TimestampMillisecond), - make_test_case!(i64_vals, TimestampMicrosecondArray, TimestampMicrosecond), - make_test_case!(i64_vals, TimestampNanosecondArray, TimestampNanosecond), - make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth), - make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime), - make_str_dict_test_case!(str_vals, Int8Type, Utf8), - make_str_dict_test_case!(str_vals, Int16Type, Utf8), - make_str_dict_test_case!(str_vals, Int32Type, Utf8), - make_str_dict_test_case!(str_vals, Int64Type, Utf8), - make_str_dict_test_case!(str_vals, UInt8Type, Utf8), - make_str_dict_test_case!(str_vals, UInt16Type, Utf8), - make_str_dict_test_case!(str_vals, UInt32Type, Utf8), - make_str_dict_test_case!(str_vals, UInt64Type, Utf8), + make_date_test_case!(&i32_vals, Int32Array, Date32), + make_date_test_case!(&i64_vals, Int64Array, Date64), + make_ts_test_case!(&i64_vals, Int64Array, Second, TimestampSecond), + make_ts_test_case!(&i64_vals, Int64Array, Millisecond, TimestampMillisecond), + make_ts_test_case!(&i64_vals, Int64Array, Microsecond, TimestampMicrosecond), + make_ts_test_case!(&i64_vals, Int64Array, Nanosecond, TimestampNanosecond), + make_temporal_test_case!(&i32_vals, Int32Array, YearMonth, IntervalYearMonth), + make_temporal_test_case!( + &days_ms_vals, + DaysMsArray, + DayTime, + IntervalDayTime + ), + make_str_dict_test_case!(str_vals, i8, Utf8), + make_str_dict_test_case!(str_vals, i16, Utf8), + make_str_dict_test_case!(str_vals, i32, Utf8), + make_str_dict_test_case!(str_vals, i64, Utf8), + make_str_dict_test_case!(str_vals, u8, Utf8), + make_str_dict_test_case!(str_vals, u16, Utf8), + make_str_dict_test_case!(str_vals, u32, Utf8), + make_str_dict_test_case!(str_vals, u64, Utf8), ]; for case in cases { diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs index 0c9498acf920..6a226ba6bbec 100644 --- a/datafusion/src/test_util.rs +++ b/datafusion/src/test_util.rs @@ -35,7 +35,7 @@ macro_rules! assert_batches_eq { let expected_lines: Vec = $EXPECTED_LINES.iter().map(|&s| s.into()).collect(); - let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap(); + let formatted = arrow::io::print::write($CHUNKS); let actual_lines: Vec<&str> = formatted.trim().lines().collect(); @@ -69,7 +69,7 @@ macro_rules! assert_batches_sorted_eq { expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable() } - let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap(); + let formatted = arrow::io::print::write($CHUNKS); // fix for windows: \r\n --> let mut actual_lines: Vec<&str> = formatted.trim().lines().collect();