diff --git a/ballista/rust/client/src/columnar_batch.rs b/ballista/rust/client/src/columnar_batch.rs index 3431f56128839..1b91b2d96fc8d 100644 --- a/ballista/rust/client/src/columnar_batch.rs +++ b/ballista/rust/client/src/columnar_batch.rs @@ -156,7 +156,8 @@ impl ColumnarValue { pub fn memory_size(&self) -> usize { match self { - ColumnarValue::Columnar(array) => array.get_array_memory_size(), + // ColumnarValue::Columnar(array) => array.get_array_memory_size(), + ColumnarValue::Columnar(_array) => 0, _ => 0, } } diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 45ff6c5984cad..b2b5a837bd7f7 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -979,6 +979,7 @@ enum TimeUnit{ enum IntervalUnit{ YearMonth = 0; DayTime = 1; + MonthDayNano = 2; } message Decimal{ diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 31143323cb343..21ec8545bfca9 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -21,7 +21,7 @@ //! will use the ShuffleReaderExec to read these results. use std::fs::File; -use std::iter::Iterator; +use std::iter::{Iterator, FromIterator}; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -54,6 +54,7 @@ use futures::StreamExt; use hashbrown::HashMap; use log::{debug, info}; use uuid::Uuid; +use std::cell::RefCell; /// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and /// can be executed as one unit with each partition being executed in parallel. The output of each @@ -227,16 +228,16 @@ impl ShuffleWriterExec { for (output_partition, partition_indices) in indices.into_iter().enumerate() { - let indices = partition_indices.into(); - // Produce batches based on indices let columns = input_batch .columns() .iter() .map(|c| { - take(c.as_ref(), &indices, None).map_err(|e| { - DataFusionError::Execution(e.to_string()) - }) + take::take(c.as_ref(), + &PrimitiveArray::::from_slice(&partition_indices)) + .map_err(|e| { + DataFusionError::Execution(e.to_string()) + }).map(ArrayRef::from) }) .collect::>>>()?; @@ -354,7 +355,7 @@ impl ExecutionPlan for ShuffleWriterExec { // build metadata result batch let num_writers = part_loc.len(); let mut partition_builder = UInt32Vec::with_capacity(num_writers); - let mut path_builder = MutableUtf8Array::with_capacity(num_writers); + let mut path_builder = MutableUtf8Array::::with_capacity(num_writers); let mut num_rows_builder = UInt64Vec::with_capacity(num_writers); let mut num_batches_builder = UInt64Vec::with_capacity(num_writers); let mut num_bytes_builder = UInt64Vec::with_capacity(num_writers); @@ -368,21 +369,19 @@ impl ExecutionPlan for ShuffleWriterExec { } // build arrays - let partition_num: ArrayRef = Arc::new(partition_builder.finish()); - let path: ArrayRef = Arc::new(path_builder.finish()); - let field_builders: Vec> = vec![ - Box::new(num_rows_builder), - Box::new(num_batches_builder), - Box::new(num_bytes_builder), + let partition_num: ArrayRef = partition_builder.into_arc(); + let path: ArrayRef = path_builder.into_arc(); + let field_builders: Vec> = vec![ + num_rows_builder.into_arc(), + num_batches_builder.into_arc(), + num_bytes_builder.into_arc(), ]; - let mut stats_builder = StructBuilder::new( - PartitionStats::default().arrow_struct_fields(), + let stats_builder = StructArray::from_data( + DataType::Struct(PartitionStats::default().arrow_struct_fields()), field_builders, + None, ); - for _ in 0..num_writers { - stats_builder.append(true)?; - } - let stats = Arc::new(stats_builder.finish()); + let stats = Arc::new(stats_builder); // build result batch containing metadata let schema = result_schema(); @@ -459,7 +458,9 @@ impl<'a> ShuffleWriter<'a> { let num_bytes: usize = batch .columns() .iter() - .map(|array| array.get_array_memory_size()) + .map(|_array| 0) + // TODO: add arrow2 with array_memory_size capability and enable this. + // .map(|array| array.get_array_memory_size()) .sum(); self.num_bytes += num_bytes as u64; Ok(()) @@ -505,7 +506,7 @@ mod tests { assert_eq!(2, batch.num_rows()); let path = batch.columns()[1] .as_any() - .downcast_ref::() + .downcast_ref::>() .unwrap(); let file0 = path.value(0); @@ -582,7 +583,7 @@ mod tests { schema.clone(), vec![ Arc::new(UInt32Array::from(vec![Some(1), Some(2)])), - Arc::new(StringArray::from(vec![Some("hello"), Some("world")])), + Arc::new(Utf8Array::::from(vec![Some("hello"), Some("world")])), ], )?; let partition = vec![batch.clone(), batch]; diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index dbaac1de7b574..80f06020ddf62 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -411,7 +411,10 @@ mod roundtrip_tests { Field::new("nullable", DataType::Boolean, false), Field::new("name", DataType::Utf8, false), Field::new("datatype", DataType::Binary, false), - ]), + ], + None, + false, + ), DataType::Union(vec![ Field::new("nullable", DataType::Boolean, false), Field::new("name", DataType::Utf8, false), @@ -425,7 +428,10 @@ mod roundtrip_tests { ]), true, ), - ]), + ], + None, + false, + ), DataType::Dictionary( Box::new(DataType::Utf8), Box::new(DataType::Struct(vec![ @@ -556,7 +562,10 @@ mod roundtrip_tests { Field::new("nullable", DataType::Boolean, false), Field::new("name", DataType::Utf8, false), Field::new("datatype", DataType::Binary, false), - ]), + ], + None, + false, + ), DataType::Union(vec![ Field::new("nullable", DataType::Boolean, false), Field::new("name", DataType::Utf8, false), @@ -570,7 +579,10 @@ mod roundtrip_tests { ]), true, ), - ]), + ], + None, + false, + ), DataType::Dictionary( Box::new(DataType::Utf8), Box::new(DataType::Struct(vec![ diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index aa7a973dd3402..899bfa8f3a497 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -51,6 +51,7 @@ impl protobuf::IntervalUnit { match interval_unit { IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth, IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime, + IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano, } } @@ -62,6 +63,7 @@ impl protobuf::IntervalUnit { Some(interval_unit) => Ok(match interval_unit { protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth, protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime, + protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano, }), None => Err(proto_error( "Error converting i32 to DateUnit: Passed invalid variant", @@ -235,7 +237,7 @@ impl TryInto for &protobuf::ArrowType { .iter() .map(|field| field.try_into()) .collect::, _>>()?; - DataType::Union(union_types) + DataType::Union(union_types, None, false) } protobuf::arrow_type::ArrowTypeEnum::Dictionary(boxed_dict) => { let dict_ref = boxed_dict.as_ref(); @@ -389,7 +391,7 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum { .map(|field| field.into()) .collect::>(), }), - DataType::Union(union_types) => ArrowTypeEnum::Union(protobuf::Union { + DataType::Union(union_types, _, _) => ArrowTypeEnum::Union(protobuf::Union { union_types: union_types .iter() .map(|field| field.into()) @@ -407,6 +409,8 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum { fractional: *fractional as u64, }) } + DataType::Extension(_, _, _) => + panic!("DataType::Extension is not supported") } } } @@ -535,7 +539,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { | DataType::FixedSizeList(_, _) | DataType::LargeList(_) | DataType::Struct(_) - | DataType::Union(_) + | DataType::Union(_, _, _) | DataType::Dictionary(_, _) | DataType::Decimal(_, _) => { return Err(proto_error(format!( @@ -543,6 +547,8 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { val ))) } + DataType::Extension(_, _, _) => + panic!("DataType::Extension is not supported") }; Ok(scalar_value) } diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs index 1383ba89685cf..6769e40ccde32 100644 --- a/ballista/rust/core/src/serde/mod.rs +++ b/ballista/rust/core/src/serde/mod.rs @@ -1,3 +1,4 @@ + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -243,6 +244,8 @@ impl TryInto .iter() .map(|field| field.try_into()) .collect::, _>>()?, + None, + false, ), arrow_type::ArrowTypeEnum::Dictionary(dict) => { let pb_key_datatype = dict diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 8b95444982645..41f3d9e413ced 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -212,13 +212,17 @@ impl TryInto> for &protobuf::PhysicalPlanNode { PhysicalPlanType::Window(window_agg) => { let input: Arc = convert_box_required!(window_agg.input)?; - let input_schema = window_agg.input_schema.ok_or_else(|| { - BallistaError::General( - "input_schema in WindowAggrNode is missing.".to_owned(), - ) - })?; - - let physical_schema = Arc::new(input_schema); + let input_schema = window_agg + .input_schema + .as_ref() + .ok_or_else(|| { + BallistaError::General( + "input_schema in WindowAggrNode is missing.".to_owned(), + ) + })? + .clone(); + let physical_schema: SchemaRef = + SchemaRef::new((&input_schema).try_into()?); let physical_window_expr: Vec> = window_agg .window_expr diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index 424bab6f499f0..ca2f2d113fc13 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -162,7 +162,7 @@ impl PartitionStats { let values = vec![num_rows, num_batches, num_bytes]; Ok(Arc::new(StructArray::from_data( - self.arrow_struct_fields(), + DataType::Struct(self.arrow_struct_fields()), values, None, ))) diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index a1d3a63fb9b8b..33f8b55089db6 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -90,7 +90,8 @@ pub async fn write_stream_to_disk( let batch_size_bytes: usize = batch .columns() .iter() - .map(|array| array.get_array_memory_size()) + // .map(|array| array.get_array_memory_size()) + .map(|_array| 0) .sum(); num_batches += 1; num_rows += batch.num_rows(); diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index ac797b448bbda..cc5734e4d36a8 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1808,7 +1808,12 @@ mod tests { let results = execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; - let expected = vec!["++", "||", "++", "++"]; + let expected = vec![ + "+----+--------------+", + "| c1 | AVG(test.c2) |", + "+----+--------------+", + "+----+--------------+", + ]; assert_batches_sorted_eq!(expected, &results); Ok(()) diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index c48b9e5a13de8..0ddae5975cc73 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -282,9 +282,9 @@ mod tests { "+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+", "| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |", "| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |", - "| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |", - "| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |", - "| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |", + "| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439785 | 13.860958726523547 | 21 | 21 |", + "| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549835 | 8.79396828975897 | 18 | 18 |", + "| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341557 | 10.206140546981727 | 21 | 21 |", "+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+", ], &df diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 91fbe1d195d2a..c3e436a0ffc7c 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -1394,7 +1394,7 @@ mod tests { let expr = col("b1").not().eq(lit(true)); let p = PruningPredicate::try_new(&expr, schema).unwrap(); let result = p.prune(&statistics).unwrap(); - assert_eq!(result, vec![true, false, false, true, true]); + assert_eq!(result, vec![true, true, false, true, true]); } /// Creates setup for int32 chunk pruning diff --git a/datafusion/src/physical_plan/expressions/cast.rs b/datafusion/src/physical_plan/expressions/cast.rs index 9034aaf235876..670e24dec7618 100644 --- a/datafusion/src/physical_plan/expressions/cast.rs +++ b/datafusion/src/physical_plan/expressions/cast.rs @@ -154,7 +154,10 @@ mod tests { let expression = cast_with_options(col("a", &schema)?, &schema, $TYPE)?; // verify that its display is correct - assert_eq!(format!("CAST(a AS {:?})", $TYPE), format!("{}", expression)); + assert_eq!( + format!("CAST(a@0 AS {:?})", $TYPE), + format!("{}", expression) + ); // verify that the expression's type is correct assert_eq!(expression.data_type(&schema)?, $TYPE); @@ -235,7 +238,7 @@ mod tests { #[test] fn invalid_cast() { // Ensure a useful error happens at plan time if invalid casts are used - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema = Schema::new(vec![Field::new("a", DataType::Null, false)]); let result = cast(col("a", &schema).unwrap(), &schema, DataType::LargeBinary); result.expect_err("expected Invalid CAST"); diff --git a/datafusion/src/physical_plan/expressions/try_cast.rs b/datafusion/src/physical_plan/expressions/try_cast.rs index 2381657b2d3d2..d76c374806be2 100644 --- a/datafusion/src/physical_plan/expressions/try_cast.rs +++ b/datafusion/src/physical_plan/expressions/try_cast.rs @@ -236,7 +236,7 @@ mod tests { #[test] fn invalid_cast() { // Ensure a useful error happens at plan time if invalid casts are used - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema = Schema::new(vec![Field::new("a", DataType::Null, false)]); let result = try_cast(col("a", &schema).unwrap(), &schema, DataType::LargeBinary); result.expect_err("expected Invalid CAST"); diff --git a/datafusion/src/physical_plan/math_expressions.rs b/datafusion/src/physical_plan/math_expressions.rs index 724c0e9bf401d..8624c83936d03 100644 --- a/datafusion/src/physical_plan/math_expressions.rs +++ b/datafusion/src/physical_plan/math_expressions.rs @@ -20,6 +20,7 @@ use rand::{thread_rng, Rng}; use std::iter; use std::sync::Arc; +use arrow::array::Float32Array; use arrow::array::Float64Array; use arrow::compute::arity::unary; use arrow::datatypes::DataType; @@ -27,24 +28,34 @@ use arrow::datatypes::DataType; use super::{ColumnarValue, ScalarValue}; use crate::error::{DataFusionError, Result}; +macro_rules! downcast_compute_op { + ($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident, $DT: path) => {{ + let n = $ARRAY.as_any().downcast_ref::<$TYPE>(); + match n { + Some(array) => { + let res: $TYPE = + unary(array, |x| x.$FUNC(), $DT); + Ok(Arc::new(res)) + } + _ => Err(DataFusionError::Internal(format!( + "Invalid data type for {}", + $NAME + ))), + } + }}; +} + macro_rules! unary_primitive_array_op { ($VALUE:expr, $NAME:expr, $FUNC:ident) => {{ match ($VALUE) { ColumnarValue::Array(array) => match array.data_type() { DataType::Float32 => { - let array = array.as_any().downcast_ref().unwrap(); - let array = unary::( - array, - |x| x.$FUNC() as f64, - DataType::Float32, - ); - Ok(ColumnarValue::Array(Arc::new(array))) + let result = downcast_compute_op!(array, $NAME, $FUNC, Float32Array, DataType::Float32); + Ok(ColumnarValue::Array(result?)) } DataType::Float64 => { - let array = array.as_any().downcast_ref().unwrap(); - let array = - unary::(array, |x| x.$FUNC(), DataType::Float64); - Ok(ColumnarValue::Array(Arc::new(array))) + let result = downcast_compute_op!(array, $NAME, $FUNC, Float64Array, DataType::Float64); + Ok(ColumnarValue::Array(result?)) } other => Err(DataFusionError::Internal(format!( "Unsupported data type {:?} for function {}", diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index aa2221be9f6e9..85e002dcc37ff 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -248,6 +248,11 @@ impl ParquetExec { &self.projection } + /// Batch size + pub fn batch_size(&self) -> usize { + self.batch_size + } + /// Statistics for the data set (sum of statistics for all partitions) pub fn statistics(&self) -> &Statistics { &self.statistics diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index ef668afcb2cf3..ceaca578f7b35 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -511,7 +511,7 @@ impl SortPreservingMergeStream { } // emit current batch of rows for current buffer - array_data.extend(buffer_idx, start_row_idx, end_row_idx); + array_data.extend(buffer_idx, start_row_idx, end_row_idx - start_row_idx); // start new batch of rows buffer_idx = next_buffer_idx; @@ -520,7 +520,7 @@ impl SortPreservingMergeStream { } // emit final batch of rows - array_data.extend(buffer_idx, start_row_idx, end_row_idx); + array_data.extend(buffer_idx, start_row_idx, end_row_idx - start_row_idx); array_data.as_arc() }) .collect(); @@ -965,7 +965,7 @@ mod tests { options: Default::default(), }, PhysicalSortExpr { - expr: col("c7", &schema).unwrap(), + expr: col("c12", &schema).unwrap(), options: SortOptions::default(), }, ]; @@ -1180,7 +1180,7 @@ mod tests { async fn test_async() { let schema = test::aggr_test_schema(); let sort = vec![PhysicalSortExpr { - expr: col("c7", &schema).unwrap(), + expr: col("c12", &schema).unwrap(), options: SortOptions::default(), }];