Skip to content

Commit

Permalink
YQ-2744: fix row mode in workload tpch init (ydb-platform#1266)
Browse files Browse the repository at this point in the history
* YQ-2744: fix row mode in workload tpch init

* Add cleaning by path

* Update interface

* Integrate queries into cli
  • Loading branch information
EgorkaZ authored and shnikd committed Feb 6, 2024
1 parent 1f1d945 commit 189f854
Show file tree
Hide file tree
Showing 5 changed files with 1,279 additions and 49 deletions.
81 changes: 44 additions & 37 deletions ydb/public/lib/ydb_cli/commands/tpch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,27 @@ namespace {

TVector<TString> TTpchCommandRun::GetQueries() const {
TVector<TString> queries;
TFsPath queriesDir(ExternalQueriesDir);
TVector<TString> queriesList;
queriesDir.ListNames(queriesList);
std::sort(queriesList.begin(), queriesList.end(), [](const TString& l, const TString& r) {
auto leftNum = l.substr(1);
auto rightNum = r.substr(1);
return std::stoi(leftNum) < std::stoi(rightNum);
});
for (auto&& queryFileName : queriesList) {
const TString expectedFileName = "q" + ::ToString(getQueryNumber(queries.size())) + ".sql";
Y_ABORT_UNLESS(queryFileName == expectedFileName, "incorrect files naming. have to be q<number>.sql where number in [1, N], where N is requests count");
TFileInput fInput(ExternalQueriesDir + "/" + expectedFileName);
auto query = fInput.ReadAll();
if (!ExternalQueriesDir.Empty()) {
TFsPath queriesDir(ExternalQueriesDir);
TVector<TString> queriesList;
queriesDir.ListNames(queriesList);
std::sort(queriesList.begin(), queriesList.end(), [](const TString& l, const TString& r) {
auto leftNum = l.substr(1);
auto rightNum = r.substr(1);
return std::stoi(leftNum) < std::stoi(rightNum);
});
for (auto&& queryFileName : queriesList) {
const TString expectedFileName = "q" + ::ToString(getQueryNumber(queries.size())) + ".sql";
Y_ABORT_UNLESS(queryFileName == expectedFileName, "incorrect files naming. have to be q<number>.sql where number in [1, N], where N is requests count");
TFileInput fInput(ExternalQueriesDir + "/" + expectedFileName);
queries.emplace_back(fInput.ReadAll());
}
} else {
queries = StringSplitter(NResource::Find("tpch_queries.sql")).SplitByString("-- end query").ToList<TString>();
}

for (auto& query : queries) {
SubstGlobal(query, "{path}", TablesPath);
queries.emplace_back(query);
}
return queries;
}
Expand Down Expand Up @@ -125,11 +131,11 @@ bool TTpchCommandRun::RunBench(TConfig& config)
testInfo.ColdTime.MilliSeconds() * 0.001, testInfo.Min.MilliSeconds() * 0.001, testInfo.Max.MilliSeconds() * 0.001,
testInfo.Mean * 0.001, testInfo.Std * 0.001) << Endl;
if (collectJsonSensors) {
jsonReport.AppendValue(GetSensorValue("ColdTime", testInfo.ColdTime, queryN));
jsonReport.AppendValue(GetSensorValue("Min", testInfo.Min, queryN));
jsonReport.AppendValue(GetSensorValue("Max", testInfo.Max, queryN));
jsonReport.AppendValue(GetSensorValue("Mean", testInfo.Mean, queryN));
jsonReport.AppendValue(GetSensorValue("Std", testInfo.Std, queryN));
jsonReport.AppendValue(GetSensorValue("ColdTime", testInfo.ColdTime, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Min", testInfo.Min, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Max", testInfo.Max, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Mean", testInfo.Mean, getQueryNumber(queryN)));
jsonReport.AppendValue(GetSensorValue("Std", testInfo.Std, getQueryNumber(queryN)));
}
}

Expand Down Expand Up @@ -222,14 +228,12 @@ void TTpchCommandInit::Config(TConfig& config) {
"column - use column-based storage engine.\n"
"s3 - use cloud tpc bucket")
.DefaultValue("row").StoreResult(&StoreType);
config.Opts->AddLongOption('s', "scale", "TPC-H dataset scale. One of 1, 10, 100, 1000. Default is 1")
config.Opts->AddLongOption("s3-prefix", "Root path to TPC-H dataset in s3 storage")
.Optional()
.DefaultValue("1")
.StoreResult(&Scale);
config.Opts->AddLongOption('b', "bucket", "S3 bucket with TPC-H dataset")
.StoreResult(&S3Prefix);
config.Opts->AddLongOption('e', "s3-endpoint", "Endpoint of S3 bucket with TPC-H dataset")
.Optional()
.DefaultValue("")
.StoreResult(&Bucket);
.StoreResult(&S3Endpoint);
};

void TTpchCommandInit::SetPartitionByCols(TString& createSql) {
Expand All @@ -256,7 +260,7 @@ void TTpchCommandInit::SetPartitionByCols(TString& createSql) {

int TTpchCommandInit::Run(TConfig& config) {
StoreType = to_lower(StoreType);
TString storageType = "";
TString storageType = "-- ";
TString notNull = "";
TString createExternalDataSource;
TString external;
Expand All @@ -266,20 +270,19 @@ int TTpchCommandInit::Run(TConfig& config) {
storageType = "STORE = COLUMN, --";
notNull = "NOT NULL";
} else if (StoreType == "s3") {
storageType = R"(DATA_SOURCE = "_tpc_s3_external_source", FORMAT = "parquet", LOCATION = )";
storageType = fmt::format(R"(DATA_SOURCE = "{}_tpc_s3_external_source", FORMAT = "parquet", LOCATION = )", TablesPath);
notNull = "NOT NULL";
createExternalDataSource = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `_tpc_s3_external_source` WITH (
CREATE EXTERNAL DATA SOURCE `{}_tpc_s3_external_source` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="https://storage.yandexcloud.net/{}/",
LOCATION="{}",
AUTH_METHOD="NONE"
);
)", Bucket);
)", TablesPath, S3Endpoint);
external = "EXTERNAL";
partitioning = "--";
primaryKey = "--";
} else if (StoreType != "row") {
storageType = "-- ";
throw yexception() << "Incorrect storage type. Available options: \"row\", \"column\"." << Endl;
}

Expand All @@ -292,9 +295,9 @@ int TTpchCommandInit::Run(TConfig& config) {
SubstGlobal(createSql, "{external}", external);
SubstGlobal(createSql, "{notnull}", notNull);
SubstGlobal(createSql, "{partitioning}", partitioning);
SubstGlobal(createSql, "{primary_key}", primaryKey);
SubstGlobal(createSql, "{path}", TablesPath);
SubstGlobal(createSql, "{scale}", Scale);
SubstGlobal(createSql, "{primary_key}", primaryKey);
SubstGlobal(createSql, "{s3_prefix}", S3Prefix);
SubstGlobal(createSql, "{store}", storageType);
SetPartitionByCols(createSql);

Expand All @@ -319,15 +322,18 @@ void TTpchCommandClean::Config(TConfig& config) {
config.Opts->AddLongOption('e', "external", "Drop tables as external. Use if initialized with external storage")
.Optional()
.StoreTrue(&IsExternal);
config.Opts->AddLongOption('p', "path", "Folder name where benchmark tables are located")
.Optional()
.StoreResult(&TablesPath);
};

int TTpchCommandClean::Run(TConfig& config) {
auto driver = CreateDriver(config);
TTableClient client(driver);

TString dropDdl;
for (auto& table : Tables) {
TString fullPath = FullTablePath(config.Database, table);
for (const auto& table : Tables) {
TString fullPath = FullTablePath(config.Database, fmt::format("{}{}", TablesPath, table));
fmt::format_to(std::back_inserter(dropDdl), "DROP {} TABLE `{}`", IsExternal ? "EXTERNAL" : "", fullPath);

ThrowOnError(client.RetryOperationSync([&dropDdl](TSession session) {
Expand All @@ -337,8 +343,9 @@ int TTpchCommandClean::Run(TConfig& config) {
}

if (IsExternal) {
ThrowOnError(client.RetryOperationSync([](TSession session) {
return session.ExecuteSchemeQuery("DROP EXTERNAL DATA SOURCE `_tpc_s3_external_source`;").GetValueSync();
TString fullPath = FullTablePath(config.Database, fmt::format("{}_tpc_s3_external_source", TablesPath));
ThrowOnError(client.RetryOperationSync([&](TSession session) {
return session.ExecuteSchemeQuery(fmt::format("DROP EXTERNAL DATA SOURCE `{}`;", fullPath)).GetValueSync();
}));
}

Expand Down
7 changes: 3 additions & 4 deletions ydb/public/lib/ydb_cli/commands/tpch.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class TTpchCommandInit : public NYdb::NConsoleClient::TYdbCommand {

TString TablesPath;
TString StoreType;
TString Scale;
TString Bucket;
TString S3Endpoint;
TString S3Prefix;
};

class TTpchCommandClean : public NYdb::NConsoleClient::TYdbCommand {
Expand All @@ -31,15 +31,14 @@ class TTpchCommandClean : public NYdb::NConsoleClient::TYdbCommand {
std::vector<TString> Tables = {"customer", "lineitem", "nation", "orders",
"region", "part", "partsupp", "supplier"};
bool IsExternal = false;
TString TablesPath;
};

class TTpchCommandRun : public NYdb::NConsoleClient::TYdbCommand {
protected:
TSet<ui32> QueriesToRun;
TSet<ui32> QueriesToSkip;
TVector<TString> QuerySettings;
TString ExternalQueries;
TString ExternalQueriesFile;
TString ExternalQueriesDir;
TString ExternalVariablesString;
TString QueryExecuterType;
Expand Down
Loading

0 comments on commit 189f854

Please sign in to comment.