diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 74335998e034b..9179260bd0372 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -46,7 +46,7 @@ simd = ["arrow2/simd"] [dependencies] ahash = "0.7" hashbrown = "0.11" -arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "b75f2b507d185f13ec37f81f9c01077a0069e755" } +arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "afb233a" } sqlparser = "0.9.0" paste = "^1.0" num_cpus = "1.13.0" diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index 7bea9042458b8..d1d1c2adb991b 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -113,7 +113,7 @@ mod tests { async fn read_small_batches() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; let projection = None; - let exec = table.scan(&projection, 2, &[], None)?; + let exec = table.scan(&projection, 2, &[], Some(2))?; let stream = exec.execute(0).await?; let _ = stream diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 59af408b38592..02d8522e275bb 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -241,9 +241,9 @@ mod tests { "+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+", "| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |", "| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |", - "| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |", - "| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |", - "| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |", + "| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439785 | 13.860958726523547 | 21 | 21 |", + "| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549835 | 8.79396828975897 | 18 | 18 |", + "| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341557 | 10.206140546981727 | 21 | 21 |", "+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+", ], &df diff --git a/datafusion/src/physical_plan/array_expressions.rs b/datafusion/src/physical_plan/array_expressions.rs index c767ebe6bd7da..add079314eae6 100644 --- a/datafusion/src/physical_plan/array_expressions.rs +++ b/datafusion/src/physical_plan/array_expressions.rs @@ -20,6 +20,7 @@ use crate::error::{DataFusionError, Result}; use arrow2::array::*; use arrow2::compute::concat; +use arrow2::datatypes::DataType; use std::sync::Arc; use super::ColumnarValue; @@ -58,3 +59,24 @@ pub fn array(values: &[ColumnarValue]) -> Result { Ok(ColumnarValue::Array(array_array(&arrays).map(Arc::new)?)) } + +/// Currently supported types by the array function. +/// The order of these types correspond to the order on which coercion applies +/// This should thus be from least informative to most informative +// `array` supports all types, but we do not have a signature to correctly +// coerce them. +pub static SUPPORTED_ARRAY_TYPES: &[DataType] = &[ + DataType::Boolean, + DataType::UInt8, + DataType::UInt16, + DataType::UInt32, + DataType::UInt64, + DataType::Int8, + DataType::Int16, + DataType::Int32, + DataType::Int64, + DataType::Float32, + DataType::Float64, + DataType::Utf8, + DataType::LargeUtf8, +]; diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index 1787c6dd83f7a..da51b7611398c 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use ahash::RandomState; +use arrow2::array::new_empty_array; use arrow2::{ array::{Array, ListArray}, datatypes::{DataType, Field}, @@ -170,34 +171,28 @@ impl Accumulator for DistinctCountAccumulator { } fn state(&self) -> Result> { - self.values - .iter() - .map(|distinct_values| { - if distinct_values.is_empty() { - Ok(None) - } else { - // create an array with all distinct values - let arrays = distinct_values - .iter() - .map(ScalarValue::from) - .map(|x| x.to_array()) - .collect::>(); - let arrays = arrays.iter().map(|x| x.as_ref()).collect::>(); - Ok(arrow2::compute::concat::concatenate(&arrays) - .map(|x| x.into()) - .map(Some)?) - } - }) - .zip(self.state_data_types.iter()) - .map(|(x, type_): (Result>>, &DataType)| { - x.map(|x| { - ScalarValue::List( - x, - ListArray::::default_datatype(type_.clone()), - ) - }) + // create a ListArray for each `state_data_type`. The `ListArray` + let a = self.state_data_types.iter().enumerate().map(|(i, type_)| { + if self.values.is_empty() { + return Ok((new_empty_array(type_.clone()), type_)); + }; + let arrays = self + .values + .iter() + .map(|distinct_values| ScalarValue::from(&distinct_values[i]).to_array()) + .collect::>(); + let arrays = arrays.iter().map(|x| x.as_ref()).collect::>(); + Ok(arrow2::compute::concat::concatenate(&arrays).map(|x| (x, type_))?) + }); + a.map(|values: Result<(Box, &DataType)>| { + values.map(|(values, type_)| { + ScalarValue::List( + Some(values.into()), + ListArray::::default_datatype(type_.clone()), + ) }) - .collect() + }) + .collect() } fn evaluate(&self) -> Result { @@ -211,9 +206,10 @@ impl Accumulator for DistinctCountAccumulator { } } -/* #[cfg(test)] mod tests { + type ArrayRef = Arc; + use std::iter::FromIterator; use super::*; @@ -222,25 +218,52 @@ mod tests { use arrow2::datatypes::DataType; macro_rules! state_to_vec { - ($LIST:expr, $DATA_TYPE:ident, $PRIM_TY:ty) => {{ + ($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{ + match $LIST { + ScalarValue::List(_, data_type) => assert_eq!( + ListArray::::get_child_type(data_type), + &DataType::$DATA_TYPE + ), + _ => panic!("Expected a ScalarValue::List"), + } + + match $LIST { + ScalarValue::List(None, _) => None, + ScalarValue::List(Some(values), _) => { + let vec = values + .as_any() + .downcast_ref::<$ARRAY_TY>() + .unwrap() + .iter() + .map(|x| x.map(|x| *x)) + .collect::>(); + + Some(vec) + } + _ => unreachable!(), + } + }}; + } + + macro_rules! state_to_vec_bool { + ($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{ match $LIST { - ScalarValue::List(_, data_type) => match data_type { - DataType::$DATA_TYPE => (), - _ => panic!("Unexpected DataType for list"), - }, + ScalarValue::List(_, data_type) => assert_eq!( + ListArray::::get_child_type(data_type), + &DataType::$DATA_TYPE + ), _ => panic!("Expected a ScalarValue::List"), } match $LIST { ScalarValue::List(None, _) => None, - ScalarValue::List(Some(scalar_values), _) => { - let vec = scalar_values + ScalarValue::List(Some(values), _) => { + let vec = values + .as_any() + .downcast_ref::<$ARRAY_TY>() + .unwrap() .iter() - .map(|scalar_value| match scalar_value { - ScalarValue::$DATA_TYPE(value) => *value, - _ => panic!("Unexpected ScalarValue variant"), - }) - .collect::>>(); + .collect::>(); Some(vec) } @@ -319,7 +342,7 @@ mod tests { macro_rules! test_count_distinct_update_batch_numeric { ($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{ - let values: Vec> = vec![ + let values = &[ Some(1), Some(1), None, @@ -336,7 +359,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; let mut state_vec = - state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap(); + state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap(); state_vec.sort(); assert_eq!(states.len(), 1); @@ -388,7 +411,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; let mut state_vec = - state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap(); + state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap(); state_vec.sort_by(|a, b| match (a, b) { (Some(lhs), Some(rhs)) => { OrderedFloat::from(*lhs).cmp(&OrderedFloat::from(*rhs)) @@ -472,7 +495,8 @@ mod tests { let get_count = |data: BooleanArray| -> Result<(Vec>, u64)> { let arrays = vec![Arc::new(data) as ArrayRef]; let (states, result) = run_update_batch(&arrays)?; - let mut state_vec = state_to_vec!(&states[0], Boolean, bool).unwrap(); + let mut state_vec = + state_to_vec_bool!(&states[0], Boolean, BooleanArray).unwrap(); state_vec.sort(); let count = match result { ScalarValue::UInt64(c) => c.ok_or_else(|| { @@ -532,7 +556,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; assert_eq!(states.len(), 1); - assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![])); + assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![])); assert_eq!(result, ScalarValue::UInt64(Some(0))); Ok(()) @@ -540,13 +564,12 @@ mod tests { #[test] fn count_distinct_update_batch_empty() -> Result<()> { - let arrays = - vec![Arc::new(Int32Array::from(vec![] as Vec>)) as ArrayRef]; + let arrays = vec![Arc::new(Int32Array::new_empty(DataType::Int32)) as ArrayRef]; let (states, result) = run_update_batch(&arrays)?; assert_eq!(states.len(), 1); - assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![])); + assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![])); assert_eq!(result, ScalarValue::UInt64(Some(0))); Ok(()) @@ -560,8 +583,8 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; - let state_vec1 = state_to_vec!(&states[0], Int8, i8).unwrap(); - let state_vec2 = state_to_vec!(&states[1], Int16, i16).unwrap(); + let state_vec1 = state_to_vec!(&states[0], Int8, Int8Array).unwrap(); + let state_vec2 = state_to_vec!(&states[1], Int16, Int16Array).unwrap(); let state_pairs = collect_states::(&state_vec1, &state_vec2); assert_eq!(states.len(), 2); @@ -590,8 +613,8 @@ mod tests { ], )?; - let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap(); - let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap(); + let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap(); + let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap(); let state_pairs = collect_states::(&state_vec1, &state_vec2); assert_eq!(states.len(), 2); @@ -627,8 +650,8 @@ mod tests { ], )?; - let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap(); - let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap(); + let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap(); + let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap(); let state_pairs = collect_states::(&state_vec1, &state_vec2); assert_eq!(states.len(), 2); @@ -644,23 +667,25 @@ mod tests { #[test] fn count_distinct_merge_batch() -> Result<()> { - let state_in1 = ListPrimitive::, i32>::from_iter(vec![ - Some(vec![Some(-1_i32), Some(-1_i32), Some(-2_i32), Some(-2_i32)]), - Some(vec![Some(-2_i32), Some(-3_i32)]), - ]) - .to(ListArray::default_datatype(DataType::Int32)); - - let state_in2 = ListPrimitive::, u64>::from_iter(vec![ - Some(vec![Some(5_u64), Some(6_u64), Some(5_u64), Some(7_u64)]), - Some(vec![Some(5_u64), Some(7_u64)]), - ]) - .to(ListArray::default_datatype(DataType::UInt64)); + let state_in1: ListArray = + ListPrimitive::, i32>::from_iter(vec![ + Some(vec![Some(-1_i32), Some(-1_i32), Some(-2_i32), Some(-2_i32)]), + Some(vec![Some(-2_i32), Some(-3_i32)]), + ]) + .into(); + + let state_in2: ListArray = + ListPrimitive::, u64>::from_iter(vec![ + Some(vec![Some(5_u64), Some(6_u64), Some(5_u64), Some(7_u64)]), + Some(vec![Some(5_u64), Some(7_u64)]), + ]) + .into(); let (states, result) = run_merge_batch(&[Arc::new(state_in1), Arc::new(state_in2)])?; - let state_out_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap(); - let state_out_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap(); + let state_out_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap(); + let state_out_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap(); let state_pairs = collect_states::(&state_out_vec1, &state_out_vec2); assert_eq!( @@ -679,4 +704,3 @@ mod tests { Ok(()) } } -*/ diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index 4d87f102fa45c..fd06bdddbc3cd 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -33,7 +33,6 @@ use super::{ type_coercion::{coerce, data_types}, ColumnarValue, PhysicalExpr, }; -use crate::execution::context::ExecutionContextState; use crate::physical_plan::array_expressions; use crate::physical_plan::datetime_expressions; use crate::physical_plan::expressions::{nullif_func, SUPPORTED_NULLIF_TYPES}; @@ -43,6 +42,10 @@ use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, }; +use crate::{ + execution::context::ExecutionContextState, + physical_plan::array_expressions::SUPPORTED_ARRAY_TYPES, +}; use arrow2::{ array::{Array, NullArray}, compute::length::length, @@ -965,7 +968,9 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature { // for now, the list is small, as we do not have many built-in functions. match fun { - BuiltinScalarFunction::Array => Signature::VariadicEqual, + BuiltinScalarFunction::Array => { + Signature::Variadic(SUPPORTED_ARRAY_TYPES.to_vec()) + } BuiltinScalarFunction::Concat | BuiltinScalarFunction::ConcatWithSeparator => { Signature::Variadic(vec![DataType::Utf8]) } diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 1f6ae5dfe79f8..9ff04e7d00bf2 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -31,7 +31,7 @@ use crate::{ }; use arrow2::{ - array::*, datatypes::*, error::Result as ArrowResult, io::parquet::read, + datatypes::*, error::Result as ArrowResult, io::parquet::read, record_batch::RecordBatch, }; @@ -134,6 +134,12 @@ impl ParquetExec { let schema = read::get_schema(&file_metadata)?; let schema = Arc::new(schema); + let total_byte_size: i64 = (&file_metadata.row_groups) + .iter() + .map(|group| group.total_byte_size()) + .sum(); + let total_byte_size = total_byte_size as usize; + let row_count: i64 = (&file_metadata.row_groups) .iter() .map(|group| group.num_rows()) @@ -147,14 +153,14 @@ impl ParquetExec { let statistics = Statistics { num_rows: Some(row_count), - total_byte_size: None, + total_byte_size: Some(total_byte_size), column_statistics: None, }; - // remove files that are not needed in case of limit partitions.push(ParquetPartition { filename: filename.to_string(), statistics, }); + // remove files that are not needed in case of limit if num_rows > limit { break; } @@ -290,10 +296,11 @@ fn producer_task( response_tx: Sender, projection: &[usize], schema: SchemaRef, - _limit: usize, + limit: usize, ) -> Result<()> { let mut file = File::open(path)?; let metadata = read::read_metadata(&mut file)?; + let mut remaining = limit; for row_group in 0..metadata.row_groups.len() { let columns_metadata = metadata.row_groups[row_group].columns(); @@ -303,12 +310,18 @@ fn producer_task( let pages = read::get_page_iterator(&metadata, row_group, column, &mut file)?; let array = read::page_iter_to_array(pages, &columns_metadata[column])?; + let array = if array.len() > remaining { + array.slice(0, remaining) + } else { + array + }; columns.push( arrow2::compute::cast::cast(array.as_ref(), field.data_type()) .map(|x| x.into())?, ); } let batch = RecordBatch::try_new(schema.clone(), columns); + remaining -= batch.as_ref().map(|x| x.num_rows() as usize).unwrap_or(0); response_tx .blocking_send(batch) .map_err(|x| DataFusionError::Execution(format!("{}", x)))?; diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index b696c13600eee..f44a657be45b9 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -65,8 +65,9 @@ pub enum ScalarValue { /// large binary LargeBinary(Option>), /// list of nested ScalarValue - // 1st argument are the inner values - // 2st argument is datatype (i.e. it includes `Field`) + // 1st argument are the inner values (e.g. Int64Array) + // 2st argument is the Lists' datatype (i.e. it includes `Field`) + // to downcast inner values, use ListArray::::get_child() List(Option>, DataType), /// Date stored as a signed 32bit int Date32(Option),