Skip to content

Commit

Permalink
Add supports time zone adjustment when reading and writing Timestamp …
Browse files Browse the repository at this point in the history
…data
  • Loading branch information
wypb committed Oct 11, 2024
1 parent a976ba5 commit 75f04c7
Show file tree
Hide file tree
Showing 17 changed files with 341 additions and 117 deletions.
8 changes: 7 additions & 1 deletion velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ VectorPtr newConstantFromString(
VELOX_USER_FAIL("{}", status.message());
});
if constexpr (kind == TypeKind::TIMESTAMP) {
copy.toGMT(Timestamp::defaultTimezone());
const tz::TimeZone* timezone;
if (sessionTimezone.empty()) {
timezone = &Timestamp::defaultTimezone();
} else {
timezone = tz::locateZone(sessionTimezone);
}
copy.toGMT(*timezone);
}
return std::make_shared<ConstantVector<T>>(
pool, size, false, type, std::move(copy));
Expand Down
34 changes: 31 additions & 3 deletions velox/dwio/dwrf/reader/ColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/dwio/dwrf/reader/ColumnReader.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/common/IntCodecCommon.h"
#include "velox/dwio/common/IntDecoder.h"
#include "velox/dwio/common/ParallelFor.h"
Expand All @@ -35,6 +36,7 @@

namespace facebook::velox::dwrf {

using common::testutil::TestValue;
using dwio::common::IntDecoder;
using memory::MemoryPool;

Expand All @@ -52,7 +54,11 @@ void fillTimestamps(
const uint64_t* nullsPtr,
const int64_t* secondsPtr,
const uint64_t* nanosPtr,
vector_size_t numValues) {
vector_size_t numValues,
const tz::TimeZone* sessionTimezone,
const bool adjustTimestampToTimezone) {
TestValue::adjust(
"facebook::velox::dwrf::detail::fillTimestamps", (void*)sessionTimezone);
for (vector_size_t i = 0; i < numValues; i++) {
if (!nullsPtr || !bits::isBitNull(nullsPtr, i)) {
auto nanos = nanosPtr[i];
Expand All @@ -63,11 +69,21 @@ void fillTimestamps(
nanos *= 10;
}
}
auto seconds = secondsPtr[i] + dwio::common::EPOCH_OFFSET;

int64_t seconds;
if (sessionTimezone) {
seconds = secondsPtr[i] + dwio::common::UTC_EPOCH_OFFSET;
} else {
// Compatible with meta internal use cases.
seconds = secondsPtr[i] + dwio::common::EPOCH_OFFSET;
}
if (seconds < 0 && nanos != 0) {
seconds -= 1;
}
timestamps[i] = Timestamp(seconds, nanos);
if (adjustTimestampToTimezone && sessionTimezone) {
timestamps[i].toGMT(*sessionTimezone);
}
}
}
}
Expand Down Expand Up @@ -752,6 +768,9 @@ class TimestampColumnReader : public ColumnReader {
std::unique_ptr<dwio::common::IntDecoder</*isSigned*/ true>> seconds;
std::unique_ptr<dwio::common::IntDecoder</*isSigned*/ false>> nano;

const tz::TimeZone* sessionTimezone_{nullptr};
bool adjustTimestampToTimezone_{false};

BufferPtr secondsBuffer_;
BufferPtr nanosBuffer_;

Expand Down Expand Up @@ -797,6 +816,9 @@ TimestampColumnReader::TimestampColumnReader(
memoryPool_,
nanoVInts,
dwio::common::LONG_BYTE_SIZE);

sessionTimezone_ = stripe.sessionTimezone();
adjustTimestampToTimezone_ = stripe.adjustTimestampToTimezone();
}

uint64_t TimestampColumnReader::skip(uint64_t numValues) {
Expand Down Expand Up @@ -843,7 +865,13 @@ void TimestampColumnReader::next(
nano->next(reinterpret_cast<int64_t*>(nanosData), numValues, nullsPtr);
auto* valuesPtr = values->asMutable<Timestamp>();
detail::fillTimestamps(
valuesPtr, nullsPtr, secondsData, nanosData, numValues);
valuesPtr,
nullsPtr,
secondsData,
nanosData,
numValues,
sessionTimezone_,
adjustTimestampToTimezone_);
}

template <class T>
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/dwrf/reader/ColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ void fillTimestamps(
const uint64_t* nulls,
const int64_t* seconds,
const uint64_t* nanos,
vector_size_t numValues);
vector_size_t numValues,
const tz::TimeZone* sessionTimezone,
const bool adjustTimestampToTimezone);

} // namespace detail
} // namespace facebook::velox::dwrf
20 changes: 18 additions & 2 deletions velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/common/BufferUtil.h"
#include "velox/dwio/dwrf/common/DecoderUtil.h"

Expand All @@ -28,7 +29,9 @@ SelectiveTimestampColumnReader::SelectiveTimestampColumnReader(
common::ScanSpec& scanSpec)
: SelectiveColumnReader(fileType->type(), fileType, params, scanSpec),
precision_(
params.stripeStreams().rowReaderOptions().timestampPrecision()) {
params.stripeStreams().rowReaderOptions().timestampPrecision()),
sessionTimezone_(params.sessionTimezone()),
adjustTimestampToTimezone_(params.adjustTimestampToTimezone()) {
EncodingKey encodingKey{fileType_->id(), params.flatMapContext().sequence};
auto& stripe = params.stripeStreams();
version_ = convertRleVersion(stripe.getEncoding(encodingKey).kind());
Expand Down Expand Up @@ -74,6 +77,10 @@ void SelectiveTimestampColumnReader::read(
VELOX_CHECK(
!scanSpec_->valueHook(),
"Selective reader for TIMESTAMP doesn't support aggregation pushdown yet");
common::testutil::TestValue::adjust(
"facebook::velox::dwrf::SelectiveTimestampColumnReader::read",
(void*)sessionTimezone_);

if (!resultNulls_ || !resultNulls_->unique() ||
resultNulls_->capacity() * 8 < rows.size()) {
// Make sure a dedicated resultNulls_ is allocated with enough capacity as
Expand Down Expand Up @@ -146,7 +153,13 @@ void SelectiveTimestampColumnReader::readHelper(
nanos *= 10;
}
}
auto seconds = secondsData[i] + EPOCH_OFFSET;
int64_t seconds = 0;
if (sessionTimezone_) {
seconds = secondsData[i] + UTC_EPOCH_OFFSET;
} else {
// Compatible with meta internal use cases.
seconds = secondsData[i] + EPOCH_OFFSET;
}
if (seconds < 0 && nanos != 0) {
seconds -= 1;
}
Expand All @@ -161,6 +174,9 @@ void SelectiveTimestampColumnReader::readHelper(
break;
}
rawTs[i] = Timestamp(seconds, nanos);
if (adjustTimestampToTimezone_ && sessionTimezone_) {
rawTs[i].toGMT(*sessionTimezone_);
}
}
}
values_ = tsValues;
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class SelectiveTimestampColumnReader
// Values from copied from 'seconds_'. Nanos are in 'values_'.
BufferPtr secondsValues_;
RleVersion version_;

const tz::TimeZone* sessionTimezone_{nullptr};
bool adjustTimestampToTimezone_{false};
};

} // namespace facebook::velox::dwrf
2 changes: 2 additions & 0 deletions velox/dwio/dwrf/test/E2EReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ TEST_P(E2EReaderTest, SharedDictionaryFlatmapReadAsStruct) {
std::move(sink),
type,
config,
/*sessionTimezone=*/nullptr,
/*adjustTimestampToTimezone=*/false,
E2EWriterTestUtil::simpleFlushPolicyFactory(true));

auto seed = folly::Random::secureRand32();
Expand Down
100 changes: 96 additions & 4 deletions velox/dwio/dwrf/test/E2EWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ TEST_F(E2EWriterTest, DISABLED_TestFileCreation) {
type,
batches,
config,
/*sessionTimezone=*/nullptr,
/*adjustTimestampToTimezone=*/false,
dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(true));
}

Expand Down Expand Up @@ -375,6 +377,73 @@ TEST_F(E2EWriterTest, E2E) {
dwrf::E2EWriterTestUtil::testWriter(*leafPool_, type, batches, 1, 1, config);
}

TEST_F(E2EWriterTest, testTmestampTimezone) {
const size_t batchCount = 4;
size_t batchSize = 1100;

HiveTypeParser parser;
auto type = parser.parse("struct<timestamp_val:timestamp>");

auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::ROW_INDEX_STRIDE, static_cast<uint32_t>(1000));

std::vector<VectorPtr> batches;
for (size_t i = 0; i < batchCount; ++i) {
batches.push_back(
BatchMaker::createBatch(type, batchSize, *leafPool_, nullptr, i));
batchSize = 200;
}

for (bool adjustTimestampToTimezone : {false, true}) {
for (bool useSelectiveColumnReader : {false, true}) {
SCOPED_TRACE(fmt::format(
"useSelectiveColumnReader: {}, adjustTimestampToTimezone: {}",
useSelectiveColumnReader,
adjustTimestampToTimezone));

// Verify that the ColumnWriter has obtained the sessionTimezone
SCOPED_TESTVALUE_SET(
"facebook::velox::dwrf::Writer::Writer",
std::function<void(dwrf::WriterBase*)>(
([&](dwrf::WriterBase* writerBase) {
VELOX_CHECK_EQ(
writerBase->getContext().sessionTimezone()->name(),
"Asia/Shanghai");
})));

if (useSelectiveColumnReader) {
// Verify that the SelectiveTimestampColumnReader has obtained the
// sessionTimezone
SCOPED_TESTVALUE_SET(
"facebook::velox::dwrf::SelectiveTimestampColumnReader::read",
std::function<void(tz::TimeZone*)>(
([&](tz::TimeZone* sessionTimezone) {
VELOX_CHECK_EQ(sessionTimezone->name(), "Asia/Shanghai");
})));
} else {
// Verify that the ColumnReader has obtained the sessionTimezone
SCOPED_TESTVALUE_SET(
"facebook::velox::dwrf::detail::fillTimestamps",
std::function<void(tz::TimeZone*)>(
([&](tz::TimeZone* sessionTimezone) {
VELOX_CHECK_EQ(sessionTimezone->name(), "Asia/Shanghai");
})));
}

dwrf::E2EWriterTestUtil::testWriter(
*leafPool_,
type,
batches,
1,
1,
config,
"Asia/Shanghai",
useSelectiveColumnReader,
adjustTimestampToTimezone);
}
}
}

// Disabled because test is failing in continuous runs T193531984.
TEST_F(E2EWriterTest, DISABLED_DisableLinearHeuristics) {
const size_t batchCount = 100;
Expand Down Expand Up @@ -562,6 +631,8 @@ TEST_F(E2EWriterTest, PresentStreamIsSuppressedOnFlatMap) {
type,
dwrf::E2EWriterTestUtil::generateBatches(std::move(batch)),
config,
/*sessionTimezone=*/nullptr,
/*adjustTimestampToTimezone=*/false,
dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(true));

dwio::common::ReaderOptions readerOpts{leafPool_.get()};
Expand Down Expand Up @@ -672,6 +743,9 @@ TEST_F(E2EWriterTest, FlatMapBackfill) {
1,
1,
config,
/*sessionTimezoneName=*/"",
/*useSelectiveColumnReader=*/false,
/*adjustTimestampToTimezone=*/false,
dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(false));
}

Expand Down Expand Up @@ -721,6 +795,9 @@ void testFlatMapWithNulls(
1,
1,
config,
/*sessionTimezoneName=*/"",
/*useSelectiveColumnReader=*/false,
/*adjustTimestampToTimezone=*/false,
dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(false));
}

Expand Down Expand Up @@ -784,6 +861,9 @@ TEST_F(E2EWriterTest, FlatMapEmpty) {
1,
1,
config,
/*sessionTimezoneName=*/"",
/*useSelectiveColumnReader=*/false,
/*adjustTimestampToTimezone=*/false,
dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(false));
}

Expand Down Expand Up @@ -978,9 +1058,12 @@ TEST_F(E2EWriterTest, OversizeRows) {
1,
1,
config,
/*sessionTimezoneName=*/"",
/*useSelectiveColumnReader=*/false,
/*adjustTimestampToTimezone=*/false,
/*flushPolicyFactory=*/nullptr,
/*layoutPlannerFactory=*/nullptr,
/*memoryBudget=*/std::numeric_limits<int64_t>::max(),
/*writerMemoryCap=*/std::numeric_limits<int64_t>::max(),
false);
}

Expand Down Expand Up @@ -1010,9 +1093,12 @@ TEST_F(E2EWriterTest, OversizeBatches) {
10,
10,
config,
/*sessionTimezoneName=*/"",
/*useSelectiveColumnReader=*/false,
/*adjustTimestampToTimezone=*/false,
/*flushPolicyFactory=*/nullptr,
/*layoutPlannerFactory=*/nullptr,
/*memoryBudget=*/std::numeric_limits<int64_t>::max(),
/*writerMemoryCap=*/std::numeric_limits<int64_t>::max(),
false);

// Test splitting multiple huge batches.
Expand All @@ -1026,9 +1112,12 @@ TEST_F(E2EWriterTest, OversizeBatches) {
15,
16,
config,
/*sessionTimezoneName=*/"",
/*useSelectiveColumnReader=*/false,
/*adjustTimestampToTimezone=*/false,
/*flushPolicyFactory=*/nullptr,
/*layoutPlannerFactory=*/nullptr,
/*memoryBudget=*/std::numeric_limits<int64_t>::max(),
/*writerMemoryCap=*/std::numeric_limits<int64_t>::max(),
false);
}

Expand Down Expand Up @@ -1086,9 +1175,12 @@ TEST_F(E2EWriterTest, OverflowLengthIncrements) {
1,
1,
config,
/*sessionTimezoneName=*/"",
/*useSelectiveColumnReader=*/false,
/*adjustTimestampToTimezone=*/false,
/*flushPolicyFactory=*/nullptr,
/*layoutPlannerFactory=*/nullptr,
/*memoryBudget=*/std::numeric_limits<int64_t>::max(),
/*writerMemoryCap=*/std::numeric_limits<int64_t>::max(),
false);
}

Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/dwrf/test/ReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,8 @@ createWriterReader(
asRowType(batches[0]->type()),
batches,
config,
/*sessionTimezone=*/nullptr,
/*adjustTimestampToTimezone=*/false,
std::move(flushPolicy));
std::string data(sinkPtr->data(), sinkPtr->size());
auto input = std::make_unique<BufferedInput>(
Expand Down
7 changes: 5 additions & 2 deletions velox/dwio/dwrf/test/WriterExtendedTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ void testWriterDefaultFlushPolicy(
numStripesLower,
numStripesUpper,
config,
/*sessionTimezoneName=*/"",
/*useSelectiveColumnReader=*/false,
/*adjustTimestampToTimezone=*/false,
/*flushPolicyFactory=*/nullptr,
/*layoutPlannerFactory=*/nullptr,
memoryBudget,
false);
/*writerMemoryCap=*/memoryBudget,
/*verifyContent=*/false);
}

class E2EWriterTest : public testing::Test {
Expand Down
Loading

0 comments on commit 75f04c7

Please sign in to comment.