Skip to content

Commit

Permalink
fix: Error instead of panic when converting pg timestamps to arrow (#…
Browse files Browse the repository at this point in the history
…2430)

```
> select * from pg.public.comments;
Error: External error: External error: Overflow converting '0001-01-01 00:00:00 UTC' to Timestamp(Nanosecond, Some("UTC"))
```

Fixes: #2412

Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal authored Jan 16, 2024
1 parent 2012022 commit 5d57665
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
3 changes: 3 additions & 0 deletions crates/datasources/src/postgres/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub enum PostgresError {
#[error("Unsupported tunnel '{0}' for Postgres")]
UnsupportedTunnel(String),

#[error("Overflow converting '{0}' to {1}")]
DataOverflow(String, datafusion::arrow::datatypes::DataType),

#[error(transparent)]
Arrow(#[from] datafusion::arrow::error::ArrowError),

Expand Down
18 changes: 15 additions & 3 deletions crates/datasources/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1133,11 +1133,17 @@ fn binary_rows_to_record_batch<E: Into<PostgresError>>(
}
Arc::new(arr.finish())
}
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
dt @ DataType::Timestamp(TimeUnit::Nanosecond, None) => {
let mut arr = TimestampNanosecondBuilder::with_capacity(rows.len());
for row in rows.iter() {
let val: Option<NaiveDateTime> = row.try_get(col_idx)?;
let val = val.map(|v| v.timestamp_nanos_opt().unwrap());
let val = val
.map(|v| {
v.timestamp_nanos_opt().ok_or_else(|| {
PostgresError::DataOverflow(v.to_string(), dt.clone())
})
})
.transpose()?;
arr.append_option(val);
}
Arc::new(arr.finish())
Expand All @@ -1147,7 +1153,13 @@ fn binary_rows_to_record_batch<E: Into<PostgresError>>(
.with_data_type(dt.clone());
for row in rows.iter() {
let val: Option<DateTime<Utc>> = row.try_get(col_idx)?;
let val = val.map(|v| v.timestamp_nanos_opt().unwrap());
let val = val
.map(|v| {
v.timestamp_nanos_opt().ok_or_else(|| {
PostgresError::DataOverflow(v.to_string(), dt.clone())
})
})
.transpose()?;
arr.append_option(val);
}
Arc::new(arr.finish())
Expand Down

0 comments on commit 5d57665

Please sign in to comment.