Skip to content

Commit

Permalink
Add sessionTimezone and adjustTimestampToTimezone to DWRF reader and …
Browse files Browse the repository at this point in the history
…writer options
  • Loading branch information
wypb committed Sep 10, 2024
1 parent 4744994 commit 04a1e1f
Show file tree
Hide file tree
Showing 25 changed files with 193 additions and 110 deletions.
10 changes: 10 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class ConnectorQueryCtx {
const std::string& planNodeId,
int driverId,
const std::string& sessionTimezone,
bool adjustTimestampToTimezone = false,
folly::CancellationToken cancellationToken = {})
: operatorPool_(operatorPool),
connectorPool_(connectorPool),
Expand All @@ -283,6 +284,7 @@ class ConnectorQueryCtx {
driverId_(driverId),
planNodeId_(planNodeId),
sessionTimezone_(sessionTimezone),
adjustTimestampToTimezone_(adjustTimestampToTimezone),
cancellationToken_(std::move(cancellationToken)) {
VELOX_CHECK_NOT_NULL(sessionProperties);
}
Expand Down Expand Up @@ -351,6 +353,13 @@ class ConnectorQueryCtx {
return sessionTimezone_;
}

/// Whether to adjust Timestamp to the timeZone obtained through
/// sessionTimezone(). This is used to be compatible with the
/// old logic of Presto.
const bool adjustTimestampToTimezone() const {
return adjustTimestampToTimezone_;
}

/// Returns the cancellation token associated with this task.
const folly::CancellationToken& cancellationToken() const {
return cancellationToken_;
Expand All @@ -370,6 +379,7 @@ class ConnectorQueryCtx {
const int driverId_;
const std::string planNodeId_;
const std::string sessionTimezone_;
bool adjustTimestampToTimezone_;
const folly::CancellationToken cancellationToken_;
};

Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ void configureReaderOptions(
const auto timezone = tz::locateZone(sessionTzName);
readerOptions.setSessionTimezone(timezone);
}
readerOptions.setAdjustTimestampToTimezone(
connectorQueryCtx->adjustTimestampToTimezone());

if (readerOptions.fileFormat() != dwio::common::FileFormat::UNKNOWN) {
VELOX_CHECK(
Expand Down
7 changes: 7 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,13 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
connectorSessionProperties,
options);

const auto& sessionTzName = connectorQueryCtx_->sessionTimezone();
if (!sessionTzName.empty()) {
options->sessionTimezone = tz::locateZone(sessionTzName);
}
options->adjustTimestampToTimezone =
connectorQueryCtx_->adjustTimestampToTimezone();

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
auto writer = writerFactory_->createWriter(
Expand Down
9 changes: 6 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ VectorPtr newConstantFromString(
const TypePtr& type,
const std::optional<std::string>& value,
vector_size_t size,
velox::memory::MemoryPool* pool) {
velox::memory::MemoryPool* pool,
const std::string& sessionTimezone) {
using T = typename TypeTraits<kind>::NativeType;
if (!value.has_value()) {
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
Expand Down Expand Up @@ -362,7 +363,8 @@ std::vector<TypePtr> SplitReader::adaptColumns(
infoColumnType,
iter->second,
1,
connectorQueryCtx_->memoryPool());
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone());
childSpec->setConstantValue(constant);
} else {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
Expand Down Expand Up @@ -414,7 +416,8 @@ void SplitReader::setPartitionValue(
type,
value,
1,
connectorQueryCtx_->memoryPool());
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone());
spec->setConstantValue(constant);
}

Expand Down
15 changes: 14 additions & 1 deletion velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ class ReaderOptions : public io::ReaderOptions {
return *this;
}

ReaderOptions& setAdjustTimestampToTimezone(bool adjustTimestampToTimezone) {
adjustTimestampToTimezone_ = adjustTimestampToTimezone;
return *this;
}

/// Gets the desired tail location.
uint64_t tailLocation() const {
return tailLocation_;
Expand Down Expand Up @@ -541,10 +546,14 @@ class ReaderOptions : public io::ReaderOptions {
return ioExecutor_;
}

const tz::TimeZone* getSessionTimezone() const {
const tz::TimeZone* sessionTimezone() const {
return sessionTimezone_;
}

bool adjustTimestampToTimezone() const {
return adjustTimestampToTimezone_;
}

bool fileColumnNamesReadAsLowerCase() const {
return fileColumnNamesReadAsLowerCase_;
}
Expand Down Expand Up @@ -591,6 +600,7 @@ class ReaderOptions : public io::ReaderOptions {
std::shared_ptr<random::RandomSkipTracker> randomSkip_;
std::shared_ptr<velox::common::ScanSpec> scanSpec_;
const tz::TimeZone* sessionTimezone_{nullptr};
bool adjustTimestampToTimezone_{false};
};

struct WriterOptions {
Expand Down Expand Up @@ -621,6 +631,9 @@ struct WriterOptions {
std::function<std::unique_ptr<dwio::common::FlushPolicy>()>
flushPolicyFactory;

const tz::TimeZone* sessionTimezone{nullptr};
bool adjustTimestampToTimezone{false};

virtual ~WriterOptions() = default;
};

Expand Down
8 changes: 8 additions & 0 deletions velox/dwio/dwrf/reader/DwrfData.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ class DwrfParams : public dwio::common::FormatParams {
return streamLabels_;
}

const tz::TimeZone* sessionTimezone() const {
return stripeStreams_.sessionTimezone();
}

bool adjustTimestampToTimezone() const {
return stripeStreams_.adjustTimestampToTimezone();
}

private:
const StreamLabels& streamLabels_;
StripeStreams& stripeStreams_;
Expand Down
19 changes: 4 additions & 15 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,26 +783,15 @@ std::optional<size_t> DwrfRowReader::estimatedRowSize() const {
DwrfReader::DwrfReader(
const ReaderOptions& options,
std::unique_ptr<dwio::common::BufferedInput> input)
: readerBase_(std::make_unique<ReaderBase>(
options.memoryPool(),
std::move(input),
options.decrypterFactory(),
options.footerEstimatedSize(),
options.filePreloadThreshold(),
options.fileFormat() == FileFormat::ORC ? FileFormat::ORC
: FileFormat::DWRF,
options.fileColumnNamesReadAsLowerCase(),
options.randomSkip(),
options.scanSpec())),
options_(options) {
: readerBase_(std::make_unique<ReaderBase>(options, std::move(input))) {
// If we are not using column names to map table columns to file columns,
// then we use indices. In that case we need to ensure the names completely
// match, because we are still mapping columns by names further down the
// code. So we rename column names in the file schema to match table schema.
// We test the options to have 'fileSchema' (actually table schema) as most
// of the unit tests fail to provide it.
if ((!options_.useColumnNamesForColumnMapping()) &&
(options_.fileSchema() != nullptr)) {
if ((!readerBase_->readerOptions().useColumnNamesForColumnMapping()) &&
(readerBase_->readerOptions().fileSchema() != nullptr)) {
updateColumnNamesFromTableSchema();
}
}
Expand Down Expand Up @@ -899,7 +888,7 @@ TypePtr updateColumnNames(
} // namespace

void DwrfReader::updateColumnNamesFromTableSchema() {
const auto& tableSchema = options_.fileSchema();
const auto& tableSchema = readerBase_->readerOptions().fileSchema();
const auto& fileSchema = readerBase_->schema();
readerBase_->setSchema(std::dynamic_pointer_cast<const RowType>(
updateColumnNames(fileSchema, tableSchema, "", "")));
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/dwrf/reader/DwrfReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ class DwrfReader : public dwio::common::Reader {

private:
std::shared_ptr<ReaderBase> readerBase_;
const dwio::common::ReaderOptions options_;
};

class DwrfReaderFactory : public dwio::common::ReaderFactory {
Expand Down
50 changes: 14 additions & 36 deletions velox/dwio/dwrf/reader/ReaderBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,44 +77,21 @@ FooterStatisticsImpl::FooterStatisticsImpl(
}

ReaderBase::ReaderBase(
MemoryPool& pool,
std::unique_ptr<dwio::common::BufferedInput> input,
FileFormat fileFormat)
: ReaderBase(
pool,
std::move(input),
nullptr,
dwio::common::ReaderOptions::kDefaultFooterEstimatedSize,
dwio::common::ReaderOptions::kDefaultFilePreloadThreshold,
fileFormat) {}

ReaderBase::ReaderBase(
MemoryPool& pool,
std::unique_ptr<dwio::common::BufferedInput> input,
std::shared_ptr<DecrypterFactory> decryptorFactory,
uint64_t footerEstimatedSize,
uint64_t filePreloadThreshold,
FileFormat fileFormat,
bool fileColumnNamesReadAsLowerCase,
std::shared_ptr<random::RandomSkipTracker> randomSkip,
std::shared_ptr<velox::common::ScanSpec> scanSpec)
: pool_{pool},
const dwio::common::ReaderOptions& options,
std::unique_ptr<dwio::common::BufferedInput> input)
: options_{options},
arena_(std::make_unique<google::protobuf::Arena>()),
decryptorFactory_(decryptorFactory),
footerEstimatedSize_(footerEstimatedSize),
filePreloadThreshold_(filePreloadThreshold),
input_(std::move(input)),
randomSkip_(std::move(randomSkip)),
scanSpec_(std::move(scanSpec)),
fileLength_(input_->getReadFile()->size()) {
process::TraceContext trace("ReaderBase::ReaderBase");
// TODO: make a config
DWIO_ENSURE(fileLength_ > 0, "ORC file is empty");
VELOX_CHECK_GE(fileLength_, 4, "File size too small");

const auto preloadFile = fileLength_ <= filePreloadThreshold_;
const uint64_t readSize =
preloadFile ? fileLength_ : std::min(fileLength_, footerEstimatedSize_);
const auto preloadFile = fileLength_ <= options_.filePreloadThreshold();
const uint64_t readSize = preloadFile
? fileLength_
: std::min(fileLength_, options_.footerEstimatedSize());
if (input_->supportSyncLoad()) {
input_->enqueue({fileLength_ - readSize, readSize, "footer"});
input_->load(preloadFile ? LogType::FILE : LogType::FOOTER);
Expand All @@ -135,7 +112,7 @@ ReaderBase::ReaderBase(
fileLength_,
"Corrupted file, Post script size is invalid");

if (fileFormat == FileFormat::DWRF) {
if (fileFormat() == FileFormat::DWRF) {
auto postScript = ProtoUtils::readProto<proto::PostScript>(
input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER));
postScript_ = std::make_unique<PostScript>(std::move(postScript));
Expand Down Expand Up @@ -173,7 +150,7 @@ ReaderBase::ReaderBase(

auto footerStream = input_->read(
fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER);
if (fileFormat == FileFormat::DWRF) {
if (fileFormat() == FileFormat::DWRF) {
auto footer =
google::protobuf::Arena::CreateMessage<proto::Footer>(arena_.get());
ProtoUtils::readProtoInto<proto::Footer>(
Expand All @@ -190,7 +167,7 @@ ReaderBase::ReaderBase(
}

schema_ = std::dynamic_pointer_cast<const RowType>(
convertType(*footer_, 0, fileColumnNamesReadAsLowerCase));
convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase()));
VELOX_CHECK_NOT_NULL(schema_, "invalid schema");

// load stripe index/footer cache
Expand All @@ -204,8 +181,8 @@ ReaderBase::ReaderBase(
input_->read(cacheOffset, cacheSize, LogType::FOOTER));
input_->load(LogType::FOOTER);
} else {
auto cacheBuffer =
std::make_shared<dwio::common::DataBuffer<char>>(pool, cacheSize);
auto cacheBuffer = std::make_shared<dwio::common::DataBuffer<char>>(
options_.memoryPool(), cacheSize);
input_->read(cacheOffset, cacheSize, LogType::FOOTER)
->readFully(cacheBuffer->data(), cacheSize);
cache_ = std::make_unique<StripeMetadataCache>(
Expand All @@ -226,7 +203,8 @@ ReaderBase::ReaderBase(
}
}
// initialize file decrypter
handler_ = DecryptionHandler::create(*footer_, decryptorFactory_.get());
handler_ =
DecryptionHandler::create(*footer_, options_.decrypterFactory().get());
}

std::vector<uint64_t> ReaderBase::rowsPerStripe() const {
Expand Down
Loading

0 comments on commit 04a1e1f

Please sign in to comment.