Skip to content

Commit

Permalink
arrow::compute::kernels::temporal should support nanoseconds (#2996)
Browse files Browse the repository at this point in the history
* temporal nano seconds support

* renamed functions

* returned some public methods for downstream projects

* moved tests parts
  • Loading branch information
comphead authored Nov 3, 2022
1 parent b1050b7 commit a2c6647
Showing 1 changed file with 92 additions and 58 deletions.
150 changes: 92 additions & 58 deletions arrow/src/compute/kernels/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,50 +621,7 @@ where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
minute_generic::<T, _>(array)
}

/// Extracts the minutes of a given temporal array as an array of integers
pub fn minute_generic<T, A: ArrayAccessor<Item = T::Native>>(
array: A,
) -> Result<Int32Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
match array.data_type().clone() {
DataType::Dictionary(_, value_type) => {
minute_internal::<T, A>(array, value_type.as_ref())
}
dt => minute_internal::<T, A>(array, &dt),
}
}

/// Extracts the minutes of a given temporal array as an array of integers
fn minute_internal<T, A: ArrayAccessor<Item = T::Native>>(
array: A,
dt: &DataType,
) -> Result<Int32Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let b = Int32Builder::with_capacity(array.len());
match dt {
DataType::Date64 | DataType::Timestamp(_, None) => {
let iter = ArrayIter::new(array);
Ok(as_datetime_with_op::<A, T, _>(iter, b, |t| {
t.minute() as i32
}))
}
DataType::Timestamp(_, Some(tz)) => {
let iter = ArrayIter::new(array);
extract_component_from_datetime_array::<A, T, _>(iter, b, tz, |t| {
t.minute() as i32
})
}
_ => return_compute_error_with!("minute does not support", array.data_type()),
}
time_fraction_generic::<T, _, _>(array, "minute", |t| t.minute() as i32)
}

/// Extracts the week of a given temporal primitive array as an array of integers
Expand Down Expand Up @@ -717,52 +674,88 @@ where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
second_generic::<T, _>(array)
time_fraction_generic::<T, _, _>(array, "second", |t| t.second() as i32)
}

/// Extracts the seconds of a given temporal array as an array of integers
pub fn second_generic<T, A: ArrayAccessor<Item = T::Native>>(
/// Extracts the nanoseconds of a given temporal primitive array as an array of integers
pub fn nanosecond<T>(array: &PrimitiveArray<T>) -> Result<Int32Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
time_fraction_generic::<T, _, _>(array, "nanosecond", |t| t.nanosecond() as i32)
}

/// Extracts the time fraction of a given temporal array as an array of integers
fn time_fraction_generic<T, A: ArrayAccessor<Item = T::Native>, F>(
array: A,
name: &str,
op: F,
) -> Result<Int32Array>
where
F: Fn(NaiveDateTime) -> i32,
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
match array.data_type().clone() {
DataType::Dictionary(_, value_type) => {
second_internal::<T, A>(array, value_type.as_ref())
time_fraction_internal::<T, A, _>(array, value_type.as_ref(), name, op)
}
dt => second_internal::<T, A>(array, &dt),
dt => time_fraction_internal::<T, A, _>(array, &dt, name, op),
}
}

/// Extracts the seconds of a given temporal array as an array of integers
fn second_internal<T, A: ArrayAccessor<Item = T::Native>>(
/// Extracts the time fraction of a given temporal array as an array of integers
fn time_fraction_internal<T, A: ArrayAccessor<Item = T::Native>, F>(
array: A,
dt: &DataType,
name: &str,
op: F,
) -> Result<Int32Array>
where
F: Fn(NaiveDateTime) -> i32,
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let b = Int32Builder::with_capacity(array.len());
match dt {
DataType::Date64 | DataType::Timestamp(_, None) => {
let iter = ArrayIter::new(array);
Ok(as_datetime_with_op::<A, T, _>(iter, b, |t| {
t.second() as i32
}))
Ok(as_datetime_with_op::<A, T, _>(iter, b, op))
}
DataType::Timestamp(_, Some(tz)) => {
let iter = ArrayIter::new(array);
extract_component_from_datetime_array::<A, T, _>(iter, b, tz, |t| {
t.second() as i32
op(t.naive_local())
})
}
_ => return_compute_error_with!("second does not support", array.data_type()),
_ => return_compute_error_with!(
format!("{} does not support", name),
array.data_type()
),
}
}

pub fn minute_generic<T, A: ArrayAccessor<Item = T::Native>>(
array: A,
) -> Result<Int32Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
time_fraction_generic::<T, _, _>(array, "minute", |t| t.minute() as i32)
}

pub fn second_generic<T, A: ArrayAccessor<Item = T::Native>>(
array: A,
) -> Result<Int32Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
time_fraction_generic::<T, _, _>(array, "second", |t| t.second() as i32)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1212,21 +1205,47 @@ mod tests {
let expected = Int32Array::from(vec![11, 11, 21, 7, 21]);
assert_eq!(expected, b);

let b = minute_generic::<TimestampSecondType, _>(
let b = time_fraction_generic::<TimestampSecondType, _, _>(
dict.downcast_dict::<TimestampSecondArray>().unwrap(),
"minute",
|t| t.minute() as i32,
)
.unwrap();

let b_old = minute_generic::<TimestampSecondType, _>(
dict.downcast_dict::<TimestampSecondArray>().unwrap(),
)
.unwrap();

let expected = Int32Array::from(vec![1, 1, 2, 3, 2]);
assert_eq!(expected, b);
assert_eq!(expected, b_old);

let b = second_generic::<TimestampSecondType, _>(
let b = time_fraction_generic::<TimestampSecondType, _, _>(
dict.downcast_dict::<TimestampSecondArray>().unwrap(),
"second",
|t| t.second() as i32,
)
.unwrap();

let b_old = second_generic::<TimestampSecondType, _>(
dict.downcast_dict::<TimestampSecondArray>().unwrap(),
)
.unwrap();

let expected = Int32Array::from(vec![1, 1, 2, 3, 2]);
assert_eq!(expected, b);
assert_eq!(expected, b_old);

let b = time_fraction_generic::<TimestampSecondType, _, _>(
dict.downcast_dict::<TimestampSecondArray>().unwrap(),
"nanosecond",
|t| t.nanosecond() as i32,
)
.unwrap();

let expected = Int32Array::from(vec![0, 0, 0, 0, 0]);
assert_eq!(expected, b);
}

#[test]
Expand Down Expand Up @@ -1313,4 +1332,19 @@ mod tests {
let expected = Int32Array::from(vec![Some(1), Some(8), Some(8), Some(1), None]);
assert_eq!(expected, b);
}

#[test]
fn test_temporal_array_date64_nanosecond() {
// new Date(1667328721453)
// Tue Nov 01 2022 11:52:01 GMT-0700 (Pacific Daylight Time)
//
// new Date(1667328721453).getMilliseconds()
// 453

let a: PrimitiveArray<Date64Type> = vec![None, Some(1667328721453)].into();

let b = nanosecond(&a).unwrap();
assert!(!b.is_valid(0));
assert_eq!(453_000_000, b.value(1));
}
}

0 comments on commit a2c6647

Please sign in to comment.