From 5d57665c9271fbf9c766fd85c544706a59138c29 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Tue, 16 Jan 2024 18:30:41 +0530 Subject: [PATCH] fix: Error instead of panic when converting pg timestamps to arrow (#2430) ``` > select * from pg.public.comments; Error: External error: External error: Overflow converting '0001-01-01 00:00:00 UTC' to Timestamp(Nanosecond, Some("UTC")) ``` Fixes: #2412 Signed-off-by: Vaibhav --- crates/datasources/src/postgres/errors.rs | 3 +++ crates/datasources/src/postgres/mod.rs | 18 +++++++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/crates/datasources/src/postgres/errors.rs b/crates/datasources/src/postgres/errors.rs index 82033c859..5641289a0 100644 --- a/crates/datasources/src/postgres/errors.rs +++ b/crates/datasources/src/postgres/errors.rs @@ -27,6 +27,9 @@ pub enum PostgresError { #[error("Unsupported tunnel '{0}' for Postgres")] UnsupportedTunnel(String), + #[error("Overflow converting '{0}' to {1}")] + DataOverflow(String, datafusion::arrow::datatypes::DataType), + #[error(transparent)] Arrow(#[from] datafusion::arrow::error::ArrowError), diff --git a/crates/datasources/src/postgres/mod.rs b/crates/datasources/src/postgres/mod.rs index b7a33f743..1e8ae86bf 100644 --- a/crates/datasources/src/postgres/mod.rs +++ b/crates/datasources/src/postgres/mod.rs @@ -1133,11 +1133,17 @@ fn binary_rows_to_record_batch>( } Arc::new(arr.finish()) } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { + dt @ DataType::Timestamp(TimeUnit::Nanosecond, None) => { let mut arr = TimestampNanosecondBuilder::with_capacity(rows.len()); for row in rows.iter() { let val: Option = row.try_get(col_idx)?; - let val = val.map(|v| v.timestamp_nanos_opt().unwrap()); + let val = val + .map(|v| { + v.timestamp_nanos_opt().ok_or_else(|| { + PostgresError::DataOverflow(v.to_string(), dt.clone()) + }) + }) + .transpose()?; arr.append_option(val); } Arc::new(arr.finish()) @@ -1147,7 +1153,13 @@ fn binary_rows_to_record_batch>( .with_data_type(dt.clone()); for row in rows.iter() { let val: Option> = row.try_get(col_idx)?; - let val = val.map(|v| v.timestamp_nanos_opt().unwrap()); + let val = val + .map(|v| { + v.timestamp_nanos_opt().ok_or_else(|| { + PostgresError::DataOverflow(v.to_string(), dt.clone()) + }) + }) + .transpose()?; arr.append_option(val); } Arc::new(arr.finish())