Skip to content

Commit

Permalink
Added support for datetime reads (jorgecarleitao#888)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored and ritchie46 committed Mar 8, 2022
1 parent c999595 commit 8e06504
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 10 deletions.
47 changes: 45 additions & 2 deletions arrow-odbc-integration-testing/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use stdext::function_name;

use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array};
use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Int64Array, Utf8Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::Field;
use arrow2::datatypes::{DataType, Field, TimeUnit};
use arrow2::error::Result;
use arrow2::io::odbc::api::{Connection, Cursor};
use arrow2::io::odbc::read::{buffer_from_metadata, deserialize, infer_schema};
Expand Down Expand Up @@ -47,6 +47,49 @@ fn bool_nullable() -> Result<()> {
test(expected, "BIT", "(1),(NULL)", table_name)
}

#[test]
fn date_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected =
vec![Chunk::new(vec![
Box::new(Int32Array::from([Some(100), None]).to(DataType::Date32)) as _,
])];

test(expected, "DATE", "('1970-04-11'),(NULL)", table_name)
}

#[test]
fn timestamp_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![Box::new(
Int64Array::from([Some(60 * 60 * 1000), None])
.to(DataType::Timestamp(TimeUnit::Millisecond, None)),
) as _])];

test(
expected,
"DATETIME",
"('1970-01-01 01:00:00'),(NULL)",
table_name,
)
}

#[test]
fn timestamp_ms_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![Box::new(
Int64Array::from([Some(60 * 60 * 1000 + 110), None])
.to(DataType::Timestamp(TimeUnit::Millisecond, None)),
) as _])];

test(
expected,
"DATETIME",
"('1970-01-01 01:00:00.110'),(NULL)",
table_name,
)
}

#[test]
fn binary() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
Expand Down
149 changes: 142 additions & 7 deletions src/io/odbc/read/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use chrono::{NaiveDate, NaiveDateTime};
use odbc_api::buffers::{BinColumnIt, TextColumnIt};
use odbc_api::Bit;

use crate::array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array};
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::datatypes::{DataType, TimeUnit};
use crate::types::NativeType;

use super::super::api::buffers::AnyColumnView;
Expand All @@ -16,9 +17,9 @@ pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box<dyn Array>
AnyColumnView::Text(iter) => Box::new(utf8(data_type, iter)) as _,
AnyColumnView::WText(_) => todo!(),
AnyColumnView::Binary(iter) => Box::new(binary(data_type, iter)) as _,
AnyColumnView::Date(_) => todo!(),
AnyColumnView::Time(_) => todo!(),
AnyColumnView::Timestamp(_) => todo!(),
AnyColumnView::Date(values) => Box::new(date(data_type, values)) as _,
AnyColumnView::Time(values) => Box::new(time(data_type, values)) as _,
AnyColumnView::Timestamp(values) => Box::new(timestamp(data_type, values)) as _,
AnyColumnView::F64(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::F32(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::I8(values) => Box::new(primitive(data_type, values)) as _,
Expand All @@ -27,9 +28,21 @@ pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box<dyn Array>
AnyColumnView::I64(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::U8(values) => Box::new(primitive(data_type, values)) as _,
AnyColumnView::Bit(values) => Box::new(bool(data_type, values)) as _,
AnyColumnView::NullableDate(_) => todo!(),
AnyColumnView::NullableTime(_) => todo!(),
AnyColumnView::NullableTimestamp(_) => todo!(),
AnyColumnView::NullableDate(slice) => Box::new(date_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableTime(slice) => Box::new(time_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableTimestamp(slice) => Box::new(timestamp_optional(
data_type,
slice.raw_values().0,
slice.raw_values().1,
)) as _,
AnyColumnView::NullableF64(slice) => Box::new(primitive_optional(
data_type,
slice.raw_values().0,
Expand Down Expand Up @@ -139,3 +152,125 @@ fn utf8(data_type: DataType, iter: TextColumnIt<u8>) -> Utf8Array<i32> {
// this O(N) check is necessary for the utf8 validity
Utf8Array::from_data(data_type, offsets, values, validity)
}

fn date(data_type: DataType, values: &[odbc_api::sys::Date]) -> PrimitiveArray<i32> {
let values = values.iter().map(days_since_epoch).collect::<Vec<_>>();
PrimitiveArray::from_data(data_type, values.into(), None)
}

fn date_optional(
data_type: DataType,
values: &[odbc_api::sys::Date],
indicators: &[isize],
) -> PrimitiveArray<i32> {
let values = values.iter().map(days_since_epoch).collect::<Vec<_>>();
let validity = bitmap(indicators);
PrimitiveArray::from_data(data_type, values.into(), validity)
}

fn days_since_epoch(date: &odbc_api::sys::Date) -> i32 {
let unix_epoch = NaiveDate::from_ymd(1970, 1, 1);
let date = NaiveDate::from_ymd_opt(date.year as i32, date.month as u32, date.day as u32)
.unwrap_or(unix_epoch);
let duration = date.signed_duration_since(unix_epoch);
duration.num_days().try_into().unwrap_or(i32::MAX)
}

fn time(data_type: DataType, values: &[odbc_api::sys::Time]) -> PrimitiveArray<i32> {
let values = values.iter().map(time_since_midnight).collect::<Vec<_>>();
PrimitiveArray::from_data(data_type, values.into(), None)
}

fn time_since_midnight(date: &odbc_api::sys::Time) -> i32 {
(date.hour as i32) * 60 * 60 + (date.minute as i32) * 60 + date.second as i32
}

fn time_optional(
data_type: DataType,
values: &[odbc_api::sys::Time],
indicators: &[isize],
) -> PrimitiveArray<i32> {
let values = values.iter().map(time_since_midnight).collect::<Vec<_>>();
let validity = bitmap(indicators);
PrimitiveArray::from_data(data_type, values.into(), validity)
}

fn timestamp(data_type: DataType, values: &[odbc_api::sys::Timestamp]) -> PrimitiveArray<i64> {
let unit = if let DataType::Timestamp(unit, _) = &data_type {
unit
} else {
unreachable!()
};
let values = match unit {
TimeUnit::Second => values.iter().map(timestamp_s).collect::<Vec<_>>(),
TimeUnit::Millisecond => values.iter().map(timestamp_ms).collect::<Vec<_>>(),
TimeUnit::Microsecond => values.iter().map(timestamp_us).collect::<Vec<_>>(),
TimeUnit::Nanosecond => values.iter().map(timestamp_ns).collect::<Vec<_>>(),
};
PrimitiveArray::from_data(data_type, values.into(), None)
}

fn timestamp_optional(
data_type: DataType,
values: &[odbc_api::sys::Timestamp],
indicators: &[isize],
) -> PrimitiveArray<i64> {
let unit = if let DataType::Timestamp(unit, _) = &data_type {
unit
} else {
unreachable!()
};
let values = match unit {
TimeUnit::Second => values.iter().map(timestamp_s).collect::<Vec<_>>(),
TimeUnit::Millisecond => values.iter().map(timestamp_ms).collect::<Vec<_>>(),
TimeUnit::Microsecond => values.iter().map(timestamp_us).collect::<Vec<_>>(),
TimeUnit::Nanosecond => values.iter().map(timestamp_ns).collect::<Vec<_>>(),
};
let validity = bitmap(indicators);
PrimitiveArray::from_data(data_type, values.into(), validity)
}

fn timestamp_to_naive(timestamp: &odbc_api::sys::Timestamp) -> Option<NaiveDateTime> {
NaiveDate::from_ymd_opt(
timestamp.year as i32,
timestamp.month as u32,
timestamp.day as u32,
)
.and_then(|x| {
println!("{timestamp:?}");
x.and_hms_nano_opt(
timestamp.hour as u32,
timestamp.minute as u32,
timestamp.second as u32,
/*
https://docs.microsoft.com/en-us/sql/odbc/reference/appendixes/c-data-types?view=sql-server-ver15
[b] The value of the fraction field is [...] for a billionth of a second (one nanosecond) is 1.
*/
timestamp.fraction,
)
})
}

fn timestamp_s(timestamp: &odbc_api::sys::Timestamp) -> i64 {
timestamp_to_naive(timestamp)
.map(|x| x.timestamp())
.unwrap_or(0)
}

fn timestamp_ms(timestamp: &odbc_api::sys::Timestamp) -> i64 {
timestamp_to_naive(timestamp)
.map(|x| x.timestamp_millis())
.unwrap_or(0)
}

fn timestamp_us(timestamp: &odbc_api::sys::Timestamp) -> i64 {
timestamp_to_naive(timestamp)
.map(|x| x.timestamp_nanos() / 1000)
.unwrap_or(0)
}

fn timestamp_ns(timestamp: &odbc_api::sys::Timestamp) -> i64 {
timestamp_to_naive(timestamp)
.map(|x| x.timestamp_nanos())
.unwrap_or(0)
}
2 changes: 1 addition & 1 deletion src/io/odbc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::api;

/// Creates a [`api::buffers::ColumnarBuffer`] from the metadata.
/// # Errors
/// Iff the driver provides an incorrect [`ResultSetMetadata`]
/// Iff the driver provides an incorrect [`api::ResultSetMetadata`]
pub fn buffer_from_metadata(
resut_set_metadata: &impl api::ResultSetMetadata,
max_batch_size: usize,
Expand Down

0 comments on commit 8e06504

Please sign in to comment.