Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 16, 2021
1 parent 1702d6c commit a4659e4
Show file tree
Hide file tree
Showing 89 changed files with 1,082 additions and 1,823 deletions.
9 changes: 0 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,4 @@
[workspace]
members = [
"datafusion",
"datafusion-cli",
"datafusion-examples",
"benchmarks",
"ballista/rust/client",
"ballista/rust/core",
"ballista/rust/executor",
"ballista/rust/scheduler",
]

exclude = ["python"]
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ Run a SQL query against data stored in a CSV:

```rust
use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow2::util::pretty::print_batches;
use arrow2::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
Expand All @@ -92,8 +92,8 @@ Use the DataFrame API to process data stored in a CSV:

```rust
use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow2::util::pretty::print_batches;
use arrow2::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
Expand Down
4 changes: 1 addition & 3 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,14 @@ path = "src/lib.rs"

[features]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
simd = ["arrow/simd"]
crypto_expressions = ["md-5", "sha2"]
regex_expressions = ["regex", "lazy_static"]
unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow = { git = "https://github.com/apache/arrow-rs", rev = "4449ee96fe3fd4a0b275da8dd25ce2792699bc98", features = ["prettyprint"] }
parquet = { git = "https://github.com/apache/arrow-rs", rev = "4449ee96fe3fd4a0b275da8dd25ce2792699bc98", features = ["arrow"] }
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "d22c2899dbff3a72410ca72b7e05ffa0fe3dafca" }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::runtime::Runtime;
extern crate arrow;
extern crate datafusion;

use arrow::{
use arrow2::{
array::Float32Array,
array::Float64Array,
array::StringArray,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/filter_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::{
use arrow2::{
array::{Float32Array, Float64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/math_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::runtime::Runtime;
extern crate arrow;
extern crate datafusion;

use arrow::{
use arrow2::{
array::{Float32Array, Float64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::{Arc, Mutex};
extern crate arrow;
extern crate datafusion;

use arrow::datatypes::{DataType, Field, Schema};
use arrow2::datatypes::{DataType, Field, Schema};

use datafusion::datasource::{CsvFile, CsvReadOptions, MemTable};
use datafusion::execution::context::ExecutionContext;
Expand Down Expand Up @@ -57,7 +57,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
Field::new("c13", DataType::Utf8, false),
]));

let testdata = arrow::util::test_util::arrow_test_data();
let testdata = arrow2::util::test_util::arrow_test_data();

// create CSV data source
let csv = CsvFile::try_new(
Expand Down
144 changes: 68 additions & 76 deletions datafusion/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
use std::{any, sync::Arc};

use arrow::{
array::{StringBuilder, UInt64Builder},
use arrow2::{
array::*,
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
Expand Down Expand Up @@ -189,23 +189,23 @@ impl SchemaProvider for InformationSchemaProvider {
///
/// Columns are based on https://www.postgresql.org/docs/current/infoschema-columns.html
struct InformationSchemaTablesBuilder {
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
table_types: StringBuilder,
catalog_names: Utf8Primitive<i32>,
schema_names: Utf8Primitive<i32>,
table_names: Utf8Primitive<i32>,
table_types: Utf8Primitive<i32>,
}

impl InformationSchemaTablesBuilder {
fn new() -> Self {
// StringBuilder requires providing an initial capacity, so
// Utf8Primitive<i32> requires providing an initial capacity, so
// pick 10 here arbitrarily as this is not performance
// critical code and the number of tables is unavailable here.
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
table_types: StringBuilder::new(default_capacity),
catalog_names: Utf8Primitive::with_capacity(default_capacity),
schema_names: Utf8Primitive::with_capacity(default_capacity),
table_names: Utf8Primitive::with_capacity(default_capacity),
table_types: Utf8Primitive::with_capacity(default_capacity),
}
}

Expand All @@ -215,22 +215,28 @@ impl InformationSchemaTablesBuilder {
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
table_type: TableType,
) {
// Note: append_value is actually infallable.
self.catalog_names.push(Some(&catalog_name.as_ref()));
self.schema_names.push(Some(&schema_name.as_ref()));
self.table_names.push(Some(&table_name.as_ref()));
self.table_types.push(Some(&match table_type {
TableType::Base => "BASE TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
}));
}

fn add_system_table(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
) {
// Note: append_value is actually infallable.
self.catalog_names
.append_value(catalog_name.as_ref())
.unwrap();
self.schema_names
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.table_types
.append_value(match table_type {
TableType::Base => "BASE TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
})
.unwrap();
}
}

Expand Down Expand Up @@ -270,45 +276,45 @@ impl From<InformationSchemaTablesBuilder> for MemTable {
///
/// Columns are based on https://www.postgresql.org/docs/current/infoschema-columns.html
struct InformationSchemaColumnsBuilder {
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
column_names: StringBuilder,
ordinal_positions: UInt64Builder,
column_defaults: StringBuilder,
is_nullables: StringBuilder,
data_types: StringBuilder,
character_maximum_lengths: UInt64Builder,
character_octet_lengths: UInt64Builder,
numeric_precisions: UInt64Builder,
numeric_precision_radixes: UInt64Builder,
numeric_scales: UInt64Builder,
datetime_precisions: UInt64Builder,
interval_types: StringBuilder,
catalog_names: Utf8Primitive<i32>,
schema_names: Utf8Primitive<i32>,
table_names: Utf8Primitive<i32>,
column_names: Utf8Primitive<i32>,
ordinal_positions: Primitive<u64>,
column_defaults: Utf8Primitive<i32>,
is_nullables: Utf8Primitive<i32>,
data_types: Utf8Primitive<i32>,
character_maximum_lengths: Primitive<u64>,
character_octet_lengths: Primitive<u64>,
numeric_precisions: Primitive<u64>,
numeric_precision_radixes: Primitive<u64>,
numeric_scales: Primitive<u64>,
datetime_precisions: Primitive<u64>,
interval_types: Utf8Primitive<i32>,
}

impl InformationSchemaColumnsBuilder {
fn new() -> Self {
// StringBuilder requires providing an initial capacity, so
// Utf8Primitive<i32> requires providing an initial capacity, so
// pick 10 here arbitrarily as this is not performance
// critical code and the number of tables is unavailable here.
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
column_names: StringBuilder::new(default_capacity),
ordinal_positions: UInt64Builder::new(default_capacity),
column_defaults: StringBuilder::new(default_capacity),
is_nullables: StringBuilder::new(default_capacity),
data_types: StringBuilder::new(default_capacity),
character_maximum_lengths: UInt64Builder::new(default_capacity),
character_octet_lengths: UInt64Builder::new(default_capacity),
numeric_precisions: UInt64Builder::new(default_capacity),
numeric_precision_radixes: UInt64Builder::new(default_capacity),
numeric_scales: UInt64Builder::new(default_capacity),
datetime_precisions: UInt64Builder::new(default_capacity),
interval_types: StringBuilder::new(default_capacity),
catalog_names: Utf8Primitive::<i32>::with_capacity(default_capacity),
schema_names: Utf8Primitive::<i32>::with_capacity(default_capacity),
table_names: Utf8Primitive::<i32>::with_capacity(default_capacity),
column_names: Utf8Primitive::<i32>::with_capacity(default_capacity),
ordinal_positions: Primitive::<u64>::with_capacity(default_capacity),
column_defaults: Utf8Primitive::<i32>::with_capacity(default_capacity),
is_nullables: Utf8Primitive::<i32>::with_capacity(default_capacity),
data_types: Utf8Primitive::<i32>::with_capacity(default_capacity),
character_maximum_lengths: Primitive::<u64>::with_capacity(default_capacity),
character_octet_lengths: Primitive::<u64>::with_capacity(default_capacity),
numeric_precisions: Primitive::<u64>::with_capacity(default_capacity),
numeric_precision_radixes: Primitive::<u64>::with_capacity(default_capacity),
numeric_scales: Primitive::<u64>::with_capacity(default_capacity),
datetime_precisions: Primitive::<u64>::with_capacity(default_capacity),
interval_types: Utf8Primitive::<i32>::with_capacity(default_capacity),
}
}

Expand All @@ -326,43 +332,31 @@ impl InformationSchemaColumnsBuilder {
use DataType::*;

// Note: append_value is actually infallable.
self.catalog_names
.append_value(catalog_name.as_ref())
.unwrap();
self.schema_names
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.catalog_names.push(Some(catalog_name.as_ref()));
self.schema_names.push(Some(schema_name.as_ref()));
self.table_names.push(Some(table_name.as_ref()));

self.column_names
.append_value(column_name.as_ref())
.unwrap();
self.column_names.push(Some(column_name.as_ref()));

self.ordinal_positions
.append_value(column_position as u64)
.unwrap();
self.ordinal_positions.push(Some(column_position as u64));

// DataFusion does not support column default values, so null
self.column_defaults.append_null().unwrap();

// "YES if the column is possibly nullable, NO if it is known not nullable. "
let nullable_str = if is_nullable { "YES" } else { "NO" };
self.is_nullables.append_value(nullable_str).unwrap();
self.is_nullables.push(Some(nullable_str));

// "System supplied type" --> Use debug format of the datatype
self.data_types
.append_value(format!("{:?}", data_type))
.unwrap();
self.data_types.push(Some(format!("{:?}", data_type)));

// "If data_type identifies a character or bit string type, the
// declared maximum length; null for all other data types or
// if no maximum length was declared."
//
// Arrow has no equivalent of VARCHAR(20), so we leave this as Null
let max_chars = None;
self.character_maximum_lengths
.append_option(max_chars)
.unwrap();
self.character_maximum_lengths.push(Some(max_chars));

// "Maximum length, in bytes, for binary data, character data,
// or text and image data."
Expand All @@ -371,9 +365,7 @@ impl InformationSchemaColumnsBuilder {
LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
_ => None,
};
self.character_octet_lengths
.append_option(char_len)
.unwrap();
self.character_octet_lengths.push(char_len);

// numeric_precision: "If data_type identifies a numeric type, this column
// contains the (declared or implicit) precision of the type
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! DataFrame API for building and executing query plans.
use crate::arrow::record_batch::RecordBatch;
use crate::arrow2::record_batch::RecordBatch;
use crate::error::Result;
use crate::logical_plan::{
DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning,
Expand Down
13 changes: 9 additions & 4 deletions datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@
//! use datafusion::datasource::TableProvider;
//! use datafusion::datasource::csv::{CsvFile, CsvReadOptions};
//!
//! let testdata = arrow::util::test_util::arrow_test_data();
//! let testdata = arrow2::util::test_util::arrow_test_data();
//! let csvdata = CsvFile::try_new(
//! &format!("{}/csv/aggregate_test_100.csv", testdata),
//! CsvReadOptions::new().delimiter(b'|'),
//! ).unwrap();
//! let schema = csvdata.schema();
//! ```
use arrow::datatypes::SchemaRef;
use std::any::Any;
use std::io::{Read, Seek};
use std::string::String;
use std::sync::{Arc, Mutex};

use arrow2::datatypes::Schema;
use arrow2::io::csv::read::infer_schema;

use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
Expand All @@ -47,6 +49,7 @@ use crate::physical_plan::csv::CsvExec;
pub use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::{common, ExecutionPlan};

type SchemaRef = Arc<Schema>;
enum Source {
/// Path to a single CSV file or a directory containing one of more CSV files
Path(String),
Expand Down Expand Up @@ -125,7 +128,7 @@ impl CsvFile {
let schema = Arc::new(match options.schema {
Some(s) => s.clone(),
None => {
let (schema, _) = arrow::csv::reader::infer_file_schema(
let (schema, _) = infer_schema(
&mut reader,
options.delimiter,
Some(options.schema_infer_max_records),
Expand Down Expand Up @@ -228,6 +231,8 @@ mod tests {
use super::*;
use crate::prelude::*;

use arrow2::array::*;

#[tokio::test]
async fn csv_file_from_reader() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
Expand All @@ -249,7 +254,7 @@ mod tests {
batches[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0),
5
Expand Down
Loading

0 comments on commit a4659e4

Please sign in to comment.