From 76197574559ec8f8711cb768ad2c5b0f00064387 Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Sat, 27 Apr 2024 10:52:58 +0530 Subject: [PATCH] casting str to timestamp --- .../execution/datafusion/expressions/cast.rs | 193 +++++++++++++++++- 1 file changed, 185 insertions(+), 8 deletions(-) diff --git a/core/src/execution/datafusion/expressions/cast.rs b/core/src/execution/datafusion/expressions/cast.rs index 10079855d..8f5603fa6 100644 --- a/core/src/execution/datafusion/expressions/cast.rs +++ b/core/src/execution/datafusion/expressions/cast.rs @@ -16,23 +16,19 @@ // under the License. use std::{ - any::Any, - fmt::{Display, Formatter}, - hash::{Hash, Hasher}, - sync::Arc, + any::Any, fmt::{Display, Formatter}, hash::{Hash, Hasher}, sync::Arc }; use crate::errors::{CometError, CometResult}; use arrow::{ - compute::{cast_with_options, CastOptions}, - record_batch::RecordBatch, - util::display::FormatOptions, + compute::{cast_with_options, CastOptions}, datatypes::TimestampMillisecondType, record_batch::RecordBatch, util::display::FormatOptions }; -use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait}; +use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray}; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; use datafusion_common::{internal_err, Result as DataFusionResult, ScalarValue}; use datafusion_physical_expr::PhysicalExpr; +use regex::Regex; use crate::execution::datafusion::expressions::utils::{ array_with_timezone, down_cast_any_ref, spark_cast, @@ -64,6 +60,26 @@ pub struct Cast { pub timezone: String, } +// It will be useful if we want to extend support to various timestamp formats +// right now it is millisecond timestamp +macro_rules! cast_utf8_to_timestamp { + ($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident) => {{ + let len = $array.len(); + let mut cast_array = PrimitiveArray::<$array_type>::builder(len); + for i in 0..len { + if $array.is_null(i) { + cast_array.append_null() + } else if let Some(cast_value) = $cast_method($array.value(i).trim(), $eval_mode)? { + cast_array.append_value(cast_value); + } else { + cast_array.append_null() + } + } + let result: CometResult = Ok(Arc::new(cast_array.finish()) as ArrayRef); + result.unwrap() + }}; +} + impl Cast { pub fn new( child: Arc, @@ -103,12 +119,37 @@ impl Cast { (DataType::LargeUtf8, DataType::Boolean) => { Self::spark_cast_utf8_to_boolean::(&array, self.eval_mode)? } + (DataType::UInt8, DataType::Timestamp(_, _)) => { + Self::cast_string_to_timestamp(&array, to_type, self.eval_mode)? + } _ => cast_with_options(&array, to_type, &CAST_OPTIONS)?, }; let result = spark_cast(cast_result, from_type, to_type); Ok(result) } + fn cast_string_to_timestamp( + array: &ArrayRef, + to_type: &DataType, + eval_mode: EvalMode, + ) -> CometResult { + let string_array = array + .as_any() + .downcast_ref::>() + .expect("Expected a string array"); + + let cast_array: ArrayRef = match to_type { + DataType::Timestamp(_, _) => { + cast_utf8_to_timestamp!(string_array, eval_mode, TimestampMillisecondType, parse_timestamp) + } + _ => unreachable!( + "Invalid data type {:?} in cast from string", + to_type + ), + }; + Ok(cast_array) + } + fn spark_cast_utf8_to_boolean( from: &dyn Array, eval_mode: EvalMode, @@ -222,3 +263,139 @@ impl PhysicalExpr for Cast { self.hash(&mut s); } } + +fn parse_timestamp(value: &str, eval_mode: EvalMode) -> CometResult> { + let value = value.trim(); + if value.is_empty() { + return Ok(None); + } + + // Define regex patterns and corresponding parsing functions + let patterns = &[ + (Regex::new(r"^\d{4}$").unwrap(), parse_str_to_year_timestamp as fn(&str) -> CometResult>), + (Regex::new(r"^\d{4}-\d{2}$").unwrap(), parse_str_to_month_timestamp), + (Regex::new(r"^\d{4}-\d{2}-\d{2}$").unwrap(), parse_str_to_day_timestamp), + (Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{1,2}$").unwrap(), parse_str_to_hour_timestamp), + (Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}$").unwrap(), parse_str_to_minute_timestamp), + (Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$").unwrap(), parse_str_to_second_timestamp), + (Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}$").unwrap(), parse_str_to_nanosecond_timestamp), + (Regex::new(r"^T\d{1,2}$").unwrap(), parse_str_to_time_only_timestamp), + ]; + + let mut timestamp = None; + + // Iterate through patterns and try matching + for (pattern, parse_func) in patterns { + if pattern.is_match(value) { + timestamp = parse_func(value)?; + break; + } + } + + if eval_mode == EvalMode::Ansi && timestamp.is_none() { + return Err(CometError::CastInvalidValue { + value: value.to_string(), + from_type: "STRING".to_string(), + to_type: "TIMESTAMP".to_string(), + }); + } + + Ok(Some(timestamp.unwrap())) +} + +fn parse_ymd_timestamp(year: i32, month: u32, day: u32) -> CometResult> { + let datetime = chrono::NaiveDate::from_ymd_opt(year, month, day); + let timestamp = datetime.unwrap().and_hms_milli_opt(0, 0, 0, 0); + Ok(Some(timestamp.unwrap().and_utc().timestamp_millis())) +} + +fn parse_hms_timestamp(year: i32, month: u32, day: u32, hour: u32, minute: u32, second: u32, millisecond: u32) -> CometResult> { + let datetime = chrono::NaiveDate::from_ymd_opt(year, month, day); + let timestamp = datetime.unwrap().and_hms_nano_opt(hour, minute, second, millisecond); + Ok(Some(timestamp.unwrap().and_utc().timestamp_millis())) +} + +fn get_timestamp_values(value: &str, timestamp_type: &str) -> CometResult> { + let values: Vec<_> = value.split(|c| c == 'T' || c == '-' || c == ':' || c == '.' ).collect(); + let year = values[0].parse::().unwrap_or_default(); + let month = values.get(1).map_or(1, |m| m.parse::().unwrap_or(1)); + let day = values.get(2).map_or(1, |d| d.parse::().unwrap_or(1)); + let hour = values.get(3).map_or(0, |h| h.parse::().unwrap_or(0)); + let minute = values.get(4).map_or(0, |m| m.parse::().unwrap_or(0)); + let second = values.get(5).map_or(0, |s| s.parse::().unwrap_or(0)); + let millisecond = values.get(6).map_or(0, |ms| ms.parse::().unwrap_or(0)); + + match timestamp_type { + "year" => parse_ymd_timestamp(year, 1, 1), + "month" => parse_ymd_timestamp(year, month, 1), + "day" => parse_ymd_timestamp(year, month, day), + "hour" => parse_hms_timestamp(year, month, day, hour, 0, 0, 0), + "minute" => parse_hms_timestamp(year, month, day, hour, minute, 0, 0), + "second" => parse_hms_timestamp(year, month, day, hour, minute, second, 0), + "millisecond" => parse_hms_timestamp(year, month, day, hour, minute, second, millisecond), + _ => Err(CometError::CastInvalidValue { + value: value.to_string(), + from_type: "STRING".to_string(), + to_type: "TIMESTAMP".to_string(), + }), + } +} + +fn parse_str_to_year_timestamp(value: &str) -> CometResult> { + get_timestamp_values(value, "year") +} + +fn parse_str_to_month_timestamp(value: &str) -> CometResult> { + get_timestamp_values(value, "month") +} + +fn parse_str_to_day_timestamp(value: &str) -> CometResult> { + get_timestamp_values(value, "day") +} + +fn parse_str_to_hour_timestamp(value: &str) -> CometResult> { + get_timestamp_values(value, "hour") +} + +fn parse_str_to_minute_timestamp(value: &str) -> CometResult> { + get_timestamp_values(value, "minute") +} + +fn parse_str_to_second_timestamp(value: &str) -> CometResult> { + get_timestamp_values(value, "second") +} + +fn parse_str_to_nanosecond_timestamp(value: &str) -> CometResult> { + get_timestamp_values(value, "millisecond") +} + +fn parse_str_to_time_only_timestamp(value: &str) -> CometResult> { + let values: Vec<_> = value.split("T").collect(); + let time_values: Vec<_> = values[1].split(":").collect(); + let hour = time_values[0].parse::().unwrap(); + let minute = time_values.get(1).map_or(0, |m| m.parse::().unwrap_or(0)); + let second = time_values.get(2).map_or(0, |s| s.parse::().unwrap_or(0)); + let millisecond = time_values.get(3).map_or(0, |ms| ms.parse::().unwrap_or(0)); + let datetime = chrono::Local::now().to_utc().date_naive(); + let timestamp = datetime.and_hms_milli_opt(hour, minute, second, millisecond); + Ok(Some(timestamp.unwrap().and_utc().timestamp_millis())) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_timestamp_test() { + + // write for all formats + assert_eq!(parse_timestamp("2020", EvalMode::Legacy).unwrap(), Some(1577836800000)); + assert_eq!(parse_timestamp("2020-01", EvalMode::Legacy).unwrap(), Some(1577836800000)); + assert_eq!(parse_timestamp("2020-01-01", EvalMode::Legacy).unwrap(), Some(1577836800000)); + assert_eq!(parse_timestamp("2020-01-01T12", EvalMode::Legacy).unwrap(), Some(1577880000000)); + assert_eq!(parse_timestamp("2020-01-01T12:34", EvalMode::Legacy).unwrap(), Some(1577882040000)); + assert_eq!(parse_timestamp("2020-01-01T12:34:56", EvalMode::Legacy).unwrap(), Some(1577882096000)); + assert_eq!(parse_timestamp("2020-01-01T12:34:56.123456", EvalMode::Legacy).unwrap(), Some(1577882096000)); + assert_eq!(parse_timestamp("T2", EvalMode::Legacy).unwrap(), Some(1714183200000)); + } +}