diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 149c6e2c5bdf..45389959f413 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -322,9 +322,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0122885821398cc923ece939e24d1056a2384ee719432397fa9db87230ff11" +checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6" dependencies = [ "bzip2", "flate2", @@ -346,7 +346,7 @@ checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.25", ] [[package]] @@ -1029,17 +1029,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eed5fff0d93c7559121e9c72bf9c242295869396255071ff2cb1617147b608c5" dependencies = [ "quote", - "syn 2.0.23", + "syn 2.0.25", ] [[package]] name = "dashmap" -version = "5.4.0" +version = "5.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d" dependencies = [ "cfg-if", - "hashbrown 0.12.3", + "hashbrown 0.14.0", "lock_api", "once_cell", "parking_lot_core", @@ -1067,6 +1067,7 @@ dependencies = [ "datafusion-row", "datafusion-sql", "flate2", + "flume", "futures", "glob", "hashbrown 0.14.0", @@ -1329,9 +1330,9 @@ dependencies = [ [[package]] name = "equivalent" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88bffebc5d80432c9b140ee17875ff173a8ab62faad5b257da912bd2f6c1c0a1" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" @@ -1380,7 +1381,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5" dependencies = [ "cfg-if", - "rustix 0.38.3", + "rustix 0.38.4", "windows-sys", ] @@ -1419,6 +1420,19 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1490,7 +1504,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.25", ] [[package]] @@ -1546,8 +1560,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1732,7 +1748,7 @@ dependencies = [ "futures-util", "http", "hyper", - "rustls 0.21.3", + "rustls 0.21.5", "tokio", "tokio-rustls 0.24.1", ] @@ -2065,6 +2081,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -2094,9 +2119,9 @@ checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" [[package]] name = "num" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" +checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af" dependencies = [ "num-bigint", "num-complex", @@ -2394,7 +2419,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.25", ] [[package]] @@ -2484,9 +2509,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.63" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb" +checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da" dependencies = [ "unicode-ident", ] @@ -2581,9 +2606,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89089e897c013b3deb627116ae56a6955a72b8bed395c9526af31c9fe528b484" +checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" dependencies = [ "aho-corasick", "memchr", @@ -2593,9 +2618,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.0" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa250384981ea14565685dea16a9ccc4d1c541a13f82b9c168572264d1df8c56" +checksum = "83d3daa6976cffb758ec878f108ba0e062a45b2d6ca3a2cca965338855476caf" dependencies = [ "aho-corasick", "memchr", @@ -2604,9 +2629,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab07dc67230e4a4718e70fd5c20055a4334b121f1f9db8fe63ef39ce9b8c846" +checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" [[package]] name = "reqwest" @@ -2631,7 +2656,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.3", + "rustls 0.21.5", "rustls-pemfile", "serde", "serde_json", @@ -2658,7 +2683,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -2721,9 +2746,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.3" +version = "0.38.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac5ffa1efe7548069688cd7028f32591853cd7b5b756d41bcffd2353e4fc75b4" +checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" dependencies = [ "bitflags 2.3.3", "errno", @@ -2746,9 +2771,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.3" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b19faa85ecb5197342b54f987b142fb3e30d0c90da40f80ef4fa9a726e6676ed" +checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" dependencies = [ "log", "ring", @@ -2893,29 +2918,29 @@ checksum = "63134939175b3131fe4d2c131b103fd42f25ccca89423d43b5e4f267920ccf03" [[package]] name = "serde" -version = "1.0.167" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7daf513456463b42aa1d94cff7e0c24d682b429f020b9afa4f5ba5c40a22b237" +checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.167" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b69b106b68bc8054f0e974e70d19984040f8a5cf9215ca82626ea4853f82c4b9" +checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.25", ] [[package]] name = "serde_json" -version = "1.0.100" +version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f1e14e89be7aa4c4b78bdbdc9eb5bf8517829a600ae8eaa39a6e1d960b5185c" +checksum = "b5062a995d481b2308b6064e9af76011f2921c35f97b0468811ed9f6cd91dfed" dependencies = [ "itoa", "ryu", @@ -2968,9 +2993,9 @@ checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" [[package]] name = "snafu" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb0656e7e3ffb70f6c39b3c2a86332bb74aa3c679da781642590f3c1118c5045" +checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" dependencies = [ "doc-comment", "snafu-derive", @@ -2978,9 +3003,9 @@ dependencies = [ [[package]] name = "snafu-derive" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2" +checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" dependencies = [ "heck", "proc-macro2", @@ -3010,6 +3035,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "sqlparser" version = "0.35.0" @@ -3087,7 +3121,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.23", + "syn 2.0.25", ] [[package]] @@ -3109,9 +3143,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.23" +version = "2.0.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59fb7d6d8281a51045d62b8eb3a7d1ce347b76f312af50cd3dc0af39c87c1737" +checksum = "15e3fc8c0c74267e2df136e5e5fb656a464158aa57624053375eb9c8c6e25ae2" dependencies = [ "proc-macro2", "quote", @@ -3170,7 +3204,7 @@ checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.25", ] [[package]] @@ -3186,9 +3220,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.22" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea9e1b3cf1243ae005d9e74085d4d542f3125458f3a81af210d901dcd7411efd" +checksum = "59e399c068f43a5d116fedaf73b203fa4f9c519f17e2b34f63221d3792f81446" dependencies = [ "serde", "time-core", @@ -3203,9 +3237,9 @@ checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "372950940a5f07bf38dbe211d7283c9e6d7327df53794992d293e534c733d09b" +checksum = "96ba15a897f3c86766b757e5ac7221554c6750054d74d5b28844fce5fb36a6c4" dependencies = [ "time-core", ] @@ -3261,7 +3295,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.25", ] [[package]] @@ -3281,7 +3315,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.3", + "rustls 0.21.5", "tokio", ] @@ -3359,7 +3393,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.25", ] [[package]] @@ -3531,7 +3565,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.25", "wasm-bindgen-shared", ] @@ -3565,7 +3599,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.23", + "syn 2.0.25", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 5bcb8bc594ff..0e797a0ea46d 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -94,6 +94,7 @@ url = "2.2" uuid = { version = "1.0", features = ["v4"] } xz2 = { version = "0.1", optional = true } zstd = { version = "0.12", optional = true, default-features = false } +flume = "0.10" [dev-dependencies] diff --git a/datafusion/core/src/physical_plan/repartition/flume_channels.rs b/datafusion/core/src/physical_plan/repartition/flume_channels.rs new file mode 100644 index 000000000000..c033f09b6976 --- /dev/null +++ b/datafusion/core/src/physical_plan/repartition/flume_channels.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Channel based on flume + +use flume::r#async::RecvStream; +use flume::{unbounded, Sender}; +use futures::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug)] +pub(super) struct DistributionSender(Sender); + +impl Clone for DistributionSender { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl DistributionSender { + pub fn send(&self, item: T) -> flume::r#async::SendFut<'_, T> { + self.0.send_async(item) + } +} + +pub(super) struct DistributionReceiver(RecvStream<'static, T>); + +impl std::fmt::Debug for DistributionReceiver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DistributionReceiver") + } +} + +impl DistributionReceiver { + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_next(cx) + } +} + +/// Create `n` empty channels. +pub(super) fn channels( + n: usize, +) -> (Vec>, Vec>) { + (0..n) + .map(|_| { + let (tx, rx) = unbounded(); + ( + DistributionSender(tx), + DistributionReceiver(rx.into_stream()), + ) + }) + .unzip() +} + +pub(super) type PartitionAwareSenders = Vec>>; +pub(super) type PartitionAwareReceivers = Vec>>; + +pub(super) fn partition_aware_channels( + n_in: usize, + n_out: usize, +) -> (PartitionAwareSenders, PartitionAwareReceivers) { + (0..n_in).map(|_| channels(n_out)).unzip() +} diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index cb4d5c89889b..f8cf4ae9814b 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -25,9 +25,9 @@ use std::task::{Context, Poll}; use std::{any::Any, vec}; use crate::physical_plan::hash_utils::create_hashes; -use crate::physical_plan::repartition::distributor_channels::{ - channels, partition_aware_channels, -}; +// use crate::physical_plan::repartition::distributor_channels::{ +// channels, partition_aware_channels, +// }; use crate::physical_plan::{ DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning, Statistics, }; @@ -38,7 +38,7 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryConsumer; use log::trace; -use self::distributor_channels::{DistributionReceiver, DistributionSender}; +// use self::distributor_channels::{DistributionReceiver, DistributionSender}; use super::common::{AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation}; use super::expressions::PhysicalSortExpr; @@ -51,12 +51,16 @@ use crate::physical_plan::sorts::streaming_merge; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::stream::Stream; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use hashbrown::HashMap; use parking_lot::Mutex; use tokio::task::JoinHandle; -mod distributor_channels; +// mod distributor_channels; +mod flume_channels; +use self::flume_channels::{ + channels, partition_aware_channels, DistributionReceiver, DistributionSender, +}; type MaybeBatch = Option>; type InputPartitionsToCurrentPartitionSender = Vec>; @@ -720,7 +724,7 @@ impl Stream for RepartitionStream { cx: &mut Context<'_>, ) -> Poll> { loop { - match self.input.recv().poll_unpin(cx) { + match self.input.poll_next(cx) { Poll::Ready(Some(Some(v))) => { if let Ok(batch) = &v { self.reservation @@ -783,7 +787,7 @@ impl Stream for PerPartitionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match self.receiver.recv().poll_unpin(cx) { + match self.receiver.poll_next(cx) { Poll::Ready(Some(Some(v))) => { if let Ok(batch) = &v { self.reservation