Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Feb 20, 2024
1 parent d55c48b commit bef8dc4
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 33 deletions.
10 changes: 5 additions & 5 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,19 @@ bool HiveConfig::ignoreMissingFiles(const Config* session) const {
}

int64_t HiveConfig::maxCoalescedBytes() const {
return config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20);
return config_->get<int64_t>(kMaxCoalescedBytes, 50 << 10);
}

int32_t HiveConfig::maxCoalescedDistanceBytes() const {
return config_->get<int32_t>(kMaxCoalescedDistanceBytes, 512 << 10);
return config_->get<int32_t>(kMaxCoalescedDistanceBytes, 0);
}

int32_t HiveConfig::prefetchRowGroups() const {
return config_->get<int32_t>(kPrefetchRowGroups, 1);
}

int32_t HiveConfig::loadQuantum() const {
return config_->get<int32_t>(kLoadQuantum, 8 << 20);
return config_->get<int32_t>(kLoadQuantum, 50 << 10);
}

int32_t HiveConfig::numCacheFileHandles() const {
Expand Down Expand Up @@ -195,11 +195,11 @@ uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* session) const {
}

uint64_t HiveConfig::footerEstimatedSize() const {
return config_->get<uint64_t>(kFooterEstimatedSize, 1UL << 20);
return config_->get<uint64_t>(kFooterEstimatedSize, 100);
}

uint64_t HiveConfig::filePreloadThreshold() const {
return config_->get<uint64_t>(kFilePreloadThreshold, 8UL << 20);
return config_->get<uint64_t>(kFilePreloadThreshold, 100);
}

bool HiveConfig::s3UseProxyFromEnv() const {
Expand Down
27 changes: 14 additions & 13 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ void configureReaderOptions(
const RowTypePtr& fileSchema,
const std::shared_ptr<HiveConnectorSplit>& hiveSplit,
const std::unordered_map<std::string, std::string>& tableParameters) {
readerOptions.setLoadQuantum(hiveConfig->loadQuantum());
readerOptions.setMaxCoalesceBytes(hiveConfig->maxCoalescedBytes());
readerOptions.setMaxCoalesceDistance(hiveConfig->maxCoalescedDistanceBytes());
readerOptions.setFileColumnNamesReadAsLowerCase(
Expand Down Expand Up @@ -586,19 +587,19 @@ std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
const ConnectorQueryCtx* connectorQueryCtx,
std::shared_ptr<io::IoStatistics> ioStats,
folly::Executor* executor) {
if (connectorQueryCtx->cache()) {
return std::make_unique<dwio::common::CachedBufferedInput>(
fileHandle.file,
dwio::common::MetricsLog::voidLog(),
fileHandle.uuid.id(),
connectorQueryCtx->cache(),
Connector::getTracker(
connectorQueryCtx->scanId(), readerOpts.loadQuantum()),
fileHandle.groupId.id(),
ioStats,
executor,
readerOpts);
}
// if (connectorQueryCtx->cache()) {
// return std::make_unique<dwio::common::CachedBufferedInput>(
// fileHandle.file,
// dwio::common::MetricsLog::voidLog(),
// fileHandle.uuid.id(),
// connectorQueryCtx->cache(),
// Connector::getTracker(
// connectorQueryCtx->scanId(), readerOpts.loadQuantum()),
// fileHandle.groupId.id(),
// ioStats,
// executor,
// readerOpts);
// }
return std::make_unique<dwio::common::DirectBufferedInput>(
fileHandle.file,
dwio::common::MetricsLog::voidLog(),
Expand Down
14 changes: 5 additions & 9 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,7 @@ HiveDataSource::HiveDataSource(
for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) {
filters.emplace(k.clone(), v->clone());
}
auto remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle_->remainingFilter(),
expressionEvaluator_,
false,
filters);
auto remainingFilter = hiveTableHandle_->remainingFilter();

std::vector<common::Subfield> remainingFilterSubfields;
if (remainingFilter) {
Expand Down Expand Up @@ -207,10 +203,10 @@ HiveDataSource::HiveDataSource(
hiveTableHandle_->dataColumns(),
partitionKeys_,
pool_);
if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
*scanSpec_, *remainingFilter, expressionEvaluator_);
}
// if (remainingFilter) {
// metadataFilter_ = std::make_shared<common::MetadataFilter>(
// *scanSpec_, *remainingFilter, expressionEvaluator_);
// }

ioStats_ = std::make_shared<io::IoStatistics>();
}
Expand Down
21 changes: 20 additions & 1 deletion velox/dwio/common/DirectBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/dwio/common/DirectBufferedInput.h"
#include <iostream>
#include <thread>
#include "velox/common/memory/Allocation.h"
#include "velox/common/process/TraceContext.h"
#include "velox/dwio/common/DirectInputStream.h"
Expand All @@ -32,6 +34,8 @@ using cache::TrackingId;
std::unique_ptr<SeekableInputStream> DirectBufferedInput::enqueue(
Region region,
const StreamIdentifier* sid = nullptr) {
std::cout << "enqueue, region.offset: " << region.offset
<< ", length: " << region.length << std::endl;
if (!coalescedLoads_.empty()) {
// Results of previous load are no more available here.
coalescedLoads_.clear();
Expand Down Expand Up @@ -78,7 +82,8 @@ namespace {

// True if the percentage is high enough to warrant prefetch.
bool isPrefetchablePct(int32_t pct) {
return pct >= FLAGS_cache_prefetch_min_pct;
return true;
// return pct >= FLAGS_cache_prefetch_min_pct;
}

int32_t adjustedReadPct(const cache::TrackingData& trackingData) {
Expand Down Expand Up @@ -180,12 +185,25 @@ void DirectBufferedInput::makeLoads(
readRegion(ranges, shouldPrefetch);
});
if (shouldPrefetch && executor_) {
std::cout << "coalescedLoads_.size(): " << coalescedLoads_.size()
<< std::endl;
for (auto i = 0; i < coalescedLoads_.size(); ++i) {
auto& load = coalescedLoads_[i];
if (load->state() == CoalescedLoad::State::kPlanned) {
const auto& requests =
std::dynamic_pointer_cast<DirectCoalescedLoad>(load)->requests();
std::cout << "requests.size(): " << requests.size() << std::endl;
for (const auto& request : requests) {
std::cout << "request region offset: " << request.region.offset
<< ", length: " << request.region.length << std::endl;
}
executor_->add([pendingLoad = load]() {
std::cout << "log1" << std::endl;
// std::cout << "thread " << std::this_thread::get_id() << std::endl;
process::TraceContext trace("Read Ahead");
pendingLoad->loadOrFuture(nullptr);
std::cout << "log2" << std::endl;
// std::cout << "thread " << std::this_thread::get_id() << std::endl;
});
}
}
Expand Down Expand Up @@ -272,6 +290,7 @@ std::vector<cache::CachePin> DirectCoalescedLoad::loadData(bool isPrefetch) {
lastEnd = region.offset + request.loadSize;
size += std::min<int32_t>(loadQuantum_, region.length);
}
std::cout << "input read" << std::endl;
input_->read(buffers, requests_[0].region.offset, LogType::FILE);
ioStats_->read().increment(size);
ioStats_->incRawOverreadBytes(overread);
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/common/DirectInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "velox/common/time/Timer.h"
#include "velox/dwio/common/DirectBufferedInput.h"
#include "velox/dwio/common/DirectInputStream.h"
#include <iostream>

using ::facebook::velox::common::Region;

Expand Down Expand Up @@ -132,6 +133,8 @@ makeRanges(size_t size, memory::Allocation& data, std::string& tinyData) {
} // namespace

void DirectInputStream::loadSync() {
std::cout << "loadSync loadedRegion_.offset: " << loadedRegion_.offset
<< ", loadedRegion_.length: " << loadedRegion_.length << std::endl;
if (region_.length < DirectBufferedInput::kTinySize &&
data_.numPages() == 0) {
tinyData_.resize(region_.length);
Expand Down
Binary file not shown.
78 changes: 73 additions & 5 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/type/tests/SubfieldFiltersBuilder.h"

#include <iostream>
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/exec/PlanNodeStats.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
Expand All @@ -42,7 +44,9 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
connector::getConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(
kHiveConnectorId, std::make_shared<core::MemConfig>());
kHiveConnectorId,
std::make_shared<core::MemConfig>(),
executor_.get());
connector::registerConnector(hiveConnector);
}

Expand Down Expand Up @@ -122,6 +126,9 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
filePath, 1, dwio::common::FileFormat::PARQUET)[0];
}

std::shared_ptr<folly::Executor> executor_ =
std::make_shared<folly::CPUThreadPoolExecutor>(1);

private:
RowTypePtr getRowType(std::vector<std::string>&& outputColumnNames) const {
std::vector<TypePtr> types;
Expand Down Expand Up @@ -408,11 +415,8 @@ TEST_F(ParquetTableScanTest, readAsLowerCase) {
.tableScan(ROW({"a"}, {BIGINT()}), {}, "")
.planNode();
CursorParameters params;
std::shared_ptr<folly::Executor> executor =
std::make_shared<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency());
std::shared_ptr<core::QueryCtx> queryCtx =
std::make_shared<core::QueryCtx>(executor.get());
std::make_shared<core::QueryCtx>(executor_.get());
std::unordered_map<std::string, std::string> session = {
{std::string(
connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession),
Expand Down Expand Up @@ -441,6 +445,70 @@ TEST_F(ParquetTableScanTest, readAsLowerCase) {
ASSERT_TRUE(waitForTaskCompletion(result.first->task().get()));
assertEqualResults(
result.second, {makeRowVector({"a"}, {makeFlatVector<int64_t>({0, 1})})});
if (result.first) {
result.first->task()->requestCancel().wait();
// result.first->task().reset();
}
std::cout << "pool: " << queryCtx->pool()->treeMemoryUsage() << std::endl;
}

TEST_F(ParquetTableScanTest, readRowGroup) {
auto plan = PlanBuilder(pool_.get())
.tableScan(
ROW({"a"}, {INTEGER()}),
{},
"b == c",
ROW({"b", "c", "a"}, {INTEGER(), INTEGER(), INTEGER()}))
.planNode();
CursorParameters params;
auto pool = memory::deprecatedDefaultMemoryManager().addRootPool("scan");
std::unordered_map<std::string, std::shared_ptr<Config>> connectorConfigs =
{};
std::shared_ptr<core::QueryCtx> queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(), core::QueryConfig{{}}, connectorConfigs, nullptr, pool);
std::unordered_map<std::string, std::string> session = {
{std::string(connector::hive::HiveConfig::kLoadQuantum), "51200"},
{std::string(connector::hive::HiveConfig::kMaxCoalescedDistanceBytes),
"0"},
{std::string(connector::hive::HiveConfig::kMaxCoalescedBytes), "51200"}};
queryCtx->setConnectorSessionOverridesUnsafe(
kHiveConnectorId, std::move(session));
params.queryCtx = queryCtx;
params.planNode = plan;
const int numSplitsPerFile = 1;

bool noMoreSplits = false;
auto addSplits = [&](exec::Task* task) {
if (!noMoreSplits) {
auto const splits = HiveConnectorTestBase::makeHiveConnectorSplits(
{getExampleFilePath("row_group.parquet")},
numSplitsPerFile,
dwio::common::FileFormat::PARQUET);
for (const auto& split : splits) {
task->addSplit("0", exec::Split(split));
}
task->noMoreSplits("0");
}
noMoreSplits = true;
};
auto result = readCursor(params, addSplits);
ASSERT_TRUE(waitForTaskCompletion(result.first->task().get()));
// assertEqualResults(
// result.second, {makeRowVector({"a"}, {makeFlatVector<int64_t>({0,
// 1})})});
std::cout << printPlanWithStats(
*plan, result.first->task()->taskStats(), true)
<< std::endl;
// for (const auto& rv : result.second) {
// for (int32_t i = 0; i < rv->size(); ++i) {
// std::cout << rv->toString(i) << std::endl;
// }
// }
if (result.first) {
result.first->task()->requestCancel().wait();
// result.first->task().reset();
}
std::cout << "pool: " << pool->treeMemoryUsage() << std::endl;
}

int main(int argc, char** argv) {
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "velox/exec/Operator.h"
#include "velox/exec/Task.h"

#include <iostream>

using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::exec {
Expand Down Expand Up @@ -829,6 +831,7 @@ void Driver::closeOperators() {
}

void Driver::close() {
std::cout << "close" << std::endl;
if (closed_) {
// Already closed.
return;
Expand Down

0 comments on commit bef8dc4

Please sign in to comment.