Skip to content

Commit

Permalink
Add sessionTimezone and adjustTimestampToTimezone to DWRF reader and …
Browse files Browse the repository at this point in the history
…writer options
  • Loading branch information
wypb committed Oct 10, 2024
1 parent 640592d commit 14dda78
Show file tree
Hide file tree
Showing 25 changed files with 211 additions and 140 deletions.
10 changes: 10 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class ConnectorQueryCtx {
const std::string& planNodeId,
int driverId,
const std::string& sessionTimezone,
bool adjustTimestampToTimezone = false,
folly::CancellationToken cancellationToken = {})
: operatorPool_(operatorPool),
connectorPool_(connectorPool),
Expand All @@ -283,6 +284,7 @@ class ConnectorQueryCtx {
driverId_(driverId),
planNodeId_(planNodeId),
sessionTimezone_(sessionTimezone),
adjustTimestampToTimezone_(adjustTimestampToTimezone),
cancellationToken_(std::move(cancellationToken)) {
VELOX_CHECK_NOT_NULL(sessionProperties);
}
Expand Down Expand Up @@ -351,6 +353,13 @@ class ConnectorQueryCtx {
return sessionTimezone_;
}

/// Whether to adjust Timestamp to the timeZone obtained through
/// sessionTimezone(). This is used to be compatible with the
/// old logic of Presto.
bool adjustTimestampToTimezone() const {
return adjustTimestampToTimezone_;
}

/// Returns the cancellation token associated with this task.
const folly::CancellationToken& cancellationToken() const {
return cancellationToken_;
Expand Down Expand Up @@ -378,6 +387,7 @@ class ConnectorQueryCtx {
const int driverId_;
const std::string planNodeId_;
const std::string sessionTimezone_;
const bool adjustTimestampToTimezone_;
const folly::CancellationToken cancellationToken_;
bool selectiveNimbleReaderEnabled_{false};
};
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ void configureReaderOptions(
const auto timezone = tz::locateZone(sessionTzName);
readerOptions.setSessionTimezone(timezone);
}
readerOptions.setAdjustTimestampToTimezone(
connectorQueryCtx->adjustTimestampToTimezone());
readerOptions.setSelectiveNimbleReaderEnabled(
connectorQueryCtx->selectiveNimbleReaderEnabled());

Expand Down
7 changes: 7 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,13 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
connectorSessionProperties,
options);

const auto& sessionTimeZoneName = connectorQueryCtx_->sessionTimezone();
if (!sessionTimeZoneName.empty()) {
options->sessionTimezone = tz::locateZone(sessionTimeZoneName);
}
options->adjustTimestampToTimezone =
connectorQueryCtx_->adjustTimestampToTimezone();

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
auto writer = writerFactory_->createWriter(
Expand Down
9 changes: 6 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ VectorPtr newConstantFromString(
const TypePtr& type,
const std::optional<std::string>& value,
vector_size_t size,
velox::memory::MemoryPool* pool) {
velox::memory::MemoryPool* pool,
const std::string& sessionTimezone) {
using T = typename TypeTraits<kind>::NativeType;
if (!value.has_value()) {
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
Expand Down Expand Up @@ -365,7 +366,8 @@ std::vector<TypePtr> SplitReader::adaptColumns(
infoColumnType,
iter->second,
1,
connectorQueryCtx_->memoryPool());
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone());
childSpec->setConstantValue(constant);
} else if (!childSpec->isExplicitRowNumber()) {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
Expand Down Expand Up @@ -417,7 +419,8 @@ void SplitReader::setPartitionValue(
type,
value,
1,
connectorQueryCtx_->memoryPool());
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone());
spec->setConstantValue(constant);
}

Expand Down
15 changes: 14 additions & 1 deletion velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,11 @@ class ReaderOptions : public io::ReaderOptions {
return *this;
}

ReaderOptions& setAdjustTimestampToTimezone(bool adjustTimestampToTimezone) {
adjustTimestampToTimezone_ = adjustTimestampToTimezone;
return *this;
}

/// Gets the desired tail location.
uint64_t tailLocation() const {
return tailLocation_;
Expand Down Expand Up @@ -546,10 +551,14 @@ class ReaderOptions : public io::ReaderOptions {
return ioExecutor_;
}

const tz::TimeZone* getSessionTimezone() const {
const tz::TimeZone* sessionTimezone() const {
return sessionTimezone_;
}

bool adjustTimestampToTimezone() const {
return adjustTimestampToTimezone_;
}

bool fileColumnNamesReadAsLowerCase() const {
return fileColumnNamesReadAsLowerCase_;
}
Expand Down Expand Up @@ -604,6 +613,7 @@ class ReaderOptions : public io::ReaderOptions {
std::shared_ptr<random::RandomSkipTracker> randomSkip_;
std::shared_ptr<velox::common::ScanSpec> scanSpec_;
const tz::TimeZone* sessionTimezone_{nullptr};
bool adjustTimestampToTimezone_{false};
bool selectiveNimbleReaderEnabled_{false};
};

Expand Down Expand Up @@ -635,6 +645,9 @@ struct WriterOptions {
std::function<std::unique_ptr<dwio::common::FlushPolicy>()>
flushPolicyFactory;

const tz::TimeZone* sessionTimezone{nullptr};
bool adjustTimestampToTimezone{false};

virtual ~WriterOptions() = default;
};

Expand Down
8 changes: 8 additions & 0 deletions velox/dwio/dwrf/reader/DwrfData.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ class DwrfParams : public dwio::common::FormatParams {
return streamLabels_;
}

const tz::TimeZone* sessionTimezone() const {
return stripeStreams_.sessionTimezone();
}

bool adjustTimestampToTimezone() const {
return stripeStreams_.adjustTimestampToTimezone();
}

private:
const StreamLabels& streamLabels_;
StripeStreams& stripeStreams_;
Expand Down
19 changes: 4 additions & 15 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,26 +794,15 @@ std::optional<size_t> DwrfRowReader::estimatedRowSize() const {
DwrfReader::DwrfReader(
const ReaderOptions& options,
std::unique_ptr<dwio::common::BufferedInput> input)
: readerBase_(std::make_unique<ReaderBase>(
options.memoryPool(),
std::move(input),
options.decrypterFactory(),
options.footerEstimatedSize(),
options.filePreloadThreshold(),
options.fileFormat() == FileFormat::ORC ? FileFormat::ORC
: FileFormat::DWRF,
options.fileColumnNamesReadAsLowerCase(),
options.randomSkip(),
options.scanSpec())),
options_(options) {
: readerBase_(std::make_unique<ReaderBase>(options, std::move(input))) {
// If we are not using column names to map table columns to file columns,
// then we use indices. In that case we need to ensure the names completely
// match, because we are still mapping columns by names further down the
// code. So we rename column names in the file schema to match table schema.
// We test the options to have 'fileSchema' (actually table schema) as most
// of the unit tests fail to provide it.
if ((!options_.useColumnNamesForColumnMapping()) &&
(options_.fileSchema() != nullptr)) {
if ((!readerBase_->readerOptions().useColumnNamesForColumnMapping()) &&
(readerBase_->readerOptions().fileSchema() != nullptr)) {
updateColumnNamesFromTableSchema();
}
}
Expand Down Expand Up @@ -910,7 +899,7 @@ TypePtr updateColumnNames(
} // namespace

void DwrfReader::updateColumnNamesFromTableSchema() {
const auto& tableSchema = options_.fileSchema();
const auto& tableSchema = readerBase_->readerOptions().fileSchema();
const auto& fileSchema = readerBase_->schema();
readerBase_->setSchema(std::dynamic_pointer_cast<const RowType>(
updateColumnNames(fileSchema, tableSchema, "", "")));
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/dwrf/reader/DwrfReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ class DwrfReader : public dwio::common::Reader {

private:
std::shared_ptr<ReaderBase> readerBase_;
const dwio::common::ReaderOptions options_;
};

class DwrfReaderFactory : public dwio::common::ReaderFactory {
Expand Down
48 changes: 16 additions & 32 deletions velox/dwio/dwrf/reader/ReaderBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,41 +80,24 @@ ReaderBase::ReaderBase(
MemoryPool& pool,
std::unique_ptr<dwio::common::BufferedInput> input,
FileFormat fileFormat)
: ReaderBase(
pool,
std::move(input),
nullptr,
dwio::common::ReaderOptions::kDefaultFooterEstimatedSize,
dwio::common::ReaderOptions::kDefaultFilePreloadThreshold,
fileFormat) {}
: ReaderBase(createReaderOptions(pool, fileFormat), std::move(input)) {}

ReaderBase::ReaderBase(
MemoryPool& pool,
std::unique_ptr<dwio::common::BufferedInput> input,
std::shared_ptr<DecrypterFactory> decryptorFactory,
uint64_t footerEstimatedSize,
uint64_t filePreloadThreshold,
FileFormat fileFormat,
bool fileColumnNamesReadAsLowerCase,
std::shared_ptr<random::RandomSkipTracker> randomSkip,
std::shared_ptr<velox::common::ScanSpec> scanSpec)
: pool_{pool},
arena_(std::make_unique<google::protobuf::Arena>()),
decryptorFactory_(decryptorFactory),
footerEstimatedSize_(footerEstimatedSize),
filePreloadThreshold_(filePreloadThreshold),
const dwio::common::ReaderOptions& options,
std::unique_ptr<dwio::common::BufferedInput> input)
: arena_(std::make_unique<google::protobuf::Arena>()),
options_{options},
input_(std::move(input)),
randomSkip_(std::move(randomSkip)),
scanSpec_(std::move(scanSpec)),
fileLength_(input_->getReadFile()->size()) {
process::TraceContext trace("ReaderBase::ReaderBase");
// TODO: make a config
DWIO_ENSURE(fileLength_ > 0, "ORC file is empty");
VELOX_CHECK_GE(fileLength_, 4, "File size too small");

const auto preloadFile = fileLength_ <= filePreloadThreshold_;
const uint64_t readSize =
preloadFile ? fileLength_ : std::min(fileLength_, footerEstimatedSize_);
const auto preloadFile = fileLength_ <= options_.filePreloadThreshold();
const uint64_t readSize = preloadFile
? fileLength_
: std::min(fileLength_, options_.footerEstimatedSize());
if (input_->supportSyncLoad()) {
input_->enqueue({fileLength_ - readSize, readSize, "footer"});
input_->load(preloadFile ? LogType::FILE : LogType::FOOTER);
Expand All @@ -135,7 +118,7 @@ ReaderBase::ReaderBase(
fileLength_,
"Corrupted file, Post script size is invalid");

if (fileFormat == FileFormat::DWRF) {
if (fileFormat() == FileFormat::DWRF) {
auto postScript = ProtoUtils::readProto<proto::PostScript>(
input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER));
postScript_ = std::make_unique<PostScript>(std::move(postScript));
Expand Down Expand Up @@ -173,7 +156,7 @@ ReaderBase::ReaderBase(

auto footerStream = input_->read(
fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER);
if (fileFormat == FileFormat::DWRF) {
if (fileFormat() == FileFormat::DWRF) {
auto footer =
google::protobuf::Arena::CreateMessage<proto::Footer>(arena_.get());
ProtoUtils::readProtoInto<proto::Footer>(
Expand All @@ -190,7 +173,7 @@ ReaderBase::ReaderBase(
}

schema_ = std::dynamic_pointer_cast<const RowType>(
convertType(*footer_, 0, fileColumnNamesReadAsLowerCase));
convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase()));
VELOX_CHECK_NOT_NULL(schema_, "invalid schema");

// load stripe index/footer cache
Expand All @@ -204,8 +187,8 @@ ReaderBase::ReaderBase(
input_->read(cacheOffset, cacheSize, LogType::FOOTER));
input_->load(LogType::FOOTER);
} else {
auto cacheBuffer =
std::make_shared<dwio::common::DataBuffer<char>>(pool, cacheSize);
auto cacheBuffer = std::make_shared<dwio::common::DataBuffer<char>>(
options_.memoryPool(), cacheSize);
input_->read(cacheOffset, cacheSize, LogType::FOOTER)
->readFully(cacheBuffer->data(), cacheSize);
cache_ = std::make_unique<StripeMetadataCache>(
Expand All @@ -226,7 +209,8 @@ ReaderBase::ReaderBase(
}
}
// initialize file decrypter
handler_ = DecryptionHandler::create(*footer_, decryptorFactory_.get());
handler_ =
DecryptionHandler::create(*footer_, options_.decrypterFactory().get());
}

std::vector<uint64_t> ReaderBase::rowsPerStripe() const {
Expand Down
Loading

0 comments on commit 14dda78

Please sign in to comment.