Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return error from JSON writer rather than panic #1205

Merged
merged 2 commits into from
Jan 19, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 73 additions & 71 deletions arrow/src/json/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
//! let a = Int32Array::from(vec![1, 2, 3]);
//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
//!
//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]);
//! let json_rows = json::writer::record_batches_to_json_rows(&[batch]).unwrap();
//! assert_eq!(
//! serde_json::Value::Object(json_rows[1].clone()),
//! serde_json::json!({"a": 2}),
Expand Down Expand Up @@ -110,64 +110,61 @@ use serde_json::Value;

use crate::array::*;
use crate::datatypes::*;
use crate::error::Result;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> {
as_primitive_array::<T>(array)
fn primitive_array_to_json<T: ArrowPrimitiveType>(
array: &ArrayRef,
) -> Result<Vec<Value>> {
Ok(as_primitive_array::<T>(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into_json_value().unwrap_or(Value::Null),
None => Value::Null,
})
.collect()
.collect())
}

fn struct_array_to_jsonmap_array(
array: &StructArray,
row_count: usize,
) -> Vec<JsonMap<String, Value>> {
) -> Result<Vec<JsonMap<String, Value>>> {
let inner_col_names = array.column_names();

let mut inner_objs = iter::repeat(JsonMap::new())
.take(row_count)
.collect::<Vec<JsonMap<String, Value>>>();

array
.columns()
.iter()
.enumerate()
.for_each(|(j, struct_col)| {
set_column_for_json_rows(
&mut inner_objs,
row_count,
struct_col,
inner_col_names[j],
);
});

inner_objs
for (j, struct_col) in array.columns().iter().enumerate() {
set_column_for_json_rows(
&mut inner_objs,
row_count,
struct_col,
inner_col_names[j],
)?
}
Ok(inner_objs)
}

/// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON [`serde_json::Value`]'s
pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
pub fn array_to_json_array(array: &ArrayRef) -> Result<Vec<Value>> {
match array.data_type() {
DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
DataType::Boolean => as_boolean_array(array)
DataType::Null => Ok(iter::repeat(Value::Null).take(array.len()).collect()),
DataType::Boolean => Ok(as_boolean_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into(),
None => Value::Null,
})
.collect(),
.collect()),

DataType::Utf8 => as_string_array(array)
DataType::Utf8 => Ok(as_string_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into(),
None => Value::Null,
})
.collect(),
.collect()),
DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
Expand All @@ -181,28 +178,26 @@ pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
DataType::List(_) => as_list_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => Value::Array(array_to_json_array(&v)),
None => Value::Null,
Some(v) => Ok(Value::Array(array_to_json_array(&v)?)),
None => Ok(Value::Null),
})
.collect(),
DataType::LargeList(_) => as_large_list_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => Value::Array(array_to_json_array(&v)),
None => Value::Null,
Some(v) => Ok(Value::Array(array_to_json_array(&v)?)),
None => Ok(Value::Null),
})
.collect(),
DataType::Struct(_) => {
let jsonmaps =
struct_array_to_jsonmap_array(as_struct_array(array), array.len());
jsonmaps.into_iter().map(Value::Object).collect()
}
_ => {
panic!(
"Unsupported datatype for array conversion: {:#?}",
array.data_type()
);
struct_array_to_jsonmap_array(as_struct_array(array), array.len())?;
Ok(jsonmaps.into_iter().map(Value::Object).collect())
}
t => Err(ArrowError::JsonError(format!(
"data type {:?} not supported",
t
))),
}
}

Expand Down Expand Up @@ -261,37 +256,37 @@ fn set_column_for_json_rows(
row_count: usize,
array: &ArrayRef,
col_name: &str,
) {
) -> Result<()> {
match array.data_type() {
DataType::Int8 => {
set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name);
}
DataType::Int16 => {
set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name);
}
DataType::Int32 => {
set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name);
}
DataType::Int64 => {
set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name);
}
DataType::UInt8 => {
set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name);
}
DataType::UInt16 => {
set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name);
}
DataType::UInt32 => {
set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name);
}
DataType::UInt64 => {
set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name);
}
DataType::Float32 => {
set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name);
}
DataType::Float64 => {
set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name)
set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name);
}
DataType::Null => {
// when value is null, we simply skip setting the key
Expand Down Expand Up @@ -444,7 +439,7 @@ fn set_column_for_json_rows(
}
DataType::Struct(_) => {
let inner_objs =
struct_array_to_jsonmap_array(as_struct_array(array), row_count);
struct_array_to_jsonmap_array(as_struct_array(array), row_count)?;
rows.iter_mut()
.take(row_count)
.zip(inner_objs.into_iter())
Expand All @@ -457,34 +452,34 @@ fn set_column_for_json_rows(
rows.iter_mut()
.zip(listarr.iter())
.take(row_count)
.for_each(|(row, maybe_value)| {
.try_for_each(|(row, maybe_value)| -> Result<()> {
if let Some(v) = maybe_value {
row.insert(
col_name.to_string(),
Value::Array(array_to_json_array(&v)),
Value::Array(array_to_json_array(&v)?),
);
}
});
Ok(())
})?;
}
DataType::LargeList(_) => {
let listarr = as_large_list_array(array);
rows.iter_mut()
.zip(listarr.iter())
.take(row_count)
.for_each(|(row, maybe_value)| {
.try_for_each(|(row, maybe_value)| -> Result<()> {
if let Some(v) = maybe_value {
row.insert(
col_name.to_string(),
Value::Array(array_to_json_array(&v)),
);
let val = array_to_json_array(&v)?;
row.insert(col_name.to_string(), Value::Array(val));
}
});
Ok(())
})?;
}
DataType::Dictionary(_, value_type) => {
let slice = array.slice(0, row_count);
let hydrated = crate::compute::kernels::cast::cast(&slice, value_type)
.expect("cannot cast dictionary to underlying values");
set_column_for_json_rows(rows, row_count, &hydrated, col_name)
set_column_for_json_rows(rows, row_count, &hydrated, col_name)?;
}
DataType::Map(_, _) => {
let maparr = as_map_array(array);
Expand All @@ -494,11 +489,14 @@ fn set_column_for_json_rows(

// Keys have to be strings to convert to json.
if !matches!(keys.data_type(), DataType::Utf8) {
panic!("Unsupported datatype: {:#?}", array.data_type());
return Err(ArrowError::JsonError(format!(
"data type {:?} not supported in nested map for json writer",
keys.data_type()
)));
}

let keys = as_string_array(&keys);
let values = array_to_json_array(&values);
let values = array_to_json_array(&values)?;

let mut kv = keys.iter().zip(values.into_iter());

Expand All @@ -522,34 +520,38 @@ fn set_column_for_json_rows(
}
}
_ => {
panic!("Unsupported datatype: {:#?}", array.data_type());
return Err(ArrowError::JsonError(format!(
"data type {:?} not supported in nested map for json writer",
array.data_type()
)))
}
}
Ok(())
}

/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
/// [`JsonMap`]s (objects)
pub fn record_batches_to_json_rows(
batches: &[RecordBatch],
) -> Vec<JsonMap<String, Value>> {
) -> Result<Vec<JsonMap<String, Value>>> {
let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new())
.take(batches.iter().map(|b| b.num_rows()).sum())
.collect();

if !rows.is_empty() {
let schema = batches[0].schema();
let mut base = 0;
batches.iter().for_each(|batch| {
for batch in batches {
let row_count = batch.num_rows();
batch.columns().iter().enumerate().for_each(|(j, col)| {
for (j, col) in batch.columns().iter().enumerate() {
let col_name = schema.field(j).name();
set_column_for_json_rows(&mut rows[base..], row_count, col, col_name);
});
set_column_for_json_rows(&mut rows[base..], row_count, col, col_name)?
}
base += row_count;
});
}
}

rows
Ok(rows)
}

/// This trait defines how to format a sequence of JSON objects to a
Expand Down Expand Up @@ -683,7 +685,7 @@ where

/// Convert the [`RecordBatch`] into JSON rows, and write them to the output
pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
for row in record_batches_to_json_rows(batches) {
for row in record_batches_to_json_rows(batches)? {
self.write_row(&Value::Object(row))?;
}
Ok(())
Expand Down