Skip to content

Commit

Permalink
Fix panics for out-of-range timestamps (#481)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmmease authored Apr 11, 2024
1 parent b3b64e9 commit 1bae34f
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use chrono::{NaiveDateTime, TimeZone};
use std::any::Any;
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_common::arrow::compute::try_unary;
use vegafusion_common::arrow::error::ArrowError;
use vegafusion_common::datafusion_expr::ScalarUDFImpl;
use vegafusion_common::{
arrow::{
array::{ArrayRef, Date32Array, TimestampMillisecondArray},
compute::unary,
datatypes::{DataType, TimeUnit},
},
datafusion_common::{DataFusionError, ScalarValue},
Expand Down Expand Up @@ -73,7 +74,7 @@ impl ScalarUDFImpl for DateToUtcTimestampUDF {
let s_per_day = 60 * 60 * 24_i64;
let date_array = date_array.as_any().downcast_ref::<Date32Array>().unwrap();

let timestamp_array: TimestampMillisecondArray = unary(date_array, |v| {
let timestamp_array: TimestampMillisecondArray = try_unary(date_array, |v| {
// Build naive datetime for time
let seconds = (v as i64) * s_per_day;
let nanoseconds = 0_u32;
Expand All @@ -84,11 +85,11 @@ impl ScalarUDFImpl for DateToUtcTimestampUDF {
let local_datetime = tz
.from_local_datetime(&naive_local_datetime)
.earliest()
.unwrap();
.ok_or(ArrowError::ComputeError("date out of bounds".to_string()))?;

// Get timestamp millis (in UTC)
local_datetime.timestamp_millis()
});
Ok(local_datetime.timestamp_millis())
})?;
let timestamp_array = Arc::new(timestamp_array) as ArrayRef;

// maybe back to scalar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn parse_datetime(
dt
} else {
// Handle positive timezone transition by adding 1 hour
let datetime = datetime.with_hour(datetime.hour() + 1).unwrap();
let datetime = datetime.with_hour(datetime.hour() + 1)?;
local_tz.from_local_datetime(&datetime).earliest()?
};
let dt_utc = dt.with_timezone(&chrono::Utc);
Expand Down
113 changes: 79 additions & 34 deletions vegafusion-datafusion-udfs/src/udfs/datetime/timeunit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::any::Any;
use std::str::FromStr;
use std::sync::Arc;
use vegafusion_common::arrow::array::{ArrayRef, Int64Array, TimestampMillisecondArray};
use vegafusion_common::arrow::compute::unary;
use vegafusion_common::arrow::compute::try_unary;
use vegafusion_common::arrow::datatypes::{DataType, TimeUnit};
use vegafusion_common::arrow::error::ArrowError;
use vegafusion_common::arrow::temporal_conversions::date64_to_datetime;
use vegafusion_common::datafusion_common::{DataFusionError, ScalarValue};
use vegafusion_common::datafusion_expr::{
Expand Down Expand Up @@ -66,36 +67,52 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
value: i64,
units_mask: &[bool],
in_tz: T,
) -> DateTime<T> {
) -> Result<DateTime<T>, ArrowError> {
// Load and interpret date time as UTC
let dt_value = date64_to_datetime(value)
.unwrap()
.with_nanosecond(0)
.unwrap();
let dt_value = Utc.from_local_datetime(&dt_value).earliest().unwrap();
.and_then(|d| d.with_nanosecond(0))
.ok_or(ArrowError::ComputeError("date out of bounds".to_string()))?;

let dt_value =
Utc.from_local_datetime(&dt_value)
.earliest()
.ok_or(ArrowError::ComputeError(
"Failed to convert to UTC".to_string(),
))?;

let mut dt_value = dt_value.with_timezone(&in_tz);

// Handle time truncation
if !units_mask[7] {
// Clear hours first to avoid any of the other time truncations from landing on a daylight
// savings boundary
dt_value = dt_value.with_hour(0).unwrap();
dt_value = dt_value
.with_hour(0)
.ok_or(ArrowError::ComputeError("Failed to drop hours".to_string()))?;
}

if !units_mask[10] {
// Milliseconds
let new_ns = (((dt_value.nanosecond() as f64) / 1e6).floor() * 1e6) as u32;
dt_value = dt_value.with_nanosecond(new_ns).unwrap();
dt_value = dt_value
.with_nanosecond(new_ns)
.ok_or(ArrowError::ComputeError(
"Failed to set nanoseconds".to_string(),
))?;
}

if !units_mask[9] {
// Seconds
dt_value = dt_value.with_second(0).unwrap();
dt_value = dt_value.with_second(0).ok_or(ArrowError::ComputeError(
"Failed to set seconds".to_string(),
))?;
}

if !units_mask[8] {
// Minutes
dt_value = dt_value.with_minute(0).unwrap();
dt_value = dt_value.with_minute(0).ok_or(ArrowError::ComputeError(
"Failed to set minutes".to_string(),
))?;
}

// Save off day of the year and weekday here, because these will change if the
Expand All @@ -115,11 +132,11 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
let hour = dt_value.hour();
dt_value
.with_hour(0)
.unwrap()
.with_year(2012)
.unwrap()
.with_hour(hour + 1)
.unwrap()
.and_then(|dt| dt.with_year(2012))
.and_then(|dt| dt.with_hour(hour + 1))
.ok_or(ArrowError::ComputeError(
"Failed to handle daylight savings boundary".to_string(),
))?
}
}

Expand All @@ -131,35 +148,42 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
let new_month = ((dt_value.month0() as f64 / 3.0).floor() * 3.0) as u32;
dt_value = dt_value
.with_day0(0)
.unwrap()
.with_month0(new_month)
.unwrap();
.and_then(|dt| dt.with_month0(new_month))
.ok_or(ArrowError::ComputeError(
"Failed to truncate to quarter".to_string(),
))?;
} else if units_mask[2] {
// Month and not Date
// Truncate to first day of the month
if !units_mask[3] {
dt_value = dt_value.with_day0(0).unwrap();
dt_value = dt_value.with_day0(0).ok_or(ArrowError::ComputeError(
"Failed to truncate to first day of the month".to_string(),
))?;
}
} else if units_mask[3] {
// Date and not Month
// Normalize to January, keeping existing day of the month.
// (January has 31 days, so this is safe)
if !units_mask[2] {
dt_value = dt_value.with_month0(0).unwrap();
dt_value = dt_value.with_month0(0).ok_or(ArrowError::ComputeError(
"Failed to truncate to first day of the month".to_string(),
))?;
}
} else if units_mask[4] {
// Week
// Step 1: Find the date of the first Sunday in the same calendar year as the date.
// This may occur in isoweek 0, or in the final isoweek of the previous year

let isoweek0_sunday = NaiveDate::from_isoywd_opt(dt_value.year(), 1, Weekday::Sun)
.expect("invalid or out-of-range datetime");
let isoweek0_sunday = NaiveDate::from_isoywd_opt(dt_value.year(), 1, Weekday::Sun).ok_or(
ArrowError::ComputeError("invalid or out-of-range datetime".to_string()),
)?;

let isoweek0_sunday = NaiveDateTime::new(isoweek0_sunday, dt_value.time());
let isoweek0_sunday = in_tz
.from_local_datetime(&isoweek0_sunday)
.earliest()
.unwrap();
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;

// Subtract one week from isoweek0_sunday and check if it's still in the same calendar
// year
Expand Down Expand Up @@ -188,11 +212,15 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
// (which is January 1st)
let first_sunday_of_2012 = in_tz
.from_local_datetime(&NaiveDateTime::new(
NaiveDate::from_ymd_opt(2012, 1, 1).expect("invalid or out-of-range datetime"),
NaiveDate::from_ymd_opt(2012, 1, 1).ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?,
dt_value.time(),
))
.earliest()
.unwrap();
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;

dt_value = first_sunday_of_2012 + chrono::Duration::weeks(week_number);
} else {
Expand All @@ -207,20 +235,34 @@ fn perform_timeunit_start_from_utc<T: TimeZone>(
} else {
NaiveDate::from_isoywd_opt(dt_value.year(), 2, weekday)
}
.expect("invalid or out-of-range datetime");
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;

let new_datetime = NaiveDateTime::new(new_date, dt_value.time());
dt_value = in_tz.from_local_datetime(&new_datetime).earliest().unwrap();
dt_value =
in_tz
.from_local_datetime(&new_datetime)
.earliest()
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;
} else if units_mask[6] {
// DayOfYear
// Keep the same day of the year
dt_value = dt_value.with_ordinal0(ordinal0).unwrap();
dt_value = dt_value
.with_ordinal0(ordinal0)
.ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;
} else {
// Clear month and date
dt_value = dt_value.with_ordinal0(0).unwrap();
dt_value = dt_value.with_ordinal0(0).ok_or(ArrowError::ComputeError(
"invalid or out-of-range datetime".to_string(),
))?;
}

dt_value
Ok(dt_value)
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -296,9 +338,12 @@ impl ScalarUDFImpl for TimeunitStartUDF {
let (timestamp, tz, units_mask) = unpack_timeunit_udf_args(args)?;

let array = timestamp.as_any().downcast_ref::<Int64Array>().unwrap();
let result_array: TimestampMillisecondArray = unary(array, |value| {
perform_timeunit_start_from_utc(value, units_mask.as_slice(), tz).timestamp_millis()
});
let result_array: TimestampMillisecondArray = try_unary(array, |value| {
Ok(
perform_timeunit_start_from_utc(value, units_mask.as_slice(), tz)?
.timestamp_millis(),
)
})?;

Ok(ColumnarValue::Array(Arc::new(result_array) as ArrayRef))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ pub fn to_utc_timestamp(timestamp_array: ArrayRef, tz: Tz) -> Result<ArrayRef, D
} else {
// Try adding 1 hour to handle daylight savings boundaries
let hour = naive_local_datetime.hour();
let new_naive_local_datetime =
naive_local_datetime.with_hour(hour + 1).unwrap();
let new_naive_local_datetime = naive_local_datetime.with_hour(hour + 1)?;
tz.from_local_datetime(&new_naive_local_datetime).earliest()
};

Expand Down

0 comments on commit 1bae34f

Please sign in to comment.