Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Day 1 #1

Merged
merged 2 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Parquet data source

use std::any::Any;
use std::any::{Any, type_name};
use std::fs::File;
use std::sync::Arc;

Expand All @@ -35,7 +35,7 @@ use crate::datasource::{
create_max_min_accs, get_col_stats, get_statistics_with_limit, FileAndSchema,
PartitionedFile, TableDescriptor, TableDescriptorBuilder, TableProvider,
};
use crate::error::Result;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{combine_filters, Expr};
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::parquet::ParquetExec;
Expand Down Expand Up @@ -221,7 +221,12 @@ impl ParquetTableDescriptor {
if let DataType::$DT = fields[i].data_type() {
let stats = stats
.as_any()
.downcast_ref::<ParquetPrimitiveStatistics<$PRIMITIVE_TYPE>>()?;
.downcast_ref::<ParquetPrimitiveStatistics<$PRIMITIVE_TYPE>>().ok_or_else(|| {
DataFusionError::Internal(format!(
"Failed to cast stats to {} stats",
type_name::<$PRIMITIVE_TYPE>()
))
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
match max_value.update(&[ScalarValue::$DT(Some(v))]) {
Expand Down Expand Up @@ -250,7 +255,9 @@ impl ParquetTableDescriptor {
PhysicalType::Boolean => {
if let DataType::Boolean = fields[i].data_type() {
let stats =
stats.as_any().downcast_ref::<ParquetBooleanStatistics>()?;
stats.as_any().downcast_ref::<ParquetBooleanStatistics>().ok_or_else(|| {
DataFusionError::Internal("Failed to cast stats to boolean stats".to_owned())
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
match max_value.update(&[ScalarValue::Boolean(Some(v))]) {
Expand Down Expand Up @@ -290,11 +297,13 @@ impl ParquetTableDescriptor {
PhysicalType::ByteArray => {
if let DataType::Utf8 = fields[i].data_type() {
let stats =
stats.as_any().downcast_ref::<ParquetBinaryStatistics>()?;
stats.as_any().downcast_ref::<ParquetBinaryStatistics>().ok_or_else(|| {
DataFusionError::Internal("Failed to cast stats to binary stats".to_owned())
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
match max_value.update(&[ScalarValue::Utf8(
std::str::from_utf8(v).map(|s| s.to_string()).ok(),
std::str::from_utf8(&*v).map(|s| s.to_string()).ok(),
)]) {
Ok(_) => {}
Err(_) => {
Expand All @@ -306,7 +315,7 @@ impl ParquetTableDescriptor {
if let Some(min_value) = &mut min_values[i] {
if let Some(v) = stats.min_value {
match min_value.update(&[ScalarValue::Utf8(
std::str::from_utf8(v).map(|s| s.to_string()).ok(),
std::str::from_utf8(&*v).map(|s| s.to_string()).ok(),
)]) {
Ok(_) => {}
Err(_) => {
Expand Down Expand Up @@ -341,7 +350,7 @@ impl TableDescriptorBuilder for ParquetTableDescriptor {

let (mut max_values, mut min_values) = create_max_min_accs(&schema);

for row_group_meta in meta_data.row_groups() {
for row_group_meta in meta_data.row_groups {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();

Expand Down Expand Up @@ -386,7 +395,7 @@ impl TableDescriptorBuilder for ParquetTableDescriptor {
};

Ok(FileAndSchema {
file: PartitionedFile { path, statistics },
file: PartitionedFile { path: path.to_owned(), statistics },
schema,
})
}
Expand Down
12 changes: 12 additions & 0 deletions datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::io;
use std::result;

use arrow::error::ArrowError;
use parquet::error::ParquetError;
use sqlparser::parser::ParserError;

/// Result type for operations that could result in an [DataFusionError]
Expand All @@ -34,6 +35,8 @@ pub type Result<T> = result::Result<T, DataFusionError>;
pub enum DataFusionError {
/// Error returned by arrow.
ArrowError(ArrowError),
/// Wraps an error from the Parquet crate
ParquetError(ParquetError),
/// Error associated to I/O operations and associated traits.
IoError(io::Error),
/// Error returned when SQL is syntactically incorrect.
Expand Down Expand Up @@ -74,6 +77,12 @@ impl From<ArrowError> for DataFusionError {
}
}

impl From<ParquetError> for DataFusionError {
fn from(e: ParquetError) -> Self {
DataFusionError::ParquetError(e)
}
}

impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e)
Expand All @@ -84,6 +93,9 @@ impl Display for DataFusionError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match *self {
DataFusionError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc),
DataFusionError::ParquetError(ref desc) => {
write!(f, "Parquet error: {}", desc)
}
DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc),
DataFusionError::SQL(ref desc) => {
write!(f, "SQL error: {:?}", desc)
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::sync::{Arc, Mutex};

use arrow::io::print
use arrow::io::print;
use arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
Expand Down Expand Up @@ -160,13 +160,13 @@ impl DataFrame for DataFrameImpl {
/// Print results.
async fn show(&self) -> Result<()> {
let results = self.collect().await?;
Ok(print::print(&results)?)
Ok(print::print(&results))
}

/// Print results and limit rows.
async fn show_limit(&self, num: usize) -> Result<()> {
let results = self.limit(num)?.collect().await?;
Ok(print::print(&results)?)
Ok(print::print(&results))
}

/// Convert the logical plan represented by this DataFrame into a physical plan and
Expand Down
12 changes: 0 additions & 12 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ use std::{
sync::Arc,
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicates


use crate::datasource::TableProvider;
use crate::sql::parser::FileType;

use super::extension::UserDefinedLogicalNode;
use super::{
display::{GraphvizVisitor, IndentVisitor},
Column,
};
use crate::logical_plan::dfschema::DFSchemaRef;

/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinType {
Expand Down
2 changes: 0 additions & 2 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use std::sync::Arc;

use arrow::compute::cast;
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::DataType;
use arrow::temporal_conversions::utf8_to_timestamp_ns_scalar;

Expand All @@ -32,7 +31,6 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::physical_plan::functions::BuiltinScalarFunction;
use crate::scalar::ScalarValue;
use arrow::compute::{kernels, DEFAULT_CAST_OPTIONS};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-valid uses;


/// Optimizer that simplifies comparison expressions involving boolean literals.
///
Expand Down
29 changes: 14 additions & 15 deletions datafusion/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use crate::{
physical_plan::{display::DisplayableExecutionPlan, Partitioning},
physical_plan::{DisplayFormatType, ExecutionPlan},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;

use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
use async_trait::async_trait;
use arrow::array::MutableUtf8Array;

/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
/// discards the results, and then prints out an annotated plan with metrics
Expand Down Expand Up @@ -149,43 +150,41 @@ impl ExecutionPlan for AnalyzeExec {
}
let end = Instant::now();

let mut type_builder = StringBuilder::new(1);
let mut plan_builder = StringBuilder::new(1);
let mut type_builder: MutableUtf8Array<i32> = MutableUtf8Array::new();
let mut plan_builder: MutableUtf8Array<i32> = MutableUtf8Array::new();

// TODO use some sort of enum rather than strings?
type_builder.append_value("Plan with Metrics").unwrap();
type_builder.push(Some("Plan with Metrics"));

let annotated_plan =
DisplayableExecutionPlan::with_metrics(captured_input.as_ref())
.indent()
.to_string();
plan_builder.append_value(annotated_plan).unwrap();
plan_builder.push(Some(annotated_plan));

// Verbose output
// TODO make this more sophisticated
if verbose {
type_builder.append_value("Plan with Full Metrics").unwrap();
type_builder.push(Some("Plan with Full Metrics"));

let annotated_plan =
DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref())
.indent()
.to_string();
plan_builder.append_value(annotated_plan).unwrap();
plan_builder.push(Some(annotated_plan));

type_builder.append_value("Output Rows").unwrap();
plan_builder.append_value(total_rows.to_string()).unwrap();
type_builder.push(Some("Output Rows"));
plan_builder.push(Some(total_rows.to_string()));

type_builder.append_value("Duration").unwrap();
plan_builder
.append_value(format!("{:?}", end - start))
.unwrap();
type_builder.push(Some("Duration"));
plan_builder.push(Some(format!("{:?}", end - start)));
}

let maybe_batch = RecordBatch::try_new(
captured_schema,
vec![
Arc::new(type_builder.finish()),
Arc::new(plan_builder.finish()),
type_builder.into_arc(),
plan_builder.into_arc(),
],
);
// again ignore error
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use arrow::{
array::*,
compute::cast,
datatypes::{
ArrowPrimitiveType, DataType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-exists

DataType, TimeUnit,
},
temporal_conversions::utf8_to_timestamp_ns_scalar,
types::NativeType,
Expand Down
19 changes: 0 additions & 19 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,25 +776,6 @@ mod tests {
Ok(())
}

#[test]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicates

fn modulus_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));

apply_arithmetic::<i32>(
schema,
vec![a, b],
Operator::Modulo,
Int32Array::from(vec![0, 0, 2, 8, 0]),
)?;

Ok(())
}

fn apply_arithmetic<T: NativeType>(
schema: Arc<Schema>,
data: Vec<Arc<dyn Array>>,
Expand Down
19 changes: 15 additions & 4 deletions datafusion/src/physical_plan/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ macro_rules! compare_op_scalar {
}};
}

// TODO: primitive array currently doesn't have `values_iter()`, it may
// worth adding one there, and this specialized case could be removed.
macro_rules! compare_primitive_op_scalar {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check TODO above

($left: expr, $right:expr, $op:expr) => {{
let validity = $left.validity();
let values =
Bitmap::from_trusted_len_iter($left.values().iter().map(|x| $op(x, $right)));
Ok(BooleanArray::from_data(DataType::Boolean, values, validity))
}};
}

/// InList
#[derive(Debug)]
pub struct InListExpr {
Expand Down Expand Up @@ -162,18 +173,18 @@ macro_rules! make_contains_primitive {
// whether each value on the left (can be null) is contained in the non-null list
fn in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[<T as NativeType>],
values: &[T],
) -> Result<BooleanArray> {
compare_op_scalar!(array, values, |x, v: &[<T as NativeType>]| v
compare_primitive_op_scalar!(array, values, |x, v| v
.contains(&x))
}

// whether each value on the left (can be null) is contained in the non-null list
fn not_in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[<T as NativeType>],
values: &[T],
) -> Result<BooleanArray> {
compare_op_scalar!(array, values, |x, v: &[<T as NativeType>]| !v
compare_primitive_op_scalar!(array, values, |x, v| !v
.contains(&x))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ fn combine_hashes(l: u64, r: u64) -> u64 {
}

macro_rules! hash_array {
($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
($array_type:ty, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
Copy link
Collaborator Author

@yjshen yjshen Sep 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utf8Array::<i32> is not a ident

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty or path

if array.null_count() == 0 {
if $multi_col {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
use crate::expressions::{PhysicalSortExpr, SortColumn};
use crate::physical_plan::expressions::{PhysicalSortExpr, SortColumn};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use arrow::{
record_batch::RecordBatch,
};
use log::debug;
use parquet::file::reader::{FileReader, SerializedFileReader};

use parquet::statistics::{
BinaryStatistics as ParquetBinaryStatistics,
BooleanStatistics as ParquetBooleanStatistics,
Expand Down Expand Up @@ -579,7 +579,7 @@ fn read_partition(
ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics);
let mut file = File::open(partitioned_file.path.as_str())?;
let reader = read::RecordReader::try_new(
std::io::BufReader::new(file)
std::io::BufReader::new(file),
Some(projection.to_vec()),
limit,
None,
Expand Down
Loading