Skip to content

Commit

Permalink
support convert i96 values into other time unit of arrow's timestamp (j…
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Jul 16, 2022
1 parent 731e06a commit adac2b8
Showing 1 changed file with 70 additions and 156 deletions.
226 changes: 70 additions & 156 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,37 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
})
}

/// Unify the timestamp unit from parquet TimeUnit into arrow's TimeUnit
/// Returns (a int64 factor, is_multiplier)
fn unifiy_timestmap_unit(
logical_type: &Option<PrimitiveLogicalType>,
time_unit: TimeUnit,
) -> (i64, bool) {
if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type {
match (*unit, time_unit) {
(ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond)
| (ParquetTimeUnit::Microseconds, TimeUnit::Microsecond)
| (ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => (1, true),

(ParquetTimeUnit::Milliseconds, TimeUnit::Second)
| (ParquetTimeUnit::Microseconds, TimeUnit::Millisecond)
| (ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => (1000, false),

(ParquetTimeUnit::Microseconds, TimeUnit::Second)
| (ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => (1_000_000, false),

(ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => (1_000_000_000, false),

(ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond)
| (ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => (1_000, true),

(ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => (1_000_000, true),
}
} else {
(1, true)
}
}

fn timestamp<'a, I: 'a + DataPages>(
pages: I,
physical_type: &PhysicalType,
Expand All @@ -288,54 +319,29 @@ fn timestamp<'a, I: 'a + DataPages>(
time_unit: TimeUnit,
) -> Result<ArrayIter<'a>> {
if physical_type == &PhysicalType::Int96 {
if time_unit == TimeUnit::Nanosecond {
return Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
int96_to_i64_ns,
))));
} else {
return Err(Error::nyi("Can't decode int96 to timestamp other than ns"));
}
let iter = primitive::Iter::new(pages, data_type, chunk_size, int96_to_i64_ns);

let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit);
return match (factor, is_multiplier) {
(1, _) => Ok(dyn_iter(iden(iter))),
(a, true) => Ok(dyn_iter(op(iter, move |x| x * a))),
(a, false) => Ok(dyn_iter(op(iter, move |x| x / a))),
};
};

if physical_type != &PhysicalType::Int64 {
return Err(Error::nyi(
"Can't decode a timestamp from a non-int64 parquet type",
));
}

let iter = primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x);

let unit = if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type {
unit
} else {
return Ok(dyn_iter(iden(iter)));
};

Ok(match (unit, time_unit) {
(ParquetTimeUnit::Milliseconds, TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)),
(ParquetTimeUnit::Microseconds, TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)),
(ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => {
dyn_iter(op(iter, |x| x / 1_000_000_000))
}

(ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond) => dyn_iter(iden(iter)),
(ParquetTimeUnit::Microseconds, TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)),
(ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => {
dyn_iter(op(iter, |x| x / 1_000_000))
}

(ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x * 1_000)),
(ParquetTimeUnit::Microseconds, TimeUnit::Microsecond) => dyn_iter(iden(iter)),
(ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x / 1_000)),

(ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => {
dyn_iter(op(iter, |x| x * 1_000_000))
}
(ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000)),
(ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => dyn_iter(iden(iter)),
})
let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit);
match (factor, is_multiplier) {
(1, _) => Ok(dyn_iter(iden(iter))),
(a, true) => Ok(dyn_iter(op(iter, move |x| x * a))),
(a, false) => Ok(dyn_iter(op(iter, move |x| x / a))),
}
}

fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>(
Expand All @@ -347,130 +353,38 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>(
time_unit: TimeUnit,
) -> Result<ArrayIter<'a>> {
if physical_type == &PhysicalType::Int96 {
if time_unit == TimeUnit::Nanosecond {
return Ok(dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit);
return match (factor, is_multiplier) {
(a, true) => Ok(dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
DataType::Timestamp(TimeUnit::Nanosecond, None),
chunk_size,
int96_to_i64_ns,
)));
} else {
return Err(Error::nyi("Can't decode int96 to timestamp other than ns"));
}
move |x| int96_to_i64_ns(x) * a,
))),
(a, false) => Ok(dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
DataType::Timestamp(TimeUnit::Nanosecond, None),
chunk_size,
move |x| int96_to_i64_ns(x) / a,
))),
};
};

let unit = if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type {
unit
} else {
return Ok(dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit);
match (factor, is_multiplier) {
(a, true) => Ok(dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x,
)));
};

Ok(match (unit, time_unit) {
(ParquetTimeUnit::Milliseconds, TimeUnit::Second) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x / 1_000,
))
}
(ParquetTimeUnit::Microseconds, TimeUnit::Second) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x / 1_000_000,
))
}
(ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x / 1_000_000_000,
))
}

(ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x,
))
}
(ParquetTimeUnit::Microseconds, TimeUnit::Millisecond) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x / 1_000,
))
}
(ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x / 1_000_000,
))
}

(ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x * 1_000,
))
}
(ParquetTimeUnit::Microseconds, TimeUnit::Microsecond) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x,
))
}
(ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x / 1_000,
))
}

(ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x * 1_000_000,
))
}
(ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x * 1_000,
))
}
(ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => {
dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
|x: i64| x,
))
}
})
move |x: i64| x * a,
))),
(a, false) => Ok(dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
pages,
data_type,
chunk_size,
move |x: i64| x / a,
))),
}
}

fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>(
Expand Down

0 comments on commit adac2b8

Please sign in to comment.