From 7823d67eac7f00844520681f3d0c41f7fe2690f0 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Sat, 3 Aug 2024 20:42:47 +0100 Subject: [PATCH 1/6] move to datafusion 40 --- Cargo.toml | 19 +++----- .../src/executor/table_provider/edge.rs | 43 +++++++++++-------- .../src/executor/table_provider/mod.rs | 23 ++++------ .../src/executor/table_provider/node.rs | 33 +++++++------- raphtory-cypher/src/hop/execution.rs | 43 +++++++++++-------- 5 files changed, 78 insertions(+), 83 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5cf60e1008..a5a37184bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,19 +128,12 @@ pest = "2.7.8" pest_derive = "2.7.8" sqlparser = "0.43.1" -datafusion = { version = "36" } +datafusion = { version = "40" } futures = "0.3" -arrow = { version = "50" } -arrow-buffer = { version = "50" } -arrow-schema = { version = "50" } -arrow-array = { version = "50" } +arrow = { version = "52" } +arrow-buffer = { version = "52" } +arrow-schema = { version = "52" } +arrow-data = { version = "52" } +arrow-array = { version = "52" } moka = { version = "0.12.7", features = ["sync"] } - -# Make sure that transitive dependencies stick to disk_graph 50 -[patch.crates-io] -arrow = { git = "https://github.com/apache/arrow-rs.git", tag = "50.0.0" } -arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", tag = "50.0.0" } -arrow-schema = { git = "https://github.com/apache/arrow-rs.git", tag = "50.0.0" } -arrow-data = { git = "https://github.com/apache/arrow-rs.git", tag = "50.0.0" } -arrow-array = { git = "https://github.com/apache/arrow-rs.git", tag = "50.0.0" } diff --git a/raphtory-cypher/src/executor/table_provider/edge.rs b/raphtory-cypher/src/executor/table_provider/edge.rs index 767aeeaafc..a08b21c5a8 100644 --- a/raphtory-cypher/src/executor/table_provider/edge.rs +++ b/raphtory-cypher/src/executor/table_provider/edge.rs @@ -22,7 +22,7 @@ use datafusion::{ physical_expr::PhysicalSortExpr, physical_plan::{ metrics::MetricsSet, stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, - ExecutionPlan, + ExecutionPlan, PlanProperties, }, physical_planner::create_physical_sort_expr, }; @@ -32,6 +32,8 @@ use raphtory::disk_graph::DiskGraphStorage; use crate::executor::{arrow2_to_arrow_buf, ExecError}; +use super::plan_properties; + // use super::plan_properties; pub struct EdgeListTableProvider { @@ -146,11 +148,10 @@ impl TableProvider for EdgeListTableProvider { _limit: Option, ) -> Result, DataFusionError> { let schema = projection - .as_ref() - .map(|proj| Arc::new(self.schema().project(proj).expect("failed projection"))) - .unwrap_or_else(|| self.schema().clone()); + .map(|proj| self.schema().project(proj).map(Arc::new)) + .unwrap_or_else(|| Ok( self.schema().clone() ))?; - // let plan_properties = plan_properties(schema.clone(), self.num_partitions); + let plan_properties = plan_properties(schema.clone(), self.num_partitions); Ok(Arc::new(EdgeListExecPlan { layer_id: self.layer_id, layer_name: self.layer_name.clone(), @@ -159,7 +160,7 @@ impl TableProvider for EdgeListTableProvider { num_partitions: self.num_partitions, row_count: self.row_count, sorted_by: self.sorted_by.clone(), - // props: plan_properties, + props: plan_properties, projection: projection.map(|proj| Arc::from(proj.as_slice())), })) } @@ -173,7 +174,7 @@ struct EdgeListExecPlan { num_partitions: usize, row_count: usize, sorted_by: Vec, - // props: PlanProperties, + props: PlanProperties, projection: Option>, } @@ -349,21 +350,25 @@ impl DisplayAs for EdgeListExecPlan { #[async_trait] impl ExecutionPlan for EdgeListExecPlan { + fn name(&self) -> &str { + "EdgeListExecPlan" + } + fn as_any(&self) -> &dyn Any { self } - // fn properties(&self) -> &PlanProperties { - // &self.props - // } - - fn output_partitioning(&self) -> datafusion::physical_expr::Partitioning { - datafusion::physical_expr::Partitioning::UnknownPartitioning(self.num_partitions) - } - fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> { - Some(&self.sorted_by) + fn properties(&self) -> &PlanProperties { + &self.props } + // fn output_partitioning(&self) -> datafusion::physical_expr::Partitioning { + // datafusion::physical_expr::Partitioning::UnknownPartitioning(self.num_partitions) + // } + // fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> { + // Some(&self.sorted_by) + // } + fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -372,7 +377,7 @@ impl ExecutionPlan for EdgeListExecPlan { vec![true; self.children().len()] } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } @@ -388,7 +393,7 @@ impl ExecutionPlan for EdgeListExecPlan { target_partitions: usize, _config: &ConfigOptions, ) -> Result>, DataFusionError> { - // let plan_properties = plan_properties(self.schema.clone(), target_partitions); + let plan_properties = plan_properties(self.schema.clone(), target_partitions); Ok(Some(Arc::new(EdgeListExecPlan { layer_id: self.layer_id, layer_name: self.layer_name.clone(), @@ -397,7 +402,7 @@ impl ExecutionPlan for EdgeListExecPlan { num_partitions: target_partitions, row_count: self.row_count, sorted_by: self.sorted_by.clone(), - // props: plan_properties, + props: plan_properties, projection: self.projection.clone(), }))) } diff --git a/raphtory-cypher/src/executor/table_provider/mod.rs b/raphtory-cypher/src/executor/table_provider/mod.rs index 8fdd46c878..2b6c01f7ac 100644 --- a/raphtory-cypher/src/executor/table_provider/mod.rs +++ b/raphtory-cypher/src/executor/table_provider/mod.rs @@ -1,9 +1,5 @@ -// use arrow_schema::SchemaRef; -// use datafusion::{ -// physical_expr::EquivalenceProperties, -// // physical_plan::{ExecutionMode, Partitioning, PlanProperties}, -// physical_plan::Partitioning, -// }; +use arrow_schema::SchemaRef; +use datafusion::{physical_expr::EquivalenceProperties, physical_plan::PlanProperties}; pub mod edge; pub mod node; @@ -11,13 +7,10 @@ pub mod node; // called `Result::unwrap()` on an `Err` value: Context("EnforceDistribution", Internal("PhysicalOptimizer rule 'EnforceDistribution' failed, due to generate a different schema, // schema: Schema { fields: [Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, // schema: Schema { fields: [Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }")) +pub fn plan_properties(schema: SchemaRef, num_partitions: usize) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + let partitioning = datafusion::physical_plan::Partitioning::UnknownPartitioning(num_partitions); + let execution_mode = datafusion::physical_plan::ExecutionMode::Bounded; + PlanProperties::new(eq_properties, partitioning, execution_mode) +} -// pub fn plan_properties(schema: SchemaRef, target_partitions: usize) -> PlanProperties { -// let eq_properties = EquivalenceProperties::new(schema.clone()); -// let plan_properties = PlanProperties::new( -// eq_properties, -// Partitioning::UnknownPartitioning(target_partitions), -// ExecutionMode::Bounded, -// ); -// plan_properties -// } diff --git a/raphtory-cypher/src/executor/table_provider/node.rs b/raphtory-cypher/src/executor/table_provider/node.rs index 08d1116714..0328d4ba16 100644 --- a/raphtory-cypher/src/executor/table_provider/node.rs +++ b/raphtory-cypher/src/executor/table_provider/node.rs @@ -17,7 +17,7 @@ use datafusion::{ logical_expr::Expr, physical_plan::{ metrics::MetricsSet, stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, - ExecutionPlan, + ExecutionPlan, PlanProperties, }, }; use futures::Stream; @@ -28,6 +28,8 @@ use raphtory::{ }; use std::{any::Any, fmt::Formatter, sync::Arc}; +use super::plan_properties; + pub struct NodeTableProvider { graph: DiskGraphStorage, schema: SchemaRef, @@ -116,18 +118,17 @@ impl TableProvider for NodeTableProvider { _limit: Option, ) -> Result, DataFusionError> { let schema = projection - .as_ref() - .map(|proj| Arc::new(self.schema().project(proj).expect("failed projection"))) - .unwrap_or_else(|| self.schema().clone()); + .map(|proj| self.schema().project(proj).map(Arc::new)) + .unwrap_or_else(|| Ok(self.schema().clone()))?; - // let plan_properties = plan_properties(self.schema.clone(), self.num_partitions); + let plan_properties = plan_properties(self.schema.clone(), self.num_partitions); Ok(Arc::new(NodeScanExecPlan { graph: self.graph.clone(), schema, num_partitions: self.num_partitions, chunk_size: self.chunk_size, - // props: plan_properties, + props: plan_properties, projection: projection.map(|proj| Arc::from(proj.as_slice())), })) } @@ -190,7 +191,7 @@ struct NodeScanExecPlan { schema: SchemaRef, num_partitions: usize, chunk_size: usize, - // props: PlanProperties, + props: PlanProperties, projection: Option>, } @@ -231,19 +232,17 @@ impl DisplayAs for NodeScanExecPlan { #[async_trait] impl ExecutionPlan for NodeScanExecPlan { + + fn name(&self) -> &str { + "NodeScanExecPlan" + } + fn as_any(&self) -> &dyn Any { self } - // fn properties(&self) -> &PlanProperties { - // &self.props - // } - - fn output_partitioning(&self) -> datafusion::physical_expr::Partitioning { - datafusion::physical_expr::Partitioning::UnknownPartitioning(self.num_partitions) - } - fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> { - None + fn properties(&self) -> &PlanProperties { + &self.props } fn schema(&self) -> SchemaRef { @@ -254,7 +253,7 @@ impl ExecutionPlan for NodeScanExecPlan { vec![true; self.children().len()] } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/raphtory-cypher/src/hop/execution.rs b/raphtory-cypher/src/hop/execution.rs index da5140a760..7349d99d47 100644 --- a/raphtory-cypher/src/hop/execution.rs +++ b/raphtory-cypher/src/hop/execution.rs @@ -8,7 +8,10 @@ use std::{ task::{Context, Poll}, }; -use crate::arrow2::{offset::Offset, types::NativeType}; +use crate::{ + arrow2::{offset::Offset, types::NativeType}, + executor::table_provider::plan_properties, +}; // use disk_graph::compute::take_record_batch; use arrow_array::{ builder::{ @@ -26,11 +29,7 @@ use datafusion::{ error::DataFusionError, execution::{RecordBatchStream, TaskContext}, physical_plan::{ - DisplayAs, - DisplayFormatType, - Distribution, - ExecutionPlan, //ExecutionPlanProperties, - // PlanProperties, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, SendableRecordBatchStream, }, }; @@ -60,7 +59,7 @@ pub struct HopExec { right_schema: DFSchemaRef, output_schema: SchemaRef, - + props: PlanProperties, right_proj: Option>, } @@ -85,7 +84,11 @@ impl HopExec { let input_col = find_last_input_col(hop, &input); - let out_schema: Schema = hop.out_schema.as_ref().into(); + let out_schema: Arc = Arc::new(hop.out_schema.as_ref().into()); + let props = plan_properties( + out_schema.clone(), + input.properties().output_partitioning().partition_count(), + ); Self { graph, @@ -95,8 +98,8 @@ impl HopExec { right_schema: hop.right_schema.clone(), layers: hop.right_layers.clone(), - output_schema: Arc::new(out_schema), - + output_schema: out_schema, + props, right_proj: hop.right_proj.clone(), } } @@ -110,21 +113,22 @@ impl DisplayAs for HopExec { #[async_trait] impl ExecutionPlan for HopExec { + fn name(&self) -> &str { + "HopExec" + } + /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self } + fn properties(&self) -> &PlanProperties { + &self.props + } + fn schema(&self) -> SchemaRef { self.output_schema.clone() } - fn output_partitioning(&self) -> Partitioning { - self.input.output_partitioning() - } - fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> { - self.input.output_ordering() - } - fn required_input_distribution(&self) -> Vec { vec![Distribution::UnspecifiedDistribution] } @@ -133,8 +137,8 @@ impl ExecutionPlan for HopExec { vec![true] } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( @@ -149,6 +153,7 @@ impl ExecutionPlan for HopExec { layers: self.layers.clone(), right_schema: self.right_schema.clone(), output_schema: self.output_schema.clone(), + props: self.props.clone(), right_proj: self.right_proj.clone(), })) } From 469c5e9fe23f85ef0417551857bf63065c6b8876 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Thu, 15 Aug 2024 13:52:55 +0100 Subject: [PATCH 2/6] outstanding issue found with schema in nodes --- Cargo.toml | 2 +- .../src/executor/table_provider/edge.rs | 29 ++++---- .../src/executor/table_provider/node.rs | 21 +++--- raphtory-cypher/src/hop/operator.rs | 15 ++-- raphtory-cypher/src/hop/rule.rs | 24 +++---- raphtory-cypher/src/lib.rs | 21 +++++- raphtory-cypher/src/transpiler/mod.rs | 68 +++++++++++-------- 7 files changed, 109 insertions(+), 71 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a5a37184bc..fb893818bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,7 +126,7 @@ prost-build = "0.13.1" lazy_static = "1.4.0" pest = "2.7.8" pest_derive = "2.7.8" -sqlparser = "0.43.1" +sqlparser = "0.47" datafusion = { version = "40" } futures = "0.3" diff --git a/raphtory-cypher/src/executor/table_provider/edge.rs b/raphtory-cypher/src/executor/table_provider/edge.rs index a08b21c5a8..45d7b57998 100644 --- a/raphtory-cypher/src/executor/table_provider/edge.rs +++ b/raphtory-cypher/src/executor/table_provider/edge.rs @@ -55,7 +55,7 @@ impl EdgeListTableProvider { let schema = lift_nested_arrow_schema(&g, layer_id)?; - let num_partitions = std::thread::available_parallelism()?.get(); + let num_partitions = 2;//std::thread::available_parallelism()?.get(); let row_count = graph .as_ref() @@ -304,7 +304,7 @@ fn produce_record_batch( RecordBatch::try_new(schema.clone(), columns) .map_err(|arrow_err| DataFusionError::ArrowError(arrow_err, None)) } - }); + }).inspect(|rb| println!("{rb:?}") ); Box::new(iter) } @@ -393,18 +393,19 @@ impl ExecutionPlan for EdgeListExecPlan { target_partitions: usize, _config: &ConfigOptions, ) -> Result>, DataFusionError> { - let plan_properties = plan_properties(self.schema.clone(), target_partitions); - Ok(Some(Arc::new(EdgeListExecPlan { - layer_id: self.layer_id, - layer_name: self.layer_name.clone(), - graph: self.graph.clone(), - schema: self.schema.clone(), - num_partitions: target_partitions, - row_count: self.row_count, - sorted_by: self.sorted_by.clone(), - props: plan_properties, - projection: self.projection.clone(), - }))) + // let plan_properties = plan_properties(self.schema.clone(), target_partitions); + // Ok(Some(Arc::new(EdgeListExecPlan { + // layer_id: self.layer_id, + // layer_name: self.layer_name.clone(), + // graph: self.graph.clone(), + // schema: self.schema.clone(), + // num_partitions: target_partitions, + // row_count: self.row_count, + // sorted_by: self.sorted_by.clone(), + // props: plan_properties, + // projection: self.projection.clone(), + // }))) + Ok(None) } fn execute( diff --git a/raphtory-cypher/src/executor/table_provider/node.rs b/raphtory-cypher/src/executor/table_provider/node.rs index 0328d4ba16..2c081c7d0d 100644 --- a/raphtory-cypher/src/executor/table_provider/node.rs +++ b/raphtory-cypher/src/executor/table_provider/node.rs @@ -3,9 +3,9 @@ use crate::{ executor::ExecError, }; use arrow::datatypes::UInt64Type; -use arrow_array::{make_array, Array, PrimitiveArray}; +use arrow_array::{make_array, Array, LargeStringArray, PrimitiveArray}; use arrow_buffer::ScalarBuffer; -use arrow_schema::{DataType, Schema}; +use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; use datafusion::{ arrow::{array::RecordBatch, datatypes::SchemaRef}, @@ -55,10 +55,15 @@ impl NodeTableProvider { }); let name_dt = graph.global_ordering().data_type(); - let schema = lift_arrow_schema( - name_dt.clone(), - graph.node_properties().const_props.as_ref(), - )?; + // let schema = lift_arrow_schema( + // name_dt.clone(), + // graph.node_properties().const_props.as_ref(), + // )?; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt64, false), + Field::new("gid", DataType::UInt64, false), + ])); Ok(Self { graph: g, @@ -160,7 +165,8 @@ async fn produce_record_batch( None, )); - let arr_gid = graph.global_ordering().sliced(start, end - start); + let length = ( end - start ).min(graph.global_ordering().len()); + let arr_gid = graph.global_ordering().sliced(start, length); let gid_data = to_data(arr_gid.as_ref()); let gid = make_array(gid_data); @@ -232,7 +238,6 @@ impl DisplayAs for NodeScanExecPlan { #[async_trait] impl ExecutionPlan for NodeScanExecPlan { - fn name(&self) -> &str { "NodeScanExecPlan" } diff --git a/raphtory-cypher/src/hop/operator.rs b/raphtory-cypher/src/hop/operator.rs index 344abe85a9..1cd4611159 100644 --- a/raphtory-cypher/src/hop/operator.rs +++ b/raphtory-cypher/src/hop/operator.rs @@ -85,10 +85,9 @@ impl HopPlan { } } } - impl UserDefinedLogicalNodeCore for HopPlan { fn name(&self) -> &str { - "Hop" + "HopPlan" } fn inputs(&self) -> Vec<&LogicalPlan> { @@ -122,12 +121,16 @@ impl UserDefinedLogicalNodeCore for HopPlan { ) } - fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { + fn with_exprs_and_inputs( + &self, + exprs: Vec, + inputs: Vec, + ) -> datafusion::error::Result { assert_eq!(inputs.len(), 1); assert_eq!(exprs.len(), 0); // (eg JOIN on edge1.src = edge2.dst for -[]->()-[]->) // let expr = exprs.first().unwrap(); // let (left, right) = extract_eq_exprs(expr).unwrap(); - HopPlan { + Ok(HopPlan { graph: self.graph.clone(), dir: self.dir, left_col: self.left_col.clone(), @@ -137,6 +140,6 @@ impl UserDefinedLogicalNodeCore for HopPlan { right_layers: self.right_layers.clone(), expressions: self.expressions.clone(), right_proj: self.right_proj.clone(), - } + }) } -} +} \ No newline at end of file diff --git a/raphtory-cypher/src/hop/rule.rs b/raphtory-cypher/src/hop/rule.rs index 8f9ed148c1..deb6867bd2 100644 --- a/raphtory-cypher/src/hop/rule.rs +++ b/raphtory-cypher/src/hop/rule.rs @@ -4,11 +4,11 @@ use super::execution::HopExec; use crate::hop::operator::HopPlan; use async_trait::async_trait; use datafusion::{ - common::Column, + common::{tree_node::Transformed, Column}, error::DataFusionError, execution::context::{QueryPlanner, SessionState}, logical_expr::{Expr, Extension, Join, LogicalPlan, UserDefinedLogicalNode}, - optimizer::{optimize_children, optimizer::ApplyOrder, OptimizerConfig, OptimizerRule}, + optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule}, physical_plan::ExecutionPlan, physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}, }; @@ -29,12 +29,12 @@ impl OptimizerRule for HopRule { Some(ApplyOrder::BottomUp) } - fn try_optimize( + fn rewrite( &self, - plan: &LogicalPlan, - config: &dyn OptimizerConfig, - ) -> Result, DataFusionError> { - if let LogicalPlan::Join(join) = plan { + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + if let LogicalPlan::Join(join) = &plan { let Join { right, on, @@ -44,7 +44,7 @@ impl OptimizerRule for HopRule { } = join; if on.len() != 1 { - return Ok(None); //optimize_children(self, plan, config); + return Ok(Transformed::no(plan)); } let (hop_from_col, _hop_to_col, direction) = if let ( @@ -61,11 +61,11 @@ impl OptimizerRule for HopRule { ("dst", "dst") => Direction::IN, ("src", "src") => Direction::OUT, ("src", "dst") => Direction::IN, - _ => return Ok(None), + _ => return Ok(Transformed::no(plan)), }; (hop_from_col, hop_to_col, direction) } else { - return Ok(None); + return Ok(Transformed::no(plan)); }; // simplest form Any -> TableScan @@ -82,11 +82,11 @@ impl OptimizerRule for HopRule { on.clone(), )), }); - return Ok(Some(plan)); + return Ok(Transformed::yes(plan)); } } } - optimize_children(self, plan, config) + Ok(Transformed::no(plan)) } fn name(&self) -> &str { diff --git a/raphtory-cypher/src/lib.rs b/raphtory-cypher/src/lib.rs index 23a1f5fa52..1f1763d644 100644 --- a/raphtory-cypher/src/lib.rs +++ b/raphtory-cypher/src/lib.rs @@ -48,6 +48,7 @@ mod cypher { enable_hop_optim: bool, ) -> Result { let (ctx, plan) = prepare_plan(query, g, enable_hop_optim).await?; + println!("{}", plan.display_indent().to_string()); let df = ctx.execute_logical_plan(plan).await?; Ok(df) } @@ -116,7 +117,7 @@ mod cypher { ctx.refresh_catalogs().await?; let query = transpiler::to_sql(query, g); - // println!("SQL: {:?}", query.to_string()); + println!("SQL: {:?}", query.to_string()); // println!("SQL AST: {:?}", query); let plan = ctx .state() @@ -176,6 +177,7 @@ mod cypher { #[cfg(test)] mod test { use arrow::compute::concat_batches; + use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use std::path::Path; // FIXME: actually assert the tests below @@ -186,7 +188,7 @@ mod cypher { use raphtory::{disk_graph::DiskGraphStorage, prelude::*}; - use crate::run_cypher; + use crate::{run_cypher, run_sql}; lazy_static::lazy_static! { static ref EDGES: Vec<(u64, u64, i64, f64)> = vec![ @@ -614,7 +616,20 @@ mod cypher { let graph_dir = tempdir().unwrap(); let graph = make_graph_with_node_props(graph_dir); - let df = run_cypher("match (a)-[e]->(b) return a.name, e, b.name", &graph, true) + // let df = run_sql("WITH e AS (SELECT * FROM _default), b AS (SELECT * FROM nodes) SELECT e.*, b.gid FROM e JOIN b ON e.dst = b.id", &graph).await.unwrap(); + let df = run_cypher("match ()-[e]->(b) return e,b.gid", &graph, false) + .await + .unwrap(); + let data = df.collect().await.unwrap(); + print_batches(&data).unwrap(); + } + + #[tokio::test] + async fn select_node_names_from_edges_both() { + let graph_dir = tempdir().unwrap(); + let graph = make_graph_with_node_props(graph_dir); + + let df = run_cypher("match (a)-[e]->(b) return a.gid, e, b.gid", &graph, false) .await .unwrap(); let data = df.collect().await.unwrap(); diff --git a/raphtory-cypher/src/transpiler/mod.rs b/raphtory-cypher/src/transpiler/mod.rs index 35773b075c..d800aa5d89 100644 --- a/raphtory-cypher/src/transpiler/mod.rs +++ b/raphtory-cypher/src/transpiler/mod.rs @@ -18,7 +18,8 @@ use raphtory::{ prelude::*, }; use sqlparser::ast::{ - self as sql_ast, GroupByExpr, OrderByExpr, SetExpr, TableAlias, WildcardAdditionalOptions, With, + self as sql_ast, DuplicateTreatment, FunctionArgumentList, GroupByExpr, OrderByExpr, SetExpr, + TableAlias, WildcardAdditionalOptions, With, }; mod exprs; @@ -240,7 +241,7 @@ fn scan_edges_as_sql_cte( }, query: union_query, from: None, - // materialized: None, + materialized: None, } } @@ -390,7 +391,10 @@ fn select_query_with_projection( named_window: vec![], // QUALIFY (Snowflake) qualify: None, - // value_table_mode: None, + // extra + value_table_mode: None, + window_before_qualify: false, + connect_by: None, }))), // ORDER BY order_by: vec![], @@ -473,7 +477,7 @@ fn node_scan_cte(node: &NodePattern) -> sql_ast::Cte { "nodes", ), from: None, - // materialized: None, + materialized: None, } } @@ -515,7 +519,9 @@ fn parse_select_body( named_window: vec![], // QUALIFY (Snowflake) qualify: None, - // value_table_mode: None, + value_table_mode: None, + window_before_qualify: false, + connect_by: None, }))) } @@ -1075,20 +1081,22 @@ fn cypher_to_sql_expr( fn sql_count_all(table: &str, attr: &str) -> sql_ast::Expr { sql_ast::Expr::Function(sql_ast::Function { name: sql_ast::ObjectName(vec![sql_ast::Ident::new("COUNT")]), - args: vec![sql_ast::FunctionArg::Unnamed( - sql_ast::FunctionArgExpr::Expr(sql_ast::Expr::CompoundIdentifier( - vec![table.to_string(), attr.to_string()] - .into_iter() - .map(sql_ast::Ident::new) - .collect(), - )), // this is a hack because datafusion gets confused when there are no columns selected - )], + args: sql_ast::FunctionArguments::List(FunctionArgumentList { + args: vec![sql_ast::FunctionArg::Unnamed( + sql_ast::FunctionArgExpr::Expr(sql_ast::Expr::CompoundIdentifier( + vec![table.to_string(), attr.to_string()] + .into_iter() + .map(sql_ast::Ident::new) + .collect(), + )), // this is a hack because datafusion gets confused when there are no columns selected + )], + duplicate_treatment: None, + clauses: vec![], + }), over: None, - distinct: false, filter: None, null_treatment: None, - special: false, - order_by: vec![], + within_group: vec![], }) } @@ -1099,22 +1107,28 @@ fn sql_function_ast( node_binds: &[String], distinct: &bool, ) -> sql_ast::Expr { + let args = args + .iter() + .map(|arg| { + sql_ast::FunctionArg::Unnamed(sql_ast::FunctionArgExpr::Expr(cypher_to_sql_expr( + arg, rel_binds, node_binds, false, + ))) + }) + .collect(); + + let duplicate_treatment = distinct.then(|| DuplicateTreatment::Distinct); + let args = sql_ast::FunctionArguments::List(FunctionArgumentList { + args, + duplicate_treatment, + clauses: vec![], + }); sql_ast::Expr::Function(sql_ast::Function { name: sql_ast::ObjectName(vec![sql_ast::Ident::new(name)]), - args: args - .iter() - .map(|arg| { - sql_ast::FunctionArg::Unnamed(sql_ast::FunctionArgExpr::Expr(cypher_to_sql_expr( - arg, rel_binds, node_binds, false, - ))) - }) - .collect(), + args, over: None, - distinct: *distinct, filter: None, null_treatment: None, - special: false, - order_by: vec![], + within_group: vec![], }) } From f5d12bfc9ed5e0e3338f3e35df6202935e1a82cc Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Mon, 19 Aug 2024 09:30:41 +0100 Subject: [PATCH 3/6] finally found the schema issue in the nodes.rs table scanner --- .../src/executor/table_provider/edge.rs | 34 +++++++------------ .../src/executor/table_provider/node.rs | 26 +++++++------- 2 files changed, 25 insertions(+), 35 deletions(-) diff --git a/raphtory-cypher/src/executor/table_provider/edge.rs b/raphtory-cypher/src/executor/table_provider/edge.rs index 45d7b57998..b2d1605b1d 100644 --- a/raphtory-cypher/src/executor/table_provider/edge.rs +++ b/raphtory-cypher/src/executor/table_provider/edge.rs @@ -304,7 +304,7 @@ fn produce_record_batch( RecordBatch::try_new(schema.clone(), columns) .map_err(|arrow_err| DataFusionError::ArrowError(arrow_err, None)) } - }).inspect(|rb| println!("{rb:?}") ); + }); Box::new(iter) } @@ -362,13 +362,6 @@ impl ExecutionPlan for EdgeListExecPlan { &self.props } - // fn output_partitioning(&self) -> datafusion::physical_expr::Partitioning { - // datafusion::physical_expr::Partitioning::UnknownPartitioning(self.num_partitions) - // } - // fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> { - // Some(&self.sorted_by) - // } - fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -393,19 +386,18 @@ impl ExecutionPlan for EdgeListExecPlan { target_partitions: usize, _config: &ConfigOptions, ) -> Result>, DataFusionError> { - // let plan_properties = plan_properties(self.schema.clone(), target_partitions); - // Ok(Some(Arc::new(EdgeListExecPlan { - // layer_id: self.layer_id, - // layer_name: self.layer_name.clone(), - // graph: self.graph.clone(), - // schema: self.schema.clone(), - // num_partitions: target_partitions, - // row_count: self.row_count, - // sorted_by: self.sorted_by.clone(), - // props: plan_properties, - // projection: self.projection.clone(), - // }))) - Ok(None) + let plan_properties = plan_properties(self.schema.clone(), target_partitions); + Ok(Some(Arc::new(EdgeListExecPlan { + layer_id: self.layer_id, + layer_name: self.layer_name.clone(), + graph: self.graph.clone(), + schema: self.schema.clone(), + num_partitions: target_partitions, + row_count: self.row_count, + sorted_by: self.sorted_by.clone(), + props: plan_properties, + projection: self.projection.clone(), + }))) } fn execute( diff --git a/raphtory-cypher/src/executor/table_provider/node.rs b/raphtory-cypher/src/executor/table_provider/node.rs index 2c081c7d0d..ad4fbedebb 100644 --- a/raphtory-cypher/src/executor/table_provider/node.rs +++ b/raphtory-cypher/src/executor/table_provider/node.rs @@ -3,9 +3,9 @@ use crate::{ executor::ExecError, }; use arrow::datatypes::UInt64Type; -use arrow_array::{make_array, Array, LargeStringArray, PrimitiveArray}; +use arrow_array::{make_array, Array, PrimitiveArray}; use arrow_buffer::ScalarBuffer; -use arrow_schema::{DataType, Field, Schema}; +use arrow_schema::{DataType, Schema}; use async_trait::async_trait; use datafusion::{ arrow::{array::RecordBatch, datatypes::SchemaRef}, @@ -30,6 +30,7 @@ use std::{any::Any, fmt::Formatter, sync::Arc}; use super::plan_properties; +// FIXME: review this file, some of the assuptions and mapping between partitions and chunk sizes are not correct pub struct NodeTableProvider { graph: DiskGraphStorage, schema: SchemaRef, @@ -55,15 +56,10 @@ impl NodeTableProvider { }); let name_dt = graph.global_ordering().data_type(); - // let schema = lift_arrow_schema( - // name_dt.clone(), - // graph.node_properties().const_props.as_ref(), - // )?; - - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt64, false), - Field::new("gid", DataType::UInt64, false), - ])); + let schema = lift_arrow_schema( + name_dt.clone(), + graph.node_properties().const_props.as_ref(), + )?; Ok(Self { graph: g, @@ -126,7 +122,7 @@ impl TableProvider for NodeTableProvider { .map(|proj| self.schema().project(proj).map(Arc::new)) .unwrap_or_else(|| Ok(self.schema().clone()))?; - let plan_properties = plan_properties(self.schema.clone(), self.num_partitions); + let plan_properties = plan_properties(schema.clone(), self.num_partitions); Ok(Arc::new(NodeScanExecPlan { graph: self.graph.clone(), @@ -160,12 +156,14 @@ async fn produce_record_batch( let start = chunk_id * chunk_size; let end = (chunk_id + 1) * chunk_size; + let n = chunk.values()[0].len(); + let iter = (start as u64..end as u64).take(n); let id = Arc::new(PrimitiveArray::::new( - ScalarBuffer::from_iter((start as u64..end as u64).take(chunk.values()[0].len())), + ScalarBuffer::from_iter(iter), None, )); - let length = ( end - start ).min(graph.global_ordering().len()); + let length = (end - start).min(graph.global_ordering().len()); let arr_gid = graph.global_ordering().sliced(start, length); let gid_data = to_data(arr_gid.as_ref()); let gid = make_array(gid_data); From e0832226abd8017414dcd1c988b7d3d21898be0d Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Mon, 19 Aug 2024 09:34:19 +0100 Subject: [PATCH 4/6] fmt + deactivate storage --- raphtory-cypher/src/executor/table_provider/edge.rs | 4 ++-- raphtory-cypher/src/executor/table_provider/mod.rs | 5 ----- raphtory-cypher/src/hop/operator.rs | 2 +- raphtory-cypher/src/hop/rule.rs | 2 +- raphtory-cypher/src/lib.rs | 10 ---------- 5 files changed, 4 insertions(+), 19 deletions(-) diff --git a/raphtory-cypher/src/executor/table_provider/edge.rs b/raphtory-cypher/src/executor/table_provider/edge.rs index b2d1605b1d..8df8f0dfe7 100644 --- a/raphtory-cypher/src/executor/table_provider/edge.rs +++ b/raphtory-cypher/src/executor/table_provider/edge.rs @@ -55,7 +55,7 @@ impl EdgeListTableProvider { let schema = lift_nested_arrow_schema(&g, layer_id)?; - let num_partitions = 2;//std::thread::available_parallelism()?.get(); + let num_partitions = 2; //std::thread::available_parallelism()?.get(); let row_count = graph .as_ref() @@ -149,7 +149,7 @@ impl TableProvider for EdgeListTableProvider { ) -> Result, DataFusionError> { let schema = projection .map(|proj| self.schema().project(proj).map(Arc::new)) - .unwrap_or_else(|| Ok( self.schema().clone() ))?; + .unwrap_or_else(|| Ok(self.schema().clone()))?; let plan_properties = plan_properties(schema.clone(), self.num_partitions); Ok(Arc::new(EdgeListExecPlan { diff --git a/raphtory-cypher/src/executor/table_provider/mod.rs b/raphtory-cypher/src/executor/table_provider/mod.rs index 2b6c01f7ac..08254733c1 100644 --- a/raphtory-cypher/src/executor/table_provider/mod.rs +++ b/raphtory-cypher/src/executor/table_provider/mod.rs @@ -3,14 +3,9 @@ use datafusion::{physical_expr::EquivalenceProperties, physical_plan::PlanProper pub mod edge; pub mod node; -// FIXME this error shows up in datafusion 37 raised https://github.com/apache/datafusion/issues/10421 -// called `Result::unwrap()` on an `Err` value: Context("EnforceDistribution", Internal("PhysicalOptimizer rule 'EnforceDistribution' failed, due to generate a different schema, -// schema: Schema { fields: [Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, -// schema: Schema { fields: [Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }")) pub fn plan_properties(schema: SchemaRef, num_partitions: usize) -> PlanProperties { let eq_properties = EquivalenceProperties::new(schema); let partitioning = datafusion::physical_plan::Partitioning::UnknownPartitioning(num_partitions); let execution_mode = datafusion::physical_plan::ExecutionMode::Bounded; PlanProperties::new(eq_properties, partitioning, execution_mode) } - diff --git a/raphtory-cypher/src/hop/operator.rs b/raphtory-cypher/src/hop/operator.rs index 1cd4611159..bac57aacc6 100644 --- a/raphtory-cypher/src/hop/operator.rs +++ b/raphtory-cypher/src/hop/operator.rs @@ -142,4 +142,4 @@ impl UserDefinedLogicalNodeCore for HopPlan { right_proj: self.right_proj.clone(), }) } -} \ No newline at end of file +} diff --git a/raphtory-cypher/src/hop/rule.rs b/raphtory-cypher/src/hop/rule.rs index deb6867bd2..88d7019ff7 100644 --- a/raphtory-cypher/src/hop/rule.rs +++ b/raphtory-cypher/src/hop/rule.rs @@ -44,7 +44,7 @@ impl OptimizerRule for HopRule { } = join; if on.len() != 1 { - return Ok(Transformed::no(plan)); + return Ok(Transformed::no(plan)); } let (hop_from_col, _hop_to_col, direction) = if let ( diff --git a/raphtory-cypher/src/lib.rs b/raphtory-cypher/src/lib.rs index 1f1763d644..93dbaa36ac 100644 --- a/raphtory-cypher/src/lib.rs +++ b/raphtory-cypher/src/lib.rs @@ -48,7 +48,6 @@ mod cypher { enable_hop_optim: bool, ) -> Result { let (ctx, plan) = prepare_plan(query, g, enable_hop_optim).await?; - println!("{}", plan.display_indent().to_string()); let df = ctx.execute_logical_plan(plan).await?; Ok(df) } @@ -58,7 +57,6 @@ mod cypher { g: &DiskGraphStorage, enable_hop_optim: bool, ) -> Result<(SessionContext, LogicalPlan), ExecError> { - // println!("Running query: {:?}", query); let query = super::parser::parse_cypher(query)?; let config = SessionConfig::from_env()?.with_information_schema(true); @@ -117,8 +115,6 @@ mod cypher { ctx.refresh_catalogs().await?; let query = transpiler::to_sql(query, g); - println!("SQL: {:?}", query.to_string()); - // println!("SQL AST: {:?}", query); let plan = ctx .state() .statement_to_plan(datafusion::sql::parser::Statement::Statement(Box::new( @@ -129,7 +125,6 @@ mod cypher { opts.verify_plan(&plan)?; let plan = ctx.state().optimize(&plan)?; - // println!("PLAN! {:?}", plan); Ok((ctx, plan)) } @@ -153,11 +148,6 @@ mod cypher { let node_table_provider = NodeTableProvider::new(graph.clone())?; ctx.register_table("nodes", Arc::new(node_table_provider))?; - // let state = ctx.state(); - // let dialect = state.config().options().sql_parser.dialect.as_str(); - // let sql_ast = ctx.state().sql_to_statement(query, dialect)?; - // println!("SQL AST: {:?}", sql_ast); - let df = ctx.sql(query).await?; Ok(df) } From a2707608328469b945befca7443bdeeb44db8035 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Wed, 21 Aug 2024 16:02:30 +0100 Subject: [PATCH 5/6] fix the test failures for nodes --- raphtory-cypher/src/executor/table_provider/edge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raphtory-cypher/src/executor/table_provider/edge.rs b/raphtory-cypher/src/executor/table_provider/edge.rs index 8df8f0dfe7..b32fff2308 100644 --- a/raphtory-cypher/src/executor/table_provider/edge.rs +++ b/raphtory-cypher/src/executor/table_provider/edge.rs @@ -55,7 +55,7 @@ impl EdgeListTableProvider { let schema = lift_nested_arrow_schema(&g, layer_id)?; - let num_partitions = 2; //std::thread::available_parallelism()?.get(); + let num_partitions = std::thread::available_parallelism()?.get(); let row_count = graph .as_ref() From be8f9dbf87772c3f05a9bdab5faa67f14c337924 Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Fri, 23 Aug 2024 16:26:39 +0100 Subject: [PATCH 6/6] fix datafusion dependencies --- Cargo.lock | 396 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 229 insertions(+), 167 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 573c22a418..0333b0f6f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -229,28 +229,30 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +version = "52.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05048a8932648b63f21c37d88b552ccc8a65afb6dfe9fc9f30ce79174c2e7a85" dependencies = [ "arrow-arith", "arrow-array", "arrow-buffer", - "arrow-cast 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", + "arrow-cast", "arrow-csv", "arrow-data", - "arrow-ipc 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", + "arrow-ipc", "arrow-json", - "arrow-ord 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", + "arrow-ord", "arrow-row", "arrow-schema", - "arrow-select 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", - "arrow-string 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", + "arrow-select", + "arrow-string", ] [[package]] name = "arrow-arith" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +version = "52.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d8a57966e43bfe9a3277984a14c24ec617ad874e4c0e1d2a1b083a39cfbf22c" dependencies = [ "arrow-array", "arrow-buffer", @@ -263,15 +265,16 @@ dependencies = [ [[package]] name = "arrow-array" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +version = "52.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f4a9468c882dc66862cef4e1fd8423d47e67972377d85d80e022786427768c" dependencies = [ "ahash", "arrow-buffer", "arrow-data", "arrow-schema", "chrono", - "chrono-tz", + "chrono-tz 0.9.0", "half", "hashbrown 0.14.5", "num", @@ -279,8 +282,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +version = "52.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c975484888fc95ec4a632cdc98be39c085b1bb518531b0c80c5d462063e5daa1" dependencies = [ "bytes", "half", @@ -289,48 +293,34 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "50.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e448e5dd2f4113bf5b74a1f26531708f5edcacc77335b7066f9398f4bcf4cdef" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "base64 0.21.7", - "chrono", - "half", - "lexical-core", - "num", -] - -[[package]] -name = "arrow-cast" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +checksum = "da26719e76b81d8bc3faad1d4dbdc1bcc10d14704e63dc17fc9f3e7e1e567c8e" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", - "arrow-select 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", - "base64 0.21.7", + "arrow-select", + "atoi", + "base64 0.22.1", "chrono", "comfy-table", "half", "lexical-core", "num", + "ryu", ] [[package]] name = "arrow-csv" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +version = "52.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c13c36dc5ddf8c128df19bab27898eea64bf9da2b555ec1cd17a8ff57fba9ec2" dependencies = [ "arrow-array", "arrow-buffer", - "arrow-cast 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", + "arrow-cast", "arrow-data", "arrow-schema", "chrono", @@ -343,8 +333,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +version = "52.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd9d6f18c65ef7a2573ab498c374d8ae364b4a4edf67105357491c031f716ca5" dependencies = [ "arrow-buffer", "arrow-schema", @@ -354,40 +345,28 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "50.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03dea5e79b48de6c2e04f03f62b0afea7105be7b77d134f6c5414868feefb80d" +checksum = "e786e1cdd952205d9a8afc69397b317cfbb6e0095e445c69cda7e8da5c1eeb0f" dependencies = [ "arrow-array", "arrow-buffer", - "arrow-cast 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "arrow-cast", "arrow-data", "arrow-schema", "flatbuffers", "lz4_flex", ] -[[package]] -name = "arrow-ipc" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-cast 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", - "arrow-data", - "arrow-schema", - "flatbuffers", -] - [[package]] name = "arrow-json" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +version = "52.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb22284c5a2a01d73cebfd88a33511a3234ab45d66086b2ca2d1228c3498e445" dependencies = [ "arrow-array", "arrow-buffer", - "arrow-cast 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", + "arrow-cast", "arrow-data", "arrow-schema", "chrono", @@ -401,37 +380,24 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "50.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ed9630979034077982d8e74a942b7ac228f33dd93a93b615b4d02ad60c260be" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "half", - "num", -] - -[[package]] -name = "arrow-ord" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +checksum = "42745f86b1ab99ef96d1c0bcf49180848a64fe2c7a7a0d945bc64fa2b21ba9bc" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", - "arrow-select 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", + "arrow-select", "half", "num", ] [[package]] name = "arrow-row" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +version = "52.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd09a518c602a55bd406bcc291a967b284cfa7a63edfbf8b897ea4748aad23c" dependencies = [ "ahash", "arrow-array", @@ -439,32 +405,19 @@ dependencies = [ "arrow-data", "arrow-schema", "half", - "hashbrown 0.14.5", ] [[package]] name = "arrow-schema" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" - -[[package]] -name = "arrow-select" -version = "50.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ce20973c1912de6514348e064829e50947e35977bb9d7fb637dc99ea9ffd78c" -dependencies = [ - "ahash", - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "num", -] +checksum = "9e972cd1ff4a4ccd22f86d3e53e835c2ed92e0eea6a3e8eadb72b4f1ac802cf8" [[package]] name = "arrow-select" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +version = "52.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "600bae05d43483d216fb3494f8c32fdbefd8aa4e1de237e790dbb3d9f44690a3" dependencies = [ "ahash", "arrow-array", @@ -476,30 +429,16 @@ dependencies = [ [[package]] name = "arrow-string" -version = "50.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f3b37f2aeece31a2636d1b037dabb69ef590e03bdc7eb68519b51ec86932a7" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "num", - "regex", - "regex-syntax 0.8.4", -] - -[[package]] -name = "arrow-string" -version = "50.0.0" -source = "git+https://github.com/apache/arrow-rs.git?tag=50.0.0#db811083669df66992008c9409b743a2e365adb0" +checksum = "f0dc1985b67cb45f6606a248ac2b4a288849f196bab8c657ea5589f47cdd55e6" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", - "arrow-select 50.0.0 (git+https://github.com/apache/arrow-rs.git?tag=50.0.0)", + "arrow-select", + "memchr", "num", "regex", "regex-syntax 0.8.4", @@ -689,6 +628,15 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atoi_simd" version = "0.15.6" @@ -850,7 +798,18 @@ checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", - "brotli-decompressor", + "brotli-decompressor 2.5.1", +] + +[[package]] +name = "brotli" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor 4.0.1", ] [[package]] @@ -863,6 +822,16 @@ dependencies = [ "alloc-stdlib", ] +[[package]] +name = "brotli-decompressor" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -982,7 +951,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" dependencies = [ "chrono", - "chrono-tz-build", + "chrono-tz-build 0.2.1", + "phf", +] + +[[package]] +name = "chrono-tz" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" +dependencies = [ + "chrono", + "chrono-tz-build 0.3.0", "phf", ] @@ -997,6 +977,17 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "chrono-tz-build" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "ciborium" version = "0.2.2" @@ -1411,14 +1402,14 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "datafusion" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b360b692bf6c6d6e6b6dbaf41a3be0020daeceac0f406aed54c75331e50dbb" +checksum = "ab9d55a9cd2634818953809f75ebe5248b00dd43c3227efb2a51a2d5feaad54e" dependencies = [ "ahash", "arrow", "arrow-array", - "arrow-ipc 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "arrow-ipc", "arrow-schema", "async-compression", "async-trait", @@ -1427,12 +1418,15 @@ dependencies = [ "chrono", "dashmap 5.5.3", "datafusion-common", + "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", "datafusion-functions", + "datafusion-functions-aggregate", "datafusion-functions-array", "datafusion-optimizer", "datafusion-physical-expr", + "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-sql", "flate2", @@ -1447,6 +1441,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", + "paste", "pin-project-lite", "rand", "sqlparser", @@ -1461,9 +1456,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37f343ccc298f440e25aa38ff82678291a7acc24061c7370ba6c0ff5cc811412" +checksum = "def66b642959e7f96f5d2da22e1f43d3bd35598f821e5ce351a0553e0f1b7367" dependencies = [ "ahash", "arrow", @@ -1472,6 +1467,8 @@ dependencies = [ "arrow-schema", "chrono", "half", + "hashbrown 0.14.5", + "instant", "libc", "num_cpus", "object_store", @@ -1479,11 +1476,20 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "datafusion-common-runtime" +version = "40.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f104bb9cb44c06c9badf8a0d7e0855e5f7fa5e395b887d7f835e8a9457dc1352" +dependencies = [ + "tokio", +] + [[package]] name = "datafusion-execution" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9c93043081487e335399a21ebf8295626367a647ac5cb87d41d18afad7d0f7" +checksum = "2ac0fd8b5d80bbca3fc3b6f40da4e9f6907354824ec3b18bbd83fee8cf5c3c3e" dependencies = [ "arrow", "chrono", @@ -1502,15 +1508,18 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e204d89909e678846b6a95f156aafc1ee5b36cb6c9e37ec2e1449b078a38c818" +checksum = "2103d2cc16fb11ef1fa993a6cac57ed5cb028601db4b97566c90e5fa77aa1e68" dependencies = [ "ahash", "arrow", "arrow-array", + "arrow-buffer", + "chrono", "datafusion-common", "paste", + "serde_json", "sqlparser", "strum", "strum_macros", @@ -1518,38 +1527,74 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98f1c73f7801b2b8ba2297b3ad78ffcf6c1fc6b8171f502987eb9ad5cb244ee7" +checksum = "a369332afd0ef5bd565f6db2139fb9f1dfdd0afa75a7f70f000b74208d76994f" dependencies = [ "arrow", - "base64 0.21.7", + "base64 0.22.1", + "blake2", + "blake3", + "chrono", "datafusion-common", "datafusion-execution", "datafusion-expr", + "hashbrown 0.14.5", "hex", + "itertools 0.12.1", "log", + "md-5", + "rand", + "regex", + "sha2", + "unicode-segmentation", + "uuid", +] + +[[package]] +name = "datafusion-functions-aggregate" +version = "40.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92718db1aff70c47e5abf9fc975768530097059e5db7c7b78cd64b5e9a11fc77" +dependencies = [ + "ahash", + "arrow", + "arrow-schema", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr-common", + "log", + "paste", + "sqlparser", ] [[package]] name = "datafusion-functions-array" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42d16a0ddf2c991526f6ffe2f47a72c6da0b7354d6c32411dd20631fe2e38937" +checksum = "30bb80f46ff3dcf4bb4510209c2ba9b8ce1b716ac8b7bf70c6bf7dca6260c831" dependencies = [ "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-functions", + "datafusion-functions-aggregate", + "itertools 0.12.1", "log", "paste", ] [[package]] name = "datafusion-optimizer" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ae27e07bf1f04d327be5c2a293470879801ab5535204dc3b16b062fda195496" +checksum = "82f34692011bec4fdd6fc18c264bf8037b8625d801e6dd8f5111af15cb6d71d3" dependencies = [ "arrow", "async-trait", @@ -1558,64 +1603,78 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.5", + "indexmap", "itertools 0.12.1", "log", + "paste", "regex-syntax 0.8.4", ] [[package]] name = "datafusion-physical-expr" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dde620cd9ef76a3bca9c754fb68854bd2349c49f55baf97e08001f9e967f6d6b" +checksum = "45538630defedb553771434a437f7ca8f04b9b3e834344aafacecb27dc65d5e5" dependencies = [ "ahash", "arrow", "arrow-array", "arrow-buffer", - "arrow-ord 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "arrow-ord", "arrow-schema", - "arrow-string 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "base64 0.21.7", - "blake2", - "blake3", + "arrow-string", + "base64 0.22.1", "chrono", "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-physical-expr-common", "half", "hashbrown 0.14.5", "hex", "indexmap", "itertools 0.12.1", "log", - "md-5", "paste", "petgraph", - "rand", "regex", - "sha2", - "unicode-segmentation", - "uuid", +] + +[[package]] +name = "datafusion-physical-expr-common" +version = "40.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d8a72b0ca908e074aaeca52c14ddf5c28d22361e9cb6bc79bb733cd6661b536" +dependencies = [ + "ahash", + "arrow", + "datafusion-common", + "datafusion-expr", + "hashbrown 0.14.5", + "rand", ] [[package]] name = "datafusion-physical-plan" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a4c75fba9ea99d64b2246cbd2fcae2e6fc973e6616b1015237a616036506dd4" +checksum = "b504eae6107a342775e22e323e9103f7f42db593ec6103b28605b7b7b1405c4a" dependencies = [ "ahash", "arrow", "arrow-array", "arrow-buffer", + "arrow-ord", "arrow-schema", "async-trait", "chrono", "datafusion-common", + "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", + "datafusion-functions-aggregate", "datafusion-physical-expr", + "datafusion-physical-expr-common", "futures", "half", "hashbrown 0.14.5", @@ -1627,21 +1686,23 @@ dependencies = [ "pin-project-lite", "rand", "tokio", - "uuid", ] [[package]] name = "datafusion-sql" -version = "36.0.0" +version = "40.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21474a95c3a62d113599d21b439fa15091b538bac06bd20be0bb2e7d22903c09" +checksum = "e5db33f323f41b95ae201318ba654a9bf11113e58a51a1dff977b1a836d3d889" dependencies = [ "arrow", + "arrow-array", "arrow-schema", "datafusion-common", "datafusion-expr", "log", + "regex", "sqlparser", + "strum", ] [[package]] @@ -1947,9 +2008,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flatbuffers" -version = "23.5.26" +version = "24.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" dependencies = [ "bitflags 1.3.2", "rustc_version", @@ -3073,7 +3134,7 @@ dependencies = [ "backoff", "bytes", "chrono", - "chrono-tz", + "chrono-tz 0.8.6", "deadpool", "delegate", "futures", @@ -3254,16 +3315,16 @@ dependencies = [ [[package]] name = "object_store" -version = "0.9.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3" +checksum = "e6da452820c715ce78221e8202ccc599b4a52f3e1eb3eedb487b680c81a8e3f3" dependencies = [ "async-trait", "bytes", "chrono", "futures", "humantime", - "itertools 0.12.1", + "itertools 0.13.0", "parking_lot", "percent-encoding", "snafu", @@ -3454,20 +3515,20 @@ dependencies = [ [[package]] name = "parquet" -version = "50.0.0" +version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" +checksum = "e977b9066b4d3b03555c22bdc442f3fadebd96a39111249113087d0edb2691cd" dependencies = [ "ahash", "arrow-array", "arrow-buffer", - "arrow-cast 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "arrow-cast", "arrow-data", - "arrow-ipc 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "arrow-ipc", "arrow-schema", - "arrow-select 50.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "base64 0.21.7", - "brotli", + "arrow-select", + "base64 0.22.1", + "brotli 6.0.0", "bytes", "chrono", "flate2", @@ -3485,6 +3546,7 @@ dependencies = [ "tokio", "twox-hash", "zstd", + "zstd-sys", ] [[package]] @@ -3821,7 +3883,7 @@ checksum = "b421d2196f786fdfe162db614c8485f8308fe41575d4de634a39bbe460d1eb6a" dependencies = [ "ahash", "base64 0.21.7", - "brotli", + "brotli 3.5.0", "ethnum", "flate2", "lz4", @@ -5168,9 +5230,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "sqlparser" -version = "0.43.1" +version = "0.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4" +checksum = "295e9930cd7a97e58ca2a070541a3ca502b17f5d1fa7157376d0fabd85324f25" dependencies = [ "log", "sqlparser_derive", @@ -6633,9 +6695,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "2.0.13+zstd.1.5.6" +version = "2.0.12+zstd.1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +checksum = "0a4e40c320c3cb459d9a9ff6de98cff88f4751ee9275d140e2be94a2b74e4c13" dependencies = [ "cc", "pkg-config",