Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 25, 2021
1 parent 9ba214a commit 05383ea
Show file tree
Hide file tree
Showing 90 changed files with 833 additions and 1,048 deletions.
8 changes: 1 addition & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,4 @@
[workspace]
members = [
"datafusion",
"datafusion-examples",
"benchmarks",
"ballista/rust/client",
"ballista/rust/core",
"ballista/rust/executor",
"ballista/rust/scheduler",
]
]
5 changes: 2 additions & 3 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,15 @@ path = "src/bin/main.rs"
[features]
default = ["cli", "crypto_expressions", "regex_expressions", "unicode_expressions"]
cli = ["rustyline"]
simd = ["arrow/simd"]
simd = []
crypto_expressions = ["md-5", "sha2"]
regex_expressions = ["regex", "lazy_static"]
unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow = { git = "https://github.com/apache/arrow-rs", rev = "c3fe3bab9905739fdda75301dab07a18c91731bd", features = ["prettyprint"] }
parquet = { git = "https://github.com/apache/arrow-rs", rev = "c3fe3bab9905739fdda75301dab07a18c91731bd", features = ["arrow"] }
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "5567d2c6487a9cda7cacf43890b73486d8613989" }
sqlparser = "0.9.0"
clap = "2.33"
rustyline = {version = "7.0", optional = true}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ Run a SQL query against data stored in a CSV:

```rust
use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow2::util::pretty::print_batches;
use arrow2::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
Expand All @@ -87,8 +87,8 @@ Use the DataFrame API to process data stored in a CSV:

```rust
use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow2::util::pretty::print_batches;
use arrow2::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::runtime::Runtime;
extern crate arrow;
extern crate datafusion;

use arrow::{
use arrow2::{
array::Float32Array,
array::Float64Array,
array::StringArray,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/filter_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::{
use arrow2::{
array::{Float32Array, Float64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/math_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::runtime::Runtime;
extern crate arrow;
extern crate datafusion;

use arrow::{
use arrow2::{
array::{Float32Array, Float64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::{Arc, Mutex};
extern crate arrow;
extern crate datafusion;

use arrow::datatypes::{DataType, Field, Schema};
use arrow2::datatypes::{DataType, Field, Schema};

use datafusion::datasource::{CsvFile, CsvReadOptions, MemTable};
use datafusion::execution::context::ExecutionContext;
Expand Down Expand Up @@ -57,7 +57,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
Field::new("c13", DataType::Utf8, false),
]));

let testdata = arrow::util::test_util::arrow_test_data();
let testdata = arrow2::util::test_util::arrow_test_data();

// create CSV data source
let csv = CsvFile::try_new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/bin/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#![allow(bare_trait_objects)]

use arrow::util::pretty;
use arrow2::util::pretty;
use clap::{crate_version, App, Arg};
use datafusion::error::Result;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
Expand Down
32 changes: 14 additions & 18 deletions datafusion/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
use std::{any, sync::Arc};

use arrow::{
array::{StringBuilder, UInt64Builder},
use arrow2::{
array::*,
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
Expand Down Expand Up @@ -178,10 +178,10 @@ impl SchemaProvider for InformationSchemaProvider {
///
/// Columns are based on https://www.postgresql.org/docs/current/infoschema-columns.html
struct InformationSchemaTablesBuilder {
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
table_types: StringBuilder,
catalog_names: Utf8Primitive<i32>,
schema_names: Utf8Primitive<i32>,
table_names: Utf8Primitive<i32>,
table_types: Utf8Primitive<i32>,
}

impl InformationSchemaTablesBuilder {
Expand All @@ -191,10 +191,10 @@ impl InformationSchemaTablesBuilder {
// critical code and the number of tables is unavailable here.
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
table_types: StringBuilder::new(default_capacity),
catalog_names: Utf8Primitive::with_capacity(default_capacity),
schema_names: Utf8Primitive::with_capacity(default_capacity),
table_names: Utf8Primitive::with_capacity(default_capacity),
table_types: Utf8Primitive::with_capacity(default_capacity),
}
}

Expand All @@ -205,14 +205,10 @@ impl InformationSchemaTablesBuilder {
table_name: impl AsRef<str>,
) {
// Note: append_value is actually infallable.
self.catalog_names
.append_value(catalog_name.as_ref())
.unwrap();
self.schema_names
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.table_types.append_value("BASE TABLE").unwrap();
self.catalog_names.push(Some(&catalog_name.as_ref()));
self.schema_names.push(Some(&schema_name.as_ref()));
self.table_names.push(Some(&table_name.as_ref()));
self.table_types.push(Some(&"BASE TABLE"));
}

fn add_system_table(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! DataFrame API for building and executing query plans.
use crate::arrow::record_batch::RecordBatch;
use crate::arrow2::record_batch::RecordBatch;
use crate::error::Result;
use crate::logical_plan::{
DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning,
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@
//! use datafusion::datasource::TableProvider;
//! use datafusion::datasource::csv::{CsvFile, CsvReadOptions};
//!
//! let testdata = arrow::util::test_util::arrow_test_data();
//! let testdata = arrow2::util::test_util::arrow_test_data();
//! let csvdata = CsvFile::try_new(
//! &format!("{}/csv/aggregate_test_100.csv", testdata),
//! CsvReadOptions::new().delimiter(b'|'),
//! ).unwrap();
//! let schema = csvdata.schema();
//! ```
use arrow::datatypes::SchemaRef;
use arrow2::datatypes::Schema;
type SchemaRef = Arc<Schema>;
use std::any::Any;
use std::string::String;
use std::sync::Arc;
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::sync::Arc;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::ExecutionPlan;
use crate::{arrow::datatypes::SchemaRef, scalar::ScalarValue};
use crate::{arrow2::datatypes::Schema, scalar::ScalarValue};

type SchemaRef = Arc<Schema>;

/// This table statistics are estimates.
/// It can not be used directly in the precise compute
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
use std::any::Any;
use std::sync::Arc;

use arrow::datatypes::*;
use arrow2::datatypes::*;

type SchemaRef = Arc<Schema>;

use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
Expand Down
10 changes: 6 additions & 4 deletions datafusion/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use log::debug;
use std::any::Any;
use std::sync::Arc;

use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow2::datatypes::{Field, Schema};
use arrow2::record_batch::RecordBatch;

type SchemaRef = Arc<Schema>;

use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -221,8 +223,8 @@ impl TableProvider for MemTable {
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow2::array::Int32Array;
use arrow2::datatypes::{DataType, Field, Schema};
use futures::StreamExt;
use std::collections::HashMap;

Expand Down
13 changes: 6 additions & 7 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::any::Any;
use std::string::String;
use std::sync::Arc;

use arrow::datatypes::*;
use arrow2::datatypes::*;

use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
Expand All @@ -32,6 +32,8 @@ use crate::physical_plan::ExecutionPlan;

use super::datasource::TableProviderFilterPushDown;

type SchemaRef = Arc<Schema>;

/// Table-based representation of a `ParquetFile`.
pub struct ParquetTable {
path: String,
Expand Down Expand Up @@ -106,11 +108,8 @@ impl TableProvider for ParquetTable {
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
use arrow2::array::*;
use arrow2::record_batch::RecordBatch;
use futures::StreamExt;

#[tokio::test]
Expand Down Expand Up @@ -328,7 +327,7 @@ mod tests {
}

fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = arrow2::util::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::try_new(&filename, 2)?;
Ok(Arc::new(table))
Expand Down
16 changes: 2 additions & 14 deletions datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use std::fmt::{Display, Formatter};
use std::io;
use std::result;

use arrow::error::ArrowError;
use parquet::errors::ParquetError;
use arrow2::error::ArrowError;
use sqlparser::parser::ParserError;

/// Result type for operations that could result in an [DataFusionError]
Expand All @@ -35,8 +34,6 @@ pub type Result<T> = result::Result<T, DataFusionError>;
pub enum DataFusionError {
/// Error returned by arrow.
ArrowError(ArrowError),
/// Wraps an error from the Parquet crate
ParquetError(ParquetError),
/// Error associated to I/O operations and associated traits.
IoError(io::Error),
/// Error returned when SQL is syntactically incorrect.
Expand All @@ -59,7 +56,7 @@ pub enum DataFusionError {
}

impl DataFusionError {
/// Wraps this [DataFusionError] as an [arrow::error::ArrowError].
/// Wraps this [DataFusionError] as an [arrow2::error::ArrowError].
pub fn into_arrow_external_error(self) -> ArrowError {
ArrowError::from_external_error(Box::new(self))
}
Expand All @@ -77,12 +74,6 @@ impl From<ArrowError> for DataFusionError {
}
}

impl From<ParquetError> for DataFusionError {
fn from(e: ParquetError) -> Self {
DataFusionError::ParquetError(e)
}
}

impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e)
Expand All @@ -93,9 +84,6 @@ impl Display for DataFusionError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match *self {
DataFusionError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc),
DataFusionError::ParquetError(ref desc) => {
write!(f, "Parquet error: {}", desc)
}
DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc),
DataFusionError::SQL(ref desc) => {
write!(f, "SQL error: {:?}", desc)
Expand Down
Loading

0 comments on commit 05383ea

Please sign in to comment.