Skip to content

Commit

Permalink
Add sessionTimezone and adjustTimestampToTimezone to DWRF reader and …
Browse files Browse the repository at this point in the history
…writer options
  • Loading branch information
wypb committed Oct 9, 2024
1 parent 483d36e commit 33f25c4
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 60 deletions.
4 changes: 2 additions & 2 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class ConnectorQueryCtx {
/// Whether to adjust Timestamp to the timeZone obtained through
/// sessionTimezone(). This is used to be compatible with the
/// old logic of Presto.
const bool adjustTimestampToTimezone() const {
bool adjustTimestampToTimezone() const {
return adjustTimestampToTimezone_;
}

Expand Down Expand Up @@ -387,7 +387,7 @@ class ConnectorQueryCtx {
const int driverId_;
const std::string planNodeId_;
const std::string sessionTimezone_;
bool adjustTimestampToTimezone_;
const bool adjustTimestampToTimezone_;
const folly::CancellationToken cancellationToken_;
bool selectiveNimbleReaderEnabled_{false};
};
Expand Down
6 changes: 3 additions & 3 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,9 +744,9 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
connectorSessionProperties,
options);

const auto& sessionTzName = connectorQueryCtx_->sessionTimezone();
if (!sessionTzName.empty()) {
options->sessionTimezone = tz::locateZone(sessionTzName);
const auto& sessionTimeZoneName = connectorQueryCtx_->sessionTimezone();
if (!sessionTimeZoneName.empty()) {
options->sessionTimezone = tz::locateZone(sessionTimeZoneName);
}
options->adjustTimestampToTimezone =
connectorQueryCtx_->adjustTimestampToTimezone();
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/reader/ReaderBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ class ReaderBase {
return options;
}

const dwio::common::ReaderOptions options_;
std::unique_ptr<google::protobuf::Arena> arena_;
std::unique_ptr<PostScript> postScript_;
std::unique_ptr<FooterWrapper> footer_ = nullptr;
std::unique_ptr<StripeMetadataCache> cache_;
std::unique_ptr<encryption::DecryptionHandler> handler_;

const dwio::common::ReaderOptions options_;
const std::unique_ptr<dwio::common::BufferedInput> input_;
const uint64_t fileLength_;

Expand Down
85 changes: 31 additions & 54 deletions velox/dwio/dwrf/reader/StripeStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ class StrideIndexProvider {
virtual uint64_t getStrideIndex() const = 0;
};

/**
* StreamInformation Implementation
*/
/// StreamInformation Implementation
class StreamInformationImpl : public StreamInformation {
public:
static const StreamInformationImpl& getNotFound() {
Expand Down Expand Up @@ -91,45 +89,36 @@ class StripeStreams {
public:
virtual ~StripeStreams() = default;

/**
* Get the DwrfFormat for the stream
* @return DwrfFormat
*/
/// Get the DwrfFormat for the stream
///
/// @return DwrfFormat
virtual DwrfFormat format() const = 0;

/**
* get column selector for current stripe reading session
* @return column selector will hold column projection info
*/
/// get column selector for current stripe reading session
///
/// @return column selector will hold column projection info
virtual const dwio::common::ColumnSelector& getColumnSelector() const = 0;

/**
* Session timezone used for reading Timestamp.
*/
/// Session timezone used for reading Timestamp.
virtual const tz::TimeZone* sessionTimezone() const = 0;

/**
* Whether to adjust Timestamp to the timeZone obtained via
* sessionTimezone(). This is used to be compatible with the
* old logic of Presto.
*/
/// Whether to adjust Timestamp to the timeZone obtained via
/// sessionTimezone(). This is used to be compatible with the
/// old logic of Presto.
virtual bool adjustTimestampToTimezone() const = 0;

// Get row reader options
/// Get row reader options
virtual const dwio::common::RowReaderOptions& rowReaderOptions() const = 0;

/**
* Get the encoding for the given column for this stripe.
*/
/// Get the encoding for the given column for this stripe.
virtual const proto::ColumnEncoding& getEncoding(
const EncodingKey&) const = 0;

/**
* Get the stream for the given column/kind in this stripe.
* @param streamId stream identifier object
* @param throwIfNotFound fail if a stream is required and not found
* @return the new stream
*/
/// Get the stream for the given column/kind in this stripe.
///
/// @param streamId stream identifier object
/// @param throwIfNotFound fail if a stream is required and not found
/// @return the new stream
virtual std::unique_ptr<dwio::common::SeekableInputStream> getStream(
const DwrfStreamIdentifier& si,
std::string_view label,
Expand All @@ -149,35 +138,27 @@ class StripeStreams {

virtual std::shared_ptr<StripeDictionaryCache> getStripeDictionaryCache() = 0;

/**
* visit all streams of given node and execute visitor logic
* return number of streams visited
*/
/// visit all streams of given node and execute visitor logic
/// return number of streams visited
virtual uint32_t visitStreamsOfNode(
uint32_t node,
std::function<void(const StreamInformation&)> visitor) const = 0;

/**
* Get the value of useVInts for the given column in this stripe.
* Defaults to true.
* @param streamId stream identifier
*/
/// Get the value of useVInts for the given column in this stripe.
/// Defaults to true.
/// @param streamId stream identifier
virtual bool getUseVInts(const DwrfStreamIdentifier& streamId) const = 0;

/**
* Get the memory pool for this reader.
*/
/// Get the memory pool for this reader.
virtual memory::MemoryPool& getMemoryPool() const = 0;

/**
* Get stride index provider which is used by string dictionary reader to
* get the row index stride index where next() happens
*/
/// Get stride index provider which is used by string dictionary reader to
/// get the row index stride index where next() happens
virtual const StrideIndexProvider& getStrideIndexProvider() const = 0;

virtual int64_t stripeRows() const = 0;

// Number of rows per row group. Last row group may have fewer rows.
/// Number of rows per row group. Last row group may have fewer rows.
virtual uint32_t rowsPerRowGroup() const = 0;
};

Expand All @@ -193,7 +174,7 @@ class StripeStreamsBase : public StripeStreams {
return *pool_;
}

// For now just return DWRF, will refine when ORC has better support
/// For now just return DWRF, will refine when ORC has better support
virtual DwrfFormat format() const override {
return DwrfFormat::kDwrf;
}
Expand Down Expand Up @@ -224,9 +205,7 @@ struct StripeReadState {
stripeMetadata{std::move(_stripeMetadata)} {}
};

/**
* StripeStream Implementation
*/
/// StripeStream Implementation
class StripeStreamsImpl : public StripeStreamsBase {
public:
static constexpr int64_t kUnknownStripeRows = -1;
Expand Down Expand Up @@ -288,7 +267,7 @@ class StripeStreamsImpl : public StripeStreamsBase {
return encodingKeyIt->second;
}

// load data into buffer according to read plan
/// load data into buffer according to read plan
void loadReadPlan();

std::unique_ptr<dwio::common::SeekableInputStream> getCompressedStream(
Expand Down Expand Up @@ -382,9 +361,7 @@ class StripeStreamsImpl : public StripeStreamsBase {
decryptedEncodings_;
};

/**
* StripeInformation Implementation
*/
/// StripeInformation Implementation
class StripeInformationImpl : public StripeInformation {
uint64_t offset;
uint64_t indexLength;
Expand Down

0 comments on commit 33f25c4

Please sign in to comment.