From 22c46598fc42fab1e5cedf3da4f1bca820f354fa Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 21 Dec 2023 20:28:53 +0800 Subject: [PATCH 1/7] with anit-join Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 147 +++++++++++++++++++++++++++++++++++--- 1 file changed, 138 insertions(+), 9 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 137035755bd5..8aa7cdd95666 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeSet, HashSet, VecDeque}; +use std::fmt::Display; use std::str::FromStr; use std::sync::Arc; use std::time::UNIX_EPOCH; @@ -24,8 +25,8 @@ use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction, ScalarUDF}; use datafusion::logical_expr::expr_rewriter::normalize_cols; use datafusion::logical_expr::{ - AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension, - LogicalPlan, LogicalPlanBuilder, Operator, ScalarUDF as ScalarUdfDef, + col, AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, + Extension, LogicalPlan, LogicalPlanBuilder, Operator, ScalarUDF as ScalarUdfDef, }; use datafusion::optimizer::utils; use datafusion::prelude as df_prelude; @@ -1489,6 +1490,7 @@ impl PromPlanner { .context(DataFusionPlanningSnafu) } + /// Build a set operator (AND/OR/UNLESS) fn set_op_on_non_field_columns( &self, left: LogicalPlan, @@ -1501,6 +1503,10 @@ impl PromPlanner { let mut left_tag_col_set = left_tag_cols.into_iter().collect::>(); let mut right_tag_col_set = right_tag_cols.into_iter().collect::>(); + if matches!(op.id(), token::T_LOR) { + return self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier); + } + // apply modifier if let Some(modifier) = modifier { // one-to-many and many-to-one are not supported @@ -1545,7 +1551,8 @@ impl PromPlanner { ) }; let join_keys = left_tag_col_set - .into_iter() + .iter() + .cloned() .chain([self.ctx.time_index_column.clone().unwrap()]) .collect::>(); @@ -1579,17 +1586,139 @@ impl PromPlanner { .build() .context(DataFusionPlanningSnafu), token::T_LOR => { - // `OR` can not be expressed by `UNION` precisely. - // it will generate unexpceted result when schemas don't match - UnsupportedExprSnafu { - name: "set operation `OR`", - } - .fail() + self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier) } _ => UnexpectedTokenSnafu { token: op }.fail(), } } + // TODO(ruihang): change function name + fn or_operator( + &self, + left: LogicalPlan, + right: LogicalPlan, + left_tag_cols_set: HashSet, + right_tag_cols_set: HashSet, + modifier: &Option, + ) -> Result { + // prepare hash sets + let all_tags = left_tag_cols_set + .union(&right_tag_cols_set) + .cloned() + .collect::>(); + let tags_not_in_left = all_tags + .difference(&left_tag_cols_set) + .cloned() + .collect::>(); + let tags_not_in_right = all_tags + .difference(&right_tag_cols_set) + .cloned() + .collect::>(); + let left_qualifier = left.schema().field(0).qualifier().cloned(); + let right_qualifier = right.schema().field(0).qualifier().cloned(); + let left_qualifier_string = left_qualifier + .as_ref() + .map(|l| l.to_string()) + .unwrap_or_default(); + let right_qualifier_string = right_qualifier + .as_ref() + .map(|r| r.to_string()) + .unwrap_or_default(); + + common_telemetry::info!("[DEBUG] tags not in left {tags_not_in_left:?}"); + common_telemetry::info!("[DEBUG] all tags {all_tags:?}"); + + // step 1: align schema using project, fill non-exist columns with null + let left_proj_exprs = + left.schema() + .field_names() + .into_iter() + .map(col) + .chain(tags_not_in_left.iter().map(|tag_name| { + DfExpr::Literal(ScalarValue::Utf8(None)) + // .alias(format!("{left_qualifier_string}.{tag_name}")) + .alias(format!("{tag_name}")) + })); + let left_projected = LogicalPlanBuilder::from(left) + .project(left_proj_exprs) + .context(DataFusionPlanningSnafu)? + .alias(left_qualifier_string.clone()) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + let right_proj_exprs = + right + .schema() + .field_names() + .into_iter() + .map(col) + .chain(tags_not_in_right.iter().map(|tag_name| { + DfExpr::Literal(ScalarValue::Utf8(None)) + // .alias(format!("{right_qualifier_string}.{tag_name}")) + .alias(format!("{tag_name}")) + })); + let right_projected = LogicalPlanBuilder::from(right) + .project(right_proj_exprs) + .context(DataFusionPlanningSnafu)? + .alias(right_qualifier_string.clone()) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + // step 2: compute match columns + let mut match_columns = if let Some(modifier) = modifier + && let Some(matching) = &modifier.matching + { + match matching { + // keeps columns mentioned in `on` + LabelModifier::Include(on) => on.labels.clone(), + // removes columns memtioned in `ignoring` + LabelModifier::Exclude(ignoring) => { + let ignoring = ignoring.labels.iter().cloned().collect::>(); + all_tags.difference(&ignoring).cloned().collect() + } + } + } else { + all_tags.iter().cloned().collect() + }; + // sort to ensure the generated plan is not volatile + match_columns.sort_unstable(); + match_columns.push(self.ctx.time_index_column.clone().unwrap()); + + // step 3: right anti join + let filter = match_columns + .iter() + .cloned() + .map(|col| { + let left_col = DfExpr::Column(Column::new(left_qualifier.clone(), col)); + left_col.is_not_null() + }) + .reduce(|left, right| left.and(right)) + .unwrap(); // Safety: match columns contains at least the time index col + let right_anti_join = LogicalPlanBuilder::from(left_projected.clone()) + .join_detailed( + right_projected, + JoinType::RightAnti, + (match_columns.clone(), match_columns), + Some(filter), + true, + ) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + common_telemetry::info!("[DEBUG] left projected {left_projected:?}"); + common_telemetry::info!("[DEBUG] right anti join {right_anti_join:?}"); + + // step 4: union left and right + let result = LogicalPlanBuilder::from(left_projected) + .union(right_anti_join) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + Ok(result) + } + /// Build a projection that project and perform operation expr for every value columns. /// Non-value columns (tag and timestamp) will be preserved in the projection. /// From cb36625ddeea5e1528a8706645eb11931f889a9b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 27 Dec 2023 20:35:21 +0800 Subject: [PATCH 2/7] impl UnionDistinctOn Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/promql/Cargo.toml | 1 + src/promql/src/extension_plan.rs | 1 + .../src/extension_plan/union_distinct_on.rs | 467 ++++++++++++++++++ src/promql/src/lib.rs | 1 + 5 files changed, 471 insertions(+) create mode 100644 src/promql/src/extension_plan/union_distinct_on.rs diff --git a/Cargo.lock b/Cargo.lock index ac08a49f6baa..853697f83790 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6539,6 +6539,7 @@ dependencies = [ name = "promql" version = "0.5.0" dependencies = [ + "ahash 0.8.6", "async-recursion", "async-trait", "bytemuck", diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index a10973d4ebc1..6be12de4e343 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +ahash.workspace = true async-recursion = "1.0" async-trait.workspace = true bytemuck.workspace = true diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 49a9199bf0cc..def0ccee919e 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -19,6 +19,7 @@ mod normalize; mod planner; mod range_manipulate; mod series_divide; +mod union_distinct_on; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream}; diff --git a/src/promql/src/extension_plan/union_distinct_on.rs b/src/promql/src/extension_plan/union_distinct_on.rs new file mode 100644 index 000000000000..2c74d67673ae --- /dev/null +++ b/src/promql/src/extension_plan/union_distinct_on.rs @@ -0,0 +1,467 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use ahash::{HashMap, RandomState}; +use datafusion::arrow::array::UInt64Array; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::DFSchemaRef; +use datafusion::error::{DataFusionError, Result as DataFusionResult}; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + hash_utils, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, Statistics, +}; +use datatypes::arrow::compute; +use futures::future::BoxFuture; +use futures::{ready, Stream, StreamExt, TryStreamExt}; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct UnionDistinctOn { + left: LogicalPlan, + right: LogicalPlan, + /// The columns to compare for equality. + /// TIME INDEX is not included. + compare_keys: Vec, + ts_col: String, + output_schema: DFSchemaRef, +} + +impl UnionDistinctOn { + pub fn name() -> &'static str { + "UnionDistinctOn" + } + + pub fn new( + left: LogicalPlan, + right: LogicalPlan, + compare_keys: Vec, + ts_col: String, + output_schema: DFSchemaRef, + ) -> Self { + Self { + left, + right, + compare_keys, + ts_col, + output_schema, + } + } + + pub fn to_execution_plan( + &self, + left_exec: Arc, + right_exec: Arc, + ) -> Arc { + Arc::new(UnionDistinctOnExec { + left: left_exec, + right: right_exec, + compare_keys: self.compare_keys.clone(), + ts_col: self.ts_col.clone(), + output_schema: Arc::new(self.output_schema.as_ref().into()), + metric: ExecutionPlanMetricsSet::new(), + random_state: RandomState::new(), + }) + } +} + +impl UserDefinedLogicalNodeCore for UnionDistinctOn { + fn name(&self) -> &str { + Self::name() + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.left, &self.right] + } + + fn schema(&self) -> &DFSchemaRef { + &self.output_schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "UnionDistinctOn: on col=[{:?}], ts_col=[{}]", + self.compare_keys, self.ts_col + ) + } + + fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { + assert_eq!(inputs.len(), 2); + + let left = inputs[0].clone(); + let right = inputs[1].clone(); + Self { + left, + right, + compare_keys: self.compare_keys.clone(), + ts_col: self.ts_col.clone(), + output_schema: self.output_schema.clone(), + } + } +} + +#[derive(Debug)] +pub struct UnionDistinctOnExec { + left: Arc, + right: Arc, + compare_keys: Vec, + ts_col: String, + output_schema: SchemaRef, + metric: ExecutionPlanMetricsSet, + + /// Shared the `RandomState` for the hashing algorithm + random_state: RandomState, +} + +impl ExecutionPlan for UnionDistinctOnExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } + + // fn required_input_distribution(&self) -> Vec { + // vec![ + // Distribution::HashPartitioned(todo!()), + // Distribution::HashPartitioned(todo!()), + // ] + // } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + /// [UnionDistinctOnExec] will output left first, then right. + /// So the order of the output is not maintained. + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![self.left.clone(), self.right.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + assert_eq!(children.len(), 2); + + let left = children[0].clone(); + let right = children[1].clone(); + Ok(Arc::new(UnionDistinctOnExec { + left, + right, + compare_keys: self.compare_keys.clone(), + ts_col: self.ts_col.clone(), + output_schema: self.output_schema.clone(), + metric: self.metric.clone(), + random_state: self.random_state.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let left_stream = self.left.execute(partition, context.clone())?; + let right_stream = self.right.execute(partition, context.clone())?; + + // Convert column name to column index. Add one for the time column. + let mut key_indices = Vec::with_capacity(self.compare_keys.len() + 1); + for key in &self.compare_keys { + let index = self + .output_schema + .column_with_name(key) + .map(|(i, _)| i) + .ok_or_else(|| DataFusionError::Internal(format!("Column {} not found", key)))?; + key_indices.push(index); + } + let ts_index = self + .output_schema + .column_with_name(&self.ts_col) + .map(|(i, _)| i) + .ok_or_else(|| { + DataFusionError::Internal(format!("Column {} not found", self.ts_col)) + })?; + key_indices.push(ts_index); + + // Build right hash table future. + let hashed_data_future = HashedDataFut::Pending(Box::pin(HashedData::new( + right_stream, + self.random_state.clone(), + key_indices.clone(), + ))); + + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + Ok(Box::pin(UnionDistinctOnStream { + left: left_stream, + right: hashed_data_future, + compare_keys: key_indices, + output_schema: self.output_schema.clone(), + metric: baseline_metric, + })) + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +impl DisplayAs for UnionDistinctOnExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "UnionDistinctOnExec: on col=[{:?}], ts_col=[{}]", + self.compare_keys, self.ts_col + ) + } + } + } +} + +pub struct UnionDistinctOnStream { + left: SendableRecordBatchStream, + right: HashedDataFut, + /// Include time index + compare_keys: Vec, + output_schema: SchemaRef, + metric: BaselineMetrics, +} + +impl UnionDistinctOnStream { + fn poll_impl(&mut self, cx: &mut Context<'_>) -> Poll::Item>> { + // resolve the right stream + let right = match self.right { + HashedDataFut::Pending(ref mut fut) => { + let right = ready!(fut.as_mut().poll(cx))?; + self.right = HashedDataFut::Ready(right); + let HashedDataFut::Ready(right_ref) = &mut self.right else { + unreachable!() + }; + right_ref + } + HashedDataFut::Ready(ref mut right) => right, + HashedDataFut::Empty => return Poll::Ready(None), + }; + + let next_left = ready!(self.left.poll_next_unpin(cx)); + match next_left { + Some(Ok(left)) => { + // observe left batch and return it + right.update_map(&left)?; + Poll::Ready(Some(Ok(left))) + } + Some(Err(e)) => Poll::Ready(Some(Err(e))), + None => { + // left stream is exhausted, so we can send the right part + let right = std::mem::replace(&mut self.right, HashedDataFut::Empty); + let HashedDataFut::Ready(data) = right else { + unreachable!() + }; + Poll::Ready(Some(data.finish())) + } + } + } +} + +impl RecordBatchStream for UnionDistinctOnStream { + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } +} + +impl Stream for UnionDistinctOnStream { + type Item = DataFusionResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_impl(cx) + } +} + +/// Simple future state for [HashedData] +enum HashedDataFut { + /// The result is not ready + Pending(BoxFuture<'static, DataFusionResult>), + /// The result is ready + Ready(HashedData), + /// The result is taken + Empty, +} + +/// ALL input batches and its hash table +struct HashedData { + // TODO(ruihang): use `JoinHashMap` instead after upgrading to DF 34.0 + /// Hash table for all input batches. The key is hash value, and the value + /// is the index of `bathc`. + hash_map: HashMap, + /// Output batch. + batch: RecordBatch, + /// The indices of the columns to be hashed. + hash_key_indices: Vec, + random_state: RandomState, +} + +impl HashedData { + pub async fn new( + input: SendableRecordBatchStream, + random_state: RandomState, + hash_key_indices: Vec, + ) -> DataFusionResult { + // Collect all batches from the input stream + let initial = (Vec::new(), 0); + let (batches, num_rows) = input + .try_fold(initial, |mut acc, batch| async { + // Update rowcount + acc.1 += batch.num_rows(); + // Push batch to output + acc.0.push(batch); + Ok(acc) + }) + .await?; + + // Create hash for each batch + let mut hash_map = HashMap::default(); + let mut hashes_buffer = Vec::new(); + let mut interleave_indices = Vec::new(); + for (batch_number, batch) in batches.iter().enumerate() { + hashes_buffer.resize(batch.num_rows(), 0); + // get columns for hashing + let arrays = hash_key_indices + .iter() + .map(|i| batch.column(*i).clone()) + .collect::>(); + + // compute hash + let hash_values = + hash_utils::create_hashes(&arrays, &random_state, &mut hashes_buffer)?; + for (row_number, hash_value) in hash_values.iter().enumerate() { + // Only keeps the first observed row for each hash value + if hash_map + .try_insert(*hash_value, interleave_indices.len()) + .is_ok() + { + interleave_indices.push((batch_number, row_number)); + } + } + } + + // Finilize the hash map + let batch = interleave_batches(batches, interleave_indices)?; + + Ok(Self { + hash_map, + batch, + hash_key_indices, + random_state, + }) + } + + /// Remove rows that hash value present in the input + /// record batch from the hash map. + pub fn update_map(&mut self, input: &RecordBatch) -> DataFusionResult<()> { + // get columns for hashing + let mut hashes_buffer = Vec::new(); + let arrays = self + .hash_key_indices + .iter() + .map(|i| input.column(*i).clone()) + .collect::>(); + + // compute hash + let hash_values = + hash_utils::create_hashes(&arrays, &self.random_state, &mut hashes_buffer)?; + + // remove those hashes + for hash in hash_values { + self.hash_map.remove(hash); + } + + Ok(()) + } + + pub fn finish(self) -> DataFusionResult { + let valid_indices = self.hash_map.values().copied().collect::>(); + take_batch(&self.batch, &valid_indices) + } +} + +/// Utility function to interleave batches. Based on [interleave](datafusion::arrow::compute::interleave) +fn interleave_batches( + batches: Vec, + indices: Vec<(usize, usize)>, +) -> DataFusionResult { + let schema = batches[0].schema(); + + // transform batches into arrays + let mut arrays = vec![vec![]; schema.fields().len()]; + for batch in &batches { + for (i, array) in batch.columns().iter().enumerate() { + arrays[i].push(array.as_ref()); + } + } + + // interleave arrays + let mut interleaved_arrays = Vec::with_capacity(arrays.len()); + for array in arrays { + interleaved_arrays.push(compute::interleave(&array, &indices)?); + } + + // assemble new record batch + RecordBatch::try_new(schema.clone(), interleaved_arrays) + .map_err(|e| DataFusionError::ArrowError(e)) +} + +/// Utility function to take rows from a record batch. Based on [take](datafusion::arrow::compute::take) +fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult { + // fast path + if batch.num_rows() == indices.len() { + return Ok(batch.clone()); + } + + let schema = batch.schema(); + + let indices_array = UInt64Array::from_iter(indices.iter().map(|i| *i as u64)); + let arrays = batch + .columns() + .iter() + .map(|array| compute::take(array, &indices_array, None)) + .collect::, _>>() + .map_err(|e| DataFusionError::ArrowError(e))?; + + let result = RecordBatch::try_new(schema, arrays).map_err(DataFusionError::ArrowError)?; + Ok(result) +} diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 9514a015380b..127bf45d5f1a 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -14,6 +14,7 @@ #![feature(option_get_or_insert_default)] #![feature(let_chains)] +#![feature(map_try_insert)] pub mod error; pub mod extension_plan; From f4e8b056cd6d1242fc41e561a045fe7a52ec6323 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 27 Dec 2023 22:17:23 +0800 Subject: [PATCH 3/7] unify schema Signed-off-by: Ruihang Xia --- src/promql/src/extension_plan.rs | 1 + src/promql/src/extension_plan/planner.rs | 7 +- .../src/extension_plan/union_distinct_on.rs | 43 +++-- src/promql/src/planner.rs | 150 +++++++++++------- 4 files changed, 138 insertions(+), 63 deletions(-) diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index def0ccee919e..0c16c861005a 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -29,5 +29,6 @@ pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream} pub use planner::PromExtensionPlanner; pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream}; pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream}; +pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream}; pub(crate) type Millisecond = ::Native; diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index 7798c9b32193..80cd565bd20a 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -21,7 +21,7 @@ use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; -use super::HistogramFold; +use super::{HistogramFold, UnionDistinctOn}; use crate::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, }; @@ -50,6 +50,11 @@ impl ExtensionPlanner for PromExtensionPlanner { Ok(Some(node.to_execution_plan(session_state, planner)?)) } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan( + physical_inputs[0].clone(), + physical_inputs[1].clone(), + ))) } else { Ok(None) } diff --git a/src/promql/src/extension_plan/union_distinct_on.rs b/src/promql/src/extension_plan/union_distinct_on.rs index 2c74d67673ae..9f289b834731 100644 --- a/src/promql/src/extension_plan/union_distinct_on.rs +++ b/src/promql/src/extension_plan/union_distinct_on.rs @@ -28,8 +28,8 @@ use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ - hash_utils, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + hash_utils, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datatypes::arrow::compute; use futures::future::BoxFuture; @@ -146,12 +146,9 @@ impl ExecutionPlan for UnionDistinctOnExec { self.output_schema.clone() } - // fn required_input_distribution(&self) -> Vec { - // vec![ - // Distribution::HashPartitioned(todo!()), - // Distribution::HashPartitioned(todo!()), - // ] - // } + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition, Distribution::SinglePartition] + } fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(1) @@ -278,6 +275,7 @@ impl UnionDistinctOnStream { HashedDataFut::Empty => return Poll::Ready(None), }; + // poll left and probe with right let next_left = ready!(self.left.poll_next_unpin(cx)); match next_left { Some(Ok(left)) => { @@ -287,6 +285,7 @@ impl UnionDistinctOnStream { } Some(Err(e)) => Poll::Ready(Some(Err(e))), None => { + common_telemetry::info!("[DEBUG] left stream is end"); // left stream is exhausted, so we can send the right part let right = std::mem::replace(&mut self.right, HashedDataFut::Empty); let HashedDataFut::Ready(data) = right else { @@ -382,6 +381,15 @@ impl HashedData { // Finilize the hash map let batch = interleave_batches(batches, interleave_indices)?; + common_telemetry::info!( + "[DEBUG] right batch: {}", + datatypes::arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + .unwrap() + .to_string() + ); + + common_telemetry::info!("[DEBUG] initial hash map: {hash_map:?}"); + Ok(Self { hash_map, batch, @@ -393,6 +401,13 @@ impl HashedData { /// Remove rows that hash value present in the input /// record batch from the hash map. pub fn update_map(&mut self, input: &RecordBatch) -> DataFusionResult<()> { + common_telemetry::info!( + "[DEBUG] incoming left batch: {}", + datatypes::arrow::util::pretty::pretty_format_batches(&[input.clone()]) + .unwrap() + .to_string() + ); + // get columns for hashing let mut hashes_buffer = Vec::new(); let arrays = self @@ -402,9 +417,12 @@ impl HashedData { .collect::>(); // compute hash + hashes_buffer.resize(input.num_rows(), 0); let hash_values = hash_utils::create_hashes(&arrays, &self.random_state, &mut hashes_buffer)?; + common_telemetry::info!("[DEBUG] hashes to remove: {hash_values:?}"); + // remove those hashes for hash in hash_values { self.hash_map.remove(hash); @@ -415,7 +433,14 @@ impl HashedData { pub fn finish(self) -> DataFusionResult { let valid_indices = self.hash_map.values().copied().collect::>(); - take_batch(&self.batch, &valid_indices) + let result = take_batch(&self.batch, &valid_indices)?; + common_telemetry::info!( + "[DEBUG] right batch: {}", + datatypes::arrow::util::pretty::pretty_format_batches(&[result.clone()]) + .unwrap() + .to_string() + ); + Ok(result) } } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 8aa7cdd95666..8c296982a5b5 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BTreeSet, HashSet, VecDeque}; -use std::fmt::Display; use std::str::FromStr; use std::sync::Arc; use std::time::UNIX_EPOCH; @@ -52,7 +51,7 @@ use crate::error::{ }; use crate::extension_plan::{ build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, - RangeManipulate, SeriesDivide, SeriesNormalize, + RangeManipulate, SeriesDivide, SeriesNormalize, UnionDistinctOn, }; use crate::functions::{ AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta, @@ -1626,19 +1625,61 @@ impl PromPlanner { .unwrap_or_default(); common_telemetry::info!("[DEBUG] tags not in left {tags_not_in_left:?}"); + common_telemetry::info!("[DEBUG] tags not in right {tags_not_in_right:?}"); common_telemetry::info!("[DEBUG] all tags {all_tags:?}"); + // step 0: fild all columns in output schema + let all_columns_set = left + .schema() + .fields() + .iter() + .chain(right.schema().fields().iter()) + .map(|field| field.name().clone()) + .collect::>(); + let mut all_columns = all_columns_set.into_iter().collect::>(); + // sort to ensure the generated schema is not volatile + all_columns.sort_unstable(); + // step 1: align schema using project, fill non-exist columns with null - let left_proj_exprs = - left.schema() - .field_names() - .into_iter() - .map(col) - .chain(tags_not_in_left.iter().map(|tag_name| { - DfExpr::Literal(ScalarValue::Utf8(None)) - // .alias(format!("{left_qualifier_string}.{tag_name}")) - .alias(format!("{tag_name}")) - })); + let left_proj_exprs = all_columns.iter().map(|col| { + if tags_not_in_left.contains(col) { + DfExpr::Literal(ScalarValue::Utf8(None)).alias(format!("{col}")) + } else { + DfExpr::Column(Column::new(left_qualifier.clone(), col)) + } + }); + let right_proj_exprs = all_columns.iter().map(|col| { + if tags_not_in_right.contains(col) { + DfExpr::Literal(ScalarValue::Utf8(None)).alias(format!("{col}")) + } else { + DfExpr::Column(Column::new(right_qualifier.clone(), col)) + } + }); + // let left_proj_exprs = + // left.schema() + // .field_names() + // .into_iter() + // .map(col) + // .chain(tags_not_in_left.iter().map(|tag_name| { + // DfExpr::Literal(ScalarValue::Utf8(None)) + // // .alias(format!("{left_qualifier_string}.{tag_name}")) + // .alias(format!("{tag_name}")) + // })); + // let right_proj_exprs = + // right + // .schema() + // .field_names() + // .into_iter() + // .map(col) + // .chain(tags_not_in_right.iter().map(|tag_name| { + // DfExpr::Literal(ScalarValue::Utf8(None)) + // // .alias(format!("{right_qualifier_string}.{tag_name}")) + // .alias(format!("{tag_name}")) + // })); + + common_telemetry::info!("[DEBUG] left project exprs {left_proj_exprs:?}"); + common_telemetry::info!("[DEBUG] right project exprs {right_proj_exprs:?}"); + let left_projected = LogicalPlanBuilder::from(left) .project(left_proj_exprs) .context(DataFusionPlanningSnafu)? @@ -1646,17 +1687,6 @@ impl PromPlanner { .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu)?; - let right_proj_exprs = - right - .schema() - .field_names() - .into_iter() - .map(col) - .chain(tags_not_in_right.iter().map(|tag_name| { - DfExpr::Literal(ScalarValue::Utf8(None)) - // .alias(format!("{right_qualifier_string}.{tag_name}")) - .alias(format!("{tag_name}")) - })); let right_projected = LogicalPlanBuilder::from(right) .project(right_proj_exprs) .context(DataFusionPlanningSnafu)? @@ -1683,39 +1713,53 @@ impl PromPlanner { }; // sort to ensure the generated plan is not volatile match_columns.sort_unstable(); - match_columns.push(self.ctx.time_index_column.clone().unwrap()); - - // step 3: right anti join - let filter = match_columns - .iter() - .cloned() - .map(|col| { - let left_col = DfExpr::Column(Column::new(left_qualifier.clone(), col)); - left_col.is_not_null() - }) - .reduce(|left, right| left.and(right)) - .unwrap(); // Safety: match columns contains at least the time index col - let right_anti_join = LogicalPlanBuilder::from(left_projected.clone()) - .join_detailed( - right_projected, - JoinType::RightAnti, - (match_columns.clone(), match_columns), - Some(filter), - true, - ) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu)?; + // match_columns.push(self.ctx.time_index_column.clone().unwrap()); + + // // step 3: right anti join + // let filter = match_columns + // .iter() + // .cloned() + // .map(|col| { + // let left_col = DfExpr::Column(Column::new(left_qualifier.clone(), col)); + // left_col.is_not_null() + // }) + // .reduce(|left, right| left.and(right)) + // .unwrap(); // Safety: match columns contains at least the time index col + // let right_anti_join = LogicalPlanBuilder::from(left_projected.clone()) + // .join_detailed( + // right_projected, + // JoinType::RightAnti, + // (match_columns.clone(), match_columns), + // Some(filter), + // true, + // ) + // .context(DataFusionPlanningSnafu)? + // .build() + // .context(DataFusionPlanningSnafu)?; common_telemetry::info!("[DEBUG] left projected {left_projected:?}"); - common_telemetry::info!("[DEBUG] right anti join {right_anti_join:?}"); + common_telemetry::info!("[DEBUG] right anti join {right_projected:?}"); + + // // step 4: union left and right + // let result = LogicalPlanBuilder::from(left_projected) + // .union(right_anti_join) + // .context(DataFusionPlanningSnafu)? + // .build() + // .context(DataFusionPlanningSnafu)?; + + // step 3: build `UnionDistinctOn` plan + let schema = left_projected.schema().clone(); + let union_distinct_on = UnionDistinctOn::new( + left_projected, + right_projected, + match_columns, + self.ctx.time_index_column.clone().unwrap(), + schema, + ); + let result = LogicalPlan::Extension(Extension { + node: Arc::new(union_distinct_on), + }); - // step 4: union left and right - let result = LogicalPlanBuilder::from(left_projected) - .union(right_anti_join) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu)?; Ok(result) } From c0f2abe7b4ea149c771e9f2823527c4f2b65f787 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 27 Dec 2023 23:13:24 +0800 Subject: [PATCH 4/7] fix clippy Signed-off-by: Ruihang Xia --- .../src/extension_plan/union_distinct_on.rs | 33 ++------- src/promql/src/planner.rs | 70 ++----------------- 2 files changed, 8 insertions(+), 95 deletions(-) diff --git a/src/promql/src/extension_plan/union_distinct_on.rs b/src/promql/src/extension_plan/union_distinct_on.rs index 9f289b834731..99da262f1c40 100644 --- a/src/promql/src/extension_plan/union_distinct_on.rs +++ b/src/promql/src/extension_plan/union_distinct_on.rs @@ -250,6 +250,7 @@ impl DisplayAs for UnionDistinctOnExec { } } +#[allow(dead_code)] pub struct UnionDistinctOnStream { left: SendableRecordBatchStream, right: HashedDataFut, @@ -285,7 +286,6 @@ impl UnionDistinctOnStream { } Some(Err(e)) => Poll::Ready(Some(Err(e))), None => { - common_telemetry::info!("[DEBUG] left stream is end"); // left stream is exhausted, so we can send the right part let right = std::mem::replace(&mut self.right, HashedDataFut::Empty); let HashedDataFut::Ready(data) = right else { @@ -342,7 +342,7 @@ impl HashedData { ) -> DataFusionResult { // Collect all batches from the input stream let initial = (Vec::new(), 0); - let (batches, num_rows) = input + let (batches, _num_rows) = input .try_fold(initial, |mut acc, batch| async { // Update rowcount acc.1 += batch.num_rows(); @@ -381,15 +381,6 @@ impl HashedData { // Finilize the hash map let batch = interleave_batches(batches, interleave_indices)?; - common_telemetry::info!( - "[DEBUG] right batch: {}", - datatypes::arrow::util::pretty::pretty_format_batches(&[batch.clone()]) - .unwrap() - .to_string() - ); - - common_telemetry::info!("[DEBUG] initial hash map: {hash_map:?}"); - Ok(Self { hash_map, batch, @@ -401,13 +392,6 @@ impl HashedData { /// Remove rows that hash value present in the input /// record batch from the hash map. pub fn update_map(&mut self, input: &RecordBatch) -> DataFusionResult<()> { - common_telemetry::info!( - "[DEBUG] incoming left batch: {}", - datatypes::arrow::util::pretty::pretty_format_batches(&[input.clone()]) - .unwrap() - .to_string() - ); - // get columns for hashing let mut hashes_buffer = Vec::new(); let arrays = self @@ -421,8 +405,6 @@ impl HashedData { let hash_values = hash_utils::create_hashes(&arrays, &self.random_state, &mut hashes_buffer)?; - common_telemetry::info!("[DEBUG] hashes to remove: {hash_values:?}"); - // remove those hashes for hash in hash_values { self.hash_map.remove(hash); @@ -434,12 +416,6 @@ impl HashedData { pub fn finish(self) -> DataFusionResult { let valid_indices = self.hash_map.values().copied().collect::>(); let result = take_batch(&self.batch, &valid_indices)?; - common_telemetry::info!( - "[DEBUG] right batch: {}", - datatypes::arrow::util::pretty::pretty_format_batches(&[result.clone()]) - .unwrap() - .to_string() - ); Ok(result) } } @@ -466,8 +442,7 @@ fn interleave_batches( } // assemble new record batch - RecordBatch::try_new(schema.clone(), interleaved_arrays) - .map_err(|e| DataFusionError::ArrowError(e)) + RecordBatch::try_new(schema.clone(), interleaved_arrays).map_err(DataFusionError::ArrowError) } /// Utility function to take rows from a record batch. Based on [take](datafusion::arrow::compute::take) @@ -485,7 +460,7 @@ fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult, _>>() - .map_err(|e| DataFusionError::ArrowError(e))?; + .map_err(DataFusionError::ArrowError)?; let result = RecordBatch::try_new(schema, arrays).map_err(DataFusionError::ArrowError)?; Ok(result) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 8c296982a5b5..2b7f6d83e8f4 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -24,8 +24,8 @@ use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction, ScalarUDF}; use datafusion::logical_expr::expr_rewriter::normalize_cols; use datafusion::logical_expr::{ - col, AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, - Extension, LogicalPlan, LogicalPlanBuilder, Operator, ScalarUDF as ScalarUdfDef, + AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension, + LogicalPlan, LogicalPlanBuilder, Operator, ScalarUDF as ScalarUdfDef, }; use datafusion::optimizer::utils; use datafusion::prelude as df_prelude; @@ -1624,10 +1624,6 @@ impl PromPlanner { .map(|r| r.to_string()) .unwrap_or_default(); - common_telemetry::info!("[DEBUG] tags not in left {tags_not_in_left:?}"); - common_telemetry::info!("[DEBUG] tags not in right {tags_not_in_right:?}"); - common_telemetry::info!("[DEBUG] all tags {all_tags:?}"); - // step 0: fild all columns in output schema let all_columns_set = left .schema() @@ -1643,42 +1639,18 @@ impl PromPlanner { // step 1: align schema using project, fill non-exist columns with null let left_proj_exprs = all_columns.iter().map(|col| { if tags_not_in_left.contains(col) { - DfExpr::Literal(ScalarValue::Utf8(None)).alias(format!("{col}")) + DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string()) } else { DfExpr::Column(Column::new(left_qualifier.clone(), col)) } }); let right_proj_exprs = all_columns.iter().map(|col| { if tags_not_in_right.contains(col) { - DfExpr::Literal(ScalarValue::Utf8(None)).alias(format!("{col}")) + DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string()) } else { DfExpr::Column(Column::new(right_qualifier.clone(), col)) } }); - // let left_proj_exprs = - // left.schema() - // .field_names() - // .into_iter() - // .map(col) - // .chain(tags_not_in_left.iter().map(|tag_name| { - // DfExpr::Literal(ScalarValue::Utf8(None)) - // // .alias(format!("{left_qualifier_string}.{tag_name}")) - // .alias(format!("{tag_name}")) - // })); - // let right_proj_exprs = - // right - // .schema() - // .field_names() - // .into_iter() - // .map(col) - // .chain(tags_not_in_right.iter().map(|tag_name| { - // DfExpr::Literal(ScalarValue::Utf8(None)) - // // .alias(format!("{right_qualifier_string}.{tag_name}")) - // .alias(format!("{tag_name}")) - // })); - - common_telemetry::info!("[DEBUG] left project exprs {left_proj_exprs:?}"); - common_telemetry::info!("[DEBUG] right project exprs {right_proj_exprs:?}"); let left_projected = LogicalPlanBuilder::from(left) .project(left_proj_exprs) @@ -1713,40 +1685,6 @@ impl PromPlanner { }; // sort to ensure the generated plan is not volatile match_columns.sort_unstable(); - // match_columns.push(self.ctx.time_index_column.clone().unwrap()); - - // // step 3: right anti join - // let filter = match_columns - // .iter() - // .cloned() - // .map(|col| { - // let left_col = DfExpr::Column(Column::new(left_qualifier.clone(), col)); - // left_col.is_not_null() - // }) - // .reduce(|left, right| left.and(right)) - // .unwrap(); // Safety: match columns contains at least the time index col - // let right_anti_join = LogicalPlanBuilder::from(left_projected.clone()) - // .join_detailed( - // right_projected, - // JoinType::RightAnti, - // (match_columns.clone(), match_columns), - // Some(filter), - // true, - // ) - // .context(DataFusionPlanningSnafu)? - // .build() - // .context(DataFusionPlanningSnafu)?; - - common_telemetry::info!("[DEBUG] left projected {left_projected:?}"); - common_telemetry::info!("[DEBUG] right anti join {right_projected:?}"); - - // // step 4: union left and right - // let result = LogicalPlanBuilder::from(left_projected) - // .union(right_anti_join) - // .context(DataFusionPlanningSnafu)? - // .build() - // .context(DataFusionPlanningSnafu)?; - // step 3: build `UnionDistinctOn` plan let schema = left_projected.schema().clone(); let union_distinct_on = UnionDistinctOn::new( From 3a6e82b534272e2b224b415bae9a1e221218f7f1 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 27 Dec 2023 23:24:13 +0800 Subject: [PATCH 5/7] add sqlness case Signed-off-by: Ruihang Xia --- .../common/promql/set_operation.result | 148 +++++++++++++++++- .../common/promql/set_operation.sql | 36 ++++- 2 files changed, 176 insertions(+), 8 deletions(-) diff --git a/tests/cases/standalone/common/promql/set_operation.result b/tests/cases/standalone/common/promql/set_operation.result index d14b6fe88bc1..15a7a865a317 100644 --- a/tests/cases/standalone/common/promql/set_operation.result +++ b/tests/cases/standalone/common/promql/set_operation.result @@ -130,10 +130,21 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) and ignoring(g, job) -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="production"}; -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` ++------------+----------+-----+---------------------+-------+ +| g | instance | job | ts | val | ++------------+----------+-----+---------------------+-------+ +| canary | 0 | api | 1970-01-01T00:50:00 | 300.0 | +| canary | 0 | app | 1970-01-01T00:50:00 | 700.0 | +| canary | 1 | api | 1970-01-01T00:50:00 | 400.0 | +| canary | 1 | app | 1970-01-01T00:50:00 | 800.0 | +| production | 0 | api | 1970-01-01T00:50:00 | 100.0 | +| production | 0 | app | 1970-01-01T00:50:00 | 500.0 | +| production | 1 | api | 1970-01-01T00:50:00 | 200.0 | +| production | 1 | app | 1970-01-01T00:50:00 | 600.0 | ++------------+----------+-----+---------------------+-------+ -- # On overlap the rhs samples must be dropped. -- eval instant at 50m (http_requests{group="canary"} + 1) or http_requests{instance="1"} @@ -143,10 +154,10 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` -- {group="canary", instance="1", job="app-server"} 801 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"}; -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` +Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named http_requests.val. Valid fields are http_requests.job, http_requests.instance, http_requests.g, http_requests.ts, "val + Float64(1)". -- # Matching only on instance excludes everything that has instance=0/1 but includes -- # entries without the instance label. @@ -161,7 +172,7 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` -- NOT SUPPORTED: `or` tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a); -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` +Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.val. Valid fields are cpu_count.ts. -- eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a) -- {group="canary", instance="0", job="api-server"} 301 @@ -174,7 +185,7 @@ Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` -- NOT SUPPORTED: `or` tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or ignoring(l, g, job) (http_requests or cpu_count or vector_matching_a); -Error: 1004(InvalidArguments), Unsupported expr type: set operation `OR` +Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.val. Valid fields are cpu_count.ts. -- eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"} -- http_requests{group="canary", instance="1", job="api-server"} 400 @@ -268,3 +279,128 @@ drop table vector_matching_a; Affected Rows: 0 +-- the following cases are not from Prometheus. +create table t1 (ts timestamp time index, job string primary key, val double); + +Affected Rows: 0 + +insert into t1 values (0, "a", 1.0), (500000, "b", 2.0), (1000000, "a", 3.0), (1500000, "c", 4.0); + +Affected Rows: 4 + +create table t2 (ts timestamp time index, val double); + +Affected Rows: 0 + +insert into t2 values (0, 0), (300000, 0), (600000, 0), (900000, 0), (1200000, 0), (1500000, 0), (1800000, 0); + +Affected Rows: 7 + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or t2; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on () t2; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on (job) t2; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or t1; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on () t1; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | ++-----+---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on(job) t1; + ++-----+---------------------+-----+ +| job | ts | val | ++-----+---------------------+-----+ +| | 1970-01-01T00:00:00 | 0.0 | +| | 1970-01-01T00:06:40 | 0.0 | +| | 1970-01-01T00:13:20 | 0.0 | +| | 1970-01-01T00:20:00 | 0.0 | +| | 1970-01-01T00:26:40 | 0.0 | +| | 1970-01-01T00:33:20 | 0.0 | +| a | 1970-01-01T00:00:00 | 1.0 | +| a | 1970-01-01T00:20:00 | 3.0 | +| b | 1970-01-01T00:13:20 | 2.0 | +| c | 1970-01-01T00:26:40 | 4.0 | ++-----+---------------------+-----+ + +drop table t1; + +Affected Rows: 0 + +drop table t2; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/set_operation.sql b/tests/cases/standalone/common/promql/set_operation.sql index e91460df3478..6a71711bd896 100644 --- a/tests/cases/standalone/common/promql/set_operation.sql +++ b/tests/cases/standalone/common/promql/set_operation.sql @@ -79,7 +79,7 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) and ignoring(g, job) -- http_requests{group="production", instance="0", job="app-server"} 500 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="production"}; -- # On overlap the rhs samples must be dropped. @@ -90,7 +90,7 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="produc -- {group="canary", instance="1", job="app-server"} 801 -- http_requests{group="production", instance="1", job="api-server"} 200 -- http_requests{group="production", instance="1", job="app-server"} 600 --- NOT SUPPORTED: `or` +-- SQLNESS SORT_RESULT 3 1 tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"}; @@ -173,3 +173,35 @@ drop table http_requests; drop table cpu_count; drop table vector_matching_a; + +-- the following cases are not from Prometheus. + +create table t1 (ts timestamp time index, job string primary key, val double); + +insert into t1 values (0, "a", 1.0), (500000, "b", 2.0), (1000000, "a", 3.0), (1500000, "c", 4.0); + +create table t2 (ts timestamp time index, val double); + +insert into t2 values (0, 0), (300000, 0), (600000, 0), (900000, 0), (1200000, 0), (1500000, 0), (1800000, 0); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or t2; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on () t2; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t1 or on (job) t2; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or t1; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on () t1; + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 2000, '400') t2 or on(job) t1; + +drop table t1; + +drop table t2; From 288e5918953f1fce9f24081d83cbc1c1b553922e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 27 Dec 2023 23:43:04 +0800 Subject: [PATCH 6/7] add UTs Signed-off-by: Ruihang Xia --- src/promql/src/extension_plan.rs | 2 + .../src/extension_plan/instant_manipulate.rs | 50 +------- src/promql/src/extension_plan/test_util.rs | 64 ++++++++++ .../src/extension_plan/union_distinct_on.rs | 111 +++++++++++++++++- 4 files changed, 179 insertions(+), 48 deletions(-) create mode 100644 src/promql/src/extension_plan/test_util.rs diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 0c16c861005a..ff2195e532ee 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -19,6 +19,8 @@ mod normalize; mod planner; mod range_manipulate; mod series_divide; +#[cfg(test)] +mod test_util; mod union_distinct_on; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index ba155627d2c5..e65592bb374e 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -445,40 +445,12 @@ impl InstantManipulateStream { #[cfg(test)] mod test { - use datafusion::arrow::array::Float64Array; - use datafusion::arrow::datatypes::{ - ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, - }; - use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; - use datatypes::arrow::array::TimestampMillisecondArray; - use datatypes::arrow_array::StringArray; use super::*; - - const TIME_INDEX_COLUMN: &str = "timestamp"; - - fn prepare_test_data() -> MemoryExec { - let schema = Arc::new(Schema::new(vec![ - Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), - Field::new("value", DataType::Float64, true), - Field::new("path", DataType::Utf8, true), - ])); - let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ - 0, 30_000, 60_000, 90_000, 120_000, // every 30s - 180_000, 240_000, // every 60s - 241_000, 271_000, 291_000, // others - ])) as _; - let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _; - let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _; - let data = RecordBatch::try_new( - schema.clone(), - vec![timestamp_column, field_column, path_column], - ) - .unwrap(); - - MemoryExec::try_new(&[vec![data]], schema, None).unwrap() - } + use crate::extension_plan::test_util::{ + prepare_test_data, prepare_test_data_with_nan, TIME_INDEX_COLUMN, + }; async fn do_normalize_test( start: Millisecond, @@ -749,22 +721,6 @@ mod test { do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await; } - fn prepare_test_data_with_nan() -> MemoryExec { - let schema = Arc::new(Schema::new(vec![ - Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), - Field::new("value", DataType::Float64, true), - ])); - let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ - 0, 30_000, 60_000, 90_000, 120_000, // every 30s - ])) as _; - let field_column = - Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _; - let data = - RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap(); - - MemoryExec::try_new(&[vec![data]], schema, None).unwrap() - } - #[tokio::test] async fn lookback_10s_interval_10s_with_nan() { let expected = String::from( diff --git a/src/promql/src/extension_plan/test_util.rs b/src/promql/src/extension_plan/test_util.rs new file mode 100644 index 000000000000..f751cb9fa84b --- /dev/null +++ b/src/promql/src/extension_plan/test_util.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utils for testing extension plan + +use std::sync::Arc; + +use common_recordbatch::DfRecordBatch as RecordBatch; +use datafusion::arrow::array::Float64Array; +use datafusion::arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, +}; +use datafusion::physical_plan::memory::MemoryExec; +use datatypes::arrow::array::TimestampMillisecondArray; +use datatypes::arrow_array::StringArray; + +pub(crate) const TIME_INDEX_COLUMN: &str = "timestamp"; + +pub(crate) fn prepare_test_data() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value", DataType::Float64, true), + Field::new("path", DataType::Utf8, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + 180_000, 240_000, // every 60s + 241_000, 271_000, 291_000, // others + ])) as _; + let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _; + let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _; + let data = RecordBatch::try_new( + schema.clone(), + vec![timestamp_column, field_column, path_column], + ) + .unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() +} + +pub(crate) fn prepare_test_data_with_nan() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value", DataType::Float64, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 30_000, 60_000, 90_000, 120_000, // every 30s + ])) as _; + let field_column = Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _; + let data = RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() +} diff --git a/src/promql/src/extension_plan/union_distinct_on.rs b/src/promql/src/extension_plan/union_distinct_on.rs index 99da262f1c40..22551b73f810 100644 --- a/src/promql/src/extension_plan/union_distinct_on.rs +++ b/src/promql/src/extension_plan/union_distinct_on.rs @@ -35,12 +35,31 @@ use datatypes::arrow::compute; use futures::future::BoxFuture; use futures::{ready, Stream, StreamExt, TryStreamExt}; +/// A special kind of `UNION`(`OR` in PromQL) operator, for PromQL specific use case. +/// +/// This operator is similar to `UNION` from SQL, but it only accepts two inputs. The +/// most different part is that it treat left child and right child differently: +/// - All columns from left child will be outputted. +/// - Only check collisions (when not distinct) on the columns specified by `compare_keys`. +/// - When there is a collision: +/// - If the collision is from right child itself, only the first observed row will be +/// preserved. All others are discarded. +/// - If the collision is from left child, the row in right child will be discarded. +/// - The output order is not maintained. This plan will output left child first, then right child. +/// - The output schema contains all columns from left or right child plans. +/// +/// From the implementation perspective, this operator is similar to `HashJoin`, but the +/// probe side is the right child, and the build side is the left child. Another difference +/// is that the probe is opting-out. +/// +/// This plan will exhaust the right child first to build probe hash table, then streaming +/// on left side, and use the left side to "mask" the hash table. #[derive(Debug, PartialEq, Eq, Hash)] pub struct UnionDistinctOn { left: LogicalPlan, right: LogicalPlan, /// The columns to compare for equality. - /// TIME INDEX is not included. + /// TIME INDEX is included. compare_keys: Vec, ts_col: String, output_schema: DFSchemaRef, @@ -250,6 +269,7 @@ impl DisplayAs for UnionDistinctOnExec { } } +// TODO(ruihang): some unused fields are for metrics, which will be implemented later. #[allow(dead_code)] pub struct UnionDistinctOnStream { left: SendableRecordBatchStream, @@ -465,3 +485,92 @@ fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult Date: Thu, 28 Dec 2023 11:26:34 +0800 Subject: [PATCH 7/7] Update src/promql/src/planner.rs Co-authored-by: dennis zhuang --- src/promql/src/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 2b7f6d83e8f4..7c8176d7b95e 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -1624,7 +1624,7 @@ impl PromPlanner { .map(|r| r.to_string()) .unwrap_or_default(); - // step 0: fild all columns in output schema + // step 0: fill all columns in output schema let all_columns_set = left .schema() .fields()