From faf8ab944725bf66395197147d7951bbe8e84825 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 30 May 2021 09:44:31 +0000 Subject: [PATCH] More conversions. --- .../core/src/serde/logical_plan/from_proto.rs | 65 ++++---- .../rust/core/src/serde/logical_plan/mod.rs | 2 +- .../core/src/serde/logical_plan/to_proto.rs | 2 +- .../src/serde/physical_plan/from_proto.rs | 3 +- .../rust/core/src/serde/physical_plan/mod.rs | 8 +- ballista/rust/core/src/serde/scheduler/mod.rs | 6 +- benchmarks/src/bin/nyctaxi.rs | 6 +- benchmarks/src/bin/tpch.rs | 6 +- datafusion-cli/src/print_format.rs | 12 +- datafusion-examples/examples/dataframe.rs | 4 +- datafusion-examples/examples/flight_server.rs | 5 +- datafusion/Cargo.toml | 2 +- datafusion/benches/aggregate_query_sql.rs | 17 +- datafusion/benches/filter_query_sql.rs | 4 +- datafusion/benches/math_query_sql.rs | 7 +- datafusion/benches/sort_limit_query_sql.rs | 2 +- datafusion/src/catalog/information_schema.rs | 19 +-- .../src/physical_plan/expressions/cast.rs | 10 +- .../src/physical_plan/expressions/mod.rs | 2 + .../src/physical_plan/expressions/try_cast.rs | 4 +- datafusion/src/physical_plan/hash_join.rs | 8 +- .../src/physical_plan/regex_expressions.rs | 145 ++++++++++++++++- datafusion/src/scalar.rs | 4 +- datafusion/tests/custom_sources.rs | 4 +- datafusion/tests/provider_filter_pushdown.rs | 19 ++- datafusion/tests/sql.rs | 147 +++++++++--------- datafusion/tests/user_defined_plan.rs | 4 +- 27 files changed, 318 insertions(+), 199 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 020858fbfc3fe..ecd1c480b67dc 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -26,7 +26,7 @@ use std::{ unimplemented, }; -use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow2::datatypes::{DataType, Field, Schema}; use datafusion::logical_plan::{ abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin, sqrt, tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, @@ -299,9 +299,9 @@ impl TryInto for protobuf::Schema { } } -impl TryInto for &protobuf::scalar_type::Datatype { +impl TryInto for &protobuf::scalar_type::Datatype { type Error = BallistaError; - fn try_into(self) -> Result { + fn try_into(self) -> Result { use protobuf::scalar_type::Datatype; Ok(match self { Datatype::Scalar(scalar_type) => { @@ -332,17 +332,18 @@ impl TryInto for &protobuf::scalar_type::Datatype { )) })?; //Because length is checked above it is safe to unwrap .last() - let mut scalar_type = - arrow::datatypes::DataType::List(Box::new(Field::new( - field_names.last().unwrap().as_str(), - pb_scalar_type.into(), - true, - ))); + let mut scalar_type = DataType::List(Box::new(Field::new( + field_names.last().unwrap().as_str(), + pb_scalar_type.into(), + true, + ))); //Iterate over field names in reverse order except for the last item in the vector for name in field_names.iter().rev().skip(1) { - let new_datatype = arrow::datatypes::DataType::List(Box::new( - Field::new(name.as_str(), scalar_type, true), - )); + let new_datatype = DataType::List(Box::new(Field::new( + name.as_str(), + scalar_type, + true, + ))); scalar_type = new_datatype; } scalar_type @@ -351,11 +352,11 @@ impl TryInto for &protobuf::scalar_type::Datatype { } } -impl TryInto for &protobuf::arrow_type::ArrowTypeEnum { +impl TryInto for &protobuf::arrow_type::ArrowTypeEnum { type Error = BallistaError; - fn try_into(self) -> Result { - use arrow::datatypes::DataType; + fn try_into(self) -> Result { use protobuf::arrow_type; + use DataType; Ok(match self { arrow_type::ArrowTypeEnum::None(_) => DataType::Null, arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean, @@ -467,9 +468,9 @@ impl TryInto for &protobuf::arrow_type::ArrowTypeEnu } #[allow(clippy::from_over_into)] -impl Into for protobuf::PrimitiveScalarType { - fn into(self) -> arrow::datatypes::DataType { - use arrow::datatypes::DataType; +impl Into for protobuf::PrimitiveScalarType { + fn into(self) -> DataType { + use DataType; match self { protobuf::PrimitiveScalarType::Bool => DataType::Boolean, protobuf::PrimitiveScalarType::Uint8 => DataType::UInt8, @@ -486,10 +487,10 @@ impl Into for protobuf::PrimitiveScalarType { protobuf::PrimitiveScalarType::LargeUtf8 => DataType::LargeUtf8, protobuf::PrimitiveScalarType::Date32 => DataType::Date32, protobuf::PrimitiveScalarType::TimeMicrosecond => { - DataType::Time64(arrow::datatypes::TimeUnit::Microsecond) + DataType::Time64(TimeUnit::Microsecond) } protobuf::PrimitiveScalarType::TimeNanosecond => { - DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond) + DataType::Time64(TimeUnit::Nanosecond) } protobuf::PrimitiveScalarType::Null => DataType::Null, } @@ -746,9 +747,9 @@ impl TryInto for &protobuf::ScalarListValue { } } -impl TryInto for &protobuf::ScalarListType { +impl TryInto for &protobuf::ScalarListType { type Error = BallistaError; - fn try_into(self) -> Result { + fn try_into(self) -> Result { use protobuf::PrimitiveScalarType; let protobuf::ScalarListType { deepest_type, @@ -762,7 +763,7 @@ impl TryInto for &protobuf::ScalarListType { )); } - let mut curr_type = arrow::datatypes::DataType::List(Box::new(Field::new( + let mut curr_type = DataType::List(Box::new(Field::new( //Since checked vector is not empty above this is safe to unwrap field_names.last().unwrap(), PrimitiveScalarType::from_i32(*deepest_type) @@ -774,9 +775,8 @@ impl TryInto for &protobuf::ScalarListType { ))); //Iterates over field names in reverse order except for the last item in the vector for name in field_names.iter().rev().skip(1) { - let temp_curr_type = arrow::datatypes::DataType::List(Box::new(Field::new( - name, curr_type, true, - ))); + let temp_curr_type = + DataType::List(Box::new(Field::new(name, curr_type, true))); curr_type = temp_curr_type; } Ok(curr_type) @@ -876,8 +876,7 @@ impl TryInto for &protobuf::ScalarValue { .iter() .map(|val| val.try_into()) .collect::, _>>()?; - let scalar_type: arrow::datatypes::DataType = - pb_scalar_type.try_into()?; + let scalar_type: DataType = pb_scalar_type.try_into()?; ScalarValue::List(Some(typechecked_values), scalar_type) } protobuf::scalar_value::Value::NullListValue(v) => { @@ -1169,9 +1168,9 @@ fn from_proto_binary_op(op: &str) -> Result { } } -impl TryInto for &protobuf::ScalarType { +impl TryInto for &protobuf::ScalarType { type Error = BallistaError; - fn try_into(self) -> Result { + fn try_into(self) -> Result { let pb_scalartype = self.datatype.as_ref().ok_or_else(|| { proto_error("ScalarType message missing required field 'datatype'") })?; @@ -1202,16 +1201,16 @@ impl TryInto for &protobuf::Schema { } } -impl TryInto for &protobuf::Field { +impl TryInto for &protobuf::Field { type Error = BallistaError; - fn try_into(self) -> Result { + fn try_into(self) -> Result { let pb_datatype = self.arrow_type.as_ref().ok_or_else(|| { proto_error( "Protobuf deserialization error: Field message missing required field 'arrow_type'", ) })?; - Ok(arrow::datatypes::Field::new( + Ok(Field::new( self.name.as_str(), pb_datatype.as_ref().try_into()?, self.nullable, diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 48dd96c4d3f31..7257650b545fb 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -24,8 +24,8 @@ mod roundtrip_tests { use super::super::{super::error::Result, protobuf}; use crate::error::BallistaError; - use arrow::datatypes::{DataType, Field, Schema}; use core::panic; + use datafusion::arrow2::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::physical_plan::functions::BuiltinScalarFunction::Sqrt; use datafusion::{ logical_plan::{Expr, LogicalPlan, LogicalPlanBuilder}, diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 47e27483ff307..884f81b93b241 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -26,7 +26,7 @@ use std::{ use crate::datasource::DfTableAdapter; use crate::serde::{protobuf, BallistaError}; -use arrow::datatypes::{DataType, Schema}; +use datafusion::arrow2::datatypes::{DataType, Schema}; use datafusion::datasource::CsvFile; use datafusion::logical_plan::{Expr, JoinType, LogicalPlan}; use datafusion::physical_plan::aggregates::AggregateFunction; diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index d034f3ca3bfee..0d8cab9e2b08c 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -206,8 +206,7 @@ impl TryInto> for &protobuf::PhysicalPlanNode { })? .clone(); - let physical_schema: SchemaRef = - SchemaRef::new((&input_schema).try_into()?); + let physical_schema = Arc::new(input_schema); let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index e7985cc84a9a7..a2eb3e5176d58 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -23,7 +23,7 @@ mod roundtrip_tests { use datafusion::physical_plan::hash_utils::JoinType; use std::{convert::TryInto, sync::Arc}; - use arrow::datatypes::{DataType, Schema}; + use datafusion::arrow2::datatypes::{DataType, Field, Schema}; use datafusion::physical_plan::ColumnarValue; use datafusion::physical_plan::{ empty::EmptyExec, @@ -75,7 +75,6 @@ mod roundtrip_tests { #[test] fn roundtrip_hash_join() -> Result<()> { - use arrow::datatypes::{DataType, Field, Schema}; let field_a = Field::new("col", DataType::Int64, false); let schema_left = Schema::new(vec![field_a.clone()]); let schema_right = Schema::new(vec![field_a]); @@ -95,7 +94,6 @@ mod roundtrip_tests { #[test] fn rountrip_hash_aggregate() -> Result<()> { - use arrow::datatypes::{DataType, Field, Schema}; let groups: Vec<(Arc, String)> = vec![(col("a"), "unused".to_string())]; @@ -120,7 +118,6 @@ mod roundtrip_tests { #[test] fn roundtrip_filter_with_not_and_in_list() -> Result<()> { - use arrow::datatypes::{DataType, Field, Schema}; use datafusion::logical_plan::Operator; use datafusion::physical_plan::{ expressions::{binary, lit, InListExpr, NotExpr}, @@ -149,8 +146,7 @@ mod roundtrip_tests { #[test] fn roundtrip_sort() -> Result<()> { - use arrow::compute::kernels::sort::SortOptions; - use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow2::compute::sort::SortOptions; let field_a = Field::new("a", DataType::Boolean, false); let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index bbbd48b74a1f3..2b9b6b91e56f4 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -17,10 +17,8 @@ use std::{collections::HashMap, sync::Arc}; -use arrow::array::{ - ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder, -}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow2::array::*; +use datafusion::arrow2::datatypes::{DataType, Field, Schema}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; use serde::Serialize; diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index b2a62a0d39f9b..88b6d0f2cfd5d 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -22,8 +22,8 @@ use std::path::PathBuf; use std::process; use std::time::Instant; -use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::arrow::util::pretty; +use datafusion::arrow2::datatypes::{DataType, Field, Schema}; +use datafusion::arrow2::io::print; use datafusion::error::Result; use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; @@ -124,7 +124,7 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Resu let physical_plan = ctx.create_physical_plan(&plan)?; let result = collect(physical_plan).await?; if debug { - pretty::print_batches(&result)?; + print::print(&result)?; } Ok(()) } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 52118262cc187..711c980d305bc 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -718,7 +718,7 @@ mod tests { } /// Specialised String representation - fn col_str(column: &ArrayRef, row_index: usize) -> String { + fn col_str(column: &dyn Array, row_index: usize) -> String { if column.is_null(row_index) { return "NULL".to_string(); } @@ -733,7 +733,7 @@ mod tests { let mut r = Vec::with_capacity(*n as usize); for i in 0..*n { - r.push(col_str(&array, i as usize)); + r.push(col_str(array.as_ref(), i as usize)); } return format!("[{}]", r.join(",")); } @@ -912,7 +912,7 @@ mod tests { // convert the schema to the same but with all columns set to nullable=true. // this allows direct schema comparison ignoring nullable. - fn nullable_schema(schema: Arc) -> Schema { + fn nullable_schema(schema: &Schema) -> Schema { Schema::new( schema .fields() diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 03adfb3a65bb0..e7a66d84c6cdc 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -154,9 +154,9 @@ mod tests { let batch = RecordBatch::try_new( schema, vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(Int32Array::from(vec![4, 5, 6])), - Arc::new(Int32Array::from(vec![7, 8, 9])), + Arc::new(Int32Array::from_slice(&[1, 2, 3])), + Arc::new(Int32Array::from_slice(&[4, 5, 6])), + Arc::new(Int32Array::from_slice(&[7, 8, 9])), ], ) .unwrap(); @@ -181,9 +181,9 @@ mod tests { let batch = RecordBatch::try_new( schema, vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(Int32Array::from(vec![4, 5, 6])), - Arc::new(Int32Array::from(vec![7, 8, 9])), + Arc::new(Int32Array::from_slice(&[1, 2, 3])), + Arc::new(Int32Array::from_slice(&[4, 5, 6])), + Arc::new(Int32Array::from_slice(&[7, 8, 9])), ], ) .unwrap(); diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index 60147748d77a1..0d4cc51b2bb9a 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::arrow::util::pretty; +use datafusion::arrow2::io::print; use datafusion::error::Result; use datafusion::prelude::*; @@ -27,7 +27,7 @@ async fn main() -> Result<()> { // create local execution context let mut ctx = ExecutionContext::new(); - let testdata = datafusion::crate::test::parquet_test_data(); + let testdata = datafusion::test::parquet_test_data(); let filename = &format!("{}/alltypes_plain.parquet", testdata); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 06efb04f76e06..8425d08f1a459 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -66,7 +66,8 @@ impl FlightService for FlightServiceImpl { let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap(); - let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); + let options = + datafusion::arrow2::io::ipc::write::common::IpcWriteOptions::default(); let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema( table.schema().as_ref(), &options, @@ -87,7 +88,7 @@ impl FlightService for FlightServiceImpl { // create local execution context let mut ctx = ExecutionContext::new(); - let testdata = datafusion::crate::test::parquet_test_data(); + let testdata = datafusion::test::parquet_test_data(); // register parquet file with the execution context ctx.register_parquet( diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 12ade26f56c34..9b3e73fc2b7a5 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -46,7 +46,7 @@ simd = ["arrow2/simd"] [dependencies] ahash = "0.7" hashbrown = "0.11" -arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "acc3082708977d50220b0577925e769c104a0480" } +arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "d2be5b4dd0176672ab34460d8147562590d33567" } sqlparser = "0.9.0" paste = "^1.0" num_cpus = "1.13.0" diff --git a/datafusion/benches/aggregate_query_sql.rs b/datafusion/benches/aggregate_query_sql.rs index 6f10b03ad4784..b1ee06f526b18 100644 --- a/datafusion/benches/aggregate_query_sql.rs +++ b/datafusion/benches/aggregate_query_sql.rs @@ -23,14 +23,8 @@ use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; -extern crate arrow; -extern crate datafusion; - use arrow2::{ - array::Float32Array, - array::Float64Array, - array::StringArray, - array::UInt64Array, + array::*, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; @@ -133,11 +127,14 @@ fn create_context( RecordBatch::try_new( schema.clone(), vec![ - Arc::new(StringArray::from(keys)), - Arc::new(Float32Array::from(vec![i as f32; batch_size])), + Arc::new(Utf8Array::::from_slice(keys)), + Arc::new(Float32Array::from_slice(&vec![ + i as f32; + batch_size + ])), Arc::new(Float64Array::from(values)), Arc::new(UInt64Array::from(integer_values_wide)), - Arc::new(UInt64Array::from(integer_values_narrow)), + Arc::new(UInt64Array::from_slice(&integer_values_narrow)), ], ) .unwrap() diff --git a/datafusion/benches/filter_query_sql.rs b/datafusion/benches/filter_query_sql.rs index c5637b1441fb2..fd7aff6a87a3f 100644 --- a/datafusion/benches/filter_query_sql.rs +++ b/datafusion/benches/filter_query_sql.rs @@ -50,8 +50,8 @@ fn create_context(array_len: usize, batch_size: usize) -> Result Arc> { Field::new("c13", DataType::Utf8, false), ])); - let testdata = crate::test::arrow_test_data(); + let testdata = datafusion::test::arrow_test_data(); // create CSV data source let csv = CsvFile::try_new( diff --git a/datafusion/src/catalog/information_schema.rs b/datafusion/src/catalog/information_schema.rs index 680a812d5d91b..4a58698e0083d 100644 --- a/datafusion/src/catalog/information_schema.rs +++ b/datafusion/src/catalog/information_schema.rs @@ -226,15 +226,6 @@ impl InformationSchemaTablesBuilder { TableType::Temporary => "LOCAL TEMPORARY", })); } - - fn add_system_table( - &mut self, - catalog_name: impl AsRef, - schema_name: impl AsRef, - table_name: impl AsRef, - ) { - self.catalog_names.push(Some(&catalog_name.as_ref())); - } } impl From for MemTable { @@ -335,7 +326,7 @@ impl InformationSchemaColumnsBuilder { self.column_names.push(Some(&column_name.as_ref())); - self.ordinal_positions.push(Some(&(column_position as u64))); + self.ordinal_positions.push(Some(column_position as u64)); // DataFusion does not support column default values, so null self.column_defaults.push(None); @@ -363,7 +354,7 @@ impl InformationSchemaColumnsBuilder { LargeBinary | LargeUtf8 => Some(i64::MAX as u64), _ => None, }; - self.character_octet_lengths.push(char_len.as_ref()); + self.character_octet_lengths.push(char_len); // numeric_precision: "If data_type identifies a numeric type, this column // contains the (declared or implicit) precision of the type @@ -404,9 +395,9 @@ impl InformationSchemaColumnsBuilder { _ => (None, None, None), }; - self.numeric_precisions.push(numeric_precision.as_ref()); - self.numeric_precision_radixes.push(numeric_radix.as_ref()); - self.numeric_scales.push(numeric_scale.as_ref()); + self.numeric_precisions.push(numeric_precision); + self.numeric_precision_radixes.push(numeric_radix); + self.numeric_scales.push(numeric_scale); self.datetime_precisions.push(None); self.interval_types.push(None); diff --git a/datafusion/src/physical_plan/expressions/cast.rs b/datafusion/src/physical_plan/expressions/cast.rs index ced85e882d050..20cea215393b6 100644 --- a/datafusion/src/physical_plan/expressions/cast.rs +++ b/datafusion/src/physical_plan/expressions/cast.rs @@ -142,7 +142,7 @@ mod tests { macro_rules! generic_test_cast { ($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $TYPEARRAY:ident, $TYPE:expr, $VEC:expr) => {{ let schema = Schema::new(vec![Field::new("a", $A_TYPE, false)]); - let a = $A_ARRAY::from_slice(&$A_VEC); + let a = $A_ARRAY::from_slice($A_VEC); let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; @@ -185,7 +185,7 @@ mod tests { generic_test_cast!( Int32Array, DataType::Int32, - vec![1, 2, 3, 4, 5], + &[1, 2, 3, 4, 5], UInt32Array, DataType::UInt32, vec![ @@ -204,7 +204,7 @@ mod tests { generic_test_cast!( Int32Array, DataType::Int32, - vec![1, 2, 3, 4, 5], + &[1, 2, 3, 4, 5], StringArray, DataType::Utf8, vec![Some("1"), Some("2"), Some("3"), Some("4"), Some("5")] @@ -215,12 +215,12 @@ mod tests { #[allow(clippy::redundant_clone)] #[test] fn test_cast_i64_t64() -> Result<()> { - let original = vec![1, 2, 3, 4, 5]; + let original = &[1, 2, 3, 4, 5]; let expected: Vec> = original.iter().map(|i| Some(*i)).collect(); generic_test_cast!( Int64Array, DataType::Int64, - original.clone(), + original, Int64Array, DataType::Timestamp(TimeUnit::Nanosecond, None), expected diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index 665cf248514b0..b5d07397ab446 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -31,7 +31,9 @@ type ArrayRef = Arc; /// One column to be used in lexicographical sort #[derive(Clone, Debug)] pub struct SortColumn { + /// The array to be sorted pub values: ArrayRef, + /// The options to sort the array pub options: Option, } diff --git a/datafusion/src/physical_plan/expressions/try_cast.rs b/datafusion/src/physical_plan/expressions/try_cast.rs index 60aadfdec7dbe..1d34fde9f70f2 100644 --- a/datafusion/src/physical_plan/expressions/try_cast.rs +++ b/datafusion/src/physical_plan/expressions/try_cast.rs @@ -174,7 +174,7 @@ mod tests { generic_test_cast!( Int32Array, DataType::Int32, - vec![1, 2, 3, 4, 5], + [1, 2, 3, 4, 5], UInt32Array, DataType::UInt32, vec![ @@ -193,7 +193,7 @@ mod tests { generic_test_cast!( Int32Array, DataType::Int32, - vec![1, 2, 3, 4, 5], + [1, 2, 3, 4, 5], StringArray, DataType::Utf8, vec![Some("1"), Some("2"), Some("3"), Some("4"), Some("5")] diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index ba11ec613e323..c88fc81c9b958 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -679,18 +679,18 @@ fn build_join_indexes( &left_join_values, &keys_values, )? { - left_indices.push(Some(i as i64).as_ref()); - right_indices.push(Some(row as i32).as_ref()); + left_indices.push(Some(i as i64)); + right_indices.push(Some(row as i32)); } else { left_indices.push(None); - right_indices.push(Some(row as i32).as_ref()); + right_indices.push(Some(row as i32)); } } } None => { // when no match, add the row with None for the left side left_indices.push(None); - right_indices.push(Some(row as i32).as_ref()); + right_indices.push(Some(row as i32)); } } } diff --git a/datafusion/src/physical_plan/regex_expressions.rs b/datafusion/src/physical_plan/regex_expressions.rs index 2bd9f7e15dd3e..b31eb1f38c2cd 100644 --- a/datafusion/src/physical_plan/regex_expressions.rs +++ b/datafusion/src/physical_plan/regex_expressions.rs @@ -25,8 +25,8 @@ use std::any::type_name; use std::sync::Arc; use crate::error::{DataFusionError, Result}; -use arrow2::array::{Array, Offset, Utf8Array}; -use arrow2::compute; +use arrow2::array::*; +use arrow2::error::ArrowError; use hashbrown::HashMap; use regex::Regex; @@ -49,10 +49,8 @@ macro_rules! downcast_string_arg { /// extract a specific group from a string column, using a regular expression pub fn regexp_match(args: &[ArrayRef]) -> Result { match args.len() { - 2 => compute::regex_match::regex_match(downcast_string_arg!(args[0], "string", T), downcast_string_arg!(args[1], "pattern", T)) - .map_err(DataFusionError::ArrowError).map(|x| Arc::new(x) as Arc), - 3 => compute::regex_match::regex_match(downcast_string_arg!(args[0], "string", T), downcast_string_arg!(args[1], "pattern", T)) - .map_err(DataFusionError::ArrowError).map(|x| Arc::new(x) as Arc), + 2 => regexp_matches(downcast_string_arg!(args[0], "string", T), downcast_string_arg!(args[1], "pattern", T), None).map(|x| Arc::new(x) as Arc), + 3 => regexp_matches(downcast_string_arg!(args[0], "string", T), downcast_string_arg!(args[1], "pattern", T), Some(downcast_string_arg!(args[1], "flags", T))).map(|x| Arc::new(x) as Arc), other => Err(DataFusionError::Internal(format!( "regexp_match was called with {} arguments. It requires at least 2 and at most 3.", other @@ -172,3 +170,138 @@ pub fn regexp_replace(args: &[ArrayRef]) -> Result { ))), } } + +/// Extract all groups matched by a regular expression for a given String array. +pub fn regexp_matches( + array: &Utf8Array, + regex_array: &Utf8Array, + flags_array: Option<&Utf8Array>, +) -> Result> { + let mut patterns: HashMap = HashMap::new(); + + let complete_pattern = match flags_array { + Some(flags) => Box::new(regex_array.iter().zip(flags.iter()).map( + |(pattern, flags)| { + pattern.map(|pattern| match flags { + Some(value) => format!("(?{}){}", value, pattern), + None => pattern.to_string(), + }) + }, + )) as Box>>, + None => Box::new( + regex_array + .iter() + .map(|pattern| pattern.map(|pattern| pattern.to_string())), + ), + }; + let iter = array.iter().zip(complete_pattern).map(|(value, pattern)| { + match (value, pattern) { + // Required for Postgres compatibility: + // SELECT regexp_match('foobarbequebaz', ''); = {""} + (Some(_), Some(pattern)) if pattern == *"" => { + Ok(Some(vec![Some("")].into_iter())) + } + (Some(value), Some(pattern)) => { + let existing_pattern = patterns.get(&pattern); + let re = match existing_pattern { + Some(re) => re.clone(), + None => { + let re = Regex::new(pattern.as_str()).map_err(|e| { + ArrowError::InvalidArgumentError(format!( + "Regular expression did not compile: {:?}", + e + )) + })?; + patterns.insert(pattern, re.clone()); + re + } + }; + match re.captures(value) { + Some(caps) => { + let a = caps + .iter() + .skip(1) + .map(|x| x.map(|x| x.as_str())) + .collect::>() + .into_iter(); + Ok(Some(a)) + } + None => Ok(None), + } + } + _ => Ok(None), + } + }); + + Ok( + ListPrimitive::, &str>::try_from_iter(iter) + .map(|x| x.into())?, + ) +} + +#[cfg(test)] +mod tests { + use std::iter::FromIterator; + + use super::*; + + #[test] + fn match_single_group() -> Result<()> { + let array = Utf8Array::::from(&[ + Some("abc-005-def"), + Some("X-7-5"), + Some("X545"), + None, + Some("foobarbequebaz"), + Some("foobarbequebaz"), + ]); + + let patterns = Utf8Array::::from_slice(&[ + r".*-(\d*)-.*", + r".*-(\d*)-.*", + r".*-(\d*)-.*", + r".*-(\d*)-.*", + r"(bar)(bequ1e)", + "", + ]); + + let result = regexp_matches(&array, &patterns, None)?; + + let expected = vec![ + Some(vec![Some("005")]), + Some(vec![Some("7")]), + None, + None, + None, + Some(vec![Some("")]), + ]; + + let expected: ListArray = + ListPrimitive::, &str>::from_iter(expected).into(); + + assert_eq!(expected, result); + Ok(()) + } + + #[test] + fn match_single_group_with_flags() -> Result<()> { + let array = Utf8Array::::from(&[ + Some("abc-005-def"), + Some("X-7-5"), + Some("X545"), + None, + ]); + + let patterns = Utf8Array::::from_slice(&vec![r"x.*-(\d*)-.*"; 4]); + let flags = Utf8Array::::from_slice(vec!["i"; 4]); + + let result = regexp_matches(&array, &patterns, Some(&flags))?; + + let expected = vec![None, Some(vec![Some("7")]), None, None]; + let expected: ListArray = + ListPrimitive::, &str>::from_iter(expected).into(); + + assert_eq!(expected, result); + Ok(()) + } +} diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index f44a657be45b9..6edd36e786e8a 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -492,7 +492,7 @@ impl ScalarValue { DataType::Int8 => typed_cast!(array, index, Int8Array, Int8), DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8), DataType::LargeUtf8 => typed_cast!(array, index, LargeStringArray, LargeUtf8), - DataType::List(nested_type) => { + DataType::List(_) => { let list_array = array .as_any() .downcast_ref::>() @@ -507,7 +507,7 @@ impl ScalarValue { } else { None }; - ScalarValue::List(value, nested_type.data_type().clone()) + ScalarValue::List(value, array.data_type().clone()) } DataType::Date32 => { typed_cast!(array, index, Int32Array, Date32) diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index 1f51857df70ae..2f8cd03199b3f 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -101,7 +101,7 @@ impl ExecutionPlan for CustomExecutionPlan { fn as_any(&self) -> &dyn Any { self } - fn schema(&self) -> SchemaRef { + fn schema(&self) -> Arc { let schema = TEST_CUSTOM_SCHEMA_REF!(); match &self.projection { None => schema, @@ -150,7 +150,7 @@ impl TableProvider for CustomTableProvider { self } - fn schema(&self) -> SchemaRef { + fn schema(&self) -> Arc { TEST_CUSTOM_SCHEMA_REF!() } diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs index 921733078530b..8316649511938 100644 --- a/datafusion/tests/provider_filter_pushdown.rs +++ b/datafusion/tests/provider_filter_pushdown.rs @@ -32,10 +32,8 @@ use datafusion::scalar::ScalarValue; use std::sync::Arc; fn create_batch(value: i32, num_rows: usize) -> Result { - let mut builder = Int32Builder::new(num_rows); - for _ in 0..num_rows { - builder.append_value(value)?; - } + let array = + Int32Array::from_trusted_len_values_iter(std::iter::repeat(value).take(num_rows)); Ok(RecordBatch::try_new( Arc::new(Schema::new(vec![Field::new( @@ -43,7 +41,7 @@ fn create_batch(value: i32, num_rows: usize) -> Result { DataType::Int32, false, )])), - vec![Arc::new(builder.finish())], + vec![Arc::new(array)], )?) } @@ -98,7 +96,7 @@ impl TableProvider for CustomProvider { } fn schema(&self) -> Arc { - self.zero_batch.schema() + self.zero_batch.schema().clone() } fn scan( @@ -116,7 +114,7 @@ impl TableProvider for CustomProvider { }; Ok(Arc::new(CustomPlan { - schema: self.zero_batch.schema(), + schema: self.zero_batch.schema().clone(), batches: match int_value { 0 => vec![Arc::new(self.zero_batch.clone())], 1 => vec![Arc::new(self.one_batch.clone())], @@ -125,7 +123,7 @@ impl TableProvider for CustomProvider { })) } _ => Ok(Arc::new(CustomPlan { - schema: self.zero_batch.schema(), + schema: self.zero_batch.schema().clone(), batches: vec![], })), } @@ -153,7 +151,7 @@ async fn assert_provider_row_count(value: i64, expected_count: u64) -> Result<() .aggregate(vec![], vec![count(col("flag"))])?; let results = df.collect().await?; - let result_col: &UInt64Array = as_primitive_array(results[0].column(0)); + let result_col: &UInt64Array = results[0].column(0).as_any().downcast_ref().unwrap(); assert_eq!(result_col.value(0), expected_count); ctx.register_table("data", Arc::new(provider))?; @@ -162,7 +160,8 @@ async fn assert_provider_row_count(value: i64, expected_count: u64) -> Result<() .collect() .await?; - let sql_result_col: &UInt64Array = as_primitive_array(sql_results[0].column(0)); + let sql_result_col: &UInt64Array = + sql_results[0].column(0).as_any().downcast_ref().unwrap(); assert_eq!(sql_result_col.value(0), expected_count); Ok(()) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 006ef21c9e8ee..2426b10421da8 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::convert::TryFrom; use std::sync::Arc; -use chrono::prelude::*; use chrono::Duration; use arrow2::{array::*, datatypes::*, record_batch::RecordBatch}; +use chrono::TimeZone; use datafusion::logical_plan::LogicalPlan; use datafusion::prelude::*; use datafusion::{ @@ -118,7 +117,7 @@ async fn parquet_query() { #[tokio::test] async fn parquet_single_nan_schema() { let mut ctx = ExecutionContext::new(); - let testdata = crate::test::parquet_test_data(); + let testdata = datafusion::test::parquet_test_data(); ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) .unwrap(); let sql = "SELECT mycol FROM single_nan"; @@ -136,7 +135,7 @@ async fn parquet_single_nan_schema() { #[ignore = "Test ignored, will be enabled as part of the nested Parquet reader"] async fn parquet_list_columns() { let mut ctx = ExecutionContext::new(); - let testdata = crate::test::parquet_test_data(); + let testdata = datafusion::test::parquet_test_data(); ctx.register_parquet( "list_columns", &format!("{}/list_columns.parquet", testdata), @@ -171,17 +170,17 @@ async fn parquet_list_columns() { let batch = &results[0]; assert_eq!(3, batch.num_rows()); assert_eq!(2, batch.num_columns()); - assert_eq!(schema, batch.schema()); + assert_eq!(schema.as_ref(), batch.schema().as_ref()); let int_list_array = batch .column(0) .as_any() - .downcast_ref::() + .downcast_ref::>() .unwrap(); let utf8_list_array = batch .column(1) .as_any() - .downcast_ref::() + .downcast_ref::>() .unwrap(); assert_eq!( @@ -190,7 +189,7 @@ async fn parquet_list_columns() { .as_any() .downcast_ref::>() .unwrap(), - &PrimitiveArray::::from(vec![Some(1), Some(2), Some(3),]) + &PrimitiveArray::::from(vec![Some(1), Some(2), Some(3)]) ); assert_eq!( @@ -199,7 +198,7 @@ async fn parquet_list_columns() { .as_any() .downcast_ref::>() .unwrap(), - &Utf8Array::try_from(vec![Some("abc"), Some("efg"), Some("hij"),]).unwrap() + &Utf8Array::::from(vec![Some("abc"), Some("efg"), Some("hij")]) ); assert_eq!( @@ -1488,10 +1487,13 @@ fn create_join_context( let t1_data = RecordBatch::try_new( t1_schema.clone(), vec![ - Arc::new(UInt32Array::from(vec![11, 22, 33, 44])), - Arc::new( - Utf8Array::from(vec![Some("a"), Some("b"), Some("c"), Some("d")]), - ), + Arc::new(UInt32Array::from_slice(&[11, 22, 33, 44])), + Arc::new(Utf8Array::::from(&[ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + ])), ], )?; let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?; @@ -1504,10 +1506,13 @@ fn create_join_context( let t2_data = RecordBatch::try_new( t2_schema.clone(), vec![ - Arc::new(UInt32Array::from(vec![11, 22, 44, 55])), - Arc::new( - Utf8Array::from(vec![Some("z"), Some("y"), Some("x"), Some("w")]), - ), + Arc::new(UInt32Array::from_slice(&[11, 22, 44, 55])), + Arc::new(Utf8Array::::from(&[ + Some("z"), + Some("y"), + Some("x"), + Some("w"), + ])), ], )?; let t2_table = MemTable::try_new(t2_schema, vec![vec![t2_data]])?; @@ -1527,9 +1532,9 @@ fn create_join_context_qualified() -> Result { let t1_data = RecordBatch::try_new( t1_schema.clone(), vec![ - Arc::new(UInt32Array::from(vec![1, 2, 3, 4])), - Arc::new(UInt32Array::from(vec![10, 20, 30, 40])), - Arc::new(UInt32Array::from(vec![50, 60, 70, 80])), + Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4])), + Arc::new(UInt32Array::from_slice(&[10, 20, 30, 40])), + Arc::new(UInt32Array::from_slice(&[50, 60, 70, 80])), ], )?; let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?; @@ -1543,9 +1548,9 @@ fn create_join_context_qualified() -> Result { let t2_data = RecordBatch::try_new( t2_schema.clone(), vec![ - Arc::new(UInt32Array::from(vec![1, 2, 9, 4])), - Arc::new(UInt32Array::from(vec![100, 200, 300, 400])), - Arc::new(UInt32Array::from(vec![500, 600, 700, 800])), + Arc::new(UInt32Array::from_slice(&[1, 2, 9, 4])), + Arc::new(UInt32Array::from_slice(&[100, 200, 300, 400])), + Arc::new(UInt32Array::from_slice(&[500, 600, 700, 800])), ], )?; let t2_table = MemTable::try_new(t2_schema, vec![vec![t2_data]])?; @@ -1592,7 +1597,7 @@ async fn csv_explain_verbose() { assert!(actual.contains("#c2 Gt Int64(10)"), "Actual: '{}'", actual); } -fn aggr_test_schema() -> SchemaRef { +fn aggr_test_schema() -> Arc { Arc::new(Schema::new(vec![ Field::new("c1", DataType::Utf8, false), Field::new("c2", DataType::UInt32, false), @@ -1611,7 +1616,7 @@ fn aggr_test_schema() -> SchemaRef { } async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) { - let testdata = crate::test::arrow_test_data(); + let testdata = datafusion::test::arrow_test_data(); // TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once // unsigned is supported. @@ -1651,7 +1656,7 @@ async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) { } fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> { - let testdata = crate::test::arrow_test_data(); + let testdata = datafusion::test::arrow_test_data(); let schema = aggr_test_schema(); ctx.register_csv( "aggregate_test_100", @@ -1678,7 +1683,7 @@ fn register_aggregate_simple_csv(ctx: &mut ExecutionContext) -> Result<()> { } fn register_alltypes_parquet(ctx: &mut ExecutionContext) { - let testdata = crate::test::parquet_test_data(); + let testdata = datafusion::test::parquet_test_data(); ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), @@ -1709,7 +1714,7 @@ async fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec> { } /// Specialised String representation -fn col_str(column: &ArrayRef, row_index: usize) -> String { +fn col_str(column: &dyn Array, row_index: usize) -> String { if column.is_null(row_index) { return "NULL".to_string(); } @@ -1724,7 +1729,7 @@ fn col_str(column: &ArrayRef, row_index: usize) -> String { let mut r = Vec::with_capacity(*n as usize); for i in 0..*n { - r.push(col_str(&array, i as usize)); + r.push(col_str(array.as_ref(), i as usize)); } return format!("[{}]", r.join(",")); } @@ -1743,7 +1748,7 @@ fn result_vec(results: &[RecordBatch]) -> Vec> { let row_vec = batch .columns() .iter() - .map(|column| col_str(column, row_index)) + .map(|column| col_str(column.as_ref(), row_index)) .collect(); result.push(row_vec); } @@ -1751,14 +1756,14 @@ fn result_vec(results: &[RecordBatch]) -> Vec> { result } -async fn generic_query_length>>( - datatype: DataType, -) -> Result<()> { +async fn generic_query_length(datatype: DataType) -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("c1", datatype, false)])); let data = RecordBatch::try_new( schema.clone(), - vec![Arc::new(T::from(vec!["", "a", "aa", "aaa"]))], + vec![Arc::new(Utf8Array::::from_slice(vec![ + "", "a", "aa", "aaa", + ]))], )?; let table = MemTable::try_new(schema, vec![vec![data]])?; @@ -1775,13 +1780,13 @@ async fn generic_query_length>>( #[tokio::test] #[cfg_attr(not(feature = "unicode_expressions"), ignore)] async fn query_length() -> Result<()> { - generic_query_length::>(DataType::Utf8).await + generic_query_length::(DataType::Utf8).await } #[tokio::test] #[cfg_attr(not(feature = "unicode_expressions"), ignore)] async fn query_large_length() -> Result<()> { - generic_query_length::>(DataType::LargeUtf8).await + generic_query_length::(DataType::LargeUtf8).await } #[tokio::test] @@ -1818,7 +1823,7 @@ async fn query_concat() -> Result<()> { let data = RecordBatch::try_new( schema.clone(), vec![ - Arc::new(Utf8Array::from(vec!["", "a", "aa", "aaa"])), + Arc::new(Utf8Array::::from_slice(vec!["", "a", "aa", "aaa"])), Arc::new(Int32Array::from(vec![Some(0), Some(1), None, Some(3)])), ], )?; @@ -1849,7 +1854,7 @@ async fn query_array() -> Result<()> { let data = RecordBatch::try_new( schema.clone(), vec![ - Arc::new(Utf8Array::from(vec!["", "a", "aa", "aaa"])), + Arc::new(Utf8Array::::from_slice(vec!["", "a", "aa", "aaa"])), Arc::new(Int32Array::from(vec![Some(0), Some(1), None, Some(3)])), ], )?; @@ -1930,16 +1935,17 @@ fn make_timestamp_nano_table() -> Result> { Field::new("value", DataType::Int32, true), ])); - let mut builder = TimestampNanosecondArray::builder(3); - - builder.append_value(1599572549190855000)?; // 2020-09-08T13:42:29.190855+00:00 - builder.append_value(1599568949190855000)?; // 2020-09-08T12:42:29.190855+00:00 - builder.append_value(1599565349190855000)?; // 2020-09-08T11:42:29.190855+00:00 + let array = Primitive::::from_slice(&[ + 1599572549190855000, // 2020-09-08T13:42:29.190855+00:00 + 1599568949190855000, // 2020-09-08T12:42:29.190855+00:00 + 1599565349190855000, // 2020-09-08T11:42:29.190855+00:00 + ]) + .to(DataType::Timestamp(TimeUnit::Nanosecond, None)); let data = RecordBatch::try_new( schema.clone(), vec![ - Arc::new(builder.finish()), + Arc::new(array), Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), ], )?; @@ -2054,16 +2060,17 @@ async fn query_on_string_dictionary() -> Result<()> { // Use StringDictionary (32 bit indexes = keys) let field_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - let schema = Arc::new(Schema::new(vec![Field::new("d1", field_type, true)])); + let schema = Arc::new(Schema::new(vec![Field::new( + "d1", + field_type.clone(), + true, + )])); - let keys_builder = PrimitiveBuilder::::new(10); - let values_builder = StringBuilder::new(10); - let mut builder = StringDictionaryBuilder::new(keys_builder, values_builder); + let data = vec![Some("one"), None, Some("three")]; + let data = data.into_iter().map(arrow2::error::Result::Ok); - builder.append("one")?; - builder.append_null()?; - builder.append("three")?; - let array = Arc::new(builder.finish()); + let array = DictionaryPrimitive::, _>::try_from_iter(data)? + .into_arc(); let data = RecordBatch::try_new(schema.clone(), vec![array])?; @@ -2267,14 +2274,17 @@ async fn csv_group_by_date() -> Result<()> { let data = RecordBatch::try_new( schema.clone(), vec![ - Arc::new(Date32Array::from(vec![ - Some(100), - Some(100), - Some(100), - Some(101), - Some(101), - Some(101), - ])), + Arc::new( + Primitive::::from(vec![ + Some(100), + Some(100), + Some(100), + Some(101), + Some(101), + Some(101), + ]) + .to(DataType::Date32), + ), Arc::new(Int32Array::from(vec![ Some(1), Some(2), @@ -2301,15 +2311,12 @@ async fn csv_group_by_date() -> Result<()> { async fn group_by_timestamp_millis() -> Result<()> { let mut ctx = ExecutionContext::new(); + let data_type = DataType::Timestamp(TimeUnit::Millisecond, None); let schema = Arc::new(Schema::new(vec![ - Field::new( - "timestamp", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), + Field::new("timestamp", data_type.clone(), false), Field::new("count", DataType::Int32, false), ])); - let base_dt = Utc.ymd(2018, 7, 1).and_hms(6, 0, 0); // 2018-Jul-01 06:00 + let base_dt = chrono::Utc.ymd(2018, 7, 1).and_hms(6, 0, 0); // 2018-Jul-01 06:00 let hour1 = Duration::hours(1); let timestamps = vec![ base_dt.timestamp_millis(), @@ -2322,8 +2329,8 @@ async fn group_by_timestamp_millis() -> Result<()> { let data = RecordBatch::try_new( schema.clone(), vec![ - Arc::new(TimestampMillisecondArray::from(timestamps)), - Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50, 60])), + Arc::new(Primitive::::from(timestamps).to(data_type)), + Arc::new(Int32Array::from_slice(&[10, 20, 30, 40, 50, 60])), ], )?; let t1_table = MemTable::try_new(schema, vec![vec![data]])?; @@ -2976,7 +2983,7 @@ async fn test_physical_plan_display_indent() { " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", ]; - let data_path = crate::test::arrow_test_data(); + let data_path = datafusion::test::arrow_test_data(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines() @@ -3025,7 +3032,7 @@ async fn test_physical_plan_display_indent_multi_children() { " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", ]; - let data_path = arrow::util::test_util::arrow_test_data(); + let data_path = datafusion::test::arrow_test_data(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines() diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index 031a8bd16ed20..aeaac360db0be 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -513,7 +513,7 @@ impl Stream for TopKReader { schema, vec![ Arc::new(Utf8Array::::from_slice(customer)), - Arc::new(Int64Array::from_slice(revenue)), + Arc::new(Int64Array::from_slice(&revenue)), ], ))) } @@ -523,7 +523,7 @@ impl Stream for TopKReader { } impl RecordBatchStream for TopKReader { - fn schema(&self) -> SchemaRef { + fn schema(&self) -> Arc { self.input.schema() } }