Skip to content

Commit

Permalink
feat: conditionalize generic flattening and type casting for streams …
Browse files Browse the repository at this point in the history
…v1+ (parseablehq#1057)

Co-authored-by: Nikhil Sinha <nikhil.sinha@parseable.com>
  • Loading branch information
2 people authored and parmesant committed Jan 13, 2025
1 parent 661ce51 commit 1a9ef45
Show file tree
Hide file tree
Showing 12 changed files with 548 additions and 437 deletions.
82 changes: 42 additions & 40 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, Metadata, Tags};
use crate::utils::{arrow::get_field, json::flatten_json_body};
use crate::{
metadata::SchemaVersion,
utils::{arrow::get_field, json::flatten_json_body},
};

pub struct Event {
pub data: Value,
Expand All @@ -48,8 +51,9 @@ impl EventFormat for Event {
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(&self.data, None, None, None, false)?;
let data = flatten_json_body(self.data, None, None, None, schema_version, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand All @@ -68,43 +72,38 @@ impl EventFormat for Event {
let mut is_first = false;
let schema = match derive_arrow_schema(stream_schema, fields) {
Ok(schema) => schema,
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
Ok(mut infer_schema) => {
let new_infer_schema = super::super::format::update_field_type_in_schema(
Arc::new(infer_schema),
Some(stream_schema),
time_partition,
Some(&value_arr),
);
infer_schema = Schema::new(new_infer_schema.fields().clone());
if let Err(err) = Schema::try_merge(vec![
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
infer_schema.clone(),
]) {
return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err));
}
is_first = true;
infer_schema
.fields
.iter()
.filter(|field| !field.data_type().is_null())
.cloned()
.sorted_by(|a, b| a.name().cmp(b.name()))
.collect()
}
Err(err) => {
return Err(anyhow!(
"Could not infer schema for this event due to err {:?}",
err
))
}
},
Err(_) => {
let mut infer_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok))
.map_err(|err| {
anyhow!("Could not infer schema for this event due to err {:?}", err)
})?;
let new_infer_schema = super::update_field_type_in_schema(
Arc::new(infer_schema),
Some(stream_schema),
time_partition,
Some(&value_arr),
schema_version,
);
infer_schema = Schema::new(new_infer_schema.fields().clone());
Schema::try_merge(vec![
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
infer_schema.clone(),
]).map_err(|err| anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err))?;
is_first = true;
infer_schema
.fields
.iter()
.filter(|field| !field.data_type().is_null())
.cloned()
.sorted_by(|a, b| a.name().cmp(b.name()))
.collect()
}
};

if static_schema_flag.is_none()
&& value_arr
.iter()
.any(|value| fields_mismatch(&schema, value))
.any(|value| fields_mismatch(&schema, value, schema_version))
{
return Err(anyhow!(
"Could not process this event due to mismatch in datatype"
Expand Down Expand Up @@ -165,27 +164,30 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
Ok(keys)
}

fn fields_mismatch(schema: &[Arc<Field>], body: &Value) -> bool {
fn fields_mismatch(schema: &[Arc<Field>], body: &Value, schema_version: SchemaVersion) -> bool {
for (name, val) in body.as_object().expect("body is of object variant") {
if val.is_null() {
continue;
}
let Some(field) = get_field(schema, name) else {
return true;
};
if !valid_type(field.data_type(), val) {
if !valid_type(field.data_type(), val, schema_version) {
return true;
}
}
false
}

fn valid_type(data_type: &DataType, value: &Value) -> bool {
fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion) -> bool {
match data_type {
DataType::Boolean => value.is_boolean(),
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
DataType::Float16 | DataType::Float32 => value.is_f64(),
// All numbers can be cast as Float64 from schema version v1
DataType::Float64 if schema_version == SchemaVersion::V1 => value.is_number(),
DataType::Float64 if schema_version != SchemaVersion::V1 => value.is_f64(),
DataType::Utf8 => value.is_string(),
DataType::List(field) => {
let data_type = field.data_type();
Expand All @@ -194,7 +196,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
if elem.is_null() {
continue;
}
if !valid_type(data_type, elem) {
if !valid_type(data_type, elem, schema_version) {
return false;
}
}
Expand All @@ -212,7 +214,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
if value.is_null() {
continue;
}
if !valid_type(field.data_type(), value) {
if !valid_type(field.data_type(), value, schema_version) {
return false;
}
} else {
Expand Down
Loading

0 comments on commit 1a9ef45

Please sign in to comment.