Skip to content

Commit

Permalink
Add tests for tpc {h | ds} data generator (#10564)
Browse files Browse the repository at this point in the history
  • Loading branch information
iddqdex authored Oct 21, 2024
1 parent b71da95 commit 3fb6997
Show file tree
Hide file tree
Showing 15 changed files with 571 additions and 48 deletions.
45 changes: 25 additions & 20 deletions ydb/library/workload/tpcds/data_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,30 @@ TBulkDataGeneratorList TTpcdsWorkloadDataInitializerGenerator::DoGetBulkInitialD
return TBulkDataGeneratorList(gens.begin(), gens.end());
}

ui64 TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) {
TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TPositions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) {
static const TSet<ui32> allowedModules{1, 2, 4};
TPositions result;
const auto* tdef = getTdefsByNumber(tableNum);
if (!tdef) {
return 0;
return result;
}
i64 position = 0;
split_work(tableNum, &result.FirstRow, &result.Count);
if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdef->name)) {
position = owner.StateProcessor->GetState().at(tdef->name).Position;
result.Position = owner.StateProcessor->GetState().at(tdef->name).Position;
result.Count -= std::min<i64>(result.Count, result.Position);

//this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
while (result.Position && !allowedModules.contains((result.FirstRow + result.Position) % 6)) {
--result.Position;
++result.Count;
}
}
ds_key_t firstRow;
ds_key_t rowCount;
split_work(tableNum, &firstRow, &rowCount);
return rowCount > position ? (rowCount - position) : 0;
//this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
while (result.FirstRow > 1 && !allowedModules.contains((result.FirstRow + result.Position) % 6)) {
--result.FirstRow;
++result.Count;
}
return result;
}

TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum)
Expand Down Expand Up @@ -116,10 +127,10 @@ TString TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTable
}

TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum)
: IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true))
: IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true).Count)
, TableNum(tableNum)
, Owner(owner)
, TableSize(CalcCountToGenerate(owner, tableNum, false))
, TableSize(CalcCountToGenerate(owner, tableNum, false).Count)
{}

TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
Expand All @@ -136,16 +147,10 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds
}

auto g = Guard(Lock);
ds_key_t firstRow;
ds_key_t rowCount;
split_work(TableNum, &firstRow, &rowCount);
auto positions = CalcCountToGenerate(Owner, TableNum, !Generated);
if (!Generated) {
ui32 toSkip = firstRow - 1;
if (!!Owner.StateProcessor && Owner.StateProcessor->GetState().contains(GetName())) {
Generated = Owner.StateProcessor->GetState().at(TString(GetName())).Position;
toSkip += Generated;
}
if (toSkip) {
Generated = positions.Position;
if (const ui32 toSkip = positions.FirstRow + positions.Position - 1) {
row_skip(TableNum, toSkip);
if (tdef->flags & FL_PARENT) {
row_skip(tdef->nParam, toSkip);
Expand All @@ -160,7 +165,7 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds
return result;
}
ctxs.front().SetCount(count);
ctxs.front().SetStart(firstRow + Generated);
ctxs.front().SetStart(positions.FirstRow + Generated);
Generated += count;
GenerateRows(ctxs, std::move(g));
for(auto& ctx: ctxs) {
Expand Down
7 changes: 6 additions & 1 deletion ydb/library/workload/tpcds/data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ class TTpcdsWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBas

private:
TString GetFullTableName(const char* table) const;
static ui64 CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState);
struct TPositions {
i64 FirstRow = 1;
ui64 Position = 0;
i64 Count = 0;
};
static TPositions CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState);
const TTpcdsWorkloadDataInitializerGenerator& Owner;
ui64 TableSize;
};
Expand Down
18 changes: 16 additions & 2 deletions ydb/library/workload/tpch/driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define NO_FUNC (int (*) ()) NULL /* to clean up tdefs */
#define NO_LFUNC (long (*) ()) NULL /* to clean up tdefs */

void advanceStream(int nStream, DSS_HUGE nCalls, int bUse64Bit);
long sd_cust (int child, DSS_HUGE skip_count);
long sd_line (int child, DSS_HUGE skip_count);
long sd_order (int child, DSS_HUGE skip_count);
Expand All @@ -16,6 +17,19 @@ long sd_psupp (int child, DSS_HUGE skip_count);
long sd_supp (int child, DSS_HUGE skip_count);
long sd_order_line (int child, DSS_HUGE skip_count);
long sd_part_psupp (int child, DSS_HUGE skip_count);

long sd_region (int child, DSS_HUGE skip_count) {
(void)child;
advanceStream(R_CMNT_SD, 2 * skip_count, 0);
return 0;
}

long sd_nation (int child, DSS_HUGE skip_count) {
(void)child;
advanceStream(N_CMNT_SD, 2 * skip_count, 0);
return 0;
}

void ReadDistFromResource(const char* name, distribution* target);

tdef tdefs[] =
Expand All @@ -28,8 +42,8 @@ tdef tdefs[] =
{"lineitem", "lineitem table", 150000 * ORDERS_PER_CUST, NO_FUNC, sd_line, NONE, 0}, //LINE 5
{"orders", "orders/lineitem tables", 150000 * ORDERS_PER_CUST, NO_FUNC, sd_order, LINE, 0}, //ORDER_LINE 6
{"part", "part/partsupplier tables", 200000, NO_FUNC, sd_part, PSUPP, 0}, //PART_PSUPP 7
{"nation", "nation table", NATIONS_MAX, NO_FUNC, NO_LFUNC, NONE, 0}, //NATION 8
{"region", "region table", NATIONS_MAX, NO_FUNC, NO_LFUNC, NONE, 0}, //REGION 9
{"nation", "nation table", NATIONS_MAX, NO_FUNC, sd_nation, NONE, 0}, //NATION 8
{"region", "region table", NATIONS_MAX, NO_FUNC, sd_region, NONE, 0}, //REGION 9
};


Expand Down
104 changes: 80 additions & 24 deletions ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ void TWorkloadCommandImport::Config(TConfig& config) {
.DefaultValue(WorkloadParams.BulkSize).StoreResult(&WorkloadParams.BulkSize);
config.Opts->AddLongOption("max-in-flight", "Maximum number if data portions that can be simultaneously in process.")
.DefaultValue(UploadParams.MaxInFlight).StoreResult(&UploadParams.MaxInFlight);
config.Opts->AddLongOption('f', "file-output-path", "Path to a directory to save tables into as files instead of uploading it to db.")
.StoreResult(&UploadParams.FileOutputPath);
}

TWorkloadCommandImport::TUploadParams::TUploadParams()
Expand All @@ -45,6 +47,11 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe
auto dataGeneratorList = Initializer->GetBulkInitialData();
AtomicSet(ErrorsCount, 0);
InFlightSemaphore = MakeHolder<TFastSemaphore>(UploadParams.MaxInFlight);
if (UploadParams.FileOutputPath.IsDefined()) {
Writer = MakeHolder<TFileWriter>(*this);
} else {
Writer = MakeHolder<TDbWriter>(*this);
}
for (auto dataGen : dataGeneratorList) {
TThreadPoolParams params;
params.SetCatching(false);
Expand All @@ -69,39 +76,88 @@ int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGe
return AtomicGet(ErrorsCount) ? EXIT_FAILURE : EXIT_SUCCESS;
}

TAsyncStatus TWorkloadCommandImport::TUploadCommand::SendDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) const {
auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) {
return TStatus(result.GetValueSync());
};
if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
return TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult);
class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter {
public:
using IWriter::IWriter;

TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override {
auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) {
return TStatus(result.GetValueSync());
};
if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
return Owner.TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult);
}
NRetry::TRetryOperationSettings retrySettings;
retrySettings.RetryUndefined(true);
retrySettings.MaxRetries(30);
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) {
return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
NTable::TBulkUpsertSettings settings;
settings.FormatSettings(value->FormatString);
return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::CSV, value->Data, TString(), settings)
.Apply(convertResult);
}, retrySettings);
}
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) {
return Owner.TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, value->Data, value->Schema)
.Apply(convertResult);
}, retrySettings);
}
Y_FAIL_S("Invalid data portion");
}
};

class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter {
public:
TFileWriter(const TWorkloadCommandImport::TUploadCommand& owner)
:IWriter(owner)
{
Owner.UploadParams.FileOutputPath.ForceDelete();
Owner.UploadParams.FileOutputPath.MkDirs();
}
NRetry::TRetryOperationSettings retrySettings;
retrySettings.RetryUndefined(true);
retrySettings.MaxRetries(30);
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) {
return TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
NTable::TBulkUpsertSettings settings;
settings.FormatSettings(value->FormatString);
return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::CSV, value->Data, TString(), settings)
.Apply(convertResult);
}, retrySettings);

TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override {
if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
return NThreading::MakeErrorFuture<TStatus>(std::make_exception_ptr(yexception() << "Not implemented"));
}
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TCsv>(&portion->MutableData())) {
auto g = Guard(Lock);
auto [out, created] = GetOutput(portion->GetTable());
TStringBuf toWrite(value->Data);
if (!created) {
TStringBuf firstLine;
toWrite.ReadLine(firstLine);
}
out->Write(toWrite);
return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues()));
}
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) {
return NThreading::MakeErrorFuture<TStatus>(std::make_exception_ptr(yexception() << "Not implemented"));
}
Y_FAIL_S("Invalid data portion");
}
if (auto* value = std::get_if<NYdbWorkload::IBulkDataGenerator::TDataPortion::TArrow>(&portion->MutableData())) {
return TableClient->RetryOperation([value, portion, convertResult](NTable::TTableClient& client) {
return client.BulkUpsert(portion->GetTable(), NTable::EDataFormat::ApacheArrow, value->Data, value->Schema)
.Apply(convertResult);
}, retrySettings);

private:
std::pair<TFileOutput*, bool> GetOutput(const TString& table) {
auto fname = TFsPath(table).Basename();
if (auto* result = MapFindPtr(CsvOutputs, fname)) {
return std::make_pair(result->Get(), false);
}
auto result = MakeAtomicShared<TFileOutput>(Owner.UploadParams.FileOutputPath / fname);
CsvOutputs[fname] = result;
return std::make_pair(result.Get(), true);
}
Y_FAIL_S("Invalid data portion");
}
TMap<TString, TAtomicSharedPtr<TFileOutput>> CsvOutputs;
TAdaptiveLock Lock;
};

void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try {
TAtomic counter = 0;
for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
for (const auto& data: portions) {
AtomicIncrement(counter);
SendDataPortion(data).Apply(
Writer->WriteDataPortion(data).Apply(
[data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
const auto& res = result.GetValueSync();
data->SetSendResult(res);
Expand Down
15 changes: 14 additions & 1 deletion ydb/public/lib/ydb_cli/commands/ydb_workload_import.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TWorkloadCommandImport final: public TClientCommandTree {
TUploadParams();
ui32 Threads;
ui32 MaxInFlight = 128;
TFsPath FileOutputPath;
};
class TUploadCommand;

Expand All @@ -26,9 +27,20 @@ class TWorkloadCommandImport::TUploadCommand final: public TWorkloadCommandBase
virtual void Config(TConfig& config) override;

private:
class IWriter {
public:
IWriter(const TWorkloadCommandImport::TUploadCommand& owner)
: Owner(owner)
{}
virtual ~IWriter() = default;
virtual TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) = 0;
protected:
const TWorkloadCommandImport::TUploadCommand& Owner;
};
class TFileWriter;
class TDbWriter;
NTable::TSession GetSession();
int DoRun(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config) override;
TAsyncStatus SendDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) const;
void ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept;

const TUploadParams& UploadParams;
Expand All @@ -37,6 +49,7 @@ class TWorkloadCommandImport::TUploadCommand final: public TWorkloadCommandBase
TAdaptiveLock Lock;
THolder<TFastSemaphore> InFlightSemaphore;
TAtomic ErrorsCount;
THolder<IWriter> Writer;
};

}
20 changes: 20 additions & 0 deletions ydb/tests/functional/tpc/canondata/result.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"test_generator.TestTpcdsGenerator.test_s1": {
"uri": "file://test_generator.TestTpcdsGenerator.test_s1/s1.hash"
},
"test_generator.TestTpcdsGenerator.test_s1_parts": {
"uri": "file://test_generator.TestTpcdsGenerator.test_s1_parts/s1.hash"
},
"test_generator.TestTpcdsGenerator.test_s1_state": {
"uri": "file://test_generator.TestTpcdsGenerator.test_s1_state/s1.hash"
},
"test_generator.TestTpchGenerator.test_s1": {
"uri": "file://test_generator.TestTpchGenerator.test_s1/s1.hash"
},
"test_generator.TestTpchGenerator.test_s1_parts": {
"uri": "file://test_generator.TestTpchGenerator.test_s1_parts/s1.hash"
},
"test_generator.TestTpchGenerator.test_s1_state": {
"uri": "file://test_generator.TestTpchGenerator.test_s1_state/s1.hash"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
call_center count: 6
call_center md5: 86db117a0bb48668acbe63c473e85d96
catalog_page count: 11718
catalog_page md5: 0bf750caa038dee0f1f9618414f8add1
catalog_returns count: 144067
catalog_returns md5: 78c1bd181f07d7067be644a301a1e15a
catalog_sales count: 1441548
catalog_sales md5: 47a7b34e4cd097c9b89457497550527c
customer count: 100000
customer md5: 4f35263f5c2e15d6ab687f14d1acfee7
customer_address count: 50000
customer_address md5: edda298b082245c2d0ce0bcd97af1335
customer_demographics count: 1920800
customer_demographics md5: 4f6182b865d1c183d50860387332c0b5
date_dim count: 73049
date_dim md5: f4ef03663ab568ddeb16309f493896c0
household_demographics count: 7200
household_demographics md5: b1c3ff23e00da09d7fc94ce5cd8abdd7
income_band count: 20
income_band md5: 5dbfb6a7379a3ccb81004d9abae0df5e
inventory count: 11745000
inventory md5: 24fe36237ddbad4d9be3136f9ec49299
item count: 18000
item md5: 364b883875279ed9ef3ab5dada368d7c
promotion count: 300
promotion md5: 1660520863026204779c646c58cb8870
reason count: 35
reason md5: 89493ae8b5ab9f63f750c1bdadc57089
ship_mode count: 20
ship_mode md5: 25d7c1abd229862398b88818f81f72fc
store count: 12
store md5: f342258aaec198b0ec4d6bb6e9f7991e
store_returns count: 287514
store_returns md5: 038278c999f980849c84e99da9e213c2
store_sales count: 2880404
store_sales md5: 5c5d3f51c925f546248c7cf68d9055cd
time_dim count: 86400
time_dim md5: 88f81e9a8618f855f4ff20d98e6ec122
warehouse count: 5
warehouse md5: 02268070dffd49682bc54c42580ac2ac
web_page count: 60
web_page md5: 4a2551b4b2243b5030e5f23a605db603
web_returns count: 71763
web_returns md5: 0b4934c14bed8f3048deb6873cc37921
web_sales count: 719384
web_sales md5: b866c9b742560f2d78630853dd2b81c4
web_site count: 30
web_site md5: 707d556c664272f685ee8d7ddbc46f61
Loading

0 comments on commit 3fb6997

Please sign in to comment.