Skip to content

Commit

Permalink
Improve.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 28, 2021
1 parent a309433 commit 9f80ac2
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 81 deletions.
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ simd = ["arrow2/simd"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "b75f2b507d185f13ec37f81f9c01077a0069e755" }
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "afb233a" }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mod tests {
async fn read_small_batches() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = None;
let exec = table.scan(&projection, 2, &[], None)?;
let exec = table.scan(&projection, 2, &[], Some(2))?;
let stream = exec.execute(0).await?;

let _ = stream
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ mod tests {
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439785 | 13.860958726523547 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549835 | 8.79396828975897 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341557 | 10.206140546981727 | 21 | 21 |",
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
],
&df
Expand Down
22 changes: 22 additions & 0 deletions datafusion/src/physical_plan/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::error::{DataFusionError, Result};
use arrow2::array::*;
use arrow2::compute::concat;
use arrow2::datatypes::DataType;
use std::sync::Arc;

use super::ColumnarValue;
Expand Down Expand Up @@ -58,3 +59,24 @@ pub fn array(values: &[ColumnarValue]) -> Result<ColumnarValue> {

Ok(ColumnarValue::Array(array_array(&arrays).map(Arc::new)?))
}

/// Currently supported types by the array function.
/// The order of these types correspond to the order on which coercion applies
/// This should thus be from least informative to most informative
// `array` supports all types, but we do not have a signature to correctly
// coerce them.
pub static SUPPORTED_ARRAY_TYPES: &[DataType] = &[
DataType::Boolean,
DataType::UInt8,
DataType::UInt16,
DataType::UInt32,
DataType::UInt64,
DataType::Int8,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
DataType::Utf8,
DataType::LargeUtf8,
];
160 changes: 92 additions & 68 deletions datafusion/src/physical_plan/distinct_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;

use ahash::RandomState;

use arrow2::array::new_empty_array;
use arrow2::{
array::{Array, ListArray},
datatypes::{DataType, Field},
Expand Down Expand Up @@ -170,34 +171,28 @@ impl Accumulator for DistinctCountAccumulator {
}

fn state(&self) -> Result<Vec<ScalarValue>> {
self.values
.iter()
.map(|distinct_values| {
if distinct_values.is_empty() {
Ok(None)
} else {
// create an array with all distinct values
let arrays = distinct_values
.iter()
.map(ScalarValue::from)
.map(|x| x.to_array())
.collect::<Vec<_>>();
let arrays = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
Ok(arrow2::compute::concat::concatenate(&arrays)
.map(|x| x.into())
.map(Some)?)
}
})
.zip(self.state_data_types.iter())
.map(|(x, type_): (Result<Option<Arc<dyn Array>>>, &DataType)| {
x.map(|x| {
ScalarValue::List(
x,
ListArray::<i32>::default_datatype(type_.clone()),
)
})
// create a ListArray for each `state_data_type`. The `ListArray`
let a = self.state_data_types.iter().enumerate().map(|(i, type_)| {
if self.values.is_empty() {
return Ok((new_empty_array(type_.clone()), type_));
};
let arrays = self
.values
.iter()
.map(|distinct_values| ScalarValue::from(&distinct_values[i]).to_array())
.collect::<Vec<_>>();
let arrays = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
Ok(arrow2::compute::concat::concatenate(&arrays).map(|x| (x, type_))?)
});
a.map(|values: Result<(Box<dyn Array>, &DataType)>| {
values.map(|(values, type_)| {
ScalarValue::List(
Some(values.into()),
ListArray::<i32>::default_datatype(type_.clone()),
)
})
.collect()
})
.collect()
}

fn evaluate(&self) -> Result<ScalarValue> {
Expand All @@ -211,9 +206,10 @@ impl Accumulator for DistinctCountAccumulator {
}
}

/*
#[cfg(test)]
mod tests {
type ArrayRef = Arc<dyn Array>;

use std::iter::FromIterator;

use super::*;
Expand All @@ -222,25 +218,52 @@ mod tests {
use arrow2::datatypes::DataType;

macro_rules! state_to_vec {
($LIST:expr, $DATA_TYPE:ident, $PRIM_TY:ty) => {{
($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{
match $LIST {
ScalarValue::List(_, data_type) => assert_eq!(
ListArray::<i32>::get_child_type(data_type),
&DataType::$DATA_TYPE
),
_ => panic!("Expected a ScalarValue::List"),
}

match $LIST {
ScalarValue::List(None, _) => None,
ScalarValue::List(Some(values), _) => {
let vec = values
.as_any()
.downcast_ref::<$ARRAY_TY>()
.unwrap()
.iter()
.map(|x| x.map(|x| *x))
.collect::<Vec<_>>();

Some(vec)
}
_ => unreachable!(),
}
}};
}

macro_rules! state_to_vec_bool {
($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{
match $LIST {
ScalarValue::List(_, data_type) => match data_type {
DataType::$DATA_TYPE => (),
_ => panic!("Unexpected DataType for list"),
},
ScalarValue::List(_, data_type) => assert_eq!(
ListArray::<i32>::get_child_type(data_type),
&DataType::$DATA_TYPE
),
_ => panic!("Expected a ScalarValue::List"),
}

match $LIST {
ScalarValue::List(None, _) => None,
ScalarValue::List(Some(scalar_values), _) => {
let vec = scalar_values
ScalarValue::List(Some(values), _) => {
let vec = values
.as_any()
.downcast_ref::<$ARRAY_TY>()
.unwrap()
.iter()
.map(|scalar_value| match scalar_value {
ScalarValue::$DATA_TYPE(value) => *value,
_ => panic!("Unexpected ScalarValue variant"),
})
.collect::<Vec<Option<$PRIM_TY>>>();
.collect::<Vec<_>>();

Some(vec)
}
Expand Down Expand Up @@ -319,7 +342,7 @@ mod tests {

macro_rules! test_count_distinct_update_batch_numeric {
($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{
let values: Vec<Option<$PRIM_TYPE>> = vec![
let values = &[
Some(1),
Some(1),
None,
Expand All @@ -336,7 +359,7 @@ mod tests {
let (states, result) = run_update_batch(&arrays)?;

let mut state_vec =
state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap();
state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap();
state_vec.sort();

assert_eq!(states.len(), 1);
Expand Down Expand Up @@ -388,7 +411,7 @@ mod tests {
let (states, result) = run_update_batch(&arrays)?;

let mut state_vec =
state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap();
state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap();
state_vec.sort_by(|a, b| match (a, b) {
(Some(lhs), Some(rhs)) => {
OrderedFloat::from(*lhs).cmp(&OrderedFloat::from(*rhs))
Expand Down Expand Up @@ -472,7 +495,8 @@ mod tests {
let get_count = |data: BooleanArray| -> Result<(Vec<Option<bool>>, u64)> {
let arrays = vec![Arc::new(data) as ArrayRef];
let (states, result) = run_update_batch(&arrays)?;
let mut state_vec = state_to_vec!(&states[0], Boolean, bool).unwrap();
let mut state_vec =
state_to_vec_bool!(&states[0], Boolean, BooleanArray).unwrap();
state_vec.sort();
let count = match result {
ScalarValue::UInt64(c) => c.ok_or_else(|| {
Expand Down Expand Up @@ -532,21 +556,20 @@ mod tests {
let (states, result) = run_update_batch(&arrays)?;

assert_eq!(states.len(), 1);
assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![]));
assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![]));
assert_eq!(result, ScalarValue::UInt64(Some(0)));

Ok(())
}

#[test]
fn count_distinct_update_batch_empty() -> Result<()> {
let arrays =
vec![Arc::new(Int32Array::from(vec![] as Vec<Option<i32>>)) as ArrayRef];
let arrays = vec![Arc::new(Int32Array::new_empty(DataType::Int32)) as ArrayRef];

let (states, result) = run_update_batch(&arrays)?;

assert_eq!(states.len(), 1);
assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![]));
assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![]));
assert_eq!(result, ScalarValue::UInt64(Some(0)));

Ok(())
Expand All @@ -560,8 +583,8 @@ mod tests {

let (states, result) = run_update_batch(&arrays)?;

let state_vec1 = state_to_vec!(&states[0], Int8, i8).unwrap();
let state_vec2 = state_to_vec!(&states[1], Int16, i16).unwrap();
let state_vec1 = state_to_vec!(&states[0], Int8, Int8Array).unwrap();
let state_vec2 = state_to_vec!(&states[1], Int16, Int16Array).unwrap();
let state_pairs = collect_states::<i8, i16>(&state_vec1, &state_vec2);

assert_eq!(states.len(), 2);
Expand Down Expand Up @@ -590,8 +613,8 @@ mod tests {
],
)?;

let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap();
let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap();
let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap();
let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap();
let state_pairs = collect_states::<i32, u64>(&state_vec1, &state_vec2);

assert_eq!(states.len(), 2);
Expand Down Expand Up @@ -627,8 +650,8 @@ mod tests {
],
)?;

let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap();
let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap();
let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap();
let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap();
let state_pairs = collect_states::<i32, u64>(&state_vec1, &state_vec2);

assert_eq!(states.len(), 2);
Expand All @@ -644,23 +667,25 @@ mod tests {

#[test]
fn count_distinct_merge_batch() -> Result<()> {
let state_in1 = ListPrimitive::<i32, Primitive<i32>, i32>::from_iter(vec![
Some(vec![Some(-1_i32), Some(-1_i32), Some(-2_i32), Some(-2_i32)]),
Some(vec![Some(-2_i32), Some(-3_i32)]),
])
.to(ListArray::default_datatype(DataType::Int32));
let state_in2 = ListPrimitive::<i32, Primitive<u64>, u64>::from_iter(vec![
Some(vec![Some(5_u64), Some(6_u64), Some(5_u64), Some(7_u64)]),
Some(vec![Some(5_u64), Some(7_u64)]),
])
.to(ListArray::default_datatype(DataType::UInt64));
let state_in1: ListArray<i32> =
ListPrimitive::<i32, Primitive<i32>, i32>::from_iter(vec![
Some(vec![Some(-1_i32), Some(-1_i32), Some(-2_i32), Some(-2_i32)]),
Some(vec![Some(-2_i32), Some(-3_i32)]),
])
.into();

let state_in2: ListArray<i32> =
ListPrimitive::<i32, Primitive<u64>, u64>::from_iter(vec![
Some(vec![Some(5_u64), Some(6_u64), Some(5_u64), Some(7_u64)]),
Some(vec![Some(5_u64), Some(7_u64)]),
])
.into();

let (states, result) =
run_merge_batch(&[Arc::new(state_in1), Arc::new(state_in2)])?;

let state_out_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap();
let state_out_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap();
let state_out_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap();
let state_out_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap();
let state_pairs = collect_states::<i32, u64>(&state_out_vec1, &state_out_vec2);

assert_eq!(
Expand All @@ -679,4 +704,3 @@ mod tests {
Ok(())
}
}
*/
9 changes: 7 additions & 2 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use super::{
type_coercion::{coerce, data_types},
ColumnarValue, PhysicalExpr,
};
use crate::execution::context::ExecutionContextState;
use crate::physical_plan::array_expressions;
use crate::physical_plan::datetime_expressions;
use crate::physical_plan::expressions::{nullif_func, SUPPORTED_NULLIF_TYPES};
Expand All @@ -43,6 +42,10 @@ use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
};
use crate::{
execution::context::ExecutionContextState,
physical_plan::array_expressions::SUPPORTED_ARRAY_TYPES,
};
use arrow2::{
array::{Array, NullArray},
compute::length::length,
Expand Down Expand Up @@ -965,7 +968,9 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature {

// for now, the list is small, as we do not have many built-in functions.
match fun {
BuiltinScalarFunction::Array => Signature::VariadicEqual,
BuiltinScalarFunction::Array => {
Signature::Variadic(SUPPORTED_ARRAY_TYPES.to_vec())
}
BuiltinScalarFunction::Concat | BuiltinScalarFunction::ConcatWithSeparator => {
Signature::Variadic(vec![DataType::Utf8])
}
Expand Down
Loading

0 comments on commit 9f80ac2

Please sign in to comment.