From 8e06504b34e66b4322ef3c4e2da0160605e033da Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sun, 6 Mar 2022 09:20:35 +0100 Subject: [PATCH] Added support for datetime reads (#888) --- arrow-odbc-integration-testing/src/read.rs | 47 ++++++- src/io/odbc/read/deserialize.rs | 149 ++++++++++++++++++++- src/io/odbc/read/mod.rs | 2 +- 3 files changed, 188 insertions(+), 10 deletions(-) diff --git a/arrow-odbc-integration-testing/src/read.rs b/arrow-odbc-integration-testing/src/read.rs index 145ec852dcf..a41b1388738 100644 --- a/arrow-odbc-integration-testing/src/read.rs +++ b/arrow-odbc-integration-testing/src/read.rs @@ -1,8 +1,8 @@ use stdext::function_name; -use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array}; +use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Int64Array, Utf8Array}; use arrow2::chunk::Chunk; -use arrow2::datatypes::Field; +use arrow2::datatypes::{DataType, Field, TimeUnit}; use arrow2::error::Result; use arrow2::io::odbc::api::{Connection, Cursor}; use arrow2::io::odbc::read::{buffer_from_metadata, deserialize, infer_schema}; @@ -47,6 +47,49 @@ fn bool_nullable() -> Result<()> { test(expected, "BIT", "(1),(NULL)", table_name) } +#[test] +fn date_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = + vec![Chunk::new(vec![ + Box::new(Int32Array::from([Some(100), None]).to(DataType::Date32)) as _, + ])]; + + test(expected, "DATE", "('1970-04-11'),(NULL)", table_name) +} + +#[test] +fn timestamp_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![Box::new( + Int64Array::from([Some(60 * 60 * 1000), None]) + .to(DataType::Timestamp(TimeUnit::Millisecond, None)), + ) as _])]; + + test( + expected, + "DATETIME", + "('1970-01-01 01:00:00'),(NULL)", + table_name, + ) +} + +#[test] +fn timestamp_ms_nullable() -> Result<()> { + let table_name = function_name!().rsplit_once(':').unwrap().1; + let expected = vec![Chunk::new(vec![Box::new( + Int64Array::from([Some(60 * 60 * 1000 + 110), None]) + .to(DataType::Timestamp(TimeUnit::Millisecond, None)), + ) as _])]; + + test( + expected, + "DATETIME", + "('1970-01-01 01:00:00.110'),(NULL)", + table_name, + ) +} + #[test] fn binary() -> Result<()> { let table_name = function_name!().rsplit_once(':').unwrap().1; diff --git a/src/io/odbc/read/deserialize.rs b/src/io/odbc/read/deserialize.rs index e55fc1a2598..635b2b9d158 100644 --- a/src/io/odbc/read/deserialize.rs +++ b/src/io/odbc/read/deserialize.rs @@ -1,10 +1,11 @@ +use chrono::{NaiveDate, NaiveDateTime}; use odbc_api::buffers::{BinColumnIt, TextColumnIt}; use odbc_api::Bit; use crate::array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array}; use crate::bitmap::{Bitmap, MutableBitmap}; use crate::buffer::Buffer; -use crate::datatypes::DataType; +use crate::datatypes::{DataType, TimeUnit}; use crate::types::NativeType; use super::super::api::buffers::AnyColumnView; @@ -16,9 +17,9 @@ pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box AnyColumnView::Text(iter) => Box::new(utf8(data_type, iter)) as _, AnyColumnView::WText(_) => todo!(), AnyColumnView::Binary(iter) => Box::new(binary(data_type, iter)) as _, - AnyColumnView::Date(_) => todo!(), - AnyColumnView::Time(_) => todo!(), - AnyColumnView::Timestamp(_) => todo!(), + AnyColumnView::Date(values) => Box::new(date(data_type, values)) as _, + AnyColumnView::Time(values) => Box::new(time(data_type, values)) as _, + AnyColumnView::Timestamp(values) => Box::new(timestamp(data_type, values)) as _, AnyColumnView::F64(values) => Box::new(primitive(data_type, values)) as _, AnyColumnView::F32(values) => Box::new(primitive(data_type, values)) as _, AnyColumnView::I8(values) => Box::new(primitive(data_type, values)) as _, @@ -27,9 +28,21 @@ pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box AnyColumnView::I64(values) => Box::new(primitive(data_type, values)) as _, AnyColumnView::U8(values) => Box::new(primitive(data_type, values)) as _, AnyColumnView::Bit(values) => Box::new(bool(data_type, values)) as _, - AnyColumnView::NullableDate(_) => todo!(), - AnyColumnView::NullableTime(_) => todo!(), - AnyColumnView::NullableTimestamp(_) => todo!(), + AnyColumnView::NullableDate(slice) => Box::new(date_optional( + data_type, + slice.raw_values().0, + slice.raw_values().1, + )) as _, + AnyColumnView::NullableTime(slice) => Box::new(time_optional( + data_type, + slice.raw_values().0, + slice.raw_values().1, + )) as _, + AnyColumnView::NullableTimestamp(slice) => Box::new(timestamp_optional( + data_type, + slice.raw_values().0, + slice.raw_values().1, + )) as _, AnyColumnView::NullableF64(slice) => Box::new(primitive_optional( data_type, slice.raw_values().0, @@ -139,3 +152,125 @@ fn utf8(data_type: DataType, iter: TextColumnIt) -> Utf8Array { // this O(N) check is necessary for the utf8 validity Utf8Array::from_data(data_type, offsets, values, validity) } + +fn date(data_type: DataType, values: &[odbc_api::sys::Date]) -> PrimitiveArray { + let values = values.iter().map(days_since_epoch).collect::>(); + PrimitiveArray::from_data(data_type, values.into(), None) +} + +fn date_optional( + data_type: DataType, + values: &[odbc_api::sys::Date], + indicators: &[isize], +) -> PrimitiveArray { + let values = values.iter().map(days_since_epoch).collect::>(); + let validity = bitmap(indicators); + PrimitiveArray::from_data(data_type, values.into(), validity) +} + +fn days_since_epoch(date: &odbc_api::sys::Date) -> i32 { + let unix_epoch = NaiveDate::from_ymd(1970, 1, 1); + let date = NaiveDate::from_ymd_opt(date.year as i32, date.month as u32, date.day as u32) + .unwrap_or(unix_epoch); + let duration = date.signed_duration_since(unix_epoch); + duration.num_days().try_into().unwrap_or(i32::MAX) +} + +fn time(data_type: DataType, values: &[odbc_api::sys::Time]) -> PrimitiveArray { + let values = values.iter().map(time_since_midnight).collect::>(); + PrimitiveArray::from_data(data_type, values.into(), None) +} + +fn time_since_midnight(date: &odbc_api::sys::Time) -> i32 { + (date.hour as i32) * 60 * 60 + (date.minute as i32) * 60 + date.second as i32 +} + +fn time_optional( + data_type: DataType, + values: &[odbc_api::sys::Time], + indicators: &[isize], +) -> PrimitiveArray { + let values = values.iter().map(time_since_midnight).collect::>(); + let validity = bitmap(indicators); + PrimitiveArray::from_data(data_type, values.into(), validity) +} + +fn timestamp(data_type: DataType, values: &[odbc_api::sys::Timestamp]) -> PrimitiveArray { + let unit = if let DataType::Timestamp(unit, _) = &data_type { + unit + } else { + unreachable!() + }; + let values = match unit { + TimeUnit::Second => values.iter().map(timestamp_s).collect::>(), + TimeUnit::Millisecond => values.iter().map(timestamp_ms).collect::>(), + TimeUnit::Microsecond => values.iter().map(timestamp_us).collect::>(), + TimeUnit::Nanosecond => values.iter().map(timestamp_ns).collect::>(), + }; + PrimitiveArray::from_data(data_type, values.into(), None) +} + +fn timestamp_optional( + data_type: DataType, + values: &[odbc_api::sys::Timestamp], + indicators: &[isize], +) -> PrimitiveArray { + let unit = if let DataType::Timestamp(unit, _) = &data_type { + unit + } else { + unreachable!() + }; + let values = match unit { + TimeUnit::Second => values.iter().map(timestamp_s).collect::>(), + TimeUnit::Millisecond => values.iter().map(timestamp_ms).collect::>(), + TimeUnit::Microsecond => values.iter().map(timestamp_us).collect::>(), + TimeUnit::Nanosecond => values.iter().map(timestamp_ns).collect::>(), + }; + let validity = bitmap(indicators); + PrimitiveArray::from_data(data_type, values.into(), validity) +} + +fn timestamp_to_naive(timestamp: &odbc_api::sys::Timestamp) -> Option { + NaiveDate::from_ymd_opt( + timestamp.year as i32, + timestamp.month as u32, + timestamp.day as u32, + ) + .and_then(|x| { + println!("{timestamp:?}"); + x.and_hms_nano_opt( + timestamp.hour as u32, + timestamp.minute as u32, + timestamp.second as u32, + /* + https://docs.microsoft.com/en-us/sql/odbc/reference/appendixes/c-data-types?view=sql-server-ver15 + [b] The value of the fraction field is [...] for a billionth of a second (one nanosecond) is 1. + */ + timestamp.fraction, + ) + }) +} + +fn timestamp_s(timestamp: &odbc_api::sys::Timestamp) -> i64 { + timestamp_to_naive(timestamp) + .map(|x| x.timestamp()) + .unwrap_or(0) +} + +fn timestamp_ms(timestamp: &odbc_api::sys::Timestamp) -> i64 { + timestamp_to_naive(timestamp) + .map(|x| x.timestamp_millis()) + .unwrap_or(0) +} + +fn timestamp_us(timestamp: &odbc_api::sys::Timestamp) -> i64 { + timestamp_to_naive(timestamp) + .map(|x| x.timestamp_nanos() / 1000) + .unwrap_or(0) +} + +fn timestamp_ns(timestamp: &odbc_api::sys::Timestamp) -> i64 { + timestamp_to_naive(timestamp) + .map(|x| x.timestamp_nanos()) + .unwrap_or(0) +} diff --git a/src/io/odbc/read/mod.rs b/src/io/odbc/read/mod.rs index b4077332ff3..e8945759c65 100644 --- a/src/io/odbc/read/mod.rs +++ b/src/io/odbc/read/mod.rs @@ -9,7 +9,7 @@ use super::api; /// Creates a [`api::buffers::ColumnarBuffer`] from the metadata. /// # Errors -/// Iff the driver provides an incorrect [`ResultSetMetadata`] +/// Iff the driver provides an incorrect [`api::ResultSetMetadata`] pub fn buffer_from_metadata( resut_set_metadata: &impl api::ResultSetMetadata, max_batch_size: usize,