Skip to content

Commit

Permalink
Add arbitrary precision and rust_decimal support (#7)
Browse files Browse the repository at this point in the history
* docs: get docs.rs configured correctly again (delta-io#1693)

# Description

The docs build was changed in delta-io#1658 to compile on docs.rs with all
features, but our crate cannot compile with all-features due to the TLS
features, which are mutually exclusive.

# Related Issue(s)

For example:

- closes delta-io#1692

This has been tested locally with the following command:

```
cargo doc --features azure,datafusion,datafusion,gcs,glue,json,python,s3,unity-experimental
```

* Make this a patch release to fix docs.rs

* Remove the hdfs feature from the docsrs build

* rollback resolve bucket region change

* short-term hack for decimal compatibility

* reflect new feature

* rollback some changes

* use rust_decimal for decimal types

* cargo fmt

* checkpoints use byte reader instead of serializer

* always use arbitrary_precision

* arbitrary_precision in deltalake_core

* minor formatting tweak

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
3 people authored Nov 5, 2023
1 parent 9b9f8ed commit 602f340
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 37 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ datafusion-proto = { version = "32" }
datafusion-sql = { version = "32" }
datafusion-physical-expr = { version = "32" }

rust_decimal = { version = "1" }

# serde
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_json = { version = "1", features = ["arbitrary_precision"] }

# "stdlib"
bytes = { version = "1" }
Expand Down
4 changes: 3 additions & 1 deletion crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }

rust_decimal = { workspace = true }

# serde
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_json = { workspace = true, features = ["arbitrary_precision"] }

# "stdlib"
bytes = { workspace = true }
Expand Down
36 changes: 27 additions & 9 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::collections::HashMap;
use std::convert::TryFrom;
use std::io::BufReader;
use std::iter::Iterator;

use arrow::datatypes::Schema as ArrowSchema;
Expand Down Expand Up @@ -261,12 +262,24 @@ fn parquet_bytes_from_state(

Action::remove(r)
}))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
.map(|a| serde_json::to_vec(&a).map_err(ProtocolError::from))
// adds
.chain(state.files().iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
.and_then(|v| serde_json::to_vec(&v).map_err(ProtocolError::from))
}));

let mut joined_jsons = Vec::<u8>::new();
for json_result in jsons {
match json_result {
Err(error) => return Err(error),
Ok(json) => {
joined_jsons.extend(json.into_iter());
joined_jsons.push(b'\n');
}
}
}

// Create the arrow schema that represents the Checkpoint parquet file.
let arrow_schema = delta_log_schema_for_table(
<ArrowSchema as TryFrom<&Schema>>::try_from(&current_metadata.schema)?,
Expand All @@ -278,20 +291,25 @@ fn parquet_bytes_from_state(
// Write the Checkpoint parquet file.
let mut bytes = vec![];
let mut writer = ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), None)?;
let mut decoder = ReaderBuilder::new(arrow_schema)
let mut reader = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
let jsons = jsons.collect::<Result<Vec<serde_json::Value>, _>>()?;
decoder.serialize(&jsons)?;

while let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
.build(BufReader::new(joined_jsons.as_slice()))?;

let mut cp_size: usize = 0;
while let Some(batch_result) = reader.next() {
match batch_result {
Err(error) => return Err(ProtocolError::from(error)),
Ok(batch) => {
cp_size += batch.num_rows();
writer.write(&batch)?
}
}
}

let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");

let checkpoint = CheckPointBuilder::new(state.version(), jsons.len() as i64)
let checkpoint = CheckPointBuilder::new(state.version(), cp_size as i64)
.with_size_in_bytes(bytes.len() as i64)
.build();
Ok((checkpoint, bytes::Bytes::from(bytes)))
Expand Down
10 changes: 6 additions & 4 deletions crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ pub enum ProtocolError {
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum ColumnValueStat {
/// Composite HashMap representation of statistics.
Column(HashMap<String, ColumnValueStat>),
// Order matters here! Since the serde enum strategy is "untagged" it will attempt to match in order.
/// Json representation of statistics.
Value(Value),
/// Composite HashMap representation of statistics.
Column(HashMap<String, ColumnValueStat>),
}

impl ColumnValueStat {
Expand All @@ -139,10 +140,11 @@ impl ColumnValueStat {
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum ColumnCountStat {
/// Composite HashMap representation of statistics.
Column(HashMap<String, ColumnCountStat>),
// Order matters here! Since the serde enum strategy is "untagged" it will attempt to match in order.
/// Json representation of statistics.
Value(i64),
/// Composite HashMap representation of statistics.
Column(HashMap<String, ColumnCountStat>),
}

impl ColumnCountStat {
Expand Down
30 changes: 22 additions & 8 deletions crates/deltalake-core/src/protocol/parquet_read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::{collections::HashMap, str::FromStr};

use chrono::{SecondsFormat, TimeZone, Utc};
use num_bigint::BigInt;
use num_traits::cast::ToPrimitive;
use parquet::record::{Field, ListAccessor, MapAccessor, RowAccessor};
use rust_decimal::Decimal;
use serde_json::json;

use crate::protocol::{
Expand Down Expand Up @@ -315,12 +314,27 @@ fn primitive_parquet_field_to_json_value(field: &Field) -> Result<serde_json::Va
Field::Float(value) => Ok(json!(value)),
Field::Double(value) => Ok(json!(value)),
Field::Str(value) => Ok(json!(value)),
Field::Decimal(decimal) => match BigInt::from_signed_bytes_be(decimal.data()).to_f64() {
Some(int) => Ok(json!(
int / (10_i64.pow((decimal.scale()).try_into().unwrap()) as f64)
)),
_ => Err("Invalid type for min/max values."),
},
Field::Decimal(decimal) => {
let val = decimal.data();
let val = if val.len() <= 4 {
let mut bytes = [0; 4];
bytes[..val.len()].copy_from_slice(val);
i32::from_be_bytes(bytes) as i128
} else if val.len() <= 8 {
let mut bytes = [0; 8];
bytes[..val.len()].copy_from_slice(val);
i64::from_be_bytes(bytes) as i128
} else if val.len() <= 16 {
let mut bytes = [0; 16];
bytes[..val.len()].copy_from_slice(val);
i128::from_be_bytes(bytes)
} else {
return Err("Invalid type for min/max values.");
};

let val = Decimal::from_i128_with_scale(val, decimal.scale() as u32);
Ok(serde_json::from_str(&val.to_string()).unwrap())
}
Field::TimestampMicros(timestamp) => Ok(serde_json::Value::String(
convert_timestamp_micros_to_string(*timestamp)?,
)),
Expand Down
18 changes: 15 additions & 3 deletions crates/deltalake-core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque};
use std::str::FromStr;
use std::sync::Arc;

use arrow::compute::cast;
use arrow::compute::kernels::cast_utils::Parser;
use arrow_array::types::{Date32Type, TimestampMicrosecondType};
use arrow_array::{
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float64Array, Int64Array, NullArray,
StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float64Array,
Int64Array, NullArray, StringArray, StructArray, TimestampMicrosecondArray,
TimestampMillisecondArray,
};
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use itertools::Itertools;
Expand Down Expand Up @@ -691,14 +693,24 @@ fn json_value_to_array_general<'a>(
));
Ok(arrow::compute::cast(&i64_arr, datatype)?)
}
DataType::Float32 | DataType::Float64 | DataType::Decimal128(_, _) => {
DataType::Float32 | DataType::Float64 => {
let f64_arr: ArrayRef = Arc::new(Float64Array::from(
values
.map(|value| value.and_then(serde_json::Value::as_f64))
.collect_vec(),
));
Ok(arrow::compute::cast(&f64_arr, datatype)?)
}
DataType::Decimal128(_, _) => {
let dec_array: ArrayRef = Arc::new(Decimal128Array::from(
values
.map(|value| {
value.and_then(|value| Some(i128::from_str(&value.to_string()).unwrap()))
})
.collect_vec(),
));
Ok(arrow::compute::cast(&dec_array, datatype)?)
}
DataType::Utf8 => Ok(Arc::new(StringArray::from(
values
.map(|value| value.and_then(serde_json::Value::as_str))
Expand Down
20 changes: 9 additions & 11 deletions crates/deltalake-core/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use parquet::{
file::{metadata::RowGroupMetaData, statistics::Statistics},
format::TimeUnit,
};
use rust_decimal::Decimal;

use super::*;
use crate::protocol::{Add, ColumnValueStat, Stats};
Expand Down Expand Up @@ -120,8 +121,7 @@ enum StatsScalar {
Float64(f64),
Date(chrono::NaiveDate),
Timestamp(chrono::NaiveDateTime),
// We are serializing to f64 later and the ordering should be the same
Decimal(f64),
Decimal(Decimal),
String(String),
Bytes(Vec<u8>),
Uuid(uuid::Uuid),
Expand Down Expand Up @@ -152,8 +152,7 @@ impl StatsScalar {
Ok(Self::Date(date))
}
(Statistics::Int32(v), Some(LogicalType::Decimal { scale, .. })) => {
let val = get_stat!(v) as f64 / 10.0_f64.powi(*scale);
// Spark serializes these as numbers
let val = Decimal::new(get_stat!(v) as i64, *scale as u32);
Ok(Self::Decimal(val))
}
(Statistics::Int32(v), _) => Ok(Self::Int32(get_stat!(v))),
Expand All @@ -179,8 +178,7 @@ impl StatsScalar {
Ok(Self::Timestamp(timestamp))
}
(Statistics::Int64(v), Some(LogicalType::Decimal { scale, .. })) => {
let val = get_stat!(v) as f64 / 10.0_f64.powi(*scale);
// Spark serializes these as numbers
let val = Decimal::new(get_stat!(v), *scale as u32);
Ok(Self::Decimal(val))
}
(Statistics::Int64(v), _) => Ok(Self::Int64(get_stat!(v))),
Expand Down Expand Up @@ -218,15 +216,15 @@ impl StatsScalar {
let val = if val.len() <= 4 {
let mut bytes = [0; 4];
bytes[..val.len()].copy_from_slice(val);
i32::from_be_bytes(bytes) as f64
i32::from_be_bytes(bytes) as i128
} else if val.len() <= 8 {
let mut bytes = [0; 8];
bytes[..val.len()].copy_from_slice(val);
i64::from_be_bytes(bytes) as f64
i64::from_be_bytes(bytes) as i128
} else if val.len() <= 16 {
let mut bytes = [0; 16];
bytes[..val.len()].copy_from_slice(val);
i128::from_be_bytes(bytes) as f64
i128::from_be_bytes(bytes)
} else {
return Err(DeltaWriterError::StatsParsingFailed {
debug_value: format!("{val:?}"),
Expand All @@ -237,7 +235,7 @@ impl StatsScalar {
});
};

let val = val / 10.0_f64.powi(*scale);
let val = Decimal::from_i128_with_scale(val, *scale as u32);
Ok(Self::Decimal(val))
}
(Statistics::FixedLenByteArray(v), Some(LogicalType::Uuid)) => {
Expand Down Expand Up @@ -280,7 +278,7 @@ impl From<StatsScalar> for serde_json::Value {
StatsScalar::Timestamp(v) => {
serde_json::Value::from(v.format("%Y-%m-%dT%H:%M:%S%.fZ").to_string())
}
StatsScalar::Decimal(v) => serde_json::Value::from(v),
StatsScalar::Decimal(v) => serde_json::from_str(&v.to_string()).unwrap(),
StatsScalar::String(v) => serde_json::Value::from(v),
StatsScalar::Bytes(v) => {
let escaped_bytes = v
Expand Down

0 comments on commit 602f340

Please sign in to comment.