diff --git a/velox/dwio/common/DirectDecoder.h b/velox/dwio/common/DirectDecoder.h index b966a31d70d9..286e4c967b84 100644 --- a/velox/dwio/common/DirectDecoder.h +++ b/velox/dwio/common/DirectDecoder.h @@ -98,7 +98,17 @@ class DirectDecoder : public IntDecoder { } else if constexpr (std::is_same_v< typename Visitor::DataType, int128_t>) { - toSkip = visitor.process(super::template readInt(), atEnd); + if (super::numBytes == 12 /* INT96 */) { + int128_t encoded = super::template readInt(); + int32_t days = encoded & ((1ULL << 32) - 1); + uint64_t nanos = static_cast(encoded >> 32); + + auto timestamp = Timestamp::fromDaysAndNanos(days, nanos); + toSkip = + visitor.process(*reinterpret_cast(×tamp), atEnd); + } else { + toSkip = visitor.process(super::template readInt(), atEnd); + } } else { toSkip = visitor.process(super::template readInt(), atEnd); } diff --git a/velox/dwio/common/IntDecoder.h b/velox/dwio/common/IntDecoder.h index 1a6f6f597a26..a4287b5f22cc 100644 --- a/velox/dwio/common/IntDecoder.h +++ b/velox/dwio/common/IntDecoder.h @@ -167,6 +167,9 @@ class IntDecoder { template T readInt(); + template + T readInt96(); + template T readVInt(); @@ -453,12 +456,44 @@ inline T IntDecoder::readInt() { return readLittleEndianFromBigEndian(); } else { if constexpr (std::is_same_v) { - VELOX_NYI(); + if (numBytes == 12) { + // TODO:: Do we need to handle useVInts case? + return readInt96(); + } else { + VELOX_NYI(); + } } return readLongLE(); } } +template +template +inline T IntDecoder::readInt96() { + int64_t offset = 0; + unsigned char ch; + + // read unsigned byte 64 + uint64_t part1 = 0; + for (uint32_t i = 0; i < 8; ++i) { + ch = readByte(); + part1 |= (ch & BASE_256_MASK) << offset; + offset += 8; + } + + // read signed byte 32 + int32_t part2 = 0; + offset = 0; + for (uint32_t i = 0; i < 4; ++i) { + ch = readByte(); + part2 |= (ch & BASE_256_MASK) << offset; + offset += 8; + } + + int128_t result = part1; + return (result << 32) | part2; +} + template template inline T IntDecoder::readVInt() { diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 79476b1ff9b5..771ecb28c2bf 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -414,11 +414,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { // We start from the end to allow in-place expansion. auto values = dictionary_.values->asMutable(); auto parquetValues = dictionary_.values->asMutable(); - static constexpr int64_t kJulianToUnixEpochDays = 2440588LL; - static constexpr int64_t kSecondsPerDay = 86400LL; - static constexpr int64_t kNanosPerSecond = - Timestamp::kNanosecondsInMillisecond * - Timestamp::kMillisecondsInSecond; + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { // Convert the timestamp into seconds and nanos since the Unix epoch, // 00:00:00.000000 on 1 January 1970. @@ -432,12 +428,8 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { &days, parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t), sizeof(int32_t)); - int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay; - if (nanos > Timestamp::kMaxNanos) { - seconds += nanos / kNanosPerSecond; - nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond; - } - values[i] = Timestamp(seconds, nanos); + + values[i] = Timestamp::fromDaysAndNanos(days, nanos); } break; } diff --git a/velox/dwio/parquet/tests/examples/timestamp_dict_int96.parquet b/velox/dwio/parquet/tests/examples/timestamp_dict_int96.parquet new file mode 100644 index 000000000000..661cb7a28522 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/timestamp_dict_int96.parquet differ diff --git a/velox/dwio/parquet/tests/examples/timestamp_plain_int96.parquet b/velox/dwio/parquet/tests/examples/timestamp_plain_int96.parquet new file mode 100644 index 000000000000..f2aa666b7d71 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/timestamp_plain_int96.parquet differ diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 9ccccff4d882..27f9be390bdb 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -625,6 +625,33 @@ TEST_F(ParquetTableScanTest, timestampFilter) { "testInt128() is not supported"); } +TEST_F(ParquetTableScanTest, timestampINT96) { + auto a = makeFlatVector({Timestamp(1, 0), Timestamp(2, 0)}); + auto expected = makeRowVector({"time"}, {a}); + createDuckDbTable("expected", {expected}); + + auto vector = makeArrayVector({{}}); + loadData( + getExampleFilePath("timestamp_dict_int96.parquet"), + ROW({"time"}, {TIMESTAMP()}), + makeRowVector( + {"time"}, + { + vector, + })); + assertSelect({"time"}, "SELECT time from expected"); + + loadData( + getExampleFilePath("timestamp_plain_int96.parquet"), + ROW({"time"}, {TIMESTAMP()}), + makeRowVector( + {"time"}, + { + vector, + })); + assertSelect({"time"}, "SELECT time from expected"); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::init(&argc, &argv, false); diff --git a/velox/functions/sparksql/tests/DecimalUtilTest.cpp b/velox/functions/sparksql/tests/DecimalUtilTest.cpp index 34de356ae7ab..833b88605a20 100644 --- a/velox/functions/sparksql/tests/DecimalUtilTest.cpp +++ b/velox/functions/sparksql/tests/DecimalUtilTest.cpp @@ -30,7 +30,7 @@ class DecimalUtilTest : public testing::Test { R expectedResult, bool expectedOverflow) { R r; - bool overflow; + bool overflow = false; DecimalUtil::divideWithRoundUp(r, a, b, aRescale, overflow); ASSERT_EQ(overflow, expectedOverflow); ASSERT_EQ(r, expectedResult); diff --git a/velox/type/Timestamp.h b/velox/type/Timestamp.h index 5db3bcf30833..e774f9339aeb 100644 --- a/velox/type/Timestamp.h +++ b/velox/type/Timestamp.h @@ -100,6 +100,21 @@ struct Timestamp { VELOX_USER_DCHECK_LE(nanos, kMaxNanos, "Timestamp nanos out of range"); } + static Timestamp fromDaysAndNanos(int32_t days, uint64_t nanos) { + static constexpr int64_t kJulianToUnixEpochDays = 2440588LL; + static constexpr int64_t kSecondsPerDay = 86400LL; + static constexpr int64_t kNanosPerSecond = + Timestamp::kNanosecondsInMillisecond * Timestamp::kMillisecondsInSecond; + + int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay; + if (nanos > Timestamp::kMaxNanos) { + seconds += nanos / kNanosPerSecond; + nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond; + } + + return Timestamp(seconds, nanos); + } + // Returns the current unix timestamp (ms precision). static Timestamp now();