From 37146b6dd06559bfa4079a266e03b7362e9dfd21 Mon Sep 17 00:00:00 2001 From: wypb Date: Fri, 30 Aug 2024 11:19:03 +0800 Subject: [PATCH] Add sessionTimezone and adjustTimestampToTimezone to DWRF reader and writer options --- velox/connectors/Connector.h | 10 +++ velox/connectors/hive/HiveConnectorUtil.cpp | 2 + velox/connectors/hive/HiveDataSink.cpp | 7 ++ velox/connectors/hive/SplitReader.cpp | 9 +- velox/dwio/common/Options.h | 15 +++- velox/dwio/dwrf/reader/DwrfData.h | 8 ++ velox/dwio/dwrf/reader/DwrfReader.cpp | 19 +--- velox/dwio/dwrf/reader/DwrfReader.h | 1 - velox/dwio/dwrf/reader/ReaderBase.cpp | 46 ++++------ velox/dwio/dwrf/reader/ReaderBase.h | 66 +++++++------- velox/dwio/dwrf/reader/StripeStream.h | 89 +++++++++---------- velox/dwio/dwrf/test/ColumnWriterTest.cpp | 8 ++ velox/dwio/dwrf/test/E2EWriterTest.cpp | 6 +- velox/dwio/dwrf/test/OrcTest.h | 8 ++ velox/dwio/dwrf/test/ReaderBaseTests.cpp | 3 +- .../dwio/dwrf/test/StripeReaderBaseTests.cpp | 1 - velox/dwio/dwrf/test/TestStripeStream.cpp | 8 ++ velox/dwio/dwrf/test/WriterTest.cpp | 3 +- velox/dwio/dwrf/writer/Writer.cpp | 8 +- velox/dwio/dwrf/writer/Writer.h | 2 + velox/dwio/dwrf/writer/WriterBase.h | 9 +- velox/dwio/dwrf/writer/WriterContext.cpp | 4 + velox/dwio/dwrf/writer/WriterContext.h | 12 +++ velox/dwio/parquet/reader/ParquetReader.cpp | 2 +- velox/exec/Operator.cpp | 1 + 25 files changed, 209 insertions(+), 138 deletions(-) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 6aa1b55c9e2c..bfd2b9ca1a46 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -269,6 +269,7 @@ class ConnectorQueryCtx { const std::string& planNodeId, int driverId, const std::string& sessionTimezone, + bool adjustTimestampToTimezone = false, folly::CancellationToken cancellationToken = {}) : operatorPool_(operatorPool), connectorPool_(connectorPool), @@ -283,6 +284,7 @@ class ConnectorQueryCtx { driverId_(driverId), planNodeId_(planNodeId), sessionTimezone_(sessionTimezone), + adjustTimestampToTimezone_(adjustTimestampToTimezone), cancellationToken_(std::move(cancellationToken)) { VELOX_CHECK_NOT_NULL(sessionProperties); } @@ -351,6 +353,13 @@ class ConnectorQueryCtx { return sessionTimezone_; } + /// Whether to adjust Timestamp to the timeZone obtained through + /// sessionTimezone(). This is used to be compatible with the + /// old logic of Presto. + bool adjustTimestampToTimezone() const { + return adjustTimestampToTimezone_; + } + /// Returns the cancellation token associated with this task. const folly::CancellationToken& cancellationToken() const { return cancellationToken_; @@ -378,6 +387,7 @@ class ConnectorQueryCtx { const int driverId_; const std::string planNodeId_; const std::string sessionTimezone_; + const bool adjustTimestampToTimezone_; const folly::CancellationToken cancellationToken_; bool selectiveNimbleReaderEnabled_{false}; }; diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 09636155c5aa..219288b265f7 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -569,6 +569,8 @@ void configureReaderOptions( const auto timezone = tz::locateZone(sessionTzName); readerOptions.setSessionTimezone(timezone); } + readerOptions.setAdjustTimestampToTimezone( + connectorQueryCtx->adjustTimestampToTimezone()); readerOptions.setSelectiveNimbleReaderEnabled( connectorQueryCtx->selectiveNimbleReaderEnabled()); diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index b4ef37df8101..5fd34cbf82ad 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -744,6 +744,13 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { connectorSessionProperties, options); + const auto& sessionTimeZoneName = connectorQueryCtx_->sessionTimezone(); + if (!sessionTimeZoneName.empty()) { + options->sessionTimezone = tz::locateZone(sessionTimeZoneName); + } + options->adjustTimestampToTimezone = + connectorQueryCtx_->adjustTimestampToTimezone(); + // Prevents the memory allocation during the writer creation. WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1); auto writer = writerFactory_->createWriter( diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index dcdb9441fc3d..0bea3cc30898 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -34,7 +34,8 @@ VectorPtr newConstantFromString( const TypePtr& type, const std::optional& value, vector_size_t size, - velox::memory::MemoryPool* pool) { + velox::memory::MemoryPool* pool, + const std::string& sessionTimezone) { using T = typename TypeTraits::NativeType; if (!value.has_value()) { return std::make_shared>(pool, size, true, type, T()); @@ -365,7 +366,8 @@ std::vector SplitReader::adaptColumns( infoColumnType, iter->second, 1, - connectorQueryCtx_->memoryPool()); + connectorQueryCtx_->memoryPool(), + connectorQueryCtx_->sessionTimezone()); childSpec->setConstantValue(constant); } else if (!childSpec->isExplicitRowNumber()) { auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName); @@ -417,7 +419,8 @@ void SplitReader::setPartitionValue( type, value, 1, - connectorQueryCtx_->memoryPool()); + connectorQueryCtx_->memoryPool(), + connectorQueryCtx_->sessionTimezone()); spec->setConstantValue(constant); } diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 0a4a9dbcff36..8ba90fa9ac2a 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -507,6 +507,11 @@ class ReaderOptions : public io::ReaderOptions { return *this; } + ReaderOptions& setAdjustTimestampToTimezone(bool adjustTimestampToTimezone) { + adjustTimestampToTimezone_ = adjustTimestampToTimezone; + return *this; + } + /// Gets the desired tail location. uint64_t tailLocation() const { return tailLocation_; @@ -546,10 +551,14 @@ class ReaderOptions : public io::ReaderOptions { return ioExecutor_; } - const tz::TimeZone* getSessionTimezone() const { + const tz::TimeZone* sessionTimezone() const { return sessionTimezone_; } + bool adjustTimestampToTimezone() const { + return adjustTimestampToTimezone_; + } + bool fileColumnNamesReadAsLowerCase() const { return fileColumnNamesReadAsLowerCase_; } @@ -604,6 +613,7 @@ class ReaderOptions : public io::ReaderOptions { std::shared_ptr randomSkip_; std::shared_ptr scanSpec_; const tz::TimeZone* sessionTimezone_{nullptr}; + bool adjustTimestampToTimezone_{false}; bool selectiveNimbleReaderEnabled_{false}; }; @@ -635,6 +645,9 @@ struct WriterOptions { std::function()> flushPolicyFactory; + const tz::TimeZone* sessionTimezone{nullptr}; + bool adjustTimestampToTimezone{false}; + virtual ~WriterOptions() = default; }; diff --git a/velox/dwio/dwrf/reader/DwrfData.h b/velox/dwio/dwrf/reader/DwrfData.h index 995cc80ac410..e330edbf7118 100644 --- a/velox/dwio/dwrf/reader/DwrfData.h +++ b/velox/dwio/dwrf/reader/DwrfData.h @@ -149,6 +149,14 @@ class DwrfParams : public dwio::common::FormatParams { return streamLabels_; } + const tz::TimeZone* sessionTimezone() const { + return stripeStreams_.sessionTimezone(); + } + + bool adjustTimestampToTimezone() const { + return stripeStreams_.adjustTimestampToTimezone(); + } + private: const StreamLabels& streamLabels_; StripeStreams& stripeStreams_; diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index b09c54433876..797c3551b625 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -794,26 +794,15 @@ std::optional DwrfRowReader::estimatedRowSize() const { DwrfReader::DwrfReader( const ReaderOptions& options, std::unique_ptr input) - : readerBase_(std::make_unique( - options.memoryPool(), - std::move(input), - options.decrypterFactory(), - options.footerEstimatedSize(), - options.filePreloadThreshold(), - options.fileFormat() == FileFormat::ORC ? FileFormat::ORC - : FileFormat::DWRF, - options.fileColumnNamesReadAsLowerCase(), - options.randomSkip(), - options.scanSpec())), - options_(options) { + : readerBase_(std::make_unique(options, std::move(input))) { // If we are not using column names to map table columns to file columns, // then we use indices. In that case we need to ensure the names completely // match, because we are still mapping columns by names further down the // code. So we rename column names in the file schema to match table schema. // We test the options to have 'fileSchema' (actually table schema) as most // of the unit tests fail to provide it. - if ((!options_.useColumnNamesForColumnMapping()) && - (options_.fileSchema() != nullptr)) { + if ((!readerBase_->readerOptions().useColumnNamesForColumnMapping()) && + (readerBase_->readerOptions().fileSchema() != nullptr)) { updateColumnNamesFromTableSchema(); } } @@ -910,7 +899,7 @@ TypePtr updateColumnNames( } // namespace void DwrfReader::updateColumnNamesFromTableSchema() { - const auto& tableSchema = options_.fileSchema(); + const auto& tableSchema = readerBase_->readerOptions().fileSchema(); const auto& fileSchema = readerBase_->schema(); readerBase_->setSchema(std::dynamic_pointer_cast( updateColumnNames(fileSchema, tableSchema, "", ""))); diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index f156f0dd1b79..a18b6982fa59 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -344,7 +344,6 @@ class DwrfReader : public dwio::common::Reader { private: std::shared_ptr readerBase_; - const dwio::common::ReaderOptions options_; }; class DwrfReaderFactory : public dwio::common::ReaderFactory { diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index ff2b13c9dd7c..249669b764a0 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -80,41 +80,24 @@ ReaderBase::ReaderBase( MemoryPool& pool, std::unique_ptr input, FileFormat fileFormat) - : ReaderBase( - pool, - std::move(input), - nullptr, - dwio::common::ReaderOptions::kDefaultFooterEstimatedSize, - dwio::common::ReaderOptions::kDefaultFilePreloadThreshold, - fileFormat) {} + : ReaderBase(createReaderOptions(pool, fileFormat), std::move(input)) {} ReaderBase::ReaderBase( - MemoryPool& pool, - std::unique_ptr input, - std::shared_ptr decryptorFactory, - uint64_t footerEstimatedSize, - uint64_t filePreloadThreshold, - FileFormat fileFormat, - bool fileColumnNamesReadAsLowerCase, - std::shared_ptr randomSkip, - std::shared_ptr scanSpec) - : pool_{pool}, + const dwio::common::ReaderOptions& options, + std::unique_ptr input) + : options_{options}, arena_(std::make_unique()), - decryptorFactory_(decryptorFactory), - footerEstimatedSize_(footerEstimatedSize), - filePreloadThreshold_(filePreloadThreshold), input_(std::move(input)), - randomSkip_(std::move(randomSkip)), - scanSpec_(std::move(scanSpec)), fileLength_(input_->getReadFile()->size()) { process::TraceContext trace("ReaderBase::ReaderBase"); // TODO: make a config DWIO_ENSURE(fileLength_ > 0, "ORC file is empty"); VELOX_CHECK_GE(fileLength_, 4, "File size too small"); - const auto preloadFile = fileLength_ <= filePreloadThreshold_; - const uint64_t readSize = - preloadFile ? fileLength_ : std::min(fileLength_, footerEstimatedSize_); + const auto preloadFile = fileLength_ <= options_.filePreloadThreshold(); + const uint64_t readSize = preloadFile + ? fileLength_ + : std::min(fileLength_, options_.footerEstimatedSize()); if (input_->supportSyncLoad()) { input_->enqueue({fileLength_ - readSize, readSize, "footer"}); input_->load(preloadFile ? LogType::FILE : LogType::FOOTER); @@ -135,7 +118,7 @@ ReaderBase::ReaderBase( fileLength_, "Corrupted file, Post script size is invalid"); - if (fileFormat == FileFormat::DWRF) { + if (fileFormat() == FileFormat::DWRF) { auto postScript = ProtoUtils::readProto( input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); postScript_ = std::make_unique(std::move(postScript)); @@ -173,7 +156,7 @@ ReaderBase::ReaderBase( auto footerStream = input_->read( fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER); - if (fileFormat == FileFormat::DWRF) { + if (fileFormat() == FileFormat::DWRF) { auto footer = google::protobuf::Arena::CreateMessage(arena_.get()); ProtoUtils::readProtoInto( @@ -190,7 +173,7 @@ ReaderBase::ReaderBase( } schema_ = std::dynamic_pointer_cast( - convertType(*footer_, 0, fileColumnNamesReadAsLowerCase)); + convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase())); VELOX_CHECK_NOT_NULL(schema_, "invalid schema"); // load stripe index/footer cache @@ -204,8 +187,8 @@ ReaderBase::ReaderBase( input_->read(cacheOffset, cacheSize, LogType::FOOTER)); input_->load(LogType::FOOTER); } else { - auto cacheBuffer = - std::make_shared>(pool, cacheSize); + auto cacheBuffer = std::make_shared>( + options_.memoryPool(), cacheSize); input_->read(cacheOffset, cacheSize, LogType::FOOTER) ->readFully(cacheBuffer->data(), cacheSize); cache_ = std::make_unique( @@ -226,7 +209,8 @@ ReaderBase::ReaderBase( } } // initialize file decrypter - handler_ = DecryptionHandler::create(*footer_, decryptorFactory_.get()); + handler_ = + DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); } std::vector ReaderBase::rowsPerStripe() const { diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index 34287d3fcd71..57454c177e12 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -62,23 +62,15 @@ class ReaderBase { public: /// Creates reader base from buffered input. ReaderBase( - memory::MemoryPool& pool, - std::unique_ptr input, - std::shared_ptr - decryptorFactory = nullptr, - uint64_t footerEstimatedSize = - dwio::common::ReaderOptions::kDefaultFooterEstimatedSize, - uint64_t filePreloadThreshold = - dwio::common::ReaderOptions::kDefaultFilePreloadThreshold, - dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF, - bool fileColumnNamesReadAsLowerCase = false, - std::shared_ptr randomSkip = nullptr, - std::shared_ptr scanSpec = nullptr); + const dwio::common::ReaderOptions& options, + std::unique_ptr input); + /// Creates reader base from buffered input. + /// It is kept here for backward compatibility with Meta's internal usage. ReaderBase( memory::MemoryPool& pool, std::unique_ptr input, - dwio::common::FileFormat fileFormat); + dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF); /// Creates reader base from metadata. ReaderBase( @@ -88,7 +80,7 @@ class ReaderBase { const proto::Footer* footer, std::unique_ptr cache, std::unique_ptr handler = nullptr) - : pool_{pool}, + : options_{dwio::common::ReaderOptions(&pool)}, postScript_{std::move(ps)}, footer_{std::make_unique(footer)}, cache_{std::move(cache)}, @@ -105,12 +97,17 @@ class ReaderBase { } // for testing - explicit ReaderBase(memory::MemoryPool& pool) : pool_{pool}, fileLength_{0} {} + explicit ReaderBase(const dwio::common::ReaderOptions& options) + : options_{options}, fileLength_{0} {} virtual ~ReaderBase() = default; + const dwio::common::ReaderOptions& readerOptions() const { + return options_; + } + memory::MemoryPool& memoryPool() const { - return pool_; + return options_.memoryPool(); } const PostScript& postScript() const { @@ -125,14 +122,23 @@ class ReaderBase { return schema_; } + dwio::common::FileFormat fileFormat() const { + if (options_.fileFormat() == dwio::common::FileFormat::ORC) { + return dwio::common::FileFormat::ORC; + } + + return dwio::common::FileFormat::DWRF; + } + void setSchema(RowTypePtr newSchema) { schema_ = std::move(newSchema); } const std::shared_ptr& schemaWithId() const { if (!schemaWithId_) { - if (scanSpec_) { - schemaWithId_ = dwio::common::TypeWithId::create(schema_, *scanSpec_); + if (options_.scanSpec()) { + schemaWithId_ = + dwio::common::TypeWithId::create(schema_, *options_.scanSpec()); } else { schemaWithId_ = dwio::common::TypeWithId::create(schema_); } @@ -153,7 +159,7 @@ class ReaderBase { } uint64_t footerEstimatedSize() const { - return footerEstimatedSize_; + return options_.footerEstimatedSize(); } uint64_t fileLength() const { @@ -213,7 +219,7 @@ class ReaderBase { compressionKind(), std::move(compressed), compressionBlockSize(), - pool_, + options_.memoryPool(), streamDebugInfo, decrypter); } @@ -237,7 +243,7 @@ class ReaderBase { } const std::shared_ptr& randomSkip() const { - return randomSkip_; + return options_.randomSkip(); } private: @@ -246,22 +252,22 @@ class ReaderBase { uint32_t index = 0, bool fileColumnNamesReadAsLowerCase = false); - memory::MemoryPool& pool_; + static dwio::common::ReaderOptions createReaderOptions( + memory::MemoryPool& pool, + dwio::common::FileFormat fileFormat) { + dwio::common::ReaderOptions options(&pool); + options.setFileFormat(fileFormat); + return options; + } + std::unique_ptr arena_; std::unique_ptr postScript_; std::unique_ptr footer_ = nullptr; std::unique_ptr cache_; - // Keeps factory alive for possibly async prefetch. - std::shared_ptr decryptorFactory_; std::unique_ptr handler_; - const uint64_t footerEstimatedSize_{ - dwio::common::ReaderOptions::kDefaultFooterEstimatedSize}; - const uint64_t filePreloadThreshold_{ - dwio::common::ReaderOptions::kDefaultFilePreloadThreshold}; + const dwio::common::ReaderOptions options_; const std::unique_ptr input_; - const std::shared_ptr randomSkip_; - const std::shared_ptr scanSpec_; const uint64_t fileLength_; RowTypePtr schema_; diff --git a/velox/dwio/dwrf/reader/StripeStream.h b/velox/dwio/dwrf/reader/StripeStream.h index 362a0b43d097..e7b0bde39283 100644 --- a/velox/dwio/dwrf/reader/StripeStream.h +++ b/velox/dwio/dwrf/reader/StripeStream.h @@ -33,9 +33,7 @@ class StrideIndexProvider { virtual uint64_t getStrideIndex() const = 0; }; -/** - * StreamInformation Implementation - */ +/// StreamInformation Implementation class StreamInformationImpl : public StreamInformation { public: static const StreamInformationImpl& getNotFound() { @@ -91,33 +89,36 @@ class StripeStreams { public: virtual ~StripeStreams() = default; - /** - * Get the DwrfFormat for the stream - * @return DwrfFormat - */ + /// Get the DwrfFormat for the stream + /// + /// @return DwrfFormat virtual DwrfFormat format() const = 0; - /** - * get column selector for current stripe reading session - * @return column selector will hold column projection info - */ + /// get column selector for current stripe reading session + /// + /// @return column selector will hold column projection info virtual const dwio::common::ColumnSelector& getColumnSelector() const = 0; - // Get row reader options + /// Session timezone used for reading Timestamp. + virtual const tz::TimeZone* sessionTimezone() const = 0; + + /// Whether to adjust Timestamp to the timeZone obtained via + /// sessionTimezone(). This is used to be compatible with the + /// old logic of Presto. + virtual bool adjustTimestampToTimezone() const = 0; + + /// Get row reader options virtual const dwio::common::RowReaderOptions& rowReaderOptions() const = 0; - /** - * Get the encoding for the given column for this stripe. - */ + /// Get the encoding for the given column for this stripe. virtual const proto::ColumnEncoding& getEncoding( const EncodingKey&) const = 0; - /** - * Get the stream for the given column/kind in this stripe. - * @param streamId stream identifier object - * @param throwIfNotFound fail if a stream is required and not found - * @return the new stream - */ + /// Get the stream for the given column/kind in this stripe. + /// + /// @param streamId stream identifier object + /// @param throwIfNotFound fail if a stream is required and not found + /// @return the new stream virtual std::unique_ptr getStream( const DwrfStreamIdentifier& si, std::string_view label, @@ -137,35 +138,27 @@ class StripeStreams { virtual std::shared_ptr getStripeDictionaryCache() = 0; - /** - * visit all streams of given node and execute visitor logic - * return number of streams visited - */ + /// visit all streams of given node and execute visitor logic + /// return number of streams visited virtual uint32_t visitStreamsOfNode( uint32_t node, std::function visitor) const = 0; - /** - * Get the value of useVInts for the given column in this stripe. - * Defaults to true. - * @param streamId stream identifier - */ + /// Get the value of useVInts for the given column in this stripe. + /// Defaults to true. + /// @param streamId stream identifier virtual bool getUseVInts(const DwrfStreamIdentifier& streamId) const = 0; - /** - * Get the memory pool for this reader. - */ + /// Get the memory pool for this reader. virtual memory::MemoryPool& getMemoryPool() const = 0; - /** - * Get stride index provider which is used by string dictionary reader to - * get the row index stride index where next() happens - */ + /// Get stride index provider which is used by string dictionary reader to + /// get the row index stride index where next() happens virtual const StrideIndexProvider& getStrideIndexProvider() const = 0; virtual int64_t stripeRows() const = 0; - // Number of rows per row group. Last row group may have fewer rows. + /// Number of rows per row group. Last row group may have fewer rows. virtual uint32_t rowsPerRowGroup() const = 0; }; @@ -181,7 +174,7 @@ class StripeStreamsBase : public StripeStreams { return *pool_; } - // For now just return DWRF, will refine when ORC has better support + /// For now just return DWRF, will refine when ORC has better support virtual DwrfFormat format() const override { return DwrfFormat::kDwrf; } @@ -212,9 +205,7 @@ struct StripeReadState { stripeMetadata{std::move(_stripeMetadata)} {} }; -/** - * StripeStream Implementation - */ +/// StripeStream Implementation class StripeStreamsImpl : public StripeStreamsBase { public: static constexpr int64_t kUnknownStripeRows = -1; @@ -250,6 +241,14 @@ class StripeStreamsImpl : public StripeStreamsBase { return *selector_; } + const tz::TimeZone* sessionTimezone() const override { + return readState_->readerBase->readerOptions().sessionTimezone(); + } + + bool adjustTimestampToTimezone() const override { + return readState_->readerBase->readerOptions().adjustTimestampToTimezone(); + } + const dwio::common::RowReaderOptions& rowReaderOptions() const override { return opts_; } @@ -268,7 +267,7 @@ class StripeStreamsImpl : public StripeStreamsBase { return encodingKeyIt->second; } - // load data into buffer according to read plan + /// load data into buffer according to read plan void loadReadPlan(); std::unique_ptr getCompressedStream( @@ -362,9 +361,7 @@ class StripeStreamsImpl : public StripeStreamsBase { decryptedEncodings_; }; -/** - * StripeInformation Implementation - */ +/// StripeInformation Implementation class StripeInformationImpl : public StripeInformation { uint64_t offset; uint64_t indexLength; diff --git a/velox/dwio/dwrf/test/ColumnWriterTest.cpp b/velox/dwio/dwrf/test/ColumnWriterTest.cpp index 93cc5f053308..e33c595ef770 100644 --- a/velox/dwio/dwrf/test/ColumnWriterTest.cpp +++ b/velox/dwio/dwrf/test/ColumnWriterTest.cpp @@ -154,6 +154,14 @@ class TestStripeStreams : public StripeStreamsBase { return selector_; } + const tz::TimeZone* sessionTimezone() const override { + return context_.sessionTimezone(); + } + + bool adjustTimestampToTimezone() const override { + return context_.adjustTimestampToTimezone(); + } + const RowReaderOptions& rowReaderOptions() const override { return options_; } diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index f8cccab6575d..22d1c624c946 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -21,17 +21,15 @@ #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Statistics.h" -#include "velox/dwio/common/TypeWithId.h" #include "velox/dwio/common/encryption/TestProvider.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/dwio/common/tests/utils/MapBuilder.h" #include "velox/dwio/dwrf/common/Config.h" +#include "velox/dwio/dwrf/reader/ColumnReader.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/dwio/dwrf/test/OrcTest.h" #include "velox/dwio/dwrf/test/utils/E2EWriterTestUtil.h" -#include "velox/dwio/dwrf/writer/Writer.h" #include "velox/type/fbhive/HiveTypeParser.h" -#include "velox/vector/FlatVector.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorMaker.h" @@ -62,7 +60,7 @@ class E2EWriterTest : public testing::Test { leafPool_ = rootPool_->addLeafChild("leaf"); } - std::unique_ptr createReader( + static std::unique_ptr createReader( const MemorySink& sink, const dwio::common::ReaderOptions& opts) { std::string_view data(sink.data(), sink.size()); diff --git a/velox/dwio/dwrf/test/OrcTest.h b/velox/dwio/dwrf/test/OrcTest.h index 942ad269242a..8e21bb813568 100644 --- a/velox/dwio/dwrf/test/OrcTest.h +++ b/velox/dwio/dwrf/test/OrcTest.h @@ -108,6 +108,14 @@ class MockStripeStreams : public StripeStreams { return *getColumnSelectorProxy(); } + const tz::TimeZone* sessionTimezone() const override { + return nullptr; + } + + bool adjustTimestampToTimezone() const override { + return false; + } + const dwio::common::RowReaderOptions& rowReaderOptions() const override { auto ptr = getRowReaderOptionsProxy(); return ptr ? *ptr : options_; diff --git a/velox/dwio/dwrf/test/ReaderBaseTests.cpp b/velox/dwio/dwrf/test/ReaderBaseTests.cpp index 17bb111780a6..07b9128e1f88 100644 --- a/velox/dwio/dwrf/test/ReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/ReaderBaseTests.cpp @@ -212,8 +212,9 @@ std::unique_ptr createCorruptedFileReader( sink.write(std::move(buf)); auto readFile = std::make_shared( std::string(sink.data(), sink.size())); + facebook::velox::dwio::common::ReaderOptions readerOpts{pool.get()}; return std::make_unique( - *pool, std::make_unique(readFile, *pool)); + readerOpts, std::make_unique(readFile, *pool)); } class ReaderBaseTest : public Test { diff --git a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp index 1ed91ed929fa..ff67e9f20ea8 100644 --- a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp @@ -65,7 +65,6 @@ class StripeLoadKeysTest : public Test { auto handler = DecryptionHandler::create(FooterWrapper(footer_.get()), &factory); pool_ = memoryManager()->addLeafPool(); - reader_ = std::make_unique( *pool_, std::make_unique( diff --git a/velox/dwio/dwrf/test/TestStripeStream.cpp b/velox/dwio/dwrf/test/TestStripeStream.cpp index 58dd3bab49ea..b456b7c30af8 100644 --- a/velox/dwio/dwrf/test/TestStripeStream.cpp +++ b/velox/dwio/dwrf/test/TestStripeStream.cpp @@ -635,6 +635,14 @@ class TestStripeStreams : public StripeStreamsBase { VELOX_UNSUPPORTED(); } + const facebook::velox::tz::TimeZone* sessionTimezone() const override { + VELOX_UNSUPPORTED(); + } + + bool adjustTimestampToTimezone() const override { + return false; + } + const facebook::velox::dwio::common::RowReaderOptions& rowReaderOptions() const override { VELOX_UNSUPPORTED(); diff --git a/velox/dwio/dwrf/test/WriterTest.cpp b/velox/dwio/dwrf/test/WriterTest.cpp index 3a8fee8e6d6b..1570f2988865 100644 --- a/velox/dwio/dwrf/test/WriterTest.cpp +++ b/velox/dwio/dwrf/test/WriterTest.cpp @@ -61,7 +61,8 @@ class WriterTest : public Test { std::string data(sinkPtr_->data(), sinkPtr_->size()); auto readFile = std::make_shared(std::move(data)); auto input = std::make_unique(std::move(readFile), *pool_); - return std::make_unique(*pool_, std::move(input)); + dwio::common::ReaderOptions readerOpts{pool_.get()}; + return std::make_unique(readerOpts, std::move(input)); } auto& getContext() { diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index ab92dc52d11a..9f062302993b 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -75,8 +75,12 @@ Writer::Writer( *options.encryptionSpec, options.encrypterFactory.get()) : nullptr); - writerBase_->initContext(options.config, pool, std::move(handler)); - + writerBase_->initContext( + options.config, + pool, + options.sessionTimezone, + options.adjustTimestampToTimezone, + std::move(handler)); auto& context = writerBase_->getContext(); VELOX_CHECK_EQ( context.getTotalMemoryUsage(), diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 66c38c99a516..56ea96088fbc 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -42,6 +42,8 @@ struct WriterOptions : public dwio::common::WriterOptions { WriterContext& context, const velox::dwio::common::TypeWithId& type)> columnWriterFactory; + const tz::TimeZone* sessionTimezone{nullptr}; + bool adjustTimestampToTimezone{false}; }; class Writer : public dwio::common::Writer { diff --git a/velox/dwio/dwrf/writer/WriterBase.h b/velox/dwio/dwrf/writer/WriterBase.h index 29ba724a45d2..2545f5ce4c65 100644 --- a/velox/dwio/dwrf/writer/WriterBase.h +++ b/velox/dwio/dwrf/writer/WriterBase.h @@ -74,9 +74,16 @@ class WriterBase { void initContext( const std::shared_ptr& config, std::shared_ptr pool, + const tz::TimeZone* sessionTimezone = nullptr, + const bool adjustTimestampToTimezone = false, std::unique_ptr handler = nullptr) { context_ = std::make_unique( - config, std::move(pool), sink_->metricsLog(), std::move(handler)); + config, + std::move(pool), + sink_->metricsLog(), + sessionTimezone, + adjustTimestampToTimezone, + std::move(handler)); writerSink_ = std::make_unique( *sink_, context_->getMemoryPool(MemoryUsageCategory::OUTPUT_STREAM), diff --git a/velox/dwio/dwrf/writer/WriterContext.cpp b/velox/dwio/dwrf/writer/WriterContext.cpp index 68cb916d55e2..ce02aacf2921 100644 --- a/velox/dwio/dwrf/writer/WriterContext.cpp +++ b/velox/dwio/dwrf/writer/WriterContext.cpp @@ -27,6 +27,8 @@ WriterContext::WriterContext( const std::shared_ptr& config, std::shared_ptr pool, const dwio::common::MetricsLogPtr& metricLogger, + const tz::TimeZone* sessionTimezone, + const bool adjustTimestampToTimezone, std::unique_ptr handler) : config_{config}, pool_{std::move(pool)}, @@ -52,6 +54,8 @@ WriterContext::WriterContext( // metadata with dwio::common::request::AccessDescriptor upstream and // pass down the metric log. metricLogger_{metricLogger}, + sessionTimezone_{sessionTimezone}, + adjustTimestampToTimezone_{adjustTimestampToTimezone}, handler_{std::move(handler)} { const bool forceLowMemoryMode{getConfig(Config::FORCE_LOW_MEMORY_MODE)}; const bool disableLowMemoryMode{getConfig(Config::DISABLE_LOW_MEMORY_MODE)}; diff --git a/velox/dwio/dwrf/writer/WriterContext.h b/velox/dwio/dwrf/writer/WriterContext.h index 4c93409d8d03..9ba444d53175 100644 --- a/velox/dwio/dwrf/writer/WriterContext.h +++ b/velox/dwio/dwrf/writer/WriterContext.h @@ -43,6 +43,8 @@ class WriterContext : public CompressionBufferPool { std::shared_ptr pool, const dwio::common::MetricsLogPtr& metricLogger = dwio::common::MetricsLog::voidLog(), + const tz::TimeZone* sessionTimezone = nullptr, + const bool adjustTimestampToTimezone = false, std::unique_ptr handler = nullptr); ~WriterContext() override; @@ -595,6 +597,14 @@ class WriterContext : public CompressionBufferPool { return compressionBuffer_.get(); } + const tz::TimeZone* sessionTimezone() const { + return sessionTimezone_; + } + + bool adjustTimestampToTimezone() const { + return adjustTimestampToTimezone_; + } + private: void validateConfigs() const; @@ -628,6 +638,8 @@ class WriterContext : public CompressionBufferPool { const bool streamSizeAboveThresholdCheckEnabled_; const uint64_t rawDataSizePerBatch_; const dwio::common::MetricsLogPtr metricLogger_; + const tz::TimeZone* sessionTimezone_; + const bool adjustTimestampToTimezone_; // Map needs referential stability because reference to map value is stored by // another class. diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 74bd8fb62283..d024becf92d8 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -68,7 +68,7 @@ class ReaderBase { } const tz::TimeZone* sessionTimezone() const { - return options_.getSessionTimezone(); + return options_.sessionTimezone(); } std::optional version() const { diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 973fa4d99c31..8ea1a980790b 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -69,6 +69,7 @@ OperatorCtx::createConnectorQueryCtx( planNodeId, driverCtx_->driverId, driverCtx_->queryConfig().sessionTimezone(), + driverCtx_->queryConfig().adjustTimestampToTimezone(), task->getCancellationToken()); connectorQueryCtx->setSelectiveNimbleReaderEnabled( driverCtx_->queryConfig().selectiveNimbleReaderEnabled());