From 33f25c41b82e943e015365cf3b79a60650d60fd1 Mon Sep 17 00:00:00 2001 From: wypb Date: Wed, 9 Oct 2024 19:46:22 +0800 Subject: [PATCH] Add sessionTimezone and adjustTimestampToTimezone to DWRF reader and writer options --- velox/connectors/Connector.h | 4 +- velox/connectors/hive/HiveDataSink.cpp | 6 +- velox/dwio/dwrf/reader/ReaderBase.h | 2 +- velox/dwio/dwrf/reader/StripeStream.h | 85 ++++++++++---------------- 4 files changed, 37 insertions(+), 60 deletions(-) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index cf233702b2e5..bfd2b9ca1a46 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -356,7 +356,7 @@ class ConnectorQueryCtx { /// Whether to adjust Timestamp to the timeZone obtained through /// sessionTimezone(). This is used to be compatible with the /// old logic of Presto. - const bool adjustTimestampToTimezone() const { + bool adjustTimestampToTimezone() const { return adjustTimestampToTimezone_; } @@ -387,7 +387,7 @@ class ConnectorQueryCtx { const int driverId_; const std::string planNodeId_; const std::string sessionTimezone_; - bool adjustTimestampToTimezone_; + const bool adjustTimestampToTimezone_; const folly::CancellationToken cancellationToken_; bool selectiveNimbleReaderEnabled_{false}; }; diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 661538b15c55..5fd34cbf82ad 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -744,9 +744,9 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { connectorSessionProperties, options); - const auto& sessionTzName = connectorQueryCtx_->sessionTimezone(); - if (!sessionTzName.empty()) { - options->sessionTimezone = tz::locateZone(sessionTzName); + const auto& sessionTimeZoneName = connectorQueryCtx_->sessionTimezone(); + if (!sessionTimeZoneName.empty()) { + options->sessionTimezone = tz::locateZone(sessionTimeZoneName); } options->adjustTimestampToTimezone = connectorQueryCtx_->adjustTimestampToTimezone(); diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index de68e1de0044..57454c177e12 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -260,13 +260,13 @@ class ReaderBase { return options; } - const dwio::common::ReaderOptions options_; std::unique_ptr arena_; std::unique_ptr postScript_; std::unique_ptr footer_ = nullptr; std::unique_ptr cache_; std::unique_ptr handler_; + const dwio::common::ReaderOptions options_; const std::unique_ptr input_; const uint64_t fileLength_; diff --git a/velox/dwio/dwrf/reader/StripeStream.h b/velox/dwio/dwrf/reader/StripeStream.h index 832acd40b2b3..e7b0bde39283 100644 --- a/velox/dwio/dwrf/reader/StripeStream.h +++ b/velox/dwio/dwrf/reader/StripeStream.h @@ -33,9 +33,7 @@ class StrideIndexProvider { virtual uint64_t getStrideIndex() const = 0; }; -/** - * StreamInformation Implementation - */ +/// StreamInformation Implementation class StreamInformationImpl : public StreamInformation { public: static const StreamInformationImpl& getNotFound() { @@ -91,45 +89,36 @@ class StripeStreams { public: virtual ~StripeStreams() = default; - /** - * Get the DwrfFormat for the stream - * @return DwrfFormat - */ + /// Get the DwrfFormat for the stream + /// + /// @return DwrfFormat virtual DwrfFormat format() const = 0; - /** - * get column selector for current stripe reading session - * @return column selector will hold column projection info - */ + /// get column selector for current stripe reading session + /// + /// @return column selector will hold column projection info virtual const dwio::common::ColumnSelector& getColumnSelector() const = 0; - /** - * Session timezone used for reading Timestamp. - */ + /// Session timezone used for reading Timestamp. virtual const tz::TimeZone* sessionTimezone() const = 0; - /** - * Whether to adjust Timestamp to the timeZone obtained via - * sessionTimezone(). This is used to be compatible with the - * old logic of Presto. - */ + /// Whether to adjust Timestamp to the timeZone obtained via + /// sessionTimezone(). This is used to be compatible with the + /// old logic of Presto. virtual bool adjustTimestampToTimezone() const = 0; - // Get row reader options + /// Get row reader options virtual const dwio::common::RowReaderOptions& rowReaderOptions() const = 0; - /** - * Get the encoding for the given column for this stripe. - */ + /// Get the encoding for the given column for this stripe. virtual const proto::ColumnEncoding& getEncoding( const EncodingKey&) const = 0; - /** - * Get the stream for the given column/kind in this stripe. - * @param streamId stream identifier object - * @param throwIfNotFound fail if a stream is required and not found - * @return the new stream - */ + /// Get the stream for the given column/kind in this stripe. + /// + /// @param streamId stream identifier object + /// @param throwIfNotFound fail if a stream is required and not found + /// @return the new stream virtual std::unique_ptr getStream( const DwrfStreamIdentifier& si, std::string_view label, @@ -149,35 +138,27 @@ class StripeStreams { virtual std::shared_ptr getStripeDictionaryCache() = 0; - /** - * visit all streams of given node and execute visitor logic - * return number of streams visited - */ + /// visit all streams of given node and execute visitor logic + /// return number of streams visited virtual uint32_t visitStreamsOfNode( uint32_t node, std::function visitor) const = 0; - /** - * Get the value of useVInts for the given column in this stripe. - * Defaults to true. - * @param streamId stream identifier - */ + /// Get the value of useVInts for the given column in this stripe. + /// Defaults to true. + /// @param streamId stream identifier virtual bool getUseVInts(const DwrfStreamIdentifier& streamId) const = 0; - /** - * Get the memory pool for this reader. - */ + /// Get the memory pool for this reader. virtual memory::MemoryPool& getMemoryPool() const = 0; - /** - * Get stride index provider which is used by string dictionary reader to - * get the row index stride index where next() happens - */ + /// Get stride index provider which is used by string dictionary reader to + /// get the row index stride index where next() happens virtual const StrideIndexProvider& getStrideIndexProvider() const = 0; virtual int64_t stripeRows() const = 0; - // Number of rows per row group. Last row group may have fewer rows. + /// Number of rows per row group. Last row group may have fewer rows. virtual uint32_t rowsPerRowGroup() const = 0; }; @@ -193,7 +174,7 @@ class StripeStreamsBase : public StripeStreams { return *pool_; } - // For now just return DWRF, will refine when ORC has better support + /// For now just return DWRF, will refine when ORC has better support virtual DwrfFormat format() const override { return DwrfFormat::kDwrf; } @@ -224,9 +205,7 @@ struct StripeReadState { stripeMetadata{std::move(_stripeMetadata)} {} }; -/** - * StripeStream Implementation - */ +/// StripeStream Implementation class StripeStreamsImpl : public StripeStreamsBase { public: static constexpr int64_t kUnknownStripeRows = -1; @@ -288,7 +267,7 @@ class StripeStreamsImpl : public StripeStreamsBase { return encodingKeyIt->second; } - // load data into buffer according to read plan + /// load data into buffer according to read plan void loadReadPlan(); std::unique_ptr getCompressedStream( @@ -382,9 +361,7 @@ class StripeStreamsImpl : public StripeStreamsBase { decryptedEncodings_; }; -/** - * StripeInformation Implementation - */ +/// StripeInformation Implementation class StripeInformationImpl : public StripeInformation { uint64_t offset; uint64_t indexLength;