Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Day 1 #1

Merged
merged 2 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::sync::{Arc, Mutex};

use arrow::io::print
use arrow::io::print;
use arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
Expand Down
12 changes: 0 additions & 12 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ use std::{
sync::Arc,
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicates


use crate::datasource::TableProvider;
use crate::sql::parser::FileType;

use super::extension::UserDefinedLogicalNode;
use super::{
display::{GraphvizVisitor, IndentVisitor},
Column,
};
use crate::logical_plan::dfschema::DFSchemaRef;

/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinType {
Expand Down
2 changes: 0 additions & 2 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use std::sync::Arc;

use arrow::compute::cast;
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::DataType;
use arrow::temporal_conversions::utf8_to_timestamp_ns_scalar;

Expand All @@ -32,7 +31,6 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::physical_plan::functions::BuiltinScalarFunction;
use crate::scalar::ScalarValue;
use arrow::compute::{kernels, DEFAULT_CAST_OPTIONS};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-valid uses;


/// Optimizer that simplifies comparison expressions involving boolean literals.
///
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
physical_plan::{display::DisplayableExecutionPlan, Partitioning},
physical_plan::{DisplayFormatType, ExecutionPlan},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;

use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use arrow::{
array::*,
compute::cast,
datatypes::{
ArrowPrimitiveType, DataType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-exists

DataType, TimeUnit,
},
temporal_conversions::utf8_to_timestamp_ns_scalar,
types::NativeType,
Expand Down
19 changes: 0 additions & 19 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,25 +776,6 @@ mod tests {
Ok(())
}

#[test]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicates

fn modulus_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));

apply_arithmetic::<i32>(
schema,
vec![a, b],
Operator::Modulo,
Int32Array::from(vec![0, 0, 2, 8, 0]),
)?;

Ok(())
}

fn apply_arithmetic<T: NativeType>(
schema: Arc<Schema>,
data: Vec<Arc<dyn Array>>,
Expand Down
19 changes: 15 additions & 4 deletions datafusion/src/physical_plan/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ macro_rules! compare_op_scalar {
}};
}

// TODO: primitive array currently doesn't have `values_iter()`, it may
// worth adding one there, and this specialized case could be removed.
macro_rules! compare_primitive_op_scalar {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check TODO above

($left: expr, $right:expr, $op:expr) => {{
let validity = $left.validity();
let values =
Bitmap::from_trusted_len_iter($left.values().iter().map(|x| $op(x, $right)));
Ok(BooleanArray::from_data(DataType::Boolean, values, validity))
}};
}

/// InList
#[derive(Debug)]
pub struct InListExpr {
Expand Down Expand Up @@ -162,18 +173,18 @@ macro_rules! make_contains_primitive {
// whether each value on the left (can be null) is contained in the non-null list
fn in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[<T as NativeType>],
values: &[T],
) -> Result<BooleanArray> {
compare_op_scalar!(array, values, |x, v: &[<T as NativeType>]| v
compare_primitive_op_scalar!(array, values, |x, v| v
.contains(&x))
}

// whether each value on the left (can be null) is contained in the non-null list
fn not_in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[<T as NativeType>],
values: &[T],
) -> Result<BooleanArray> {
compare_op_scalar!(array, values, |x, v: &[<T as NativeType>]| !v
compare_primitive_op_scalar!(array, values, |x, v| !v
.contains(&x))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ fn combine_hashes(l: u64, r: u64) -> u64 {
}

macro_rules! hash_array {
($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
($array_type:ty, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
Copy link
Collaborator Author

@yjshen yjshen Sep 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utf8Array::<i32> is not a ident

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty or path

if array.null_count() == 0 {
if $multi_col {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
use crate::expressions::{PhysicalSortExpr, SortColumn};
use crate::physical_plan::expressions::{PhysicalSortExpr, SortColumn};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use arrow::{
record_batch::RecordBatch,
};
use log::debug;
use parquet::file::reader::{FileReader, SerializedFileReader};

use parquet::statistics::{
BinaryStatistics as ParquetBinaryStatistics,
BooleanStatistics as ParquetBooleanStatistics,
Expand Down Expand Up @@ -579,7 +579,7 @@ fn read_partition(
ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics);
let mut file = File::open(partitioned_file.path.as_str())?;
let reader = read::RecordReader::try_new(
std::io::BufReader::new(file)
std::io::BufReader::new(file),
Some(projection.to_vec()),
limit,
None,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::record_batch::RecordBatch;
use arrow::{
array::{Array, ArrayRef, UInt32Array, UInt64Array, Utf8Array},
array::{Array, UInt64Array},
error::Result as ArrowResult,
};
use arrow::{compute::take, datatypes::SchemaRef};
Expand Down