Skip to content

Commit

Permalink
More conversions.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 30, 2021
1 parent a04b328 commit faf8ab9
Show file tree
Hide file tree
Showing 27 changed files with 318 additions and 199 deletions.
65 changes: 32 additions & 33 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
unimplemented,
};

use arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::logical_plan::{
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
sqrt, tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
Expand Down Expand Up @@ -299,9 +299,9 @@ impl TryInto<datafusion::logical_plan::DFSchemaRef> for protobuf::Schema {
}
}

impl TryInto<arrow::datatypes::DataType> for &protobuf::scalar_type::Datatype {
impl TryInto<DataType> for &protobuf::scalar_type::Datatype {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::DataType, Self::Error> {
fn try_into(self) -> Result<DataType, Self::Error> {
use protobuf::scalar_type::Datatype;
Ok(match self {
Datatype::Scalar(scalar_type) => {
Expand Down Expand Up @@ -332,17 +332,18 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::scalar_type::Datatype {
))
})?;
//Because length is checked above it is safe to unwrap .last()
let mut scalar_type =
arrow::datatypes::DataType::List(Box::new(Field::new(
field_names.last().unwrap().as_str(),
pb_scalar_type.into(),
true,
)));
let mut scalar_type = DataType::List(Box::new(Field::new(
field_names.last().unwrap().as_str(),
pb_scalar_type.into(),
true,
)));
//Iterate over field names in reverse order except for the last item in the vector
for name in field_names.iter().rev().skip(1) {
let new_datatype = arrow::datatypes::DataType::List(Box::new(
Field::new(name.as_str(), scalar_type, true),
));
let new_datatype = DataType::List(Box::new(Field::new(
name.as_str(),
scalar_type,
true,
)));
scalar_type = new_datatype;
}
scalar_type
Expand All @@ -351,11 +352,11 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::scalar_type::Datatype {
}
}

impl TryInto<arrow::datatypes::DataType> for &protobuf::arrow_type::ArrowTypeEnum {
impl TryInto<DataType> for &protobuf::arrow_type::ArrowTypeEnum {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::DataType, Self::Error> {
use arrow::datatypes::DataType;
fn try_into(self) -> Result<DataType, Self::Error> {
use protobuf::arrow_type;
use DataType;
Ok(match self {
arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
Expand Down Expand Up @@ -467,9 +468,9 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::arrow_type::ArrowTypeEnu
}

#[allow(clippy::from_over_into)]
impl Into<arrow::datatypes::DataType> for protobuf::PrimitiveScalarType {
fn into(self) -> arrow::datatypes::DataType {
use arrow::datatypes::DataType;
impl Into<DataType> for protobuf::PrimitiveScalarType {
fn into(self) -> DataType {
use DataType;
match self {
protobuf::PrimitiveScalarType::Bool => DataType::Boolean,
protobuf::PrimitiveScalarType::Uint8 => DataType::UInt8,
Expand All @@ -486,10 +487,10 @@ impl Into<arrow::datatypes::DataType> for protobuf::PrimitiveScalarType {
protobuf::PrimitiveScalarType::LargeUtf8 => DataType::LargeUtf8,
protobuf::PrimitiveScalarType::Date32 => DataType::Date32,
protobuf::PrimitiveScalarType::TimeMicrosecond => {
DataType::Time64(arrow::datatypes::TimeUnit::Microsecond)
DataType::Time64(TimeUnit::Microsecond)
}
protobuf::PrimitiveScalarType::TimeNanosecond => {
DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond)
DataType::Time64(TimeUnit::Nanosecond)
}
protobuf::PrimitiveScalarType::Null => DataType::Null,
}
Expand Down Expand Up @@ -746,9 +747,9 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::ScalarListValue {
}
}

impl TryInto<arrow::datatypes::DataType> for &protobuf::ScalarListType {
impl TryInto<DataType> for &protobuf::ScalarListType {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::DataType, Self::Error> {
fn try_into(self) -> Result<DataType, Self::Error> {
use protobuf::PrimitiveScalarType;
let protobuf::ScalarListType {
deepest_type,
Expand All @@ -762,7 +763,7 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::ScalarListType {
));
}

let mut curr_type = arrow::datatypes::DataType::List(Box::new(Field::new(
let mut curr_type = DataType::List(Box::new(Field::new(
//Since checked vector is not empty above this is safe to unwrap
field_names.last().unwrap(),
PrimitiveScalarType::from_i32(*deepest_type)
Expand All @@ -774,9 +775,8 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::ScalarListType {
)));
//Iterates over field names in reverse order except for the last item in the vector
for name in field_names.iter().rev().skip(1) {
let temp_curr_type = arrow::datatypes::DataType::List(Box::new(Field::new(
name, curr_type, true,
)));
let temp_curr_type =
DataType::List(Box::new(Field::new(name, curr_type, true)));
curr_type = temp_curr_type;
}
Ok(curr_type)
Expand Down Expand Up @@ -876,8 +876,7 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::ScalarValue {
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()?;
let scalar_type: arrow::datatypes::DataType =
pb_scalar_type.try_into()?;
let scalar_type: DataType = pb_scalar_type.try_into()?;
ScalarValue::List(Some(typechecked_values), scalar_type)
}
protobuf::scalar_value::Value::NullListValue(v) => {
Expand Down Expand Up @@ -1169,9 +1168,9 @@ fn from_proto_binary_op(op: &str) -> Result<Operator, BallistaError> {
}
}

impl TryInto<arrow::datatypes::DataType> for &protobuf::ScalarType {
impl TryInto<DataType> for &protobuf::ScalarType {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::DataType, Self::Error> {
fn try_into(self) -> Result<DataType, Self::Error> {
let pb_scalartype = self.datatype.as_ref().ok_or_else(|| {
proto_error("ScalarType message missing required field 'datatype'")
})?;
Expand Down Expand Up @@ -1202,16 +1201,16 @@ impl TryInto<Schema> for &protobuf::Schema {
}
}

impl TryInto<arrow::datatypes::Field> for &protobuf::Field {
impl TryInto<Field> for &protobuf::Field {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::Field, Self::Error> {
fn try_into(self) -> Result<Field, Self::Error> {
let pb_datatype = self.arrow_type.as_ref().ok_or_else(|| {
proto_error(
"Protobuf deserialization error: Field message missing required field 'arrow_type'",
)
})?;

Ok(arrow::datatypes::Field::new(
Ok(Field::new(
self.name.as_str(),
pb_datatype.as_ref().try_into()?,
self.nullable,
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ mod roundtrip_tests {

use super::super::{super::error::Result, protobuf};
use crate::error::BallistaError;
use arrow::datatypes::{DataType, Field, Schema};
use core::panic;
use datafusion::arrow2::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::physical_plan::functions::BuiltinScalarFunction::Sqrt;
use datafusion::{
logical_plan::{Expr, LogicalPlan, LogicalPlanBuilder},
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{

use crate::datasource::DfTableAdapter;
use crate::serde::{protobuf, BallistaError};
use arrow::datatypes::{DataType, Schema};
use datafusion::arrow2::datatypes::{DataType, Schema};
use datafusion::datasource::CsvFile;
use datafusion::logical_plan::{Expr, JoinType, LogicalPlan};
use datafusion::physical_plan::aggregates::AggregateFunction;
Expand Down
3 changes: 1 addition & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
})?
.clone();

let physical_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);
let physical_schema = Arc::new(input_schema);

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
Expand Down
8 changes: 2 additions & 6 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod roundtrip_tests {
use datafusion::physical_plan::hash_utils::JoinType;
use std::{convert::TryInto, sync::Arc};

use arrow::datatypes::{DataType, Schema};
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::ColumnarValue;
use datafusion::physical_plan::{
empty::EmptyExec,
Expand Down Expand Up @@ -75,7 +75,6 @@ mod roundtrip_tests {

#[test]
fn roundtrip_hash_join() -> Result<()> {
use arrow::datatypes::{DataType, Field, Schema};
let field_a = Field::new("col", DataType::Int64, false);
let schema_left = Schema::new(vec![field_a.clone()]);
let schema_right = Schema::new(vec![field_a]);
Expand All @@ -95,7 +94,6 @@ mod roundtrip_tests {

#[test]
fn rountrip_hash_aggregate() -> Result<()> {
use arrow::datatypes::{DataType, Field, Schema};
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("a"), "unused".to_string())];

Expand All @@ -120,7 +118,6 @@ mod roundtrip_tests {

#[test]
fn roundtrip_filter_with_not_and_in_list() -> Result<()> {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_plan::Operator;
use datafusion::physical_plan::{
expressions::{binary, lit, InListExpr, NotExpr},
Expand Down Expand Up @@ -149,8 +146,7 @@ mod roundtrip_tests {

#[test]
fn roundtrip_sort() -> Result<()> {
use arrow::compute::kernels::sort::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow2::compute::sort::SortOptions;
let field_a = Field::new("a", DataType::Boolean, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
Expand Down
6 changes: 2 additions & 4 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

use std::{collections::HashMap, sync::Arc};

use arrow::array::{
ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow2::array::*;
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use serde::Serialize;
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::path::PathBuf;
use std::process;
use std::time::Instant;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::util::pretty;
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::arrow2::io::print;

use datafusion::error::Result;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
Expand Down Expand Up @@ -124,7 +124,7 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Resu
let physical_plan = ctx.create_physical_plan(&plan)?;
let result = collect(physical_plan).await?;
if debug {
pretty::print_batches(&result)?;
print::print(&result)?;
}
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ mod tests {
}

/// Specialised String representation
fn col_str(column: &ArrayRef, row_index: usize) -> String {
fn col_str(column: &dyn Array, row_index: usize) -> String {
if column.is_null(row_index) {
return "NULL".to_string();
}
Expand All @@ -733,7 +733,7 @@ mod tests {

let mut r = Vec::with_capacity(*n as usize);
for i in 0..*n {
r.push(col_str(&array, i as usize));
r.push(col_str(array.as_ref(), i as usize));
}
return format!("[{}]", r.join(","));
}
Expand Down Expand Up @@ -912,7 +912,7 @@ mod tests {

// convert the schema to the same but with all columns set to nullable=true.
// this allows direct schema comparison ignoring nullable.
fn nullable_schema(schema: Arc<Schema>) -> Schema {
fn nullable_schema(schema: &Schema) -> Schema {
Schema::new(
schema
.fields()
Expand Down
12 changes: 6 additions & 6 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ mod tests {
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)
.unwrap();
Expand All @@ -181,9 +181,9 @@ mod tests {
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::arrow::util::pretty;
use datafusion::arrow2::io::print;

use datafusion::error::Result;
use datafusion::prelude::*;
Expand All @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
// create local execution context
let mut ctx = ExecutionContext::new();

let testdata = datafusion::crate::test::parquet_test_data();
let testdata = datafusion::test::parquet_test_data();

let filename = &format!("{}/alltypes_plain.parquet", testdata);

Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ impl FlightService for FlightServiceImpl {

let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let options =
datafusion::arrow2::io::ipc::write::common::IpcWriteOptions::default();
let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema(
table.schema().as_ref(),
&options,
Expand All @@ -87,7 +88,7 @@ impl FlightService for FlightServiceImpl {
// create local execution context
let mut ctx = ExecutionContext::new();

let testdata = datafusion::crate::test::parquet_test_data();
let testdata = datafusion::test::parquet_test_data();

// register parquet file with the execution context
ctx.register_parquet(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ simd = ["arrow2/simd"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "acc3082708977d50220b0577925e769c104a0480" }
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "d2be5b4dd0176672ab34460d8147562590d33567" }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
Loading

0 comments on commit faf8ab9

Please sign in to comment.