Skip to content

Commit

Permalink
Merge pull request #6668 from sundy-li/i96-timestmap
Browse files Browse the repository at this point in the history
fix(parquet): support read i96 timestamp from parquet file
  • Loading branch information
mergify[bot] authored Jul 19, 2022
2 parents 3bbf035 + 90bacc1 commit 4956b5f
Show file tree
Hide file tree
Showing 36 changed files with 188 additions and 391 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ simd = ["arrow/simd"]
arrow = { package = "arrow2", git = "https://github.com/datafuse-extras/arrow2", default-features = false, features = [
"io_parquet",
"io_parquet_compression",
], rev = "f5f6b7e3" }
], rev = "4cdf6ff2" }

# Crates.io dependencies
arrow-format = { version = "0.7.0", features = ["flight-data", "flight-service", "ipc"] }
Expand Down
3 changes: 2 additions & 1 deletion common/datablocks/src/data_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ impl TryFrom<DataBlock> for Chunk<ArrayRef> {
let arrays = v
.columns()
.iter()
.map(|c| c.as_arrow_array())
.zip(v.schema.fields().iter())
.map(|(c, f)| c.as_arrow_array(f.data_type().clone()))
.collect::<Vec<_>>();

Ok(Chunk::try_new(arrays)?)
Expand Down
2 changes: 1 addition & 1 deletion common/datablocks/src/kernels/data_block_gather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl DataBlock {
) -> Result<ColumnRef> {
let arrays = columns
.iter()
.map(|c| c.as_arrow_array())
.map(|c| c.as_arrow_array(c.data_type()))
.collect::<Vec<ArrayRef>>();

let arrays: Vec<&dyn Array> = arrays.iter().map(|array| array.as_ref()).collect();
Expand Down
9 changes: 6 additions & 3 deletions common/datablocks/src/kernels/data_block_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ impl DataBlock {
) -> Result<DataBlock> {
let order_columns = sort_columns_descriptions
.iter()
.map(|f| Ok(block.try_column_by_name(&f.column_name)?.as_arrow_array()))
.map(|f| {
let c = block.try_column_by_name(&f.column_name)?;
Ok(c.as_arrow_array(c.data_type()))
})
.collect::<Result<Vec<_>>>()?;

let order_arrays = sort_columns_descriptions
Expand Down Expand Up @@ -84,10 +87,10 @@ impl DataBlock {
.iter()
.map(|f| {
let left = lhs.try_column_by_name(&f.column_name)?.clone();
let left = left.as_arrow_array();
let left = left.as_arrow_array(left.data_type());

let right = rhs.try_column_by_name(&f.column_name)?.clone();
let right = right.as_arrow_array();
let right = right.as_arrow_array(right.data_type());

Ok(vec![left, right])
})
Expand Down
5 changes: 4 additions & 1 deletion common/datablocks/src/kernels/data_block_take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ impl DataBlock {
slices: &[MergeSlice],
limit: Option<usize>,
) -> Result<ColumnRef> {
let arrays: Vec<ArrayRef> = columns.iter().map(|c| c.as_arrow_array()).collect();
let arrays: Vec<ArrayRef> = columns
.iter()
.map(|c| c.as_arrow_array(c.data_type()))
.collect();
let arrays: Vec<&dyn Array> = arrays.iter().map(|c| c.as_ref()).collect();
let taked = Self::take_arrays_by_slices_limit(&arrays, slices, limit);

Expand Down
2 changes: 1 addition & 1 deletion common/datavalues/benches/eq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ fn databend_eq_simd(lhs: &ColumnRef, rhs: &ColumnRef) -> Result<ColumnRef> {
}

fn cast(column: &ColumnRef, data_type: &DataTypeImpl) -> Result<ColumnRef> {
let arrow_array = column.as_arrow_array();
let arrow_array = column.as_arrow_array(column.data_type());
let arrow_options = ArrowOption {
wrapped: true,
partial: false,
Expand Down
23 changes: 14 additions & 9 deletions common/datavalues/src/columns/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,20 @@ impl Column for ArrayColumn {
self.values.memory_size() + self.offsets.len() * std::mem::size_of::<i64>()
}

fn as_arrow_array(&self) -> ArrayRef {
let arrow_type = self.data_type().arrow_type();
let array = self.values.as_arrow_array();
Box::new(LargeListArray::from_data(
arrow_type,
self.offsets.clone(),
array,
None,
))
fn as_arrow_array(&self, data_type: DataTypeImpl) -> ArrayRef {
let arrow_type = data_type.arrow_type();
if let ArrowType::LargeList(ref f) = arrow_type {
let inner_f = from_arrow_field(f.as_ref());
let array = self.values.as_arrow_array(inner_f);
Box::new(LargeListArray::from_data(
arrow_type,
self.offsets.clone(),
array,
None,
))
} else {
unreachable!()
}
}

fn arc(&self) -> ColumnRef {
Expand Down
4 changes: 2 additions & 2 deletions common/datavalues/src/columns/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ impl Column for BooleanColumn {
self.values.as_slice().0.len()
}

fn as_arrow_array(&self) -> ArrayRef {
let array = BooleanArray::from_data(ArrowType::Boolean, self.values.clone(), None);
fn as_arrow_array(&self, logical_type: DataTypeImpl) -> ArrayRef {
let array = BooleanArray::from_data(logical_type.arrow_type(), self.values.clone(), None);
Box::new(array)
}

Expand Down
2 changes: 1 addition & 1 deletion common/datavalues/src/columns/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub trait Column: Send + Sync {

fn memory_size(&self) -> usize;
fn arc(&self) -> ColumnRef;
fn as_arrow_array(&self) -> ArrayRef;
fn as_arrow_array(&self, logical_type: DataTypeImpl) -> ArrayRef;
fn slice(&self, offset: usize, length: usize) -> ColumnRef;

fn filter(&self, filter: &BooleanColumn) -> ColumnRef;
Expand Down
4 changes: 2 additions & 2 deletions common/datavalues/src/columns/const_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ impl Column for ConstColumn {
self.column.memory_size()
}

fn as_arrow_array(&self) -> ArrayRef {
fn as_arrow_array(&self, logical_type: DataTypeImpl) -> ArrayRef {
let column = self.column.replicate(&[self.length]);
column.as_arrow_array()
column.as_arrow_array(logical_type)
}

fn arc(&self) -> ColumnRef {
Expand Down
5 changes: 2 additions & 3 deletions common/datavalues/src/columns/null/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use common_arrow::arrow::array::*;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::datatypes::DataType as ArrowType;
use common_arrow::ArrayRef;

use crate::prelude::*;
Expand Down Expand Up @@ -84,8 +83,8 @@ impl Column for NullColumn {
std::mem::size_of::<usize>()
}

fn as_arrow_array(&self) -> ArrayRef {
Box::new(NullArray::new_null(ArrowType::Null, self.length))
fn as_arrow_array(&self, logical_type: DataTypeImpl) -> ArrayRef {
Box::new(NullArray::new_null(logical_type.arrow_type(), self.length))
}

fn arc(&self) -> ColumnRef {
Expand Down
4 changes: 2 additions & 2 deletions common/datavalues/src/columns/nullable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ impl Column for NullableColumn {
self.column.memory_size()
}

fn as_arrow_array(&self) -> ArrayRef {
let result = self.column.as_arrow_array();
fn as_arrow_array(&self, logical_type: DataTypeImpl) -> ArrayRef {
let result = self.column.as_arrow_array(logical_type);
result.with_validity(Some(self.validity.clone()))
}

Expand Down
4 changes: 2 additions & 2 deletions common/datavalues/src/columns/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<T: ObjectType> Column for ObjectColumn<T> {
self.values.len() * std::mem::size_of::<T>()
}

fn as_arrow_array(&self) -> common_arrow::ArrayRef {
fn as_arrow_array(&self, logical_type: DataTypeImpl) -> common_arrow::ArrayRef {
let mut offsets: Vec<i64> = Vec::with_capacity(self.values.len());
let mut values: Vec<u8> = Vec::with_capacity(self.values.len());

Expand All @@ -131,7 +131,7 @@ impl<T: ObjectType> Column for ObjectColumn<T> {
}

Box::new(LargeBinaryArray::from_data(
self.data_type().arrow_type(),
logical_type.arrow_type(),
Buffer::from(offsets),
Buffer::from(values),
None,
Expand Down
50 changes: 26 additions & 24 deletions common/datavalues/src/columns/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ impl<T: PrimitiveType> From<PrimitiveArray<T>> for PrimitiveColumn<T> {
}
}

fn precision(x: &TimeUnit) -> usize {
fn convert_precision_to_micros(x: &TimeUnit) -> (i64, i64) {
match x {
TimeUnit::Second => 1,
TimeUnit::Millisecond => 1_000,
TimeUnit::Microsecond => 1_000_000,
TimeUnit::Nanosecond => 1_000_000_000,
TimeUnit::Second => (1_000_000, 1),
TimeUnit::Millisecond => (1_000, 1),
TimeUnit::Microsecond => (1, 1),
TimeUnit::Nanosecond => (1, 1_000),
}
}

Expand All @@ -72,52 +72,54 @@ impl<T: PrimitiveType> PrimitiveColumn<T> {

if &expected_arrow != array.data_type() {
match array.data_type() {
// u32
ArrowDataType::Timestamp(x, _) => {
let p = precision(x);
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
.expect("primitive cast should be ok");
let array = unary(array, |x| (x as usize / p) as u32, expected_arrow);

Self::from_arrow_array(&array)
}
ArrowDataType::Date32 => {
let array = cast::cast(array, &ArrowDataType::Int32, cast_options)
.expect("primitive cast should be ok");
let array = cast::cast(array.as_ref(), &expected_arrow, cast_options)
.expect("primitive cast should be ok");
Self::from_arrow_array(array.as_ref())
}
// TODO(veeupup): it seems buggy because we do not support store date as i64

ArrowDataType::Date64 => {
let array = cast::cast(array, &ArrowDataType::Int64, cast_options)
let array = cast::cast(array, &ArrowDataType::Int32, cast_options)
.expect("primitive cast should be ok");
let array = cast::cast(array.as_ref(), &expected_arrow, cast_options)
.expect("primitive cast should be ok");

Self::from_arrow_array(array.as_ref())
}

// for all the timestamp column we will cast to int64 with microsecond precision
ArrowDataType::Timestamp(x, _) => {
let p = convert_precision_to_micros(x);
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
.expect("primitive cast should be ok");
let array = unary(array, |x| x * p.0 / p.1, expected_arrow);

Self::from_arrow_array(&array)
}

ArrowDataType::Time32(x) => {
let p = precision(x);
let p = convert_precision_to_micros(x);
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i32>>()
.expect("primitive cast should be ok");

let array = unary(array, |x| (x as usize / p) as u32, expected_arrow);
let array = unary(array, |x| x as i64 * p.0 / p.1, expected_arrow);

Self::from_arrow_array(&array)
}
ArrowDataType::Time64(x) => {
let p = precision(x);
let p = convert_precision_to_micros(x);
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
.expect("primitive cast should be ok");

let array = unary(array, |x| (x as usize / p) as u32, expected_arrow);
let array = unary(array, |x| x as i64 * p.0 / p.1, expected_arrow);
Self::from_arrow_array(&array)
}
_ => unreachable!(),
Expand Down Expand Up @@ -173,8 +175,8 @@ impl<T: PrimitiveType> Column for PrimitiveColumn<T> {
self.values.len() * std::mem::size_of::<T>()
}

fn as_arrow_array(&self) -> common_arrow::ArrayRef {
let data_type = self.data_type().arrow_type();
fn as_arrow_array(&self, logical_type: DataTypeImpl) -> common_arrow::ArrayRef {
let data_type = logical_type.arrow_type();
Box::new(PrimitiveArray::<T>::from_data(
data_type,
self.values.clone(),
Expand Down
2 changes: 1 addition & 1 deletion common/datavalues/src/columns/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl Series {
let is_nullable = columns[0].is_nullable();
let arrays = columns
.iter()
.map(|c| c.as_arrow_array())
.map(|c| c.as_arrow_array(c.data_type()))
.collect::<Vec<_>>();

let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
Expand Down
4 changes: 2 additions & 2 deletions common/datavalues/src/columns/string/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ impl Column for StringColumn {
self.values.len() + self.offsets.len() * std::mem::size_of::<i64>()
}

fn as_arrow_array(&self) -> ArrayRef {
fn as_arrow_array(&self, logical_type: DataTypeImpl) -> ArrayRef {
Box::new(LargeBinaryArray::from_data(
ArrowType::LargeBinary,
logical_type.arrow_type(),
self.offsets.clone(),
self.values.clone(),
None,
Expand Down
22 changes: 18 additions & 4 deletions common/datavalues/src/columns/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use common_arrow::arrow::array::*;
use common_arrow::arrow::datatypes::DataType as ArrowType;
use common_arrow::ArrayRef;

use crate::prelude::*;
Expand Down Expand Up @@ -90,10 +91,23 @@ impl Column for StructColumn {
self.values.iter().map(|v| v.memory_size()).sum()
}

fn as_arrow_array(&self) -> ArrayRef {
let arrow_type = self.data_type().arrow_type();
let arrays = self.values.iter().map(|v| v.as_arrow_array()).collect();
Box::new(StructArray::from_data(arrow_type, arrays, None))
fn as_arrow_array(&self, logical_type: DataTypeImpl) -> ArrayRef {
let arrow_type = logical_type.arrow_type();

if let ArrowType::Struct(f) = &arrow_type {
let arrays = self
.values
.iter()
.zip(f.iter())
.map(|(v, f)| {
let f = from_arrow_field(f);
v.as_arrow_array(f)
})
.collect();
Box::new(StructArray::from_data(arrow_type, arrays, None))
} else {
unreachable!()
}
}

fn arc(&self) -> ColumnRef {
Expand Down
Loading

1 comment on commit 4956b5f

@vercel
Copy link

@vercel vercel bot commented on 4956b5f Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.