From b147f8ef7a577417b1e736f366c444393b2ab71a Mon Sep 17 00:00:00 2001 From: wypb Date: Fri, 11 Oct 2024 12:12:34 +0800 Subject: [PATCH] Add supports time zone adjustment when reading and writing Timestamp data --- velox/connectors/hive/SplitReader.cpp | 8 +- velox/dwio/dwrf/reader/ColumnReader.cpp | 34 +++++- velox/dwio/dwrf/reader/ColumnReader.h | 4 +- .../reader/SelectiveTimestampColumnReader.cpp | 20 +++- .../reader/SelectiveTimestampColumnReader.h | 3 + velox/dwio/dwrf/test/E2EReaderTest.cpp | 2 + velox/dwio/dwrf/test/E2EWriterTest.cpp | 100 +++++++++++++++++- velox/dwio/dwrf/test/ReaderTest.cpp | 2 + velox/dwio/dwrf/test/WriterExtendedTests.cpp | 7 +- .../dwrf/test/utils/E2EWriterTestUtil.cpp | 34 +++++- .../dwio/dwrf/test/utils/E2EWriterTestUtil.h | 87 ++++++++------- velox/dwio/dwrf/writer/ColumnWriter.cpp | 28 ++++- velox/dwio/dwrf/writer/Writer.cpp | 2 + velox/dwio/dwrf/writer/Writer.h | 2 - velox/exec/tests/TableScanTest.cpp | 94 +++++++++------- velox/type/Timestamp.cpp | 7 +- velox/type/Timestamp.h | 24 ++--- 17 files changed, 341 insertions(+), 117 deletions(-) diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 31b596467817..e63427798dda 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -56,7 +56,13 @@ VectorPtr newConstantFromString( VELOX_USER_FAIL("{}", status.message()); }); if constexpr (kind == TypeKind::TIMESTAMP) { - copy.toGMT(Timestamp::defaultTimezone()); + const tz::TimeZone* timezone; + if (sessionTimezone.empty()) { + timezone = &Timestamp::defaultTimezone(); + } else { + timezone = tz::locateZone(sessionTimezone); + } + copy.toGMT(*timezone); } return std::make_shared>( pool, size, false, type, std::move(copy)); diff --git a/velox/dwio/dwrf/reader/ColumnReader.cpp b/velox/dwio/dwrf/reader/ColumnReader.cpp index 9a11e9ebb567..1996f57705b9 100644 --- a/velox/dwio/dwrf/reader/ColumnReader.cpp +++ b/velox/dwio/dwrf/reader/ColumnReader.cpp @@ -15,6 +15,7 @@ */ #include "velox/dwio/dwrf/reader/ColumnReader.h" +#include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/IntCodecCommon.h" #include "velox/dwio/common/IntDecoder.h" #include "velox/dwio/common/ParallelFor.h" @@ -35,6 +36,7 @@ namespace facebook::velox::dwrf { +using common::testutil::TestValue; using dwio::common::IntDecoder; using memory::MemoryPool; @@ -52,7 +54,11 @@ void fillTimestamps( const uint64_t* nullsPtr, const int64_t* secondsPtr, const uint64_t* nanosPtr, - vector_size_t numValues) { + vector_size_t numValues, + const tz::TimeZone* sessionTimezone, + const bool adjustTimestampToTimezone) { + TestValue::adjust( + "facebook::velox::dwrf::detail::fillTimestamps", (void*)sessionTimezone); for (vector_size_t i = 0; i < numValues; i++) { if (!nullsPtr || !bits::isBitNull(nullsPtr, i)) { auto nanos = nanosPtr[i]; @@ -63,11 +69,21 @@ void fillTimestamps( nanos *= 10; } } - auto seconds = secondsPtr[i] + dwio::common::EPOCH_OFFSET; + + int64_t seconds; + if (sessionTimezone) { + seconds = secondsPtr[i] + dwio::common::UTC_EPOCH_OFFSET; + } else { + // Compatible with meta internal use cases. + seconds = secondsPtr[i] + dwio::common::EPOCH_OFFSET; + } if (seconds < 0 && nanos != 0) { seconds -= 1; } timestamps[i] = Timestamp(seconds, nanos); + if (adjustTimestampToTimezone && sessionTimezone) { + timestamps[i].toGMT(*sessionTimezone); + } } } } @@ -752,6 +768,9 @@ class TimestampColumnReader : public ColumnReader { std::unique_ptr> seconds; std::unique_ptr> nano; + const tz::TimeZone* sessionTimezone_{nullptr}; + bool adjustTimestampToTimezone_{false}; + BufferPtr secondsBuffer_; BufferPtr nanosBuffer_; @@ -797,6 +816,9 @@ TimestampColumnReader::TimestampColumnReader( memoryPool_, nanoVInts, dwio::common::LONG_BYTE_SIZE); + + sessionTimezone_ = stripe.sessionTimezone(); + adjustTimestampToTimezone_ = stripe.adjustTimestampToTimezone(); } uint64_t TimestampColumnReader::skip(uint64_t numValues) { @@ -843,7 +865,13 @@ void TimestampColumnReader::next( nano->next(reinterpret_cast(nanosData), numValues, nullsPtr); auto* valuesPtr = values->asMutable(); detail::fillTimestamps( - valuesPtr, nullsPtr, secondsData, nanosData, numValues); + valuesPtr, + nullsPtr, + secondsData, + nanosData, + numValues, + sessionTimezone_, + adjustTimestampToTimezone_); } template diff --git a/velox/dwio/dwrf/reader/ColumnReader.h b/velox/dwio/dwrf/reader/ColumnReader.h index 47d9ec3a667f..1ab014c9a400 100644 --- a/velox/dwio/dwrf/reader/ColumnReader.h +++ b/velox/dwio/dwrf/reader/ColumnReader.h @@ -192,7 +192,9 @@ void fillTimestamps( const uint64_t* nulls, const int64_t* seconds, const uint64_t* nanos, - vector_size_t numValues); + vector_size_t numValues, + const tz::TimeZone* sessionTimezone, + const bool adjustTimestampToTimezone); } // namespace detail } // namespace facebook::velox::dwrf diff --git a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp index 689584d706f1..dd2f5b3b0e1f 100644 --- a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp @@ -15,6 +15,7 @@ */ #include "velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h" +#include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/BufferUtil.h" #include "velox/dwio/dwrf/common/DecoderUtil.h" @@ -28,7 +29,9 @@ SelectiveTimestampColumnReader::SelectiveTimestampColumnReader( common::ScanSpec& scanSpec) : SelectiveColumnReader(fileType->type(), fileType, params, scanSpec), precision_( - params.stripeStreams().rowReaderOptions().timestampPrecision()) { + params.stripeStreams().rowReaderOptions().timestampPrecision()), + sessionTimezone_(params.sessionTimezone()), + adjustTimestampToTimezone_(params.adjustTimestampToTimezone()) { EncodingKey encodingKey{fileType_->id(), params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); version_ = convertRleVersion(stripe.getEncoding(encodingKey).kind()); @@ -74,6 +77,10 @@ void SelectiveTimestampColumnReader::read( VELOX_CHECK( !scanSpec_->valueHook(), "Selective reader for TIMESTAMP doesn't support aggregation pushdown yet"); + common::testutil::TestValue::adjust( + "facebook::velox::dwrf::SelectiveTimestampColumnReader::read", + (void*)sessionTimezone_); + if (!resultNulls_ || !resultNulls_->unique() || resultNulls_->capacity() * 8 < rows.size()) { // Make sure a dedicated resultNulls_ is allocated with enough capacity as @@ -146,7 +153,13 @@ void SelectiveTimestampColumnReader::readHelper( nanos *= 10; } } - auto seconds = secondsData[i] + EPOCH_OFFSET; + int64_t seconds = 0; + if (sessionTimezone_) { + seconds = secondsData[i] + UTC_EPOCH_OFFSET; + } else { + // Compatible with meta internal use cases. + seconds = secondsData[i] + EPOCH_OFFSET; + } if (seconds < 0 && nanos != 0) { seconds -= 1; } @@ -161,6 +174,9 @@ void SelectiveTimestampColumnReader::readHelper( break; } rawTs[i] = Timestamp(seconds, nanos); + if (adjustTimestampToTimezone_ && sessionTimezone_) { + rawTs[i].toGMT(*sessionTimezone_); + } } } values_ = tsValues; diff --git a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h index b3f1b8e9c845..45014d5e53c2 100644 --- a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h @@ -61,6 +61,9 @@ class SelectiveTimestampColumnReader // Values from copied from 'seconds_'. Nanos are in 'values_'. BufferPtr secondsValues_; RleVersion version_; + + const tz::TimeZone* sessionTimezone_{nullptr}; + bool adjustTimestampToTimezone_{false}; }; } // namespace facebook::velox::dwrf diff --git a/velox/dwio/dwrf/test/E2EReaderTest.cpp b/velox/dwio/dwrf/test/E2EReaderTest.cpp index 3e3ab15f1964..9861e11c1181 100644 --- a/velox/dwio/dwrf/test/E2EReaderTest.cpp +++ b/velox/dwio/dwrf/test/E2EReaderTest.cpp @@ -142,6 +142,8 @@ TEST_P(E2EReaderTest, SharedDictionaryFlatmapReadAsStruct) { std::move(sink), type, config, + /*sessionTimezone=*/nullptr, + /*adjustTimestampToTimezone=*/false, E2EWriterTestUtil::simpleFlushPolicyFactory(true)); auto seed = folly::Random::secureRand32(); diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 22d1c624c946..aaf7e09a915d 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -315,6 +315,8 @@ TEST_F(E2EWriterTest, DISABLED_TestFileCreation) { type, batches, config, + /*sessionTimezone=*/nullptr, + /*adjustTimestampToTimezone=*/false, dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(true)); } @@ -375,6 +377,73 @@ TEST_F(E2EWriterTest, E2E) { dwrf::E2EWriterTestUtil::testWriter(*leafPool_, type, batches, 1, 1, config); } +TEST_F(E2EWriterTest, testTmestampTimezone) { + const size_t batchCount = 4; + size_t batchSize = 1100; + + HiveTypeParser parser; + auto type = parser.parse("struct"); + + auto config = std::make_shared(); + config->set(dwrf::Config::ROW_INDEX_STRIDE, static_cast(1000)); + + std::vector batches; + for (size_t i = 0; i < batchCount; ++i) { + batches.push_back( + BatchMaker::createBatch(type, batchSize, *leafPool_, nullptr, i)); + batchSize = 200; + } + + for (bool adjustTimestampToTimezone : {false, true}) { + for (bool useSelectiveColumnReader : {false, true}) { + SCOPED_TRACE(fmt::format( + "useSelectiveColumnReader: {}, adjustTimestampToTimezone: {}", + useSelectiveColumnReader, + adjustTimestampToTimezone)); + + // Verify that the ColumnWriter has obtained the sessionTimezone + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::Writer::Writer", + std::function( + ([&](dwrf::WriterBase* writerBase) { + VELOX_CHECK_EQ( + writerBase->getContext().sessionTimezone()->name(), + "Asia/Shanghai"); + }))); + + if (useSelectiveColumnReader) { + // Verify that the SelectiveTimestampColumnReader has obtained the + // sessionTimezone + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::SelectiveTimestampColumnReader::read", + std::function( + ([&](tz::TimeZone* sessionTimezone) { + VELOX_CHECK_EQ(sessionTimezone->name(), "Asia/Shanghai"); + }))); + } else { + // Verify that the ColumnReader has obtained the sessionTimezone + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::detail::fillTimestamps", + std::function( + ([&](tz::TimeZone* sessionTimezone) { + VELOX_CHECK_EQ(sessionTimezone->name(), "Asia/Shanghai"); + }))); + } + + dwrf::E2EWriterTestUtil::testWriter( + *leafPool_, + type, + batches, + 1, + 1, + config, + "Asia/Shanghai", + useSelectiveColumnReader, + adjustTimestampToTimezone); + } + } +} + // Disabled because test is failing in continuous runs T193531984. TEST_F(E2EWriterTest, DISABLED_DisableLinearHeuristics) { const size_t batchCount = 100; @@ -562,6 +631,8 @@ TEST_F(E2EWriterTest, PresentStreamIsSuppressedOnFlatMap) { type, dwrf::E2EWriterTestUtil::generateBatches(std::move(batch)), config, + /*sessionTimezone=*/nullptr, + /*adjustTimestampToTimezone=*/false, dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(true)); dwio::common::ReaderOptions readerOpts{leafPool_.get()}; @@ -672,6 +743,9 @@ TEST_F(E2EWriterTest, FlatMapBackfill) { 1, 1, config, + /*sessionTimezoneName=*/"", + /*useSelectiveColumnReader=*/false, + /*adjustTimestampToTimezone=*/false, dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(false)); } @@ -721,6 +795,9 @@ void testFlatMapWithNulls( 1, 1, config, + /*sessionTimezoneName=*/"", + /*useSelectiveColumnReader=*/false, + /*adjustTimestampToTimezone=*/false, dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(false)); } @@ -784,6 +861,9 @@ TEST_F(E2EWriterTest, FlatMapEmpty) { 1, 1, config, + /*sessionTimezoneName=*/"", + /*useSelectiveColumnReader=*/false, + /*adjustTimestampToTimezone=*/false, dwrf::E2EWriterTestUtil::simpleFlushPolicyFactory(false)); } @@ -978,9 +1058,12 @@ TEST_F(E2EWriterTest, OversizeRows) { 1, 1, config, + /*sessionTimezoneName=*/"", + /*useSelectiveColumnReader=*/false, + /*adjustTimestampToTimezone=*/false, /*flushPolicyFactory=*/nullptr, /*layoutPlannerFactory=*/nullptr, - /*memoryBudget=*/std::numeric_limits::max(), + /*writerMemoryCap=*/std::numeric_limits::max(), false); } @@ -1010,9 +1093,12 @@ TEST_F(E2EWriterTest, OversizeBatches) { 10, 10, config, + /*sessionTimezoneName=*/"", + /*useSelectiveColumnReader=*/false, + /*adjustTimestampToTimezone=*/false, /*flushPolicyFactory=*/nullptr, /*layoutPlannerFactory=*/nullptr, - /*memoryBudget=*/std::numeric_limits::max(), + /*writerMemoryCap=*/std::numeric_limits::max(), false); // Test splitting multiple huge batches. @@ -1026,9 +1112,12 @@ TEST_F(E2EWriterTest, OversizeBatches) { 15, 16, config, + /*sessionTimezoneName=*/"", + /*useSelectiveColumnReader=*/false, + /*adjustTimestampToTimezone=*/false, /*flushPolicyFactory=*/nullptr, /*layoutPlannerFactory=*/nullptr, - /*memoryBudget=*/std::numeric_limits::max(), + /*writerMemoryCap=*/std::numeric_limits::max(), false); } @@ -1086,9 +1175,12 @@ TEST_F(E2EWriterTest, OverflowLengthIncrements) { 1, 1, config, + /*sessionTimezoneName=*/"", + /*useSelectiveColumnReader=*/false, + /*adjustTimestampToTimezone=*/false, /*flushPolicyFactory=*/nullptr, /*layoutPlannerFactory=*/nullptr, - /*memoryBudget=*/std::numeric_limits::max(), + /*writerMemoryCap=*/std::numeric_limits::max(), false); } diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index daeb070d1c7c..4f56f1e8dfb1 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -2087,6 +2087,8 @@ createWriterReader( asRowType(batches[0]->type()), batches, config, + /*sessionTimezone=*/nullptr, + /*adjustTimestampToTimezone=*/false, std::move(flushPolicy)); std::string data(sinkPtr->data(), sinkPtr->size()); auto input = std::make_unique( diff --git a/velox/dwio/dwrf/test/WriterExtendedTests.cpp b/velox/dwio/dwrf/test/WriterExtendedTests.cpp index ddf383b449eb..13290388053e 100644 --- a/velox/dwio/dwrf/test/WriterExtendedTests.cpp +++ b/velox/dwio/dwrf/test/WriterExtendedTests.cpp @@ -83,10 +83,13 @@ void testWriterDefaultFlushPolicy( numStripesLower, numStripesUpper, config, + /*sessionTimezoneName=*/"", + /*useSelectiveColumnReader=*/false, + /*adjustTimestampToTimezone=*/false, /*flushPolicyFactory=*/nullptr, /*layoutPlannerFactory=*/nullptr, - memoryBudget, - false); + /*writerMemoryCap=*/memoryBudget, + /*verifyContent=*/false); } class E2EWriterTest : public testing::Test { diff --git a/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp b/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp index 1504cd1f6185..46b18f10a4fa 100644 --- a/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp +++ b/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.cpp @@ -33,6 +33,8 @@ namespace facebook::velox::dwrf { std::unique_ptr sink, const std::shared_ptr& type, const std::shared_ptr& config, + const tz::TimeZone* sessionTimezone, + bool adjustTimestampToTimezone, std::function()> flushPolicyFactory, std::function(const TypeWithId&)> layoutPlannerFactory, @@ -40,6 +42,8 @@ namespace facebook::velox::dwrf { // write file to memory dwrf::WriterOptions options; options.config = config; + options.sessionTimezone = sessionTimezone; + options.adjustTimestampToTimezone = adjustTimestampToTimezone; options.schema = type; options.memoryBudget = writerMemoryCap; options.flushPolicyFactory = flushPolicyFactory; @@ -67,6 +71,8 @@ namespace facebook::velox::dwrf { const std::shared_ptr& type, const std::vector& batches, const std::shared_ptr& config, + const tz::TimeZone* sessionTimezone, + bool adjustTimestampToTimezone, std::function()> flushPolicyFactory, std::function(const TypeWithId&)> layoutPlannerFactory, @@ -75,6 +81,8 @@ namespace facebook::velox::dwrf { std::move(sink), type, config, + sessionTimezone, + adjustTimestampToTimezone, std::move(flushPolicyFactory), std::move(layoutPlannerFactory), writerMemoryCap); @@ -90,6 +98,9 @@ namespace facebook::velox::dwrf { size_t numStripesLower, size_t numStripesUpper, const std::shared_ptr& config, + const std::string& sessionTimezoneName, + bool useSelectiveColumnReader, + bool adjustTimestampToTimezone, std::function()> flushPolicyFactory, std::function(const TypeWithId&)> layoutPlannerFactory, @@ -100,14 +111,21 @@ namespace facebook::velox::dwrf { 200 * 1024 * 1024, FileSink::Options{.pool = &pool}); auto sinkPtr = sink.get(); + const tz::TimeZone* sessionTimezone = nullptr; + if (!sessionTimezoneName.empty()) { + sessionTimezone = tz::locateZone(sessionTimezoneName); + } + // Writer owns sink. Keeping writer alive to avoid deleting the sink. auto writer = writeData( std::move(sink), type, batches, config, - flushPolicyFactory, - layoutPlannerFactory, + sessionTimezone, + adjustTimestampToTimezone, + std::move(flushPolicyFactory), + std::move(layoutPlannerFactory), writerMemoryCap); // read it back and compare auto readFile = std::make_shared( @@ -115,7 +133,17 @@ namespace facebook::velox::dwrf { auto input = std::make_unique(readFile, pool); dwio::common::ReaderOptions readerOpts{&pool}; + readerOpts.setSessionTimezone(sessionTimezone); + readerOpts.setAdjustTimestampToTimezone(adjustTimestampToTimezone); + readerOpts.setFileFormat(dwio::common::FileFormat::DWRF); RowReaderOptions rowReaderOpts; + + if (useSelectiveColumnReader) { + auto spec = std::make_shared(""); + spec->addAllChildFields(*type); + rowReaderOpts.setScanSpec(spec); + rowReaderOpts.setTimestampPrecision(TimestampPrecision::kNanoseconds); + } auto reader = std::make_unique(readerOpts, std::move(input)); EXPECT_GE(numStripesUpper, reader->getNumberOfStripes()); EXPECT_LE(numStripesLower, reader->getNumberOfStripes()); @@ -147,7 +175,7 @@ namespace facebook::velox::dwrf { auto batchIndex = 0; auto rowIndex = 0; - VectorPtr batch; + auto batch = BaseVector::create(type, 0, &pool); while (dwrfRowReader->next(1000, batch)) { for (int32_t i = 0; i < batch->size(); ++i) { ASSERT_TRUE(batches[batchIndex]->equalValueAt(batch.get(), rowIndex, i)) diff --git a/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.h b/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.h index 5eb2a9faa0b6..a13cda9263f4 100644 --- a/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.h +++ b/velox/dwio/dwrf/test/utils/E2EWriterTestUtil.h @@ -23,71 +23,75 @@ namespace facebook::velox::dwrf { class E2EWriterTestUtil { public: - /** - * Creates and returns the writer so that the caller can control - * its life cycle. The writer is constructed with the supplied parameters. - * sink the container for writer output, could be - * in-memory or on-disk. - * type schema of the output data - * config ORC configs - * flushPolicyFactory supplies the policy writer use to determine - * when to flush the current stripe and start a - * new one - * layoutPlannerFactory supplies the layout planner and determine how - * order of the data streams prior to flush - * writerMemoryCap total memory budget for the writer - */ + /// Creates and returns the writer so that the caller can control + /// its life cycle. The writer is constructed with the supplied parameters. + /// sink the container for writer output, could be + /// in-memory or on-disk. + /// type schema of the output data + /// config ORC configs + /// sessionTimezone Session timezone used for writing Timestamp + /// adjustTimestampToTimezone Whether to adjust Timestamp to the timeZone + /// obtained through sessionTimezone + /// flushPolicyFactory supplies the policy writer use to determine + /// when to flush the current stripe and start + /// a new one + /// layoutPlannerFactory supplies the layout planner and determine + /// how order of the data streams prior to + /// flush + /// writerMemoryCap total memory budget for the writer static std::unique_ptr createWriter( std::unique_ptr sink, const std::shared_ptr& type, const std::shared_ptr& config, + const tz::TimeZone* sessionTimezone, + bool adjustTimestampToTimezone = false, std::function()> flushPolicyFactory = nullptr, std::function( const dwio::common::TypeWithId&)> layoutPlannerFactory = nullptr, const int64_t writerMemoryCap = std::numeric_limits::max()); - /** - * Writes data and returns the same writer so that the caller can control - * its life cycle. Parameters: - * writer the writer to write data into - * batches generated data - */ + /// Writes data and returns the same writer so that the caller can control + /// its life cycle. Parameters: + /// writer the writer to write data into + /// batches generated data static std::unique_ptr writeData( std::unique_ptr writer, const std::vector& batches); - /* - * Combines createWriter and writeData. - * The writer is constructed with the supplied parameters. - * sink the container for writer output, could be - * in-memory or on-disk. - * type schema of the output data - * batches generated data - * config ORC configs - * flushPolicyFactory supplies the policy writer use to determine - * when to flush the current stripe and start a - * new one - * layoutPlannerFactory supplies the layout planner and determine how - * order of the data streams prior to flush - * writerMemoryCap total memory budget for the writer - */ + /// Combines createWriter and writeData. + /// The writer is constructed with the supplied parameters. + /// sink the container for writer output, could be + /// in-memory or on-disk. + /// type schema of the output data + /// batches generated data + /// config ORC configs + /// sessionTimezone Session timezone used for writing Timestamp + /// adjustTimestampToTimezone Whether to adjust Timestamp to the timeZone + /// obtained through sessionTimezone + /// flushPolicyFactory supplies the policy writer use to determine + /// when to flush the current stripe and start + /// a new one + /// layoutPlannerFactory supplies the layout planner and determine + /// how order of the data streams prior to + /// flush + /// writerMemoryCap total memory budget for the writer static std::unique_ptr writeData( std::unique_ptr sink, const std::shared_ptr& type, const std::vector& batches, const std::shared_ptr& config, + const tz::TimeZone* sessionTimezone = nullptr, + bool adjustTimestampToTimezone = false, std::function()> flushPolicyFactory = nullptr, std::function( const dwio::common::TypeWithId&)> layoutPlannerFactory = nullptr, const int64_t writerMemoryCap = std::numeric_limits::max()); - /** - * Creates a writer with the supplied configuration and check the IO - * characteristics and the content of its output. Uses writeData to perform - * the data write and pass through most of the parameters. - */ + /// Creates a writer with the supplied configuration and check the IO + /// characteristics and the content of its output. Uses writeData to perform + /// the data write and pass through most of the parameters. static void testWriter( memory::MemoryPool& pool, const std::shared_ptr& type, @@ -97,6 +101,9 @@ class E2EWriterTestUtil { size_t numStripesLower, size_t numStripesUpper, const std::shared_ptr& config, + const std::string& sessionTimezoneName = "", + bool useSelectiveColumnReader = false, + bool adjustTimestampToTimezone = false, std::function()> flushPolicyFactory = nullptr, std::function( diff --git a/velox/dwio/dwrf/writer/ColumnWriter.cpp b/velox/dwio/dwrf/writer/ColumnWriter.cpp index 425f9b5df289..c7f3ad7c7393 100644 --- a/velox/dwio/dwrf/writer/ColumnWriter.cpp +++ b/velox/dwio/dwrf/writer/ColumnWriter.cpp @@ -672,7 +672,9 @@ class TimestampColumnWriter : public BaseColumnWriter { RleVersion_1, newStream(StreamKind::StreamKind_NANO_DATA), context.getConfig(Config::USE_VINTS), - LONG_BYTE_SIZE)} { + LONG_BYTE_SIZE)}, + sessionTimezone_{context.sessionTimezone()}, + adjustTimestampToTimezone_(context.adjustTimestampToTimezone()) { reset(); } @@ -695,14 +697,21 @@ class TimestampColumnWriter : public BaseColumnWriter { private: std::unique_ptr> seconds_; std::unique_ptr> nanos_; + const tz::TimeZone* sessionTimezone_{nullptr}; + bool adjustTimestampToTimezone_{false}; }; namespace { -FOLLY_ALWAYS_INLINE int64_t formatTime(int64_t seconds, uint64_t nanos) { +FOLLY_ALWAYS_INLINE int64_t formatTime( + Timestamp timestamp, + const tz::TimeZone* sessionTimezone, + bool adjustTimestampToTimezone) { + auto seconds = timestamp.getSeconds(); + auto nanos = timestamp.getNanos(); DWIO_ENSURE(seconds >= MIN_SECONDS); if (seconds < 0 && nanos != 0) { - // Adding 1 for neagive seconds is there to imitate the Java ORC writer. + // Adding 1 for negative seconds is there to imitate the Java ORC writer. // Consider the case where -1500 milliseconds need to be represented. // In java world, due to a bug (-1500/1000 will result in -1 or rounding up) // Due to this Java represented -1500 millis as -1 for seconds and 5*10^8 @@ -715,6 +724,13 @@ FOLLY_ALWAYS_INLINE int64_t formatTime(int64_t seconds, uint64_t nanos) { seconds += 1; } + if (adjustTimestampToTimezone && sessionTimezone) { + timestamp.toTimezone(*sessionTimezone); + return timestamp.getSeconds() - UTC_EPOCH_OFFSET; + } else if (!adjustTimestampToTimezone && sessionTimezone) { + return seconds - UTC_EPOCH_OFFSET; + } + // Compatible with meta internal use cases. return seconds - EPOCH_OFFSET; } @@ -752,7 +768,8 @@ uint64_t TimestampColumnWriter::write( for (auto& pos : ranges) { if (!decodedVector.isNullAt(pos)) { auto ts = decodedVector.valueAt(pos); - seconds_->writeValue(formatTime(ts.getSeconds(), ts.getNanos())); + seconds_->writeValue( + formatTime(ts, sessionTimezone_, adjustTimestampToTimezone_)); nanos_->writeValue(formatNanos(ts.getNanos())); ++count; } @@ -760,7 +777,8 @@ uint64_t TimestampColumnWriter::write( } else { for (auto& pos : ranges) { auto ts = decodedVector.valueAt(pos); - seconds_->writeValue(formatTime(ts.getSeconds(), ts.getNanos())); + seconds_->writeValue( + formatTime(ts, sessionTimezone_, adjustTimestampToTimezone_)); nanos_->writeValue(formatNanos(ts.getNanos())); ++count; } diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 9f062302993b..502902c433f5 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -81,6 +81,8 @@ Writer::Writer( options.sessionTimezone, options.adjustTimestampToTimezone, std::move(handler)); + common::testutil::TestValue::adjust( + "facebook::velox::dwrf::Writer::Writer", writerBase_.get()); auto& context = writerBase_->getContext(); VELOX_CHECK_EQ( context.getTotalMemoryUsage(), diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 56ea96088fbc..66c38c99a516 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -42,8 +42,6 @@ struct WriterOptions : public dwio::common::WriterOptions { WriterContext& context, const velox::dwio::common::TypeWithId& type)> columnWriterFactory; - const tz::TimeZone* sessionTimezone{nullptr}; - bool adjustTimestampToTimezone{false}; }; class Writer : public dwio::common::Writer { diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index bbacb0def04d..e51b11dfc709 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -4739,39 +4739,61 @@ TEST_F(TableScanTest, varbinaryPartitionKey) { TEST_F(TableScanTest, timestampPartitionKey) { const char* inputs[] = {"2023-10-14 07:00:00.0", "2024-01-06 04:00:00.0"}; - auto expected = makeRowVector( - {"t"}, - { - makeFlatVector( - std::end(inputs) - std::begin(inputs), - [&](auto i) { - auto t = util::fromTimestampString( - inputs[i], util::TimestampParseMode::kPrestoCast) - .thenOrThrow( - folly::identity, [&](const Status& status) { - VELOX_USER_FAIL("{}", status.message()); - }); - t.toGMT(Timestamp::defaultTimezone()); - return t; - }), - }); - auto vectors = makeVectors(1, 1); - auto filePath = TempFilePath::create(); - writeToFile(filePath->getPath(), vectors); - ColumnHandleMap assignments = {{"t", partitionKey("t", TIMESTAMP())}}; - std::vector> splits; - for (auto& t : inputs) { - splits.push_back(HiveConnectorSplitBuilder(filePath->getPath()) - .partitionKey("t", t) - .build()); + for (std::string sessionTimezone : + {"", "Asia/Shanghai", "America/Los_Angeles", "UTC"}) { + SCOPED_TRACE(fmt::format("sessionTimezone {}", sessionTimezone)); + + auto expected = makeRowVector( + {"t"}, + { + makeFlatVector( + std::end(inputs) - std::begin(inputs), + [&](auto i) { + auto t = util::fromTimestampString( + inputs[i], util::TimestampParseMode::kPrestoCast) + .thenOrThrow( + folly::identity, [&](const Status& status) { + VELOX_USER_FAIL("{}", status.message()); + }); + + if (sessionTimezone.empty()) { + t.toGMT(Timestamp::defaultTimezone()); + } else { + const auto timezone = tz::locateZone(sessionTimezone); + t.toGMT(*timezone); + } + + return t; + }), + }); + auto vectors = makeVectors(1, 1); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + ColumnHandleMap assignments = {{"t", partitionKey("t", TIMESTAMP())}}; + std::vector> splits; + for (auto& t : inputs) { + splits.push_back(HiveConnectorSplitBuilder(filePath->getPath()) + .partitionKey("t", t) + .build()); + } + auto plan = PlanBuilder() + .startTableScan() + .outputType(ROW({"t"}, {TIMESTAMP()})) + .assignments(assignments) + .endTableScan() + .planNode(); + + if (!sessionTimezone.empty()) { + AssertQueryBuilder(plan) + .config(core::QueryConfig::kSessionTimezone, sessionTimezone) + .splits(std::move(splits)) + .assertResults(expected); + } else { + AssertQueryBuilder(plan) + .splits(std::move(splits)) + .assertResults(expected); + } } - auto plan = PlanBuilder() - .startTableScan() - .outputType(ROW({"t"}, {TIMESTAMP()})) - .assignments(assignments) - .endTableScan() - .planNode(); - AssertQueryBuilder(plan).splits(std::move(splits)).assertResults(expected); } TEST_F(TableScanTest, partitionKeyNotMatchPartitionKeysHandle) { @@ -4883,8 +4905,8 @@ TEST_F(TableScanTest, dynamicFilters) { } TEST_F(TableScanTest, dynamicFilterWithRowIndexColumn) { - // This test ensures dynamic filters can be mapped to correct field when there - // is row_index column. + // This test ensures dynamic filters can be mapped to correct field when + // there is row_index column. auto aVector = makeRowVector({"a"}, {makeFlatVector(10, folly::identity)}); auto bVector = makeRowVector({"b"}, {makeFlatVector(10, [](auto i) { @@ -4969,8 +4991,8 @@ TEST_F(TableScanTest, DISABLED_memoryArbitrationWithSlowTableScan) { auto faultyFs = faultyFileSystem(); std::atomic_bool injectOnce{true}; faultyFs->setFileInjectionHook([&](FaultFileOperation* readOp) { - // Inject memory arbitration at the second read file so as to make sure the - // aggregation has accumulated state to spill. + // Inject memory arbitration at the second read file so as to make sure + // the aggregation has accumulated state to spill. if (readOp->path != filePaths.back()->getPath()) { return; } diff --git a/velox/type/Timestamp.cpp b/velox/type/Timestamp.cpp index ba5318def972..031f2be54dba 100644 --- a/velox/type/Timestamp.cpp +++ b/velox/type/Timestamp.cpp @@ -93,12 +93,7 @@ void Timestamp::toTimezone(const tz::TimeZone& zone) { const tz::TimeZone& Timestamp::defaultTimezone() { static const tz::TimeZone* kDefault = ({ - // TODO: We are hard-coding PST/PDT here to be aligned with the current - // behavior in DWRF reader/writer. Once they are fixed, we can use - // date::current_zone() here. - // - // See https://github.com/facebookincubator/velox/issues/8127 - auto* tz = tz::locateZone("America/Los_Angeles"); + auto* tz = tz::locateZone(date::current_zone()->name()); VELOX_CHECK_NOT_NULL(tz); tz; }); diff --git a/velox/type/Timestamp.h b/velox/type/Timestamp.h index 88a31e72f62e..07b23e8cd13b 100644 --- a/velox/type/Timestamp.h +++ b/velox/type/Timestamp.h @@ -117,10 +117,10 @@ struct Timestamp { /// and the number of nanoseconds. static Timestamp fromDaysAndNanos(int32_t days, int64_t nanos); - // date is the number of days since unix epoch. + /// date is the number of days since unix epoch. static Timestamp fromDate(int32_t date); - // Returns the current unix timestamp (ms precision). + /// Returns the current unix timestamp (ms precision). static Timestamp now(); static Timestamp create(const folly::dynamic& obj) { @@ -137,7 +137,7 @@ struct Timestamp { return nanos_; } - // Keep it in header for getting inlined. + /// Keep it in header for getting inlined. int64_t toNanos() const { // int64 can store around 292 years in nanos ~ till 2262-04-12. // When an integer overflow occurs in the calculation, @@ -173,7 +173,7 @@ struct Timestamp { return result; } - // Keep it in header for getting inlined. + /// Keep it in header for getting inlined. int64_t toMillisAllowOverflow() const { // Similar to the above toMillis() except that overflowed integer is allowed // as result. @@ -340,12 +340,12 @@ struct Timestamp { const TimestampToStringOptions& options, char* const startPosition); - // Assuming the timestamp represents a time at zone, converts it to the GMT - // time at the same moment. For example: - // - // Timestamp ts{0, 0}; - // ts.Timezone("America/Los_Angeles"); - // ts.toString(); // returns January 1, 1970 08:00:00 + /// Assuming the timestamp represents a time at zone, converts it to the GMT + /// time at the same moment. For example: + /// + /// Timestamp ts{0, 0}; + /// ts.Timezone("America/Los_Angeles"); + /// ts.toString(); // returns January 1, 1970 08:00:00 void toGMT(const tz::TimeZone& zone); /// Assuming the timestamp represents a GMT time, converts it to the time at @@ -413,7 +413,7 @@ struct Timestamp { VELOX_USER_FAIL("Timestamp nanos out of range"); } - // Needed for serialization of FlatVector + /// Needed for serialization of FlatVector operator StringView() const { return StringView("TODO: Implement"); } @@ -446,7 +446,7 @@ struct Timestamp { return obj; } - // Pretty printer for gtest. + /// Pretty printer for gtest. friend void PrintTo(const Timestamp& timestamp, std::ostream* os) { *os << "sec: " << timestamp.seconds_ << ", ns: " << timestamp.nanos_; }