Skip to content

Commit

Permalink
feat: jaq filter for input to json tables (#3099)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Jul 24, 2024
1 parent adcb253 commit dd1dafb
Show file tree
Hide file tree
Showing 18 changed files with 382 additions and 58 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions crates/datafusion_ext/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ impl From<FuncParamValue> for Option<String> {
}
}

impl From<&FuncParamValue> for Option<String> {
fn from(value: &FuncParamValue) -> Self {
match value {
FuncParamValue::Scalar(ScalarValue::Utf8(s))
| FuncParamValue::Scalar(ScalarValue::LargeUtf8(s)) => s.to_owned(),
_ => None,
}
}
}


impl<T> TryFrom<FuncParamValue> for Vec<T>
where
T: std::convert::TryFrom<FuncParamValue>,
Expand Down
3 changes: 3 additions & 0 deletions crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ rust_decimal = { version = "1.35.0", features = ["db-tokio-postgres"] }
ssh-key = { version = "0.6.6", features = ["ed25519", "alloc"] }
tiberius = { version = "0.12.2", default-features = false, features = ["tds73","rustls","chrono"] }
tokio-postgres = { version = "0.7.8", features = ["with-uuid-1", "with-serde_json-1","with-chrono-0_4"] }
memoize = { version = "0.4.2", features = ["full"] }
bigquery-storage = { git = "https://github.com/glaredb/bigquery-storage", branch = "deps/2023-10-27-update" }
lance = { git = "https://github.com/GlareDB/lance", branch = "df36" }
json-stream = { git = "https://github.com/tychoish/json-stream", rev = "bd4990fab95f789740a75a8eea98d5dac1f0160a" }
jaq-interpret = "1.5.0"
jaq-parse = "1.0.2"

# SSH tunnels
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
Expand Down
16 changes: 16 additions & 0 deletions crates/datasources/src/json/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum JsonError {
#[error("sending data already in progress")]
SendAlreadyInProgress,

#[error("missing filter expression")]
MissingFilterExpression,

#[error(transparent)]
ObjectStoreSource(#[from] crate::object_store::errors::ObjectStoreSourceError),

Expand All @@ -35,6 +38,12 @@ pub enum JsonError {

#[error(transparent)]
ChannelRecv(#[from] futures::channel::mpsc::TryRecvError),

#[error("jaq: {0}")]
Jaq(#[from] crate::json::jaq::JaqError),

#[error("jaq: interpeter: {0}")]
JaqInterpret(String),
}

impl From<JsonError> for ExtensionError {
Expand All @@ -49,4 +58,11 @@ impl From<JsonError> for DataFusionError {
}
}


impl From<jaq_interpret::Error> for JsonError {
fn from(e: jaq_interpret::Error) -> Self {
JsonError::JaqInterpret(e.to_string())
}
}

pub type Result<T, E = JsonError> = std::result::Result<T, E>;
22 changes: 22 additions & 0 deletions crates/datasources/src/json/jaq.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use jaq_interpret::{Filter, ParseCtx};
use memoize::memoize;

#[derive(Clone, Debug, thiserror::Error)]
pub enum JaqError {
#[error("parse: {0}")]
Parse(String),
}

#[memoize(Capacity: 256)]
pub fn compile_jaq_query(query: String) -> Result<Filter, JaqError> {
let (f, errs) = jaq_parse::parse(&query, jaq_parse::main());
if !errs.is_empty() {
return Err(JaqError::Parse(
errs.into_iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join("\n"),
));
}
Ok(ParseCtx::new(Vec::new()).compile(f.unwrap()))
}
1 change: 1 addition & 0 deletions crates/datasources/src/json/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod errors;
pub mod jaq;
mod stream;
pub mod table;
93 changes: 66 additions & 27 deletions crates/datasources/src/json/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion::execution::TaskContext;
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt, TryStreamExt};
use jaq_interpret::{Ctx, Filter, FilterT, RcIter, Val};
use json_stream::JsonStream;
use object_store::{ObjectMeta, ObjectStore};
use serde_json::{Map, Value};
Expand Down Expand Up @@ -51,11 +52,22 @@ pub(crate) struct ObjectStorePartition {
schema: Arc<Schema>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
filter: Option<Arc<Filter>>,
}

impl ObjectStorePartition {
pub fn new(schema: Arc<Schema>, store: Arc<dyn ObjectStore>, obj: ObjectMeta) -> Self {
Self { schema, store, obj }
pub fn new(
schema: Arc<Schema>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
filter: Option<Arc<Filter>>,
) -> Self {
Self {
schema,
store,
obj,
filter,
}
}
}

Expand All @@ -69,6 +81,7 @@ impl PartitionStream for ObjectStorePartition {
self.schema.to_owned(),
self.store.clone(),
self.obj.clone(),
self.filter.clone(),
))
}
}
Expand Down Expand Up @@ -112,16 +125,22 @@ impl JsonHandler {
schema: Arc<Schema>,
store: Arc<dyn ObjectStore>,
obj: ObjectMeta,
jaq_filter: Option<Arc<Filter>>,
) -> Self {
let stream_schema = schema.clone();
let jaq_filter = jaq_filter.clone();

let stream = futures::stream::once(async move {
let store = store.clone();
let filter = jaq_filter.clone();

Self::convert_stream(
stream_schema,
JsonStream::<Value, _>::new(match store.get(&obj.location).await {
Ok(stream) => stream.into_stream().map_err(JsonError::from),
Err(e) => return futures::stream::once(async move { Err(e.into()) }).boxed(),
})
.flat_map(Self::unwind_json_value)
.flat_map(move |v| Self::unwind_json_value(v, &filter))
.boxed(),
)
})
Expand All @@ -146,34 +165,54 @@ impl JsonHandler {
.boxed()
}


fn unwind_json_value(input: Result<Value>) -> JsonObjectStream {
fn unwind_json_value(input: Result<Value>, filter: &Option<Arc<Filter>>) -> JsonObjectStream {
futures::stream::iter(match input {
Ok(value) => match value {
Value::Array(vals) => {
let mut out = Vec::with_capacity(vals.len());
for v in vals {
match v {
Value::Object(doc) => out.push(Ok(doc)),
Value::Null => out.push(Ok(Map::new())),
_ => {
out.push(Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
)));
break;
}
Ok(value) => {
let res = match filter {
Some(jq) => {
let inputs = RcIter::new(core::iter::empty());
match jq
.run((Ctx::new([], &inputs), Val::from(value)))
.map(|res| res.map(Value::from))
.collect::<Result<Vec<_>, _>>()
{
Ok(vals) => Ok(Value::from_iter(vals)),
Err(e) => Err(JsonError::from(e)),
}
}
out
}
Value::Object(doc) => vec![Ok(doc)],
Value::Null => vec![Ok(Map::new())],
_ => {
vec![Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
))]
None => Ok(value),
};


match res {
Ok(value) => match value {
Value::Array(vals) => {
let mut out = Vec::with_capacity(vals.len());
for v in vals {
match v {
Value::Object(doc) => out.push(Ok(doc)),
Value::Null => out.push(Ok(Map::new())),
_ => {
out.push(Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
)));
break;
}
}
}
out
}
Value::Object(doc) => vec![Ok(doc)],
Value::Null => vec![Ok(Map::new())],
_ => {
vec![Err(JsonError::UnspportedType(
"only objects and arrays of objects are supported",
))]
}
},
Err(e) => vec![Err(e)],
}
},
}
Err(e) => vec![Err(e)],
})
.boxed()
Expand Down
43 changes: 34 additions & 9 deletions crates/datasources/src/json/table.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::sync::Arc;
use std::vec::Vec;

use datafusion::arrow::datatypes::{DataType, Field, FieldRef, Schema};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::streaming::PartitionStream;
use jaq_interpret::{Ctx, Filter, FilterT, RcIter, Val};
use object_store::{ObjectMeta, ObjectStore};
use serde_json::{Map, Value};

use super::jaq::compile_jaq_query;
use crate::common::url::DatasourceUrl;
use crate::json::errors::JsonError;
use crate::json::stream::{ObjectStorePartition, VectorPartition};
Expand All @@ -16,7 +18,8 @@ use crate::object_store::{ObjStoreAccess, ObjStoreAccessor};
pub async fn json_streaming_table(
store_access: Arc<dyn ObjStoreAccess>,
source_url: DatasourceUrl,
fields: Option<Vec<FieldRef>>,
fields: Option<Schema>,
jaq_filter: Option<String>,
) -> Result<Arc<dyn TableProvider>, JsonError> {
let path = source_url.path().into_owned();

Expand All @@ -33,26 +36,32 @@ pub async fn json_streaming_table(

let store = accessor.into_object_store();

json_streaming_table_inner(store, &path, list, fields).await
json_streaming_table_inner(store, &path, list, fields, jaq_filter).await
}

pub async fn json_streaming_table_from_object(
store: Arc<dyn ObjectStore>,
object: ObjectMeta,
) -> Result<Arc<dyn TableProvider>, JsonError> {
json_streaming_table_inner(store, "", vec![object], None).await
json_streaming_table_inner(store, "", vec![object], None, None).await
}

async fn json_streaming_table_inner(
store: Arc<dyn ObjectStore>,
original_path: &str, // Just for error
mut list: Vec<ObjectMeta>,
fields: Option<Vec<FieldRef>>,
schema: Option<Schema>,
jaq_filter: Option<String>,
) -> Result<Arc<dyn TableProvider>, JsonError> {
let filter = match jaq_filter {
Some(query) => Some(Arc::new(compile_jaq_query(query)?)),
None => None,
};

let mut streams = Vec::<Arc<dyn PartitionStream>>::with_capacity(list.len());

let schema = match fields {
Some(fields) => Arc::new(Schema::new(fields)),
let schema = match schema {
Some(fields) => Arc::new(fields),
None => {
let mut data = Vec::new();
{
Expand All @@ -70,6 +79,7 @@ async fn json_streaming_table_inner(
push_unwind_json_values(
&mut data,
serde_json::Deserializer::from_slice(&blob).into_iter(),
&filter,
)?;
}

Expand All @@ -87,6 +97,7 @@ async fn json_streaming_table_inner(
}
}


let schema = Arc::new(Schema::new(
field_set
.into_iter()
Expand All @@ -99,12 +110,12 @@ async fn json_streaming_table_inner(
}
};


for obj in list {
streams.push(Arc::new(ObjectStorePartition::new(
schema.clone(),
store.clone(),
obj,
filter.clone(),
)));
}

Expand All @@ -115,9 +126,23 @@ async fn json_streaming_table_inner(
fn push_unwind_json_values(
data: &mut Vec<Map<String, Value>>,
vals: impl Iterator<Item = Result<Value, serde_json::Error>>,
filter: &Option<Arc<Filter>>,
) -> Result<(), JsonError> {
for val in vals {
match val? {
let value = match filter {
Some(jq) => {
let inputs = RcIter::new(core::iter::empty());
Value::from_iter(
jq.run((Ctx::new([], &inputs), Val::from(val?)))
.map(|res| res.map(Value::from))
.collect::<Result<Vec<_>, _>>()?,
)
}
None => val?,
};


match value {
Value::Array(vals) => {
for v in vals {
match v {
Expand Down
5 changes: 5 additions & 0 deletions crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ pub fn init_session_registry<'a>(
storage_options,
..
})
| TableOptionsV0::Json(TableOptionsObjectStore {
location,
storage_options,
..
})
| TableOptionsV0::Bson(TableOptionsObjectStore {
location,
storage_options,
Expand Down
Loading

0 comments on commit dd1dafb

Please sign in to comment.