Skip to content

Commit

Permalink
[feat] parse json fixed width schema to arrow2 schema (#6)
Browse files Browse the repository at this point in the history
* [test] add schema spec. for dtypes and dummy test scheman

* [feat] serde read json schema and parse to arrow2 schema
  • Loading branch information
wilhelmagren authored Nov 26, 2023
1 parent 3e7c3d5 commit 29cf78d
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 1 deletion.
4 changes: 4 additions & 0 deletions resources/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Evolution resources

Common resources used during development, testing, and for documentation.

30 changes: 30 additions & 0 deletions resources/schema/test_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"name": "EvolutionTestSchema",
"version": 418901,
"columns": [
{
"name": "id",
"length": 9,
"dtype": "i32",
"is_nullable": false
},
{
"name": "name",
"length": 32,
"dtype": "utf8",
"is_nullable": false
},
{
"name": "city",
"length": 32,
"dtype": "utf8",
"is_nullable": true
},
{
"name": "employed",
"length": 1,
"dtype": "boolean",
"is_nullable": true
}
]
}
30 changes: 30 additions & 0 deletions resources/schema/test_schema_trailing_commas.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"name": "EvolutionTestSchema",
"version": 418901,
"columns": [
{
"name": "id",
"length": 9,
"dtype": "i32",
"is_nullable": false
},
{
"name": "name",
"length": 32,
"dtype": "utf8",
"is_nullable": false
},
{
"name": "city",
"length": 32,
"dtype": "utf8",
"is_nullable": false,
},
{
"name": "employed",
"length": 1,
"dtype": "boolean",
"is_nullable": false,
}
]
}
52 changes: 52 additions & 0 deletions resources/schema/valid_schema_dtypes.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
[
{
"evolution": "bool",
"arrow2": "Boolean"
},
{
"evolution": "boolean",
"arrow2": "Boolean"
},
{
"evolution": "i16",
"arrow2": "Int16"
},
{
"evolution": "i32",
"arrow2": "Int32"
},
{
"evolution": "i64",
"arrow2": "Int64"
},
{
"evolution": "f16",
"arrow2": "Float16"
},
{
"evolution": "f32",
"arrow2": "Float32"
},
{
"evolution": "f64",
"arrow2": "Float64"
},
{
"evolution": "utf8",
"arrow2": "Utf8"
},
{
"evolution": "string",
"arrow2": "Utf8"
},
{
"evolution": "l-utf8",
"arrow2": "LargeUtf8"
},
{
"evolution": "l-string",
"arrow2": "LargeUtf8"
}
]
}
119 changes: 118 additions & 1 deletion src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
* SOFTWARE.
*
* File created: 2023-11-25
* Last updated: 2023-11-25
* Last updated: 2023-11-26
*/

use arrow2::datatypes::{DataType, Field, Metadata, Schema};
use arrow2::error::Error;
use serde::{Deserialize, Serialize};
use serde_json::Value;

///
Expand Down Expand Up @@ -75,9 +77,83 @@ pub fn schema_from_str(json: &str, metadata: Metadata) -> Schema {
Schema { fields, metadata }
}

///
#[derive(Deserialize, Serialize)]
pub struct Column {
name: String,
length: usize,
dtype: String,
is_nullable: bool,
}

///
impl Column {
///
pub fn arrow_dtype(&self) -> Result<DataType, Error> {
match self.dtype.as_str() {
"bool" => Ok(DataType::Boolean),
"boolean" => Ok(DataType::Boolean),
"i16" => Ok(DataType::Int16),
"i32" => Ok(DataType::Int32),
"i64" => Ok(DataType::Int64),
"f16" => Ok(DataType::Float16),
"f32" => Ok(DataType::Float32),
"f64" => Ok(DataType::Float64),
"utf8" => Ok(DataType::Utf8),
"string" => Ok(DataType::Utf8),
"lutf8" => Ok(DataType::LargeUtf8),
"lstring" => Ok(DataType::LargeUtf8),
_ => Err(Error::ExternalFormat(format!(
"Could not parse json schema dtype to arrow datatype, dtype: {:?}",
self.dtype,
))),
}
}
}

///
#[derive(Deserialize, Serialize)]
pub struct FixedSchema {
name: String,
version: i32,
columns: Vec<Column>,
}

///
#[allow(dead_code)]
impl FixedSchema {
///
pub fn num_columns(&self) -> usize {
self.columns.len()
}

///
pub fn row_len(&self) -> usize {
self.columns.iter().map(|c| c.length).sum()
}

///
pub fn has_nullable_cols(&self) -> bool {
self.columns.iter().any(|c| c.is_nullable)
}

///
pub fn into_arrow_schema(self) -> Schema {
let fields: Vec<Field> = self
.columns
.iter()
.map(|c| Field::new(c.name.to_owned(), c.arrow_dtype().unwrap(), c.is_nullable))
.collect();

Schema::from(fields)
}
}

#[cfg(test)]
mod tests_schema {
use super::*;
use std::path::PathBuf;
use std::{fs, io};

#[test]
fn test_from_custom_string() {
Expand All @@ -89,4 +165,45 @@ mod tests_schema {
let schema = schema_from_str(data, Metadata::default());
println!("schema: {:?}", schema);
}

#[test]
fn test_fixed_to_arrow_schema_ok() {
let mut path: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("resources/schema/test_schema.json");

let json = fs::File::open(path).unwrap();
let reader = io::BufReader::new(json);

let fixed_schema: FixedSchema = serde_json::from_reader(reader).unwrap();

let arrow_schema: Schema = fixed_schema.into_arrow_schema();

assert_eq!(4, arrow_schema.fields.len());
}

#[test]
fn test_derive_from_file_ok() {
let mut path: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("resources/schema/test_schema.json");

let json = fs::File::open(path).unwrap();
let reader = io::BufReader::new(json);

let schema: FixedSchema = serde_json::from_reader(reader).unwrap();

assert_eq!(4, schema.num_columns());
assert_eq!(74, schema.row_len());
}

#[test]
#[should_panic]
fn test_derive_from_file_trailing_commas() {
let mut path: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("resources/schema/test_schema_trailing_commas.json");

let json = fs::File::open(path).unwrap();
let reader = io::BufReader::new(json);

let _schema: FixedSchema = serde_json::from_reader(reader).unwrap();
}
}

0 comments on commit 29cf78d

Please sign in to comment.