Skip to content

Commit

Permalink
Merge branch 'main' into distinct-on-impl
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 2, 2023
2 parents 9b41907 + 5634cce commit 8251e56
Show file tree
Hide file tree
Showing 29 changed files with 366 additions and 170 deletions.
32 changes: 32 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,41 @@ arrow-array = { version = "48.0.0", default-features = false, features = ["chron
arrow-buffer = { version = "48.0.0", default-features = false }
arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "48.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.1"
bytes = "1.4"
ctor = "0.2.0"
datafusion = { path = "datafusion/core" }
datafusion-common = { path = "datafusion/common" }
datafusion-expr = { path = "datafusion/expr" }
datafusion-sql = { path = "datafusion/sql" }
datafusion-optimizer = { path = "datafusion/optimizer" }
datafusion-physical-expr = { path = "datafusion/physical-expr" }
datafusion-physical-plan = { path = "datafusion/physical-plan" }
datafusion-execution = { path = "datafusion/execution" }
datafusion-proto = { path = "datafusion/proto" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest" }
datafusion-substrait = { path = "datafusion/substrait" }
dashmap = "5.4.0"
doc-comment = "0.3"
env_logger = "0.10"
futures = "0.3"
half = "2.2.1"
indexmap = "2.0.0"
itertools = "0.11"
log = "^0.4"
num_cpus = "1.13.0"
object_store = "0.7.0"
parking_lot = "0.12"
parquet = { version = "48.0.0", features = ["arrow", "async", "object_store"] }
rand = "0.8"
rstest = "0.18.0"
serde_json = "1"
sqlparser = { version = "0.39.0", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
chrono = { version = "0.4.31", default-features = false }
url = "2.2"

[profile.release]
codegen-units = 1
Expand Down
10 changes: 5 additions & 5 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ snmalloc = ["snmalloc-rs"]
arrow = { workspace = true }
datafusion = { path = "../datafusion/core", version = "32.0.0" }
datafusion-common = { path = "../datafusion/common", version = "32.0.0" }
env_logger = "0.10"
futures = "0.3"
log = "^0.4"
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
num_cpus = { workspace = true }
parquet = { workspace = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
serde_json = { workspace = true }
snmalloc-rs = { version = "0.3", optional = true }
structopt = { version = "0.3", default-features = false }
test-utils = { path = "../test-utils/", version = "0.1.0" }
Expand Down
20 changes: 10 additions & 10 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,26 @@ rust-version = { workspace = true }
arrow = { workspace = true }
arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
async-trait = "0.1.41"
bytes = "1.4"
dashmap = "5.4"
async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
datafusion = { path = "../datafusion/core", features = ["avro"] }
datafusion-common = { path = "../datafusion/common" }
datafusion-expr = { path = "../datafusion/expr" }
datafusion-optimizer = { path = "../datafusion/optimizer" }
datafusion-sql = { path = "../datafusion/sql" }
env_logger = "0.10"
futures = "0.3"
log = "0.4"
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
mimalloc = { version = "0.1", default-features = false }
num_cpus = "1.13.0"
num_cpus = { workspace = true }
object_store = { version = "0.7.0", features = ["aws", "http"] }
prost = { version = "0.12", default-features = false }
prost-derive = { version = "0.11", default-features = false }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.82"
tempfile = "3"
serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
tonic = "0.10"
url = "2.2"
url = { workspace = true }
uuid = "1.2"
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
half = { version = "2.1", default-features = false }
num_cpus = "1.13.0"
num_cpus = { workspace = true }
object_store = { version = "0.7.0", default-features = false, optional = true }
parquet = { workspace = true, optional = true }
pyo3 = { version = "0.20.0", optional = true }
Expand Down
20 changes: 10 additions & 10 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cast::{
};
use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
use crate::hash_utils::create_hashes;
use crate::utils::wrap_into_list_array;
use crate::utils::array_into_list_array;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::compute::kernels::numeric::*;
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
Expand Down Expand Up @@ -1667,7 +1667,7 @@ impl ScalarValue {
} else {
Self::iter_to_array(values.iter().cloned()).unwrap()
};
Arc::new(wrap_into_list_array(values))
Arc::new(array_into_list_array(values))
}

/// Converts a scalar value into an array of `size` rows.
Expand Down Expand Up @@ -2058,7 +2058,7 @@ impl ScalarValue {
let list_array = as_list_array(array);
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let arr = Arc::new(wrap_into_list_array(nested_array));
let arr = Arc::new(array_into_list_array(nested_array));

ScalarValue::List(arr)
}
Expand All @@ -2067,7 +2067,7 @@ impl ScalarValue {
let list_array = as_fixed_size_list_array(array)?;
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let arr = Arc::new(wrap_into_list_array(nested_array));
let arr = Arc::new(array_into_list_array(nested_array));

ScalarValue::List(arr)
}
Expand Down Expand Up @@ -3052,7 +3052,7 @@ mod tests {

let array = ScalarValue::new_list(scalars.as_slice(), &DataType::Utf8);

let expected = wrap_into_list_array(Arc::new(StringArray::from(vec![
let expected = array_into_list_array(Arc::new(StringArray::from(vec![
"rust",
"arrow",
"data-fusion",
Expand Down Expand Up @@ -3091,9 +3091,9 @@ mod tests {
#[test]
fn iter_to_array_string_test() {
let arr1 =
wrap_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"])));
array_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"])));
let arr2 =
wrap_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"])));
array_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"])));

let scalars = vec![
ScalarValue::List(Arc::new(arr1)),
Expand Down Expand Up @@ -4335,13 +4335,13 @@ mod tests {
// Define list-of-structs scalars

let nl0_array = ScalarValue::iter_to_array(vec![s0.clone(), s1.clone()]).unwrap();
let nl0 = ScalarValue::List(Arc::new(wrap_into_list_array(nl0_array)));
let nl0 = ScalarValue::List(Arc::new(array_into_list_array(nl0_array)));

let nl1_array = ScalarValue::iter_to_array(vec![s2.clone()]).unwrap();
let nl1 = ScalarValue::List(Arc::new(wrap_into_list_array(nl1_array)));
let nl1 = ScalarValue::List(Arc::new(array_into_list_array(nl1_array)));

let nl2_array = ScalarValue::iter_to_array(vec![s1.clone()]).unwrap();
let nl2 = ScalarValue::List(Arc::new(wrap_into_list_array(nl2_array)));
let nl2 = ScalarValue::List(Arc::new(array_into_list_array(nl2_array)));

// iter_to_array for list-of-struct
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
Expand Down
46 changes: 44 additions & 2 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

//! This module provides the bisect function, which implements binary search.

use crate::error::_internal_err;
use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::{ArrayRef, PrimitiveArray};
use arrow::buffer::OffsetBuffer;
use arrow::compute;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use arrow_array::ListArray;
use arrow_array::{Array, ListArray};
use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
Expand Down Expand Up @@ -338,7 +339,7 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(

/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
pub fn wrap_into_list_array(arr: ArrayRef) -> ListArray {
pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
Expand All @@ -348,6 +349,47 @@ pub fn wrap_into_list_array(arr: ArrayRef) -> ListArray {
)
}

/// Wrap arrays into a single element `ListArray`.
///
/// Example:
/// ```
/// use arrow::array::{Int32Array, ListArray, ArrayRef};
/// use arrow::datatypes::{Int32Type, Field};
/// use std::sync::Arc;
///
/// let arr1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
/// let arr2 = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef;
///
/// let list_arr = datafusion_common::utils::arrays_into_list_array([arr1, arr2]).unwrap();
///
/// let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(
/// vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
/// Some(vec![Some(4), Some(5), Some(6)]),
/// ]
/// );
///
/// assert_eq!(list_arr, expected);
pub fn arrays_into_list_array(
arr: impl IntoIterator<Item = ArrayRef>,
) -> Result<ListArray> {
let arr = arr.into_iter().collect::<Vec<_>>();
if arr.is_empty() {
return _internal_err!("Cannot wrap empty array into list array");
}

let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
// Assume data type is consistent
let data_type = arr[0].data_type().to_owned();
let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
Ok(ListArray::new(
Arc::new(Field::new("item", data_type, true)),
OffsetBuffer::from_lengths(lens),
arrow::compute::concat(values.as_slice())?,
None,
))
}

/// An extension trait for smart pointers. Provides an interface to get a
/// raw pointer to the data (with metadata stripped away).
///
Expand Down
52 changes: 26 additions & 26 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,61 +57,61 @@ arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-compression = { version = "0.4.0", features = ["bzip2", "gzip", "xz", "zstd", "futures-io", "tokio"], optional = true }
async-trait = "0.1.73"
bytes = "1.4"
async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { version = "0.4.3", optional = true }
chrono = { workspace = true }
dashmap = "5.4.0"
dashmap = { workspace = true }
datafusion-common = { path = "../common", version = "32.0.0", features = ["object_store"], default-features = false }
datafusion-execution = { path = "../execution", version = "32.0.0" }
datafusion-expr = { path = "../expr", version = "32.0.0" }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-optimizer = { path = "../optimizer", version = "32.0.0", default-features = false }
datafusion-physical-expr = { path = "../physical-expr", version = "32.0.0", default-features = false }
datafusion-physical-plan = { path = "../physical-plan", version = "32.0.0", default-features = false }
datafusion-sql = { path = "../sql", version = "32.0.0" }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
flate2 = { version = "1.0.24", optional = true }
futures = "0.3"
futures = { workspace = true }
glob = "0.3.0"
half = { version = "2.1", default-features = false }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.11"
log = "^0.4"
indexmap = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
object_store = "0.7.0"
parking_lot = "0.12"
num_cpus = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true, optional = true }
pin-project-lite = "^0.2.7"
rand = "0.8"
rand = { workspace = true }
sqlparser = { workspace = true }
tempfile = "3"
tempfile = { workspace = true }
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio-util = { version = "0.7.4", features = ["io"] }
url = "2.2"
url = { workspace = true }
uuid = { version = "1.0", features = ["v4"] }
xz2 = { version = "0.1", optional = true }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
async-trait = "0.1.53"
bigdecimal = "0.4.1"
async-trait = { workspace = true }
bigdecimal = { workspace = true }
criterion = { version = "0.5", features = ["async_tokio"] }
csv = "1.1.6"
ctor = "0.2.0"
doc-comment = "0.3"
env_logger = "0.10"
half = "2.2.1"
ctor = { workspace = true }
doc-comment = { workspace = true }
env_logger = { workspace = true }
half = { workspace = true }
postgres-protocol = "0.6.4"
postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] }
rand = { version = "0.8", features = ["small_rng"] }
rand_distr = "0.4.3"
regex = "1.5.4"
rstest = "0.18.0"
rstest = { workspace = true }
rust_decimal = { version = "1.27.0", features = ["tokio-pg"] }
serde_json = "1"
serde_json = { workspace = true }
test-utils = { path = "../../test-utils" }
thiserror = "1.0.37"
thiserror = { workspace = true }
tokio-postgres = "0.7.7"
[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
nix = { version = "0.27.1", features = ["fs"] }
Expand Down
20 changes: 10 additions & 10 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ path = "src/lib.rs"
[dependencies]
arrow = { workspace = true }
chrono = { version = "0.4", default-features = false }
dashmap = "5.4.0"
datafusion-common = { path = "../common", version = "32.0.0" }
datafusion-expr = { path = "../expr", version = "32.0.0" }
futures = "0.3"
dashmap = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
futures = { workspace = true }
hashbrown = { version = "0.14", features = ["raw"] }
log = "^0.4"
object_store = "0.7.0"
parking_lot = "0.12"
rand = "0.8"
tempfile = "3"
url = "2.2"
log = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
url = { workspace = true }
6 changes: 3 additions & 3 deletions datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ path = "src/lib.rs"
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { workspace = true }
arrow-array = { workspace = true }
datafusion-common = { path = "../common", version = "32.0.0", default-features = false }
datafusion-common = { workspace = true }
sqlparser = { workspace = true }
strum = { version = "0.25.0", features = ["derive"] }
strum_macros = "0.25.0"

[dev-dependencies]
ctor = "0.2.0"
env_logger = "0.10"
ctor = { workspace = true }
env_logger = { workspace = true }
Loading

0 comments on commit 8251e56

Please sign in to comment.