diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 6aa1b55c9e2c..cf233702b2e5 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -269,6 +269,7 @@ class ConnectorQueryCtx { const std::string& planNodeId, int driverId, const std::string& sessionTimezone, + bool adjustTimestampToTimezone = false, folly::CancellationToken cancellationToken = {}) : operatorPool_(operatorPool), connectorPool_(connectorPool), @@ -283,6 +284,7 @@ class ConnectorQueryCtx { driverId_(driverId), planNodeId_(planNodeId), sessionTimezone_(sessionTimezone), + adjustTimestampToTimezone_(adjustTimestampToTimezone), cancellationToken_(std::move(cancellationToken)) { VELOX_CHECK_NOT_NULL(sessionProperties); } @@ -351,6 +353,13 @@ class ConnectorQueryCtx { return sessionTimezone_; } + /// Whether to adjust Timestamp to the timeZone obtained through + /// sessionTimezone(). This is used to be compatible with the + /// old logic of Presto. + const bool adjustTimestampToTimezone() const { + return adjustTimestampToTimezone_; + } + /// Returns the cancellation token associated with this task. const folly::CancellationToken& cancellationToken() const { return cancellationToken_; @@ -378,6 +387,7 @@ class ConnectorQueryCtx { const int driverId_; const std::string planNodeId_; const std::string sessionTimezone_; + bool adjustTimestampToTimezone_; const folly::CancellationToken cancellationToken_; bool selectiveNimbleReaderEnabled_{false}; }; diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index b6de7566dc11..907519dd0060 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -563,6 +563,8 @@ void configureReaderOptions( const auto timezone = tz::locateZone(sessionTzName); readerOptions.setSessionTimezone(timezone); } + readerOptions.setAdjustTimestampToTimezone( + connectorQueryCtx->adjustTimestampToTimezone()); readerOptions.setSelectiveNimbleReaderEnabled( connectorQueryCtx->selectiveNimbleReaderEnabled()); diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 4c75432b6400..1ee3f122aa0c 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -743,6 +743,13 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { connectorSessionProperties, options); + const auto& sessionTzName = connectorQueryCtx_->sessionTimezone(); + if (!sessionTzName.empty()) { + options->sessionTimezone = tz::locateZone(sessionTzName); + } + options->adjustTimestampToTimezone = + connectorQueryCtx_->adjustTimestampToTimezone(); + // Prevents the memory allocation during the writer creation. WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1); auto writer = writerFactory_->createWriter( diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index dcdb9441fc3d..0bea3cc30898 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -34,7 +34,8 @@ VectorPtr newConstantFromString( const TypePtr& type, const std::optional& value, vector_size_t size, - velox::memory::MemoryPool* pool) { + velox::memory::MemoryPool* pool, + const std::string& sessionTimezone) { using T = typename TypeTraits::NativeType; if (!value.has_value()) { return std::make_shared>(pool, size, true, type, T()); @@ -365,7 +366,8 @@ std::vector SplitReader::adaptColumns( infoColumnType, iter->second, 1, - connectorQueryCtx_->memoryPool()); + connectorQueryCtx_->memoryPool(), + connectorQueryCtx_->sessionTimezone()); childSpec->setConstantValue(constant); } else if (!childSpec->isExplicitRowNumber()) { auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName); @@ -417,7 +419,8 @@ void SplitReader::setPartitionValue( type, value, 1, - connectorQueryCtx_->memoryPool()); + connectorQueryCtx_->memoryPool(), + connectorQueryCtx_->sessionTimezone()); spec->setConstantValue(constant); } diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 0a4a9dbcff36..8ba90fa9ac2a 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -507,6 +507,11 @@ class ReaderOptions : public io::ReaderOptions { return *this; } + ReaderOptions& setAdjustTimestampToTimezone(bool adjustTimestampToTimezone) { + adjustTimestampToTimezone_ = adjustTimestampToTimezone; + return *this; + } + /// Gets the desired tail location. uint64_t tailLocation() const { return tailLocation_; @@ -546,10 +551,14 @@ class ReaderOptions : public io::ReaderOptions { return ioExecutor_; } - const tz::TimeZone* getSessionTimezone() const { + const tz::TimeZone* sessionTimezone() const { return sessionTimezone_; } + bool adjustTimestampToTimezone() const { + return adjustTimestampToTimezone_; + } + bool fileColumnNamesReadAsLowerCase() const { return fileColumnNamesReadAsLowerCase_; } @@ -604,6 +613,7 @@ class ReaderOptions : public io::ReaderOptions { std::shared_ptr randomSkip_; std::shared_ptr scanSpec_; const tz::TimeZone* sessionTimezone_{nullptr}; + bool adjustTimestampToTimezone_{false}; bool selectiveNimbleReaderEnabled_{false}; }; @@ -635,6 +645,9 @@ struct WriterOptions { std::function()> flushPolicyFactory; + const tz::TimeZone* sessionTimezone{nullptr}; + bool adjustTimestampToTimezone{false}; + virtual ~WriterOptions() = default; }; diff --git a/velox/dwio/dwrf/reader/DwrfData.h b/velox/dwio/dwrf/reader/DwrfData.h index 995cc80ac410..e330edbf7118 100644 --- a/velox/dwio/dwrf/reader/DwrfData.h +++ b/velox/dwio/dwrf/reader/DwrfData.h @@ -149,6 +149,14 @@ class DwrfParams : public dwio::common::FormatParams { return streamLabels_; } + const tz::TimeZone* sessionTimezone() const { + return stripeStreams_.sessionTimezone(); + } + + bool adjustTimestampToTimezone() const { + return stripeStreams_.adjustTimestampToTimezone(); + } + private: const StreamLabels& streamLabels_; StripeStreams& stripeStreams_; diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index d20d67bbb0a9..185c7934f905 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -783,26 +783,15 @@ std::optional DwrfRowReader::estimatedRowSize() const { DwrfReader::DwrfReader( const ReaderOptions& options, std::unique_ptr input) - : readerBase_(std::make_unique( - options.memoryPool(), - std::move(input), - options.decrypterFactory(), - options.footerEstimatedSize(), - options.filePreloadThreshold(), - options.fileFormat() == FileFormat::ORC ? FileFormat::ORC - : FileFormat::DWRF, - options.fileColumnNamesReadAsLowerCase(), - options.randomSkip(), - options.scanSpec())), - options_(options) { + : readerBase_(std::make_unique(options, std::move(input))) { // If we are not using column names to map table columns to file columns, // then we use indices. In that case we need to ensure the names completely // match, because we are still mapping columns by names further down the // code. So we rename column names in the file schema to match table schema. // We test the options to have 'fileSchema' (actually table schema) as most // of the unit tests fail to provide it. - if ((!options_.useColumnNamesForColumnMapping()) && - (options_.fileSchema() != nullptr)) { + if ((!readerBase_->readerOptions().useColumnNamesForColumnMapping()) && + (readerBase_->readerOptions().fileSchema() != nullptr)) { updateColumnNamesFromTableSchema(); } } @@ -899,7 +888,7 @@ TypePtr updateColumnNames( } // namespace void DwrfReader::updateColumnNamesFromTableSchema() { - const auto& tableSchema = options_.fileSchema(); + const auto& tableSchema = readerBase_->readerOptions().fileSchema(); const auto& fileSchema = readerBase_->schema(); readerBase_->setSchema(std::dynamic_pointer_cast( updateColumnNames(fileSchema, tableSchema, "", ""))); diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index 43a4dc5fae7b..c1ea1182b099 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -346,7 +346,6 @@ class DwrfReader : public dwio::common::Reader { private: std::shared_ptr readerBase_; - const dwio::common::ReaderOptions options_; }; class DwrfReaderFactory : public dwio::common::ReaderFactory { diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index ff2b13c9dd7c..f90291726c6e 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -77,44 +77,21 @@ FooterStatisticsImpl::FooterStatisticsImpl( } ReaderBase::ReaderBase( - MemoryPool& pool, - std::unique_ptr input, - FileFormat fileFormat) - : ReaderBase( - pool, - std::move(input), - nullptr, - dwio::common::ReaderOptions::kDefaultFooterEstimatedSize, - dwio::common::ReaderOptions::kDefaultFilePreloadThreshold, - fileFormat) {} - -ReaderBase::ReaderBase( - MemoryPool& pool, - std::unique_ptr input, - std::shared_ptr decryptorFactory, - uint64_t footerEstimatedSize, - uint64_t filePreloadThreshold, - FileFormat fileFormat, - bool fileColumnNamesReadAsLowerCase, - std::shared_ptr randomSkip, - std::shared_ptr scanSpec) - : pool_{pool}, + const dwio::common::ReaderOptions& options, + std::unique_ptr input) + : options_{options}, arena_(std::make_unique()), - decryptorFactory_(decryptorFactory), - footerEstimatedSize_(footerEstimatedSize), - filePreloadThreshold_(filePreloadThreshold), input_(std::move(input)), - randomSkip_(std::move(randomSkip)), - scanSpec_(std::move(scanSpec)), fileLength_(input_->getReadFile()->size()) { process::TraceContext trace("ReaderBase::ReaderBase"); // TODO: make a config DWIO_ENSURE(fileLength_ > 0, "ORC file is empty"); VELOX_CHECK_GE(fileLength_, 4, "File size too small"); - const auto preloadFile = fileLength_ <= filePreloadThreshold_; - const uint64_t readSize = - preloadFile ? fileLength_ : std::min(fileLength_, footerEstimatedSize_); + const auto preloadFile = fileLength_ <= options_.filePreloadThreshold(); + const uint64_t readSize = preloadFile + ? fileLength_ + : std::min(fileLength_, options_.footerEstimatedSize()); if (input_->supportSyncLoad()) { input_->enqueue({fileLength_ - readSize, readSize, "footer"}); input_->load(preloadFile ? LogType::FILE : LogType::FOOTER); @@ -135,7 +112,7 @@ ReaderBase::ReaderBase( fileLength_, "Corrupted file, Post script size is invalid"); - if (fileFormat == FileFormat::DWRF) { + if (fileFormat() == FileFormat::DWRF) { auto postScript = ProtoUtils::readProto( input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); postScript_ = std::make_unique(std::move(postScript)); @@ -173,7 +150,7 @@ ReaderBase::ReaderBase( auto footerStream = input_->read( fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER); - if (fileFormat == FileFormat::DWRF) { + if (fileFormat() == FileFormat::DWRF) { auto footer = google::protobuf::Arena::CreateMessage(arena_.get()); ProtoUtils::readProtoInto( @@ -190,7 +167,7 @@ ReaderBase::ReaderBase( } schema_ = std::dynamic_pointer_cast( - convertType(*footer_, 0, fileColumnNamesReadAsLowerCase)); + convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase())); VELOX_CHECK_NOT_NULL(schema_, "invalid schema"); // load stripe index/footer cache @@ -204,8 +181,8 @@ ReaderBase::ReaderBase( input_->read(cacheOffset, cacheSize, LogType::FOOTER)); input_->load(LogType::FOOTER); } else { - auto cacheBuffer = - std::make_shared>(pool, cacheSize); + auto cacheBuffer = std::make_shared>( + options_.memoryPool(), cacheSize); input_->read(cacheOffset, cacheSize, LogType::FOOTER) ->readFully(cacheBuffer->data(), cacheSize); cache_ = std::make_unique( @@ -226,7 +203,8 @@ ReaderBase::ReaderBase( } } // initialize file decrypter - handler_ = DecryptionHandler::create(*footer_, decryptorFactory_.get()); + handler_ = + DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); } std::vector ReaderBase::rowsPerStripe() const { diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index 34287d3fcd71..8e5b3387f37e 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -62,23 +62,8 @@ class ReaderBase { public: /// Creates reader base from buffered input. ReaderBase( - memory::MemoryPool& pool, - std::unique_ptr input, - std::shared_ptr - decryptorFactory = nullptr, - uint64_t footerEstimatedSize = - dwio::common::ReaderOptions::kDefaultFooterEstimatedSize, - uint64_t filePreloadThreshold = - dwio::common::ReaderOptions::kDefaultFilePreloadThreshold, - dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF, - bool fileColumnNamesReadAsLowerCase = false, - std::shared_ptr randomSkip = nullptr, - std::shared_ptr scanSpec = nullptr); - - ReaderBase( - memory::MemoryPool& pool, - std::unique_ptr input, - dwio::common::FileFormat fileFormat); + const dwio::common::ReaderOptions& options, + std::unique_ptr input); /// Creates reader base from metadata. ReaderBase( @@ -88,7 +73,7 @@ class ReaderBase { const proto::Footer* footer, std::unique_ptr cache, std::unique_ptr handler = nullptr) - : pool_{pool}, + : options_{dwio::common::ReaderOptions(&pool)}, postScript_{std::move(ps)}, footer_{std::make_unique(footer)}, cache_{std::move(cache)}, @@ -105,12 +90,17 @@ class ReaderBase { } // for testing - explicit ReaderBase(memory::MemoryPool& pool) : pool_{pool}, fileLength_{0} {} + explicit ReaderBase(const dwio::common::ReaderOptions& options) + : options_{options}, fileLength_{0} {} virtual ~ReaderBase() = default; + const dwio::common::ReaderOptions& readerOptions() const { + return options_; + } + memory::MemoryPool& memoryPool() const { - return pool_; + return options_.memoryPool(); } const PostScript& postScript() const { @@ -125,14 +115,23 @@ class ReaderBase { return schema_; } + dwio::common::FileFormat fileFormat() const { + if (options_.fileFormat() == dwio::common::FileFormat::ORC) { + return dwio::common::FileFormat::ORC; + } + + return dwio::common::FileFormat::DWRF; + } + void setSchema(RowTypePtr newSchema) { schema_ = std::move(newSchema); } const std::shared_ptr& schemaWithId() const { if (!schemaWithId_) { - if (scanSpec_) { - schemaWithId_ = dwio::common::TypeWithId::create(schema_, *scanSpec_); + if (options_.scanSpec()) { + schemaWithId_ = + dwio::common::TypeWithId::create(schema_, *options_.scanSpec()); } else { schemaWithId_ = dwio::common::TypeWithId::create(schema_); } @@ -153,7 +152,7 @@ class ReaderBase { } uint64_t footerEstimatedSize() const { - return footerEstimatedSize_; + return options_.footerEstimatedSize(); } uint64_t fileLength() const { @@ -213,7 +212,7 @@ class ReaderBase { compressionKind(), std::move(compressed), compressionBlockSize(), - pool_, + options_.memoryPool(), streamDebugInfo, decrypter); } @@ -237,7 +236,7 @@ class ReaderBase { } const std::shared_ptr& randomSkip() const { - return randomSkip_; + return options_.randomSkip(); } private: @@ -246,22 +245,14 @@ class ReaderBase { uint32_t index = 0, bool fileColumnNamesReadAsLowerCase = false); - memory::MemoryPool& pool_; + const dwio::common::ReaderOptions options_; std::unique_ptr arena_; std::unique_ptr postScript_; std::unique_ptr footer_ = nullptr; std::unique_ptr cache_; - // Keeps factory alive for possibly async prefetch. - std::shared_ptr decryptorFactory_; std::unique_ptr handler_; - const uint64_t footerEstimatedSize_{ - dwio::common::ReaderOptions::kDefaultFooterEstimatedSize}; - const uint64_t filePreloadThreshold_{ - dwio::common::ReaderOptions::kDefaultFilePreloadThreshold}; const std::unique_ptr input_; - const std::shared_ptr randomSkip_; - const std::shared_ptr scanSpec_; const uint64_t fileLength_; RowTypePtr schema_; diff --git a/velox/dwio/dwrf/reader/StripeStream.h b/velox/dwio/dwrf/reader/StripeStream.h index 362a0b43d097..832acd40b2b3 100644 --- a/velox/dwio/dwrf/reader/StripeStream.h +++ b/velox/dwio/dwrf/reader/StripeStream.h @@ -103,6 +103,18 @@ class StripeStreams { */ virtual const dwio::common::ColumnSelector& getColumnSelector() const = 0; + /** + * Session timezone used for reading Timestamp. + */ + virtual const tz::TimeZone* sessionTimezone() const = 0; + + /** + * Whether to adjust Timestamp to the timeZone obtained via + * sessionTimezone(). This is used to be compatible with the + * old logic of Presto. + */ + virtual bool adjustTimestampToTimezone() const = 0; + // Get row reader options virtual const dwio::common::RowReaderOptions& rowReaderOptions() const = 0; @@ -250,6 +262,14 @@ class StripeStreamsImpl : public StripeStreamsBase { return *selector_; } + const tz::TimeZone* sessionTimezone() const override { + return readState_->readerBase->readerOptions().sessionTimezone(); + } + + bool adjustTimestampToTimezone() const override { + return readState_->readerBase->readerOptions().adjustTimestampToTimezone(); + } + const dwio::common::RowReaderOptions& rowReaderOptions() const override { return opts_; } diff --git a/velox/dwio/dwrf/test/ColumnWriterTest.cpp b/velox/dwio/dwrf/test/ColumnWriterTest.cpp index 93cc5f053308..e33c595ef770 100644 --- a/velox/dwio/dwrf/test/ColumnWriterTest.cpp +++ b/velox/dwio/dwrf/test/ColumnWriterTest.cpp @@ -154,6 +154,14 @@ class TestStripeStreams : public StripeStreamsBase { return selector_; } + const tz::TimeZone* sessionTimezone() const override { + return context_.sessionTimezone(); + } + + bool adjustTimestampToTimezone() const override { + return context_.adjustTimestampToTimezone(); + } + const RowReaderOptions& rowReaderOptions() const override { return options_; } diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index f8cccab6575d..22d1c624c946 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -21,17 +21,15 @@ #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Statistics.h" -#include "velox/dwio/common/TypeWithId.h" #include "velox/dwio/common/encryption/TestProvider.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/dwio/common/tests/utils/MapBuilder.h" #include "velox/dwio/dwrf/common/Config.h" +#include "velox/dwio/dwrf/reader/ColumnReader.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/dwio/dwrf/test/OrcTest.h" #include "velox/dwio/dwrf/test/utils/E2EWriterTestUtil.h" -#include "velox/dwio/dwrf/writer/Writer.h" #include "velox/type/fbhive/HiveTypeParser.h" -#include "velox/vector/FlatVector.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorMaker.h" @@ -62,7 +60,7 @@ class E2EWriterTest : public testing::Test { leafPool_ = rootPool_->addLeafChild("leaf"); } - std::unique_ptr createReader( + static std::unique_ptr createReader( const MemorySink& sink, const dwio::common::ReaderOptions& opts) { std::string_view data(sink.data(), sink.size()); diff --git a/velox/dwio/dwrf/test/OrcTest.h b/velox/dwio/dwrf/test/OrcTest.h index 942ad269242a..8e21bb813568 100644 --- a/velox/dwio/dwrf/test/OrcTest.h +++ b/velox/dwio/dwrf/test/OrcTest.h @@ -108,6 +108,14 @@ class MockStripeStreams : public StripeStreams { return *getColumnSelectorProxy(); } + const tz::TimeZone* sessionTimezone() const override { + return nullptr; + } + + bool adjustTimestampToTimezone() const override { + return false; + } + const dwio::common::RowReaderOptions& rowReaderOptions() const override { auto ptr = getRowReaderOptionsProxy(); return ptr ? *ptr : options_; diff --git a/velox/dwio/dwrf/test/ReaderBaseTests.cpp b/velox/dwio/dwrf/test/ReaderBaseTests.cpp index 17bb111780a6..07b9128e1f88 100644 --- a/velox/dwio/dwrf/test/ReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/ReaderBaseTests.cpp @@ -212,8 +212,9 @@ std::unique_ptr createCorruptedFileReader( sink.write(std::move(buf)); auto readFile = std::make_shared( std::string(sink.data(), sink.size())); + facebook::velox::dwio::common::ReaderOptions readerOpts{pool.get()}; return std::make_unique( - *pool, std::make_unique(readFile, *pool)); + readerOpts, std::make_unique(readFile, *pool)); } class ReaderBaseTest : public Test { diff --git a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp index 1ed91ed929fa..ff67e9f20ea8 100644 --- a/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp +++ b/velox/dwio/dwrf/test/StripeReaderBaseTests.cpp @@ -65,7 +65,6 @@ class StripeLoadKeysTest : public Test { auto handler = DecryptionHandler::create(FooterWrapper(footer_.get()), &factory); pool_ = memoryManager()->addLeafPool(); - reader_ = std::make_unique( *pool_, std::make_unique( diff --git a/velox/dwio/dwrf/test/TestStripeStream.cpp b/velox/dwio/dwrf/test/TestStripeStream.cpp index 58dd3bab49ea..b456b7c30af8 100644 --- a/velox/dwio/dwrf/test/TestStripeStream.cpp +++ b/velox/dwio/dwrf/test/TestStripeStream.cpp @@ -635,6 +635,14 @@ class TestStripeStreams : public StripeStreamsBase { VELOX_UNSUPPORTED(); } + const facebook::velox::tz::TimeZone* sessionTimezone() const override { + VELOX_UNSUPPORTED(); + } + + bool adjustTimestampToTimezone() const override { + return false; + } + const facebook::velox::dwio::common::RowReaderOptions& rowReaderOptions() const override { VELOX_UNSUPPORTED(); diff --git a/velox/dwio/dwrf/test/WriterTest.cpp b/velox/dwio/dwrf/test/WriterTest.cpp index 3a8fee8e6d6b..1570f2988865 100644 --- a/velox/dwio/dwrf/test/WriterTest.cpp +++ b/velox/dwio/dwrf/test/WriterTest.cpp @@ -61,7 +61,8 @@ class WriterTest : public Test { std::string data(sinkPtr_->data(), sinkPtr_->size()); auto readFile = std::make_shared(std::move(data)); auto input = std::make_unique(std::move(readFile), *pool_); - return std::make_unique(*pool_, std::move(input)); + dwio::common::ReaderOptions readerOpts{pool_.get()}; + return std::make_unique(readerOpts, std::move(input)); } auto& getContext() { diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index ab92dc52d11a..9f062302993b 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -75,8 +75,12 @@ Writer::Writer( *options.encryptionSpec, options.encrypterFactory.get()) : nullptr); - writerBase_->initContext(options.config, pool, std::move(handler)); - + writerBase_->initContext( + options.config, + pool, + options.sessionTimezone, + options.adjustTimestampToTimezone, + std::move(handler)); auto& context = writerBase_->getContext(); VELOX_CHECK_EQ( context.getTotalMemoryUsage(), diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 66c38c99a516..56ea96088fbc 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -42,6 +42,8 @@ struct WriterOptions : public dwio::common::WriterOptions { WriterContext& context, const velox::dwio::common::TypeWithId& type)> columnWriterFactory; + const tz::TimeZone* sessionTimezone{nullptr}; + bool adjustTimestampToTimezone{false}; }; class Writer : public dwio::common::Writer { diff --git a/velox/dwio/dwrf/writer/WriterBase.h b/velox/dwio/dwrf/writer/WriterBase.h index 29ba724a45d2..2545f5ce4c65 100644 --- a/velox/dwio/dwrf/writer/WriterBase.h +++ b/velox/dwio/dwrf/writer/WriterBase.h @@ -74,9 +74,16 @@ class WriterBase { void initContext( const std::shared_ptr& config, std::shared_ptr pool, + const tz::TimeZone* sessionTimezone = nullptr, + const bool adjustTimestampToTimezone = false, std::unique_ptr handler = nullptr) { context_ = std::make_unique( - config, std::move(pool), sink_->metricsLog(), std::move(handler)); + config, + std::move(pool), + sink_->metricsLog(), + sessionTimezone, + adjustTimestampToTimezone, + std::move(handler)); writerSink_ = std::make_unique( *sink_, context_->getMemoryPool(MemoryUsageCategory::OUTPUT_STREAM), diff --git a/velox/dwio/dwrf/writer/WriterContext.cpp b/velox/dwio/dwrf/writer/WriterContext.cpp index 68cb916d55e2..ce02aacf2921 100644 --- a/velox/dwio/dwrf/writer/WriterContext.cpp +++ b/velox/dwio/dwrf/writer/WriterContext.cpp @@ -27,6 +27,8 @@ WriterContext::WriterContext( const std::shared_ptr& config, std::shared_ptr pool, const dwio::common::MetricsLogPtr& metricLogger, + const tz::TimeZone* sessionTimezone, + const bool adjustTimestampToTimezone, std::unique_ptr handler) : config_{config}, pool_{std::move(pool)}, @@ -52,6 +54,8 @@ WriterContext::WriterContext( // metadata with dwio::common::request::AccessDescriptor upstream and // pass down the metric log. metricLogger_{metricLogger}, + sessionTimezone_{sessionTimezone}, + adjustTimestampToTimezone_{adjustTimestampToTimezone}, handler_{std::move(handler)} { const bool forceLowMemoryMode{getConfig(Config::FORCE_LOW_MEMORY_MODE)}; const bool disableLowMemoryMode{getConfig(Config::DISABLE_LOW_MEMORY_MODE)}; diff --git a/velox/dwio/dwrf/writer/WriterContext.h b/velox/dwio/dwrf/writer/WriterContext.h index 4c93409d8d03..9ba444d53175 100644 --- a/velox/dwio/dwrf/writer/WriterContext.h +++ b/velox/dwio/dwrf/writer/WriterContext.h @@ -43,6 +43,8 @@ class WriterContext : public CompressionBufferPool { std::shared_ptr pool, const dwio::common::MetricsLogPtr& metricLogger = dwio::common::MetricsLog::voidLog(), + const tz::TimeZone* sessionTimezone = nullptr, + const bool adjustTimestampToTimezone = false, std::unique_ptr handler = nullptr); ~WriterContext() override; @@ -595,6 +597,14 @@ class WriterContext : public CompressionBufferPool { return compressionBuffer_.get(); } + const tz::TimeZone* sessionTimezone() const { + return sessionTimezone_; + } + + bool adjustTimestampToTimezone() const { + return adjustTimestampToTimezone_; + } + private: void validateConfigs() const; @@ -628,6 +638,8 @@ class WriterContext : public CompressionBufferPool { const bool streamSizeAboveThresholdCheckEnabled_; const uint64_t rawDataSizePerBatch_; const dwio::common::MetricsLogPtr metricLogger_; + const tz::TimeZone* sessionTimezone_; + const bool adjustTimestampToTimezone_; // Map needs referential stability because reference to map value is stored by // another class. diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index e40617f7991e..58845903d2f7 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -67,7 +67,7 @@ class ReaderBase { } const tz::TimeZone* sessionTimezone() const { - return options_.getSessionTimezone(); + return options_.sessionTimezone(); } /// Ensures that streams are enqueued and loading for the row group at diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index bb78e9bef57c..fe7f40e0305c 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -68,6 +68,7 @@ OperatorCtx::createConnectorQueryCtx( planNodeId, driverCtx_->driverId, driverCtx_->queryConfig().sessionTimezone(), + driverCtx_->queryConfig().adjustTimestampToTimezone(), task->getCancellationToken()); connectorQueryCtx->setSelectiveNimbleReaderEnabled( driverCtx_->queryConfig().selectiveNimbleReaderEnabled());