From 6a4132195f6b50e3da9e7348ded10da96a96d0ec Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 2 Apr 2023 16:20:32 +0800 Subject: [PATCH] feat: impl quantile_over_time function (#1287) * fix qualifier alias Signed-off-by: Ruihang Xia * fix in another way Signed-off-by: Ruihang Xia * impl quantile_over_time Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/common/function-macro/src/range_fn.rs | 21 +- src/promql/src/extension_plan/empty_metric.rs | 11 +- src/promql/src/functions.rs | 2 + src/promql/src/functions/quantile.rs | 210 ++++++++++++++++++ src/promql/src/planner.rs | 16 +- 5 files changed, 240 insertions(+), 20 deletions(-) create mode 100644 src/promql/src/functions/quantile.rs diff --git a/src/common/function-macro/src/range_fn.rs b/src/common/function-macro/src/range_fn.rs index c7fcedbdae4f..326c0cb2b5f7 100644 --- a/src/common/function-macro/src/range_fn.rs +++ b/src/common/function-macro/src/range_fn.rs @@ -36,6 +36,8 @@ macro_rules! ok { } pub(crate) fn process_range_fn(args: TokenStream, input: TokenStream) -> TokenStream { + let mut result = TokenStream::new(); + // extract arg map let arg_pairs = parse_macro_input!(args as AttributeArgs); let arg_span = arg_pairs[0].span(); @@ -59,12 +61,17 @@ pub(crate) fn process_range_fn(args: TokenStream, input: TokenStream) -> TokenSt let arg_types = ok!(extract_input_types(inputs)); // build the struct and its impl block - let struct_code = build_struct( - attrs, - vis, - ok!(get_ident(&arg_map, "name", arg_span)), - ok!(get_ident(&arg_map, "display_name", arg_span)), - ); + // only do this when `display_name` is specified + if let Ok(display_name) = get_ident(&arg_map, "display_name", arg_span) { + let struct_code = build_struct( + attrs, + vis, + ok!(get_ident(&arg_map, "name", arg_span)), + display_name, + ); + result.extend(struct_code); + } + let calc_fn_code = build_calc_fn( ok!(get_ident(&arg_map, "name", arg_span)), arg_types, @@ -77,8 +84,6 @@ pub(crate) fn process_range_fn(args: TokenStream, input: TokenStream) -> TokenSt } .into(); - let mut result = TokenStream::new(); - result.extend(struct_code); result.extend(calc_fn_code); result.extend(input_fn_code); result diff --git a/src/promql/src/extension_plan/empty_metric.rs b/src/promql/src/extension_plan/empty_metric.rs index f2df04f2ea3f..0fe4aa236294 100644 --- a/src/promql/src/extension_plan/empty_metric.rs +++ b/src/promql/src/extension_plan/empty_metric.rs @@ -30,7 +30,6 @@ use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }; use datafusion::prelude::Expr; -use datafusion::sql::TableReference; use datatypes::arrow::array::TimestampMillisecondArray; use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::record_batch::RecordBatch; @@ -57,17 +56,12 @@ impl EmptyMetric { let schema = Arc::new(DFSchema::new_with_metadata( vec![ DFField::new( - None::, + Some(""), &time_index_column_name, DataType::Timestamp(TimeUnit::Millisecond, None), false, ), - DFField::new( - None::, - &value_column_name, - DataType::Float64, - true, - ), + DFField::new(Some(""), &value_column_name, DataType::Float64, true), ], HashMap::new(), )?); @@ -81,7 +75,6 @@ impl EmptyMetric { } pub fn to_execution_plan(&self) -> Arc { - // let schema = self.schema.to Arc::new(EmptyMetricExec { start: self.start, end: self.end, diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs index f83c4e5d071c..fcfe10889a66 100644 --- a/src/promql/src/functions.rs +++ b/src/promql/src/functions.rs @@ -17,6 +17,7 @@ mod changes; mod deriv; mod extrapolate_rate; mod idelta; +mod quantile; mod resets; #[cfg(test)] mod test_util; @@ -30,6 +31,7 @@ use datafusion::error::DataFusionError; use datafusion::physical_plan::ColumnarValue; pub use extrapolate_rate::{Delta, Increase, Rate}; pub use idelta::IDelta; +pub use quantile::QuantileOverTime; pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result { if let ColumnarValue::Array(array) = columnar_value { diff --git a/src/promql/src/functions/quantile.rs b/src/promql/src/functions/quantile.rs new file mode 100644 index 000000000000..2191cb0e61e9 --- /dev/null +++ b/src/promql/src/functions/quantile.rs @@ -0,0 +1,210 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datafusion::arrow::array::Float64Array; +use datafusion::arrow::datatypes::TimeUnit; +use datafusion::common::DataFusionError; +use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::physical_plan::ColumnarValue; +use datatypes::arrow::array::Array; +use datatypes::arrow::datatypes::DataType; + +use crate::error; +use crate::functions::extract_array; +use crate::range_array::RangeArray; + +pub struct QuantileOverTime { + quantile: f64, +} + +impl QuantileOverTime { + fn new(quantile: f64) -> Self { + Self { quantile } + } + + pub const fn name() -> &'static str { + "prom_quantile_over_time" + } + + pub fn scalar_udf(quantile: f64) -> ScalarUDF { + ScalarUDF { + name: Self::name().to_string(), + signature: Signature::new( + TypeSignature::Exact(Self::input_type()), + Volatility::Immutable, + ), + return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), + fun: Arc::new(move |input| Self::new(quantile).calc(input)), + } + } + + // time index column and value column + fn input_type() -> Vec { + vec![ + RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), + RangeArray::convert_data_type(DataType::Float64), + ] + } + + fn return_type() -> DataType { + DataType::Float64 + } + + fn calc(&self, input: &[ColumnarValue]) -> Result { + // construct matrix from input. + // The third one is quantile param, which is included in fields. + assert_eq!(input.len(), 3); + let ts_array = extract_array(&input[0])?; + let value_array = extract_array(&input[1])?; + + let ts_range: RangeArray = RangeArray::try_new(ts_array.data().clone().into())?; + let value_range: RangeArray = RangeArray::try_new(value_array.data().clone().into())?; + error::ensure( + ts_range.len() == value_range.len(), + DataFusionError::Execution(format!( + "{}: input arrays should have the same length, found {} and {}", + Self::name(), + ts_range.len(), + value_range.len() + )), + )?; + error::ensure( + ts_range.value_type() == DataType::Timestamp(TimeUnit::Millisecond, None), + DataFusionError::Execution(format!( + "{}: expect TimestampMillisecond as time index array's type, found {}", + Self::name(), + ts_range.value_type() + )), + )?; + error::ensure( + value_range.value_type() == DataType::Float64, + DataFusionError::Execution(format!( + "{}: expect Float64 as value array's type, found {}", + Self::name(), + value_range.value_type() + )), + )?; + + // calculation + let mut result_array = Vec::with_capacity(ts_range.len()); + + for index in 0..ts_range.len() { + let timestamps = ts_range.get(index).unwrap(); + let values = value_range.get(index).unwrap(); + let values = values + .as_any() + .downcast_ref::() + .unwrap() + .values(); + error::ensure( + timestamps.len() == values.len(), + DataFusionError::Execution(format!( + "{}: input arrays should have the same length, found {} and {}", + Self::name(), + timestamps.len(), + values.len() + )), + )?; + + let retule = quantile_impl(values, self.quantile); + + result_array.push(retule); + } + + let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + Ok(result) + } +} + +/// Refer to https://github.com/prometheus/prometheus/blob/6e2905a4d4ff9b47b1f6d201333f5bd53633f921/promql/quantile.go#L357-L386 +fn quantile_impl(values: &[f64], quantile: f64) -> Option { + if quantile.is_nan() || values.is_empty() { + return Some(f64::NAN); + } + if quantile < 0.0 { + return Some(f64::NEG_INFINITY); + } + if quantile > 1.0 { + return Some(f64::INFINITY); + } + + let mut values = values.to_vec(); + values.sort_unstable_by(f64::total_cmp); + + let length = values.len(); + let rank = quantile * (length - 1) as f64; + + let lower_index = 0.max(rank.floor() as usize); + let upper_index = (length - 1).min(lower_index + 1); + let weight = rank - rank.floor(); + + let result = values[lower_index] * (1.0 - weight) + values[upper_index] * weight; + Some(result) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_quantile_impl_empty() { + let values = &[]; + let q = 0.5; + assert!(quantile_impl(values, q).unwrap().is_nan()); + } + + #[test] + fn test_quantile_impl_nan() { + let values = &[1.0, 2.0, 3.0]; + let q = f64::NAN; + assert!(quantile_impl(values, q).unwrap().is_nan()); + } + + #[test] + fn test_quantile_impl_negative_quantile() { + let values = &[1.0, 2.0, 3.0]; + let q = -0.5; + assert_eq!(quantile_impl(values, q).unwrap(), f64::NEG_INFINITY); + } + + #[test] + fn test_quantile_impl_greater_than_one_quantile() { + let values = &[1.0, 2.0, 3.0]; + let q = 1.5; + assert_eq!(quantile_impl(values, q).unwrap(), f64::INFINITY); + } + + #[test] + fn test_quantile_impl_single_element() { + let values = &[1.0]; + let q = 0.8; + assert_eq!(quantile_impl(values, q).unwrap(), 1.0); + } + + #[test] + fn test_quantile_impl_even_length() { + let values = &[3.0, 1.0, 5.0, 2.0]; + let q = 0.5; + assert_eq!(quantile_impl(values, q).unwrap(), 2.5); + } + + #[test] + fn test_quantile_impl_odd_length() { + let values = &[4.0, 1.0, 3.0, 2.0, 5.0]; + let q = 0.25; + assert_eq!(quantile_impl(values, q).unwrap(), 2.0); + } +} diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 4813760aae6c..e31696482351 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -44,15 +44,15 @@ use table::table::adapter::DfTableProviderAdapter; use crate::error::{ CatalogSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, ExpectRangeSelectorSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, - UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu, - ZeroRangeSelectorSnafu, + UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, + ValueNotFoundSnafu, ZeroRangeSelectorSnafu, }; use crate::extension_plan::{ EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, }; use crate::functions::{ AbsentOverTime, AvgOverTime, CountOverTime, Delta, IDelta, Increase, LastOverTime, MaxOverTime, - MinOverTime, PresentOverTime, Rate, SumOverTime, + MinOverTime, PresentOverTime, QuantileOverTime, Rate, SumOverTime, }; const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; @@ -692,6 +692,16 @@ impl PromPlanner { "last_over_time" => ScalarFunc::Udf(LastOverTime::scalar_udf()), "absent_over_time" => ScalarFunc::Udf(AbsentOverTime::scalar_udf()), "present_over_time" => ScalarFunc::Udf(PresentOverTime::scalar_udf()), + "quantile_over_time" => { + let quantile_expr = match other_input_exprs.get(0) { + Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => *quantile, + other => UnexpectedPlanExprSnafu { + desc: format!("expect f64 literal as quantile, but found {:?}", other), + } + .fail()?, + }; + ScalarFunc::Udf(QuantileOverTime::scalar_udf(quantile_expr)) + } _ => ScalarFunc::DataFusionBuiltin( BuiltinScalarFunction::from_str(func.name).map_err(|_| { UnsupportedExprSnafu {