diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index e9257d045ee..8f75ea6ad9c 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -279,6 +279,37 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( }) } +/// Unify the timestamp unit from parquet TimeUnit into arrow's TimeUnit +/// Returns (a int64 factor, is_multiplier) +fn unifiy_timestmap_unit( + logical_type: &Option, + time_unit: TimeUnit, +) -> (i64, bool) { + if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type { + match (*unit, time_unit) { + (ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond) + | (ParquetTimeUnit::Microseconds, TimeUnit::Microsecond) + | (ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => (1, true), + + (ParquetTimeUnit::Milliseconds, TimeUnit::Second) + | (ParquetTimeUnit::Microseconds, TimeUnit::Millisecond) + | (ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => (1000, false), + + (ParquetTimeUnit::Microseconds, TimeUnit::Second) + | (ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => (1_000_000, false), + + (ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => (1_000_000_000, false), + + (ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond) + | (ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => (1_000, true), + + (ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => (1_000_000, true), + } + } else { + (1, true) + } +} + fn timestamp<'a, I: 'a + DataPages>( pages: I, physical_type: &PhysicalType, @@ -288,17 +319,16 @@ fn timestamp<'a, I: 'a + DataPages>( time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { - if time_unit == TimeUnit::Nanosecond { - return Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - int96_to_i64_ns, - )))); - } else { - return Err(Error::nyi("Can't decode int96 to timestamp other than ns")); - } + let iter = primitive::Iter::new(pages, data_type, chunk_size, int96_to_i64_ns); + + let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit); + return match (factor, is_multiplier) { + (1, _) => Ok(dyn_iter(iden(iter))), + (a, true) => Ok(dyn_iter(op(iter, move |x| x * a))), + (a, false) => Ok(dyn_iter(op(iter, move |x| x / a))), + }; }; + if physical_type != &PhysicalType::Int64 { return Err(Error::nyi( "Can't decode a timestamp from a non-int64 parquet type", @@ -306,36 +336,12 @@ fn timestamp<'a, I: 'a + DataPages>( } let iter = primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x); - - let unit = if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type { - unit - } else { - return Ok(dyn_iter(iden(iter))); - }; - - Ok(match (unit, time_unit) { - (ParquetTimeUnit::Milliseconds, TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)), - (ParquetTimeUnit::Microseconds, TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)), - (ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => { - dyn_iter(op(iter, |x| x / 1_000_000_000)) - } - - (ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond) => dyn_iter(iden(iter)), - (ParquetTimeUnit::Microseconds, TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)), - (ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => { - dyn_iter(op(iter, |x| x / 1_000_000)) - } - - (ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x * 1_000)), - (ParquetTimeUnit::Microseconds, TimeUnit::Microsecond) => dyn_iter(iden(iter)), - (ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x / 1_000)), - - (ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => { - dyn_iter(op(iter, |x| x * 1_000_000)) - } - (ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000)), - (ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => dyn_iter(iden(iter)), - }) + let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit); + match (factor, is_multiplier) { + (1, _) => Ok(dyn_iter(iden(iter))), + (a, true) => Ok(dyn_iter(op(iter, move |x| x * a))), + (a, false) => Ok(dyn_iter(op(iter, move |x| x / a))), + } } fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( @@ -347,130 +353,38 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { - if time_unit == TimeUnit::Nanosecond { - return Ok(dyn_iter(primitive::DictIter::::new( + let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit); + return match (factor, is_multiplier) { + (a, true) => Ok(dyn_iter(primitive::DictIter::::new( pages, DataType::Timestamp(TimeUnit::Nanosecond, None), chunk_size, - int96_to_i64_ns, - ))); - } else { - return Err(Error::nyi("Can't decode int96 to timestamp other than ns")); - } + move |x| int96_to_i64_ns(x) * a, + ))), + (a, false) => Ok(dyn_iter(primitive::DictIter::::new( + pages, + DataType::Timestamp(TimeUnit::Nanosecond, None), + chunk_size, + move |x| int96_to_i64_ns(x) / a, + ))), + }; }; - let unit = if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type { - unit - } else { - return Ok(dyn_iter(primitive::DictIter::::new( + let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit); + match (factor, is_multiplier) { + (a, true) => Ok(dyn_iter(primitive::DictIter::::new( pages, data_type, chunk_size, - |x: i64| x, - ))); - }; - - Ok(match (unit, time_unit) { - (ParquetTimeUnit::Milliseconds, TimeUnit::Second) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x / 1_000, - )) - } - (ParquetTimeUnit::Microseconds, TimeUnit::Second) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x / 1_000_000, - )) - } - (ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x / 1_000_000_000, - )) - } - - (ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x, - )) - } - (ParquetTimeUnit::Microseconds, TimeUnit::Millisecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x / 1_000, - )) - } - (ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x / 1_000_000, - )) - } - - (ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x * 1_000, - )) - } - (ParquetTimeUnit::Microseconds, TimeUnit::Microsecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x, - )) - } - (ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x / 1_000, - )) - } - - (ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x * 1_000_000, - )) - } - (ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x * 1_000, - )) - } - (ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x, - )) - } - }) + move |x: i64| x * a, + ))), + (a, false) => Ok(dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + move |x: i64| x / a, + ))), + } } fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>(