Skip to content

Commit

Permalink
casting str to timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhawvipul committed Apr 27, 2024
1 parent 49bf503 commit 7619757
Showing 1 changed file with 185 additions and 8 deletions.
193 changes: 185 additions & 8 deletions core/src/execution/datafusion/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,19 @@
// under the License.

use std::{
any::Any,
fmt::{Display, Formatter},
hash::{Hash, Hasher},
sync::Arc,
any::Any, fmt::{Display, Formatter}, hash::{Hash, Hasher}, sync::Arc
};

use crate::errors::{CometError, CometResult};
use arrow::{
compute::{cast_with_options, CastOptions},
record_batch::RecordBatch,
util::display::FormatOptions,
compute::{cast_with_options, CastOptions}, datatypes::TimestampMillisecondType, record_batch::RecordBatch, util::display::FormatOptions
};
use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait};
use arrow_array::{Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray};
use arrow_schema::{DataType, Schema};
use datafusion::logical_expr::ColumnarValue;
use datafusion_common::{internal_err, Result as DataFusionResult, ScalarValue};
use datafusion_physical_expr::PhysicalExpr;
use regex::Regex;

use crate::execution::datafusion::expressions::utils::{
array_with_timezone, down_cast_any_ref, spark_cast,
Expand Down Expand Up @@ -64,6 +60,26 @@ pub struct Cast {
pub timezone: String,
}

// It will be useful if we want to extend support to various timestamp formats
// right now it is millisecond timestamp
macro_rules! cast_utf8_to_timestamp {
($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident) => {{
let len = $array.len();
let mut cast_array = PrimitiveArray::<$array_type>::builder(len);
for i in 0..len {
if $array.is_null(i) {
cast_array.append_null()
} else if let Some(cast_value) = $cast_method($array.value(i).trim(), $eval_mode)? {
cast_array.append_value(cast_value);
} else {
cast_array.append_null()
}
}
let result: CometResult<ArrayRef> = Ok(Arc::new(cast_array.finish()) as ArrayRef);
result.unwrap()
}};
}

impl Cast {
pub fn new(
child: Arc<dyn PhysicalExpr>,
Expand Down Expand Up @@ -103,12 +119,37 @@ impl Cast {
(DataType::LargeUtf8, DataType::Boolean) => {
Self::spark_cast_utf8_to_boolean::<i64>(&array, self.eval_mode)?
}
(DataType::UInt8, DataType::Timestamp(_, _)) => {
Self::cast_string_to_timestamp(&array, to_type, self.eval_mode)?
}
_ => cast_with_options(&array, to_type, &CAST_OPTIONS)?,
};
let result = spark_cast(cast_result, from_type, to_type);
Ok(result)
}

fn cast_string_to_timestamp(
array: &ArrayRef,
to_type: &DataType,
eval_mode: EvalMode,
) -> CometResult<ArrayRef> {
let string_array = array
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
.expect("Expected a string array");

let cast_array: ArrayRef = match to_type {
DataType::Timestamp(_, _) => {
cast_utf8_to_timestamp!(string_array, eval_mode, TimestampMillisecondType, parse_timestamp)
}
_ => unreachable!(
"Invalid data type {:?} in cast from string",
to_type
),
};
Ok(cast_array)
}

fn spark_cast_utf8_to_boolean<OffsetSize>(
from: &dyn Array,
eval_mode: EvalMode,
Expand Down Expand Up @@ -222,3 +263,139 @@ impl PhysicalExpr for Cast {
self.hash(&mut s);
}
}

fn parse_timestamp(value: &str, eval_mode: EvalMode) -> CometResult<Option<i64>> {
let value = value.trim();
if value.is_empty() {
return Ok(None);
}

// Define regex patterns and corresponding parsing functions
let patterns = &[
(Regex::new(r"^\d{4}$").unwrap(), parse_str_to_year_timestamp as fn(&str) -> CometResult<Option<i64>>),
(Regex::new(r"^\d{4}-\d{2}$").unwrap(), parse_str_to_month_timestamp),
(Regex::new(r"^\d{4}-\d{2}-\d{2}$").unwrap(), parse_str_to_day_timestamp),
(Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{1,2}$").unwrap(), parse_str_to_hour_timestamp),
(Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}$").unwrap(), parse_str_to_minute_timestamp),
(Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$").unwrap(), parse_str_to_second_timestamp),
(Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}$").unwrap(), parse_str_to_nanosecond_timestamp),
(Regex::new(r"^T\d{1,2}$").unwrap(), parse_str_to_time_only_timestamp),
];

let mut timestamp = None;

// Iterate through patterns and try matching
for (pattern, parse_func) in patterns {
if pattern.is_match(value) {
timestamp = parse_func(value)?;
break;
}
}

if eval_mode == EvalMode::Ansi && timestamp.is_none() {
return Err(CometError::CastInvalidValue {
value: value.to_string(),
from_type: "STRING".to_string(),
to_type: "TIMESTAMP".to_string(),
});
}

Ok(Some(timestamp.unwrap()))
}

fn parse_ymd_timestamp(year: i32, month: u32, day: u32) -> CometResult<Option<i64>> {
let datetime = chrono::NaiveDate::from_ymd_opt(year, month, day);
let timestamp = datetime.unwrap().and_hms_milli_opt(0, 0, 0, 0);
Ok(Some(timestamp.unwrap().and_utc().timestamp_millis()))
}

fn parse_hms_timestamp(year: i32, month: u32, day: u32, hour: u32, minute: u32, second: u32, millisecond: u32) -> CometResult<Option<i64>> {
let datetime = chrono::NaiveDate::from_ymd_opt(year, month, day);
let timestamp = datetime.unwrap().and_hms_nano_opt(hour, minute, second, millisecond);
Ok(Some(timestamp.unwrap().and_utc().timestamp_millis()))
}

fn get_timestamp_values(value: &str, timestamp_type: &str) -> CometResult<Option<i64>> {
let values: Vec<_> = value.split(|c| c == 'T' || c == '-' || c == ':' || c == '.' ).collect();
let year = values[0].parse::<i32>().unwrap_or_default();
let month = values.get(1).map_or(1, |m| m.parse::<u32>().unwrap_or(1));
let day = values.get(2).map_or(1, |d| d.parse::<u32>().unwrap_or(1));
let hour = values.get(3).map_or(0, |h| h.parse::<u32>().unwrap_or(0));
let minute = values.get(4).map_or(0, |m| m.parse::<u32>().unwrap_or(0));
let second = values.get(5).map_or(0, |s| s.parse::<u32>().unwrap_or(0));
let millisecond = values.get(6).map_or(0, |ms| ms.parse::<u32>().unwrap_or(0));

match timestamp_type {
"year" => parse_ymd_timestamp(year, 1, 1),
"month" => parse_ymd_timestamp(year, month, 1),
"day" => parse_ymd_timestamp(year, month, day),
"hour" => parse_hms_timestamp(year, month, day, hour, 0, 0, 0),
"minute" => parse_hms_timestamp(year, month, day, hour, minute, 0, 0),
"second" => parse_hms_timestamp(year, month, day, hour, minute, second, 0),
"millisecond" => parse_hms_timestamp(year, month, day, hour, minute, second, millisecond),
_ => Err(CometError::CastInvalidValue {
value: value.to_string(),
from_type: "STRING".to_string(),
to_type: "TIMESTAMP".to_string(),
}),
}
}

fn parse_str_to_year_timestamp(value: &str) -> CometResult<Option<i64>> {
get_timestamp_values(value, "year")
}

fn parse_str_to_month_timestamp(value: &str) -> CometResult<Option<i64>> {
get_timestamp_values(value, "month")
}

fn parse_str_to_day_timestamp(value: &str) -> CometResult<Option<i64>> {
get_timestamp_values(value, "day")
}

fn parse_str_to_hour_timestamp(value: &str) -> CometResult<Option<i64>> {
get_timestamp_values(value, "hour")
}

fn parse_str_to_minute_timestamp(value: &str) -> CometResult<Option<i64>> {
get_timestamp_values(value, "minute")
}

fn parse_str_to_second_timestamp(value: &str) -> CometResult<Option<i64>> {
get_timestamp_values(value, "second")
}

fn parse_str_to_nanosecond_timestamp(value: &str) -> CometResult<Option<i64>> {
get_timestamp_values(value, "millisecond")
}

fn parse_str_to_time_only_timestamp(value: &str) -> CometResult<Option<i64>> {
let values: Vec<_> = value.split("T").collect();
let time_values: Vec<_> = values[1].split(":").collect();
let hour = time_values[0].parse::<u32>().unwrap();
let minute = time_values.get(1).map_or(0, |m| m.parse::<u32>().unwrap_or(0));
let second = time_values.get(2).map_or(0, |s| s.parse::<u32>().unwrap_or(0));
let millisecond = time_values.get(3).map_or(0, |ms| ms.parse::<u32>().unwrap_or(0));
let datetime = chrono::Local::now().to_utc().date_naive();
let timestamp = datetime.and_hms_milli_opt(hour, minute, second, millisecond);
Ok(Some(timestamp.unwrap().and_utc().timestamp_millis()))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn parse_timestamp_test() {

// write for all formats
assert_eq!(parse_timestamp("2020", EvalMode::Legacy).unwrap(), Some(1577836800000));
assert_eq!(parse_timestamp("2020-01", EvalMode::Legacy).unwrap(), Some(1577836800000));
assert_eq!(parse_timestamp("2020-01-01", EvalMode::Legacy).unwrap(), Some(1577836800000));
assert_eq!(parse_timestamp("2020-01-01T12", EvalMode::Legacy).unwrap(), Some(1577880000000));
assert_eq!(parse_timestamp("2020-01-01T12:34", EvalMode::Legacy).unwrap(), Some(1577882040000));
assert_eq!(parse_timestamp("2020-01-01T12:34:56", EvalMode::Legacy).unwrap(), Some(1577882096000));
assert_eq!(parse_timestamp("2020-01-01T12:34:56.123456", EvalMode::Legacy).unwrap(), Some(1577882096000));
assert_eq!(parse_timestamp("T2", EvalMode::Legacy).unwrap(), Some(1714183200000));
}
}

0 comments on commit 7619757

Please sign in to comment.