Skip to content

Commit

Permalink
fix: external schema for mongodb (#3244)
Browse files Browse the repository at this point in the history
Co-authored-by: Tal Gluck <talagluck@gmail.com>
  • Loading branch information
tychoish and talagluck authored Oct 21, 2024
1 parent f8fc10a commit 4a3f7b4
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async-stream = "0.3.6"
bitvec = "1"
bson = "2.13.0"
chrono-tz = "0.9"
half = "2.3.1"
gcp-bigquery-client = "0.23.0"
glob = "0.3.1"
mongodb = "3.1.0"
Expand Down
156 changes: 153 additions & 3 deletions crates/datasources/src/bson/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ use datafusion::arrow::array::{
Date32Builder,
Date64Builder,
Decimal128Builder,
Float16Builder,
Float32Builder,
Float64Builder,
Int16Builder,
Int32Builder,
Int64Builder,
Int8Builder,
LargeBinaryBuilder,
LargeStringBuilder,
StringBuilder,
Expand All @@ -26,7 +30,8 @@ use datafusion::arrow::array::{
TimestampNanosecondBuilder,
TimestampSecondBuilder,
};
use datafusion::arrow::datatypes::{DataType, Field, Fields, TimeUnit};
use datafusion::arrow::datatypes::{DataType, Field, Fields, TimeUnit, ToByteSlice};
use mysql_common::bigdecimal::{FromPrimitive, ToPrimitive};

use crate::bson::errors::{BsonError, Result};
use crate::common::util::try_parse_datetime;
Expand Down Expand Up @@ -246,54 +251,152 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->

// Boolean
(RawBsonRef::Boolean(v), DataType::Boolean) => append_scalar!(BooleanBuilder, col, v),
(RawBsonRef::Boolean(v), DataType::Int8) => append_scalar!(Int8Builder, col, v.into()),
(RawBsonRef::Boolean(v), DataType::Int16) => append_scalar!(Int16Builder, col, v.into()),
(RawBsonRef::Boolean(v), DataType::Int32) => append_scalar!(Int32Builder, col, v.into()),
(RawBsonRef::Boolean(v), DataType::Int64) => append_scalar!(Int64Builder, col, v.into()),
(RawBsonRef::Boolean(v), DataType::Float16) => {
append_scalar!(Float16Builder, col, half::f16::from_f32(v.into()))
}
(RawBsonRef::Boolean(v), DataType::Float32) => {
append_scalar!(Float32Builder, col, v.into())
}
(RawBsonRef::Boolean(v), DataType::Float64) => {
append_scalar!(Float64Builder, col, v.into())
}
(RawBsonRef::Boolean(v), DataType::Utf8) => {
append_scalar!(StringBuilder, col, v.to_string())
}
(RawBsonRef::Boolean(v), DataType::LargeUtf8) => {
append_scalar!(LargeStringBuilder, col, v.to_string())
}
(RawBsonRef::Boolean(v), DataType::Binary) => {
append_scalar!(BinaryBuilder, col, [i32::from(v) as u8])
}
(RawBsonRef::Boolean(v), DataType::LargeBinary) => {
append_scalar!(BinaryBuilder, col, [i32::from(v) as u8])
}

// Double
(RawBsonRef::Double(v), DataType::Int8) => append_scalar!(
Int8Builder,
col,
v.to_i8()
.ok_or_else(|| BsonError::UnexpectedDataTypeForBuilder(typ.to_owned()))?
),
(RawBsonRef::Double(v), DataType::Int16) => {
append_scalar!(
Int16Builder,
col,
v.to_i16()
.ok_or_else(|| BsonError::UnexpectedDataTypeForBuilder(typ.to_owned()))?
)
}
(RawBsonRef::Double(v), DataType::Int32) => append_scalar!(Int32Builder, col, v as i32),
(RawBsonRef::Double(v), DataType::Int64) => append_scalar!(Int64Builder, col, v as i64),
(RawBsonRef::Double(v), DataType::Float16) => {
append_scalar!(Float16Builder, col, half::f16::from_f64(v))
}
(RawBsonRef::Double(v), DataType::Float32) => {
append_scalar!(Float32Builder, col, v as f32)
}
(RawBsonRef::Double(v), DataType::Float64) => append_scalar!(Float64Builder, col, v),
(RawBsonRef::Double(v), DataType::Utf8) => {
append_scalar!(StringBuilder, col, v.to_string())
}
(RawBsonRef::Double(v), DataType::LargeUtf8) => {
append_scalar!(LargeStringBuilder, col, v.to_string())
}
(RawBsonRef::Double(v), DataType::Binary) => {
append_scalar!(BinaryBuilder, col, v.to_byte_slice())
}
(RawBsonRef::Double(v), DataType::LargeBinary) => {
append_scalar!(LargeBinaryBuilder, col, v.to_byte_slice())
}

// Int32
(RawBsonRef::Int32(v), DataType::Int8) => append_scalar!(Int8Builder, col, v as i8),
(RawBsonRef::Int32(v), DataType::Int16) => append_scalar!(Int16Builder, col, v as i16),
(RawBsonRef::Int32(v), DataType::Int32) => append_scalar!(Int32Builder, col, v),
(RawBsonRef::Int32(v), DataType::Int64) => append_scalar!(Int64Builder, col, v as i64),
(RawBsonRef::Int32(v), DataType::Float16) => {
append_scalar!(
Float16Builder,
col,
half::f16::from_i32(v)
.ok_or_else(|| BsonError::UnexpectedDataTypeForBuilder(typ.to_owned()))?
)
}
(RawBsonRef::Int32(v), DataType::Float32) => append_scalar!(Float32Builder, col, v as f32),
(RawBsonRef::Int32(v), DataType::Float64) => append_scalar!(Float64Builder, col, v as f64),
(RawBsonRef::Int32(v), DataType::Utf8) => {
append_scalar!(StringBuilder, col, v.to_string())
}
(RawBsonRef::Int32(v), DataType::LargeUtf8) => {
append_scalar!(LargeStringBuilder, col, v.to_string())
}
(RawBsonRef::Int32(v), DataType::Binary) => {
append_scalar!(BinaryBuilder, col, v.to_byte_slice())
}
(RawBsonRef::Int32(v), DataType::LargeBinary) => {
append_scalar!(LargeBinaryBuilder, col, v.to_byte_slice())
}

// Int64
(RawBsonRef::Int64(v), DataType::Int8) => append_scalar!(Int8Builder, col, v as i8),
(RawBsonRef::Int64(v), DataType::Int16) => append_scalar!(Int16Builder, col, v as i16),
(RawBsonRef::Int64(v), DataType::Int32) => append_scalar!(Int32Builder, col, v as i32),
(RawBsonRef::Int64(v), DataType::Int64) => append_scalar!(Int64Builder, col, v),
(RawBsonRef::Int64(v), DataType::Float16) => {
append_scalar!(
Float16Builder,
col,
half::f16::from_i64(v)
.ok_or_else(|| BsonError::UnexpectedDataTypeForBuilder(typ.to_owned()))?
)
}
(RawBsonRef::Int64(v), DataType::Float32) => append_scalar!(Float32Builder, col, v as f32),
(RawBsonRef::Int64(v), DataType::Float64) => append_scalar!(Float64Builder, col, v as f64),
(RawBsonRef::Int64(v), DataType::Utf8) => {
append_scalar!(StringBuilder, col, v.to_string())
}
(RawBsonRef::Int64(v), DataType::LargeUtf8) => {
append_scalar!(LargeStringBuilder, col, v.to_string())
}
(RawBsonRef::Int64(v), DataType::Binary) => {
append_scalar!(BinaryBuilder, col, v.to_byte_slice())
}
(RawBsonRef::Int64(v), DataType::LargeBinary) => {
append_scalar!(LargeBinaryBuilder, col, v.to_byte_slice())
}

// String
(RawBsonRef::String(v), DataType::Utf8) => append_scalar!(StringBuilder, col, v),
(RawBsonRef::String(v), DataType::LargeUtf8) => append_scalar!(LargeStringBuilder, col, v),
(RawBsonRef::String(v), DataType::Boolean) => {
append_scalar!(BooleanBuilder, col, v.parse().unwrap_or_default())
}
(RawBsonRef::String(v), DataType::Int64) => {
append_scalar!(Int64Builder, col, v.parse().unwrap_or_default())
}
(RawBsonRef::String(v), DataType::Int32) => {
append_scalar!(Int32Builder, col, v.parse().unwrap_or_default())
}
(RawBsonRef::String(v), DataType::Int64) => {
append_scalar!(Int64Builder, col, v.parse().unwrap_or_default())
(RawBsonRef::String(v), DataType::Int16) => {
append_scalar!(Int32Builder, col, v.parse().unwrap_or_default())
}
(RawBsonRef::String(v), DataType::Int8) => {
append_scalar!(Int32Builder, col, v.parse().unwrap_or_default())
}
(RawBsonRef::String(v), DataType::Float64) => {
append_scalar!(Float64Builder, col, v.parse().unwrap_or_default())
}
(RawBsonRef::String(v), DataType::Float32) => {
append_scalar!(Float32Builder, col, v.parse().unwrap_or_default())
}
(RawBsonRef::String(v), DataType::Float16) => {
append_scalar!(Float16Builder, col, v.parse().unwrap_or_default())
}
(RawBsonRef::String(v), DataType::Date64) => {
append_scalar!(
Date64Builder,
Expand Down Expand Up @@ -337,6 +440,12 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
(RawBsonRef::ObjectId(v), DataType::Utf8) => {
append_scalar!(StringBuilder, col, v.to_string())
}
(RawBsonRef::ObjectId(v), DataType::LargeBinary) => {
append_scalar!(LargeBinaryBuilder, col, v.bytes())
}
(RawBsonRef::ObjectId(v), DataType::LargeUtf8) => {
append_scalar!(LargeStringBuilder, col, v.to_string())
}

// Timestamp (internal mongodb type; second specified)
(RawBsonRef::Timestamp(v), DataType::Timestamp(TimeUnit::Second, _)) => {
Expand Down Expand Up @@ -372,6 +481,15 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
.to_rfc3339()
)
}
(RawBsonRef::Timestamp(v), DataType::LargeUtf8) => {
append_scalar!(
LargeStringBuilder,
col,
chrono::DateTime::from_timestamp_millis(v.time as i64 * 1000)
.ok_or_else(|| BsonError::InvalidValue(v.to_string()))?
.to_rfc3339()
)
}

// Datetime (actual timestamps that you'd actually use in an application)
(RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Second, _)) => {
Expand Down Expand Up @@ -409,6 +527,18 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
.to_rfc3339()
)
}
(RawBsonRef::DateTime(v), DataType::LargeUtf8) => {
append_scalar!(
LargeStringBuilder,
col,
chrono::DateTime::from_timestamp_millis(v.timestamp_millis())
.ok_or_else(|| BsonError::InvalidValue(v.to_string()))?
.to_rfc3339()
)
}
(RawBsonRef::DateTime(v), DataType::Int64) => {
append_scalar!(Int64Builder, col, v.timestamp_millis())
}

// Array
(RawBsonRef::Document(doc), DataType::Struct(_)) => {
Expand Down Expand Up @@ -521,6 +651,16 @@ fn append_null(typ: &DataType, col: &mut dyn ArrayBuilder) -> Result<()> {
.downcast_mut::<BooleanBuilder>()
.unwrap()
.append_null(),
&DataType::Int8 => col
.as_any_mut()
.downcast_mut::<Int8Builder>()
.unwrap()
.append_null(),
&DataType::Int16 => col
.as_any_mut()
.downcast_mut::<Int16Builder>()
.unwrap()
.append_null(),
&DataType::Int32 => col
.as_any_mut()
.downcast_mut::<Int32Builder>()
Expand All @@ -536,6 +676,16 @@ fn append_null(typ: &DataType, col: &mut dyn ArrayBuilder) -> Result<()> {
.downcast_mut::<Float64Builder>()
.unwrap()
.append_null(),
&DataType::Float32 => col
.as_any_mut()
.downcast_mut::<Float32Builder>()
.unwrap()
.append_null(),
&DataType::Float16 => col
.as_any_mut()
.downcast_mut::<Float16Builder>()
.unwrap()
.append_null(),
&DataType::Timestamp(TimeUnit::Nanosecond, _) => col
.as_any_mut()
.downcast_mut::<TimestampNanosecondBuilder>()
Expand Down
9 changes: 2 additions & 7 deletions crates/datasources/src/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use datafusion::arrow::datatypes::{
FieldRef,
Fields,
Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
};
use datafusion::arrow::datatypes::{Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result as DatafusionResult};
use datafusion::execution::context::SessionState;
Expand Down Expand Up @@ -217,7 +212,7 @@ impl VirtualLister for MongoDbAccessor {
pub struct MongoDbTableAccessInfo {
pub database: String, // "Schema"
pub collection: String,
pub fields: Option<Vec<FieldRef>>, // filter
pub fields: Option<Fields>, // filter
}

#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ impl<'a> ExternalDispatcher<'a> {
let table_info = MongoDbTableAccessInfo {
database: database.to_string(),
collection: collection.to_string(),
fields: None,
fields: schema.map(|s| s.fields),
};
let accessor = MongoDbAccessor::connect(connection_string).await?;
let table_accessor = accessor.into_table_accessor(table_info);
Expand Down
58 changes: 29 additions & 29 deletions testdata/sqllogictests/external_schema.slt
Original file line number Diff line number Diff line change
@@ -1,55 +1,55 @@


statement ok
create table src (x int, y text, z int);
CREATE TABLE src (x int, y text, z int);

statement ok
insert into src values (1, 'hello', 0), (2, 'other', 10), (3, 'world', 100);
INSERT INTO src VALUES (1, 'hello', 0), (2, 'other', 10), (3, 'world', 100);

statement ok
copy (select * from src) to '${TMP}/external_schema.bson'
COPY (SELECT * FROM src) TO '${TMP}/external_schema.bson';

statement ok
create external table dst from bson
options (
location '${TMP}/external_schema.bson',
file_type 'bson'
)
columns (
x int,
z int
CREATE EXTERNAL TABLE dst
FROM bson
OPTIONS (
location => '${TMP}/external_schema.bson',
file_type => 'bson'
)
COLUMNS (
x int,
z int
);

query ITI
select * from src;
SELECT * FROM src;
----
1 hello 0
1 hello 0
2 other 10
3 world 100

query II
select * from dst;
SELECT * FROM dst;
----
1 0
1 0
2 10
3 100


statement ok
create external table dstextra from bson
options (
location '${TMP}/external_schema.bson',
file_type 'bson'
)
columns (
alpha int,
x int,
z int
CREATE EXTERNAL TABLE dstextra
FROM bson
OPTIONS (
location => '${TMP}/external_schema.bson',
file_type => 'bson'
)
COLUMNS (
alpha int,
x int,
z int
);

query II
select * from dstextra;
SELECT * FROM dstextra;
----
NULL 1 0
NULL 1 0
NULL 2 10
NULL 3 100
NULL 3 100
Loading

0 comments on commit 4a3f7b4

Please sign in to comment.