From 2e009465557d93e4dfebc350c8718e0d6628e155 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Wed, 2 Nov 2022 19:36:57 -0400 Subject: [PATCH] Restore support for Week and DayOfYear TimeUnits (#166) * Re-enable week and year-week timeunit tests * Accept week and dayofyear time units * Implement fallback timeunit UDF for cases we don't handle with standard SQL --- .../src/spec/transform/timeunit.rs | 18 +- .../src/expression/compiler/call.rs | 4 + .../src/sql/connection/datafusion_conn.rs | 3 + .../src/transform/timeunit.rs | 373 +++++++++++++++++- .../tests/test_image_comparison.rs | 4 +- 5 files changed, 379 insertions(+), 23 deletions(-) diff --git a/vegafusion-core/src/spec/transform/timeunit.rs b/vegafusion-core/src/spec/transform/timeunit.rs index d87a7934b..5fcac85ee 100644 --- a/vegafusion-core/src/spec/transform/timeunit.rs +++ b/vegafusion-core/src/spec/transform/timeunit.rs @@ -71,24 +71,12 @@ pub enum TimeUnitUnitSpec { impl TransformSpecTrait for TimeUnitTransformSpec { fn supported(&self) -> bool { - if self.units.is_none() + let unsupported = self.units.is_none() || self.step.is_some() || self.extent.is_some() || self.maxbins.is_some() - || self.signal.is_some() - { - false - } else { - if let Some(units) = &self.units { - for unit in units { - if matches!(unit, TimeUnitUnitSpec::Week | TimeUnitUnitSpec::DayOfYear) { - // week and dayofyear units are not yet supported - return false; - } - } - } - true - } + || self.signal.is_some(); + !unsupported } fn output_signals(&self) -> Vec { diff --git a/vegafusion-rt-datafusion/src/expression/compiler/call.rs b/vegafusion-rt-datafusion/src/expression/compiler/call.rs index 9ce637d5b..3a3d4029a 100644 --- a/vegafusion-rt-datafusion/src/expression/compiler/call.rs +++ b/vegafusion-rt-datafusion/src/expression/compiler/call.rs @@ -63,6 +63,7 @@ use crate::expression::compiler::builtin_functions::type_coercion::to_boolean::t use crate::expression::compiler::builtin_functions::type_coercion::to_number::to_number_transform; use crate::expression::compiler::builtin_functions::type_coercion::to_string::to_string_transform; use crate::task_graph::timezone::RuntimeTzConfig; +use crate::transform::timeunit::TIMEUNIT_START_UDF; pub type MacroFn = Arc Result + Send + Sync>; pub type TransformFn = Arc Result + Send + Sync>; @@ -454,6 +455,9 @@ pub fn make_session_context() -> SessionContext { ctx.register_udf((*MAKE_TIMESTAMPTZ).clone()); ctx.register_udf((*TIMESTAMPTZ_TO_EPOCH_MS).clone()); + // timeunit + ctx.register_udf((*TIMEUNIT_START_UDF).clone()); + // timeformat ctx.register_udf((*FORMAT_TIMESTAMP_UDF).clone()); diff --git a/vegafusion-rt-datafusion/src/sql/connection/datafusion_conn.rs b/vegafusion-rt-datafusion/src/sql/connection/datafusion_conn.rs index 98617c9ff..15c71a7c1 100644 --- a/vegafusion-rt-datafusion/src/sql/connection/datafusion_conn.rs +++ b/vegafusion-rt-datafusion/src/sql/connection/datafusion_conn.rs @@ -40,6 +40,9 @@ pub fn make_datafusion_dialect() -> Dialect { functions.insert("make_timestamptz".to_string()); functions.insert("timestamptz_to_epoch_ms".to_string()); + // timeunit + functions.insert("vega_timeunit".to_string()); + // timeformat functions.insert("format_timestamp".to_string()); diff --git a/vegafusion-rt-datafusion/src/transform/timeunit.rs b/vegafusion-rt-datafusion/src/transform/timeunit.rs index ebc61d436..d1e571473 100644 --- a/vegafusion-rt-datafusion/src/transform/timeunit.rs +++ b/vegafusion-rt-datafusion/src/transform/timeunit.rs @@ -9,7 +9,8 @@ use crate::expression::compiler::config::CompilationConfig; use crate::transform::TransformTrait; use async_trait::async_trait; -use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::datatypes::{DataType, TimeUnit as ArrowTimeUnit}; +use datafusion::common::DataFusionError; use datafusion::prelude::col; use std::collections::HashSet; use std::ops::{Add, Div, Mul, Sub}; @@ -26,9 +27,19 @@ use crate::expression::compiler::builtin_functions::date_time::timestamp_to_time use crate::expression::compiler::builtin_functions::date_time::timestamptz_to_timestamp::TIMESTAMPTZ_TO_TIMESTAMP_UDF; use crate::sql::compile::expr::ToSqlExpr; -use datafusion_expr::{floor, lit, BuiltinScalarFunction, Expr}; +use crate::expression::compiler::builtin_functions::date_time::process_input_datetime; +use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, TimeZone, Timelike, Utc, Weekday}; +use datafusion_expr::{ + floor, lit, BuiltinScalarFunction, ColumnarValue, Expr, ReturnTypeFunction, + ScalarFunctionImplementation, ScalarUDF, Signature, TypeSignature, Volatility, +}; use itertools::Itertools; use sqlgen::dialect::DialectDisplay; +use std::str::FromStr; +use vegafusion_core::arrow::array::{ArrayRef, Int64Array, TimestampMillisecondArray}; +use vegafusion_core::arrow::compute::unary; +use vegafusion_core::arrow::temporal_conversions::date64_to_datetime; +use vegafusion_core::data::scalar::ScalarValue; // Implementation of timeunit start using the SQL DATE_TRUNC function fn timeunit_date_trunc( @@ -239,6 +250,98 @@ fn timeunit_weekday(field: &str, tz_str: &Option) -> Result<(Expr, Strin Ok((start_expr, "1 DAY".to_string())) } +// Fallback implementation of timeunit that uses a custom DataFusion UDF +fn timeunit_custom_udf( + field: &str, + units_set: &HashSet, + tz_str: &Option, +) -> Result<(Expr, String)> { + let units_mask = vec![ + units_set.contains(&TimeUnitUnit::Year), // 0 + units_set.contains(&TimeUnitUnit::Quarter), // 1 + units_set.contains(&TimeUnitUnit::Month), // 2 + units_set.contains(&TimeUnitUnit::Date), // 3 + units_set.contains(&TimeUnitUnit::Week), // 4 + units_set.contains(&TimeUnitUnit::Day), // 5 + units_set.contains(&TimeUnitUnit::DayOfYear), // 6 + units_set.contains(&TimeUnitUnit::Hours), // 7 + units_set.contains(&TimeUnitUnit::Minutes), // 8 + units_set.contains(&TimeUnitUnit::Seconds), // 9 + units_set.contains(&TimeUnitUnit::Milliseconds), // 10 + ]; + + let timeunit_start_udf = &TIMEUNIT_START_UDF; + + let tz_str = tz_str + .as_ref() + .map(|tz| tz.to_string()) + .unwrap_or_else(|| "UTC".to_string()); + + let timeunit_start_value = timeunit_start_udf.call(vec![ + col(field), + lit(tz_str), + lit(units_mask[0]), + lit(units_mask[1]), + lit(units_mask[2]), + lit(units_mask[3]), + lit(units_mask[4]), + lit(units_mask[5]), + lit(units_mask[6]), + lit(units_mask[7]), + lit(units_mask[8]), + lit(units_mask[9]), + lit(units_mask[10]), + ]); + + // Initialize interval string, this will be overwritten with the smallest specified unit + let mut interval_str = "".to_string(); + + // Year + if units_set.contains(&TimeUnitUnit::Year) { + interval_str = "1 YEAR".to_string(); + } + + // Quarter + if units_set.contains(&TimeUnitUnit::Quarter) { + interval_str = "3 MONTH".to_string(); + } + + // Month + if units_set.contains(&TimeUnitUnit::Month) { + interval_str = "1 MONTH".to_string(); + } + + // Week + if units_set.contains(&TimeUnitUnit::Week) { + interval_str = "1 WEEK".to_string(); + } + + // Day + if units_set.contains(&TimeUnitUnit::Date) + || units_set.contains(&TimeUnitUnit::DayOfYear) + || units_set.contains(&TimeUnitUnit::Day) + { + interval_str = "1 DAY".to_string(); + } + + // Hour + if units_set.contains(&TimeUnitUnit::Hours) { + interval_str = "1 HOUR".to_string(); + } + + // Minute + if units_set.contains(&TimeUnitUnit::Minutes) { + interval_str = "1 MINUTE".to_string(); + } + + // Second + if units_set.contains(&TimeUnitUnit::Seconds) { + interval_str = "1 SECOND".to_string(); + } + + Ok((timeunit_start_value, interval_str)) +} + #[async_trait] impl TransformTrait for TimeUnit { async fn eval( @@ -314,10 +417,8 @@ impl TransformTrait for TimeUnit { if units_set.is_subset(&date_part_units) { timeunit_date_part(&self.field, &units_set, &tz_str)? } else { - return Err(VegaFusionError::internal(format!( - "Unsupported combination of timeunit units: {:?}", - units_vec - ))); + // Fallback to custom UDF + timeunit_custom_udf(&self.field, &units_set, &tz_str)? } } }; @@ -412,3 +513,263 @@ impl TransformTrait for TimeUnit { Ok((dataframe, Vec::new())) } } + +fn extract_bool(value: &ColumnarValue) -> std::result::Result { + if let ColumnarValue::Scalar(scalar) = value { + if let ScalarValue::Boolean(Some(value)) = scalar { + Ok(*value) + } else { + Err(DataFusionError::Internal( + "expected boolean value".to_string(), + )) + } + } else { + Err(DataFusionError::Internal("unexpected argument".to_string())) + } +} + +fn unpack_timeunit_udf_args( + columns: &[ColumnarValue], +) -> std::result::Result<(ArrayRef, chrono_tz::Tz, Vec), DataFusionError> { + let tz_str = if let ColumnarValue::Scalar(scalar) = &columns[1] { + scalar.to_string() + } else { + return Err(DataFusionError::Internal("unexpected argument".to_string())); + }; + + let tz = chrono_tz::Tz::from_str(&tz_str).map_err(|_err| { + DataFusionError::Internal(format!("Failed to parse {} as a timezone", tz_str)) + })?; + + let timestamp = columns[0].clone().into_array(1); + let timestamp = process_input_datetime(×tamp, &tz); + + Ok(( + timestamp, + tz, + vec![ + extract_bool(&columns[2])?, + extract_bool(&columns[3])?, + extract_bool(&columns[4])?, + extract_bool(&columns[5])?, + extract_bool(&columns[6])?, + extract_bool(&columns[7])?, + extract_bool(&columns[8])?, + extract_bool(&columns[9])?, + extract_bool(&columns[10])?, + extract_bool(&columns[11])?, + extract_bool(&columns[12])?, + ], + )) +} + +/// For timestamp specified in UTC, perform time unit in the provided timezone (either UTC or Local) +fn perform_timeunit_start_from_utc( + value: i64, + units_mask: &[bool], + in_tz: T, +) -> DateTime { + // Load and interpret date time as UTC + let dt_value = date64_to_datetime(value).with_nanosecond(0).unwrap(); + let dt_value = Utc.from_local_datetime(&dt_value).earliest().unwrap(); + let mut dt_value = dt_value.with_timezone(&in_tz); + + // Handle time truncation + if !units_mask[7] { + // Clear hours first to avoid any of the other time truncations from landing on a daylight + // savings boundary + dt_value = dt_value.with_hour(0).unwrap(); + } + + if !units_mask[10] { + // Milliseconds + let new_ns = (((dt_value.nanosecond() as f64) / 1e6).floor() * 1e6) as u32; + dt_value = dt_value.with_nanosecond(new_ns).unwrap(); + } + + if !units_mask[9] { + // Seconds + dt_value = dt_value.with_second(0).unwrap(); + } + + if !units_mask[8] { + // Minutes + dt_value = dt_value.with_minute(0).unwrap(); + } + + // Save off day of the year and weekday here, because these will change if the + // year is changed + let ordinal0 = dt_value.ordinal0(); + let weekday = dt_value.weekday(); + + // Handle year truncation + // (if we're not truncating to week number, this is handled separately below) + if !units_mask[0] && !units_mask[4] { + // Year + dt_value = if let Some(v) = dt_value.with_year(2012) { + v + } else { + // The above can fail if changing to 2012 lands on daylight savings + // e.g. March 11th at 2am in 2015 + let hour = dt_value.hour(); + dt_value + .with_hour(0) + .unwrap() + .with_year(2012) + .unwrap() + .with_hour(hour + 1) + .unwrap() + } + } + + // Handle date (of the year) truncation. + // For simplicity, only one of these is valid at the same time for now + if units_mask[1] { + // Quarter + // Truncate to Quarter + let new_month = ((dt_value.month0() as f64 / 3.0).floor() * 3.0) as u32; + dt_value = dt_value + .with_day0(0) + .unwrap() + .with_month0(new_month) + .unwrap(); + } else if units_mask[2] { + // Month and not Date + // Truncate to first day of the month + if !units_mask[3] { + dt_value = dt_value.with_day0(0).unwrap(); + } + } else if units_mask[3] { + // Date and not Month + // Normalize to January, keeping existing day of the month. + // (January has 31 days, so this is safe) + if !units_mask[2] { + dt_value = dt_value.with_month0(0).unwrap(); + } + } else if units_mask[4] { + // Week + // Step 1: Find the date of the first Sunday in the same calendar year as the date. + // This may occur in isoweek 0, or in the final isoweek of the previous year + + let isoweek0_sunday = NaiveDate::from_isoywd(dt_value.year(), 1, Weekday::Sun); + + let isoweek0_sunday = NaiveDateTime::new(isoweek0_sunday, dt_value.time()); + let isoweek0_sunday = in_tz + .from_local_datetime(&isoweek0_sunday) + .earliest() + .unwrap(); + + // Subtract one week from isoweek0_sunday and check if it's still in the same calendar + // year + let week_duration = chrono::Duration::weeks(1); + let candidate_sunday = isoweek0_sunday.clone() - week_duration; + + let first_sunday_of_year = if candidate_sunday.year() == dt_value.year() { + candidate_sunday + } else { + isoweek0_sunday + }; + + // Step 2: Find the ordinal date of the first sunday of the year + let first_sunday_ordinal0 = first_sunday_of_year.ordinal0(); + + // Step 3: Compare ordinal value of first sunday with that of dt_value + let ordinal_delta = ordinal0 as i32 - first_sunday_ordinal0 as i32; + + // Compute how many whole weeks have passed since the first sunday of the year. + // If date is prior to the first sunday in the calendar year, this will evaluate to -1. + let week_number = (ordinal_delta as f64 / 7.0).floor() as i64; + + // Handle year truncation + if !units_mask[0] { + // Calendar year 2012. use weeks offset from the first Sunday of 2012 + // (which is January 1st) + let first_sunday_of_2012 = in_tz + .from_local_datetime(&NaiveDateTime::new( + NaiveDate::from_ymd(2012, 1, 1), + dt_value.time(), + )) + .earliest() + .unwrap(); + + dt_value = first_sunday_of_2012 + chrono::Duration::weeks(week_number); + } else { + // Don't change calendar year, use weeks offset from first sunday of the year + dt_value = first_sunday_of_year + chrono::Duration::weeks(week_number); + } + } else if units_mask[5] { + // Day + // Keep weekday, but make sure Sunday comes before Monday + let new_date = if weekday == Weekday::Sun { + NaiveDate::from_isoywd(dt_value.year(), 1, weekday) + } else { + NaiveDate::from_isoywd(dt_value.year(), 2, weekday) + }; + let new_datetime = NaiveDateTime::new(new_date, dt_value.time()); + dt_value = in_tz.from_local_datetime(&new_datetime).earliest().unwrap(); + } else if units_mask[6] { + // DayOfYear + // Keep the same day of the year + dt_value = dt_value.with_ordinal0(ordinal0).unwrap(); + } else { + // Clear month and date + dt_value = dt_value.with_ordinal0(0).unwrap(); + } + + dt_value +} + +fn make_timeunit_start_udf() -> ScalarUDF { + let timeunit_start: ScalarFunctionImplementation = Arc::new(|columns: &[ColumnarValue]| { + let (timestamp, tz, units_mask) = unpack_timeunit_udf_args(columns)?; + + let array = timestamp.as_any().downcast_ref::().unwrap(); + let result_array: TimestampMillisecondArray = unary(array, |value| { + perform_timeunit_start_from_utc(value, units_mask.as_slice(), tz).timestamp_millis() + }); + + Ok(ColumnarValue::Array(Arc::new(result_array) as ArrayRef)) + }); + + let return_type: ReturnTypeFunction = Arc::new(move |_datatypes| { + Ok(Arc::new(DataType::Timestamp( + ArrowTimeUnit::Millisecond, + None, + ))) + }); + + let make_sig = |timestamp_dtype: DataType| -> TypeSignature { + TypeSignature::Exact(vec![ + timestamp_dtype, // [0] timestamp + DataType::Utf8, // [1] timezone + DataType::Boolean, // [2] Year + DataType::Boolean, // [3] Quarter + DataType::Boolean, // [4] Month + DataType::Boolean, // [5] Date + DataType::Boolean, // [6] Week + DataType::Boolean, // [7] Day + DataType::Boolean, // [8] DayOfYear + DataType::Boolean, // [9] Hours + DataType::Boolean, // [10] Minutes + DataType::Boolean, // [11] Seconds + DataType::Boolean, // [12] Milliseconds + ]) + }; + + let signature: Signature = Signature::one_of( + vec![ + make_sig(DataType::Int64), + make_sig(DataType::Date64), + make_sig(DataType::Date32), + make_sig(DataType::Timestamp(ArrowTimeUnit::Millisecond, None)), + make_sig(DataType::Timestamp(ArrowTimeUnit::Nanosecond, None)), + ], + Volatility::Immutable, + ); + + ScalarUDF::new("vega_timeunit", &signature, &return_type, &timeunit_start) +} + +lazy_static! { + pub static ref TIMEUNIT_START_UDF: ScalarUDF = make_timeunit_start_udf(); +} diff --git a/vegafusion-rt-datafusion/tests/test_image_comparison.rs b/vegafusion-rt-datafusion/tests/test_image_comparison.rs index a6c0f908b..bf7e028b1 100644 --- a/vegafusion-rt-datafusion/tests/test_image_comparison.rs +++ b/vegafusion-rt-datafusion/tests/test_image_comparison.rs @@ -910,12 +910,12 @@ mod test_image_comparison_timeunit { vec![TimeUnitUnitSpec::Year], vec![TimeUnitUnitSpec::Quarter], vec![TimeUnitUnitSpec::Month], - // vec![TimeUnitUnitSpec::Week], + vec![TimeUnitUnitSpec::Week], vec![TimeUnitUnitSpec::Date], vec![TimeUnitUnitSpec::Day], vec![TimeUnitUnitSpec::Year, TimeUnitUnitSpec::Quarter], vec![TimeUnitUnitSpec::Year, TimeUnitUnitSpec::Month], - // vec![TimeUnitUnitSpec::Year, TimeUnitUnitSpec::Week], + vec![TimeUnitUnitSpec::Year, TimeUnitUnitSpec::Week], )] units: Vec,