Skip to content

Commit

Permalink
Add support to read plain encoded INT96 timestamp from Parquet file (o…
Browse files Browse the repository at this point in the history
  • Loading branch information
mskapilks authored and glutenperfbot committed Jan 8, 2024
1 parent 2ff4e78 commit f3f106f
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 14 deletions.
12 changes: 11 additions & 1 deletion velox/dwio/common/DirectDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,17 @@ class DirectDecoder : public IntDecoder<isSigned> {
} else if constexpr (std::is_same_v<
typename Visitor::DataType,
int128_t>) {
toSkip = visitor.process(super::template readInt<int128_t>(), atEnd);
if (super::numBytes == 12 /* INT96 */) {
int128_t encoded = super::template readInt<int128_t>();
int32_t days = encoded & ((1ULL << 32) - 1);
uint64_t nanos = static_cast<uint64_t>(encoded >> 32);

auto timestamp = Timestamp::fromDaysAndNanos(days, nanos);
toSkip =
visitor.process(*reinterpret_cast<int128_t*>(&timestamp), atEnd);
} else {
toSkip = visitor.process(super::template readInt<int128_t>(), atEnd);
}
} else {
toSkip = visitor.process(super::template readInt<int64_t>(), atEnd);
}
Expand Down
37 changes: 36 additions & 1 deletion velox/dwio/common/IntDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ class IntDecoder {
template <typename T>
T readInt();

template <typename T>
T readInt96();

template <typename T>
T readVInt();

Expand Down Expand Up @@ -453,12 +456,44 @@ inline T IntDecoder<isSigned>::readInt() {
return readLittleEndianFromBigEndian<T>();
} else {
if constexpr (std::is_same_v<T, int128_t>) {
VELOX_NYI();
if (numBytes == 12) {
// TODO:: Do we need to handle useVInts case?
return readInt96<T>();
} else {
VELOX_NYI();
}
}
return readLongLE();
}
}

template <bool isSigned>
template <typename T>
inline T IntDecoder<isSigned>::readInt96() {
int64_t offset = 0;
unsigned char ch;

// read unsigned byte 64
uint64_t part1 = 0;
for (uint32_t i = 0; i < 8; ++i) {
ch = readByte();
part1 |= (ch & BASE_256_MASK) << offset;
offset += 8;
}

// read signed byte 32
int32_t part2 = 0;
offset = 0;
for (uint32_t i = 0; i < 4; ++i) {
ch = readByte();
part2 |= (ch & BASE_256_MASK) << offset;
offset += 8;
}

int128_t result = part1;
return (result << 32) | part2;
}

template <bool isSigned>
template <typename T>
inline T IntDecoder<isSigned>::readVInt() {
Expand Down
14 changes: 3 additions & 11 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
static constexpr int64_t kSecondsPerDay = 86400LL;
static constexpr int64_t kNanosPerSecond =
Timestamp::kNanosecondsInMillisecond *
Timestamp::kMillisecondsInSecond;

for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
Expand All @@ -432,12 +428,8 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
&days,
parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t),
sizeof(int32_t));
int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay;
if (nanos > Timestamp::kMaxNanos) {
seconds += nanos / kNanosPerSecond;
nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond;
}
values[i] = Timestamp(seconds, nanos);

values[i] = Timestamp::fromDaysAndNanos(days, nanos);
}
break;
}
Expand Down
Binary file not shown.
Binary file not shown.
27 changes: 27 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,33 @@ TEST_F(ParquetTableScanTest, timestampFilter) {
"testInt128() is not supported");
}

TEST_F(ParquetTableScanTest, timestampINT96) {
auto a = makeFlatVector<Timestamp>({Timestamp(1, 0), Timestamp(2, 0)});
auto expected = makeRowVector({"time"}, {a});
createDuckDbTable("expected", {expected});

auto vector = makeArrayVector<Timestamp>({{}});
loadData(
getExampleFilePath("timestamp_dict_int96.parquet"),
ROW({"time"}, {TIMESTAMP()}),
makeRowVector(
{"time"},
{
vector,
}));
assertSelect({"time"}, "SELECT time from expected");

loadData(
getExampleFilePath("timestamp_plain_int96.parquet"),
ROW({"time"}, {TIMESTAMP()}),
makeRowVector(
{"time"},
{
vector,
}));
assertSelect({"time"}, "SELECT time from expected");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, false);
Expand Down
2 changes: 1 addition & 1 deletion velox/functions/sparksql/tests/DecimalUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DecimalUtilTest : public testing::Test {
R expectedResult,
bool expectedOverflow) {
R r;
bool overflow;
bool overflow = false;
DecimalUtil::divideWithRoundUp<R, A, B>(r, a, b, aRescale, overflow);
ASSERT_EQ(overflow, expectedOverflow);
ASSERT_EQ(r, expectedResult);
Expand Down
15 changes: 15 additions & 0 deletions velox/type/Timestamp.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,21 @@ struct Timestamp {
VELOX_USER_DCHECK_LE(nanos, kMaxNanos, "Timestamp nanos out of range");
}

static Timestamp fromDaysAndNanos(int32_t days, uint64_t nanos) {
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
static constexpr int64_t kSecondsPerDay = 86400LL;
static constexpr int64_t kNanosPerSecond =
Timestamp::kNanosecondsInMillisecond * Timestamp::kMillisecondsInSecond;

int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay;
if (nanos > Timestamp::kMaxNanos) {
seconds += nanos / kNanosPerSecond;
nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond;
}

return Timestamp(seconds, nanos);
}

// Returns the current unix timestamp (ms precision).
static Timestamp now();

Expand Down

0 comments on commit f3f106f

Please sign in to comment.