Skip to content

Commit

Permalink
Fix bugs in progress bar working with import benchmarks (#10939)
Browse files Browse the repository at this point in the history
  • Loading branch information
iddqdex authored Oct 25, 2024
1 parent 40eaeda commit 3a89b73
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 34 deletions.
5 changes: 4 additions & 1 deletion ydb/library/workload/abstract/workload_query_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ class IBulkDataGenerator {
TString Schema;
};

using TDataType = std::variant<NYdb::TValue, TCsv, TArrow>;
struct TSkip {
};

using TDataType = std::variant<NYdb::TValue, TCsv, TArrow, TSkip>;

template<class T>
TDataPortion(const TString& table, T&& data, ui64 size)
Expand Down
31 changes: 19 additions & 12 deletions ydb/library/workload/clickbench/data_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ TBulkDataGeneratorList TClickbenchWorkloadDataInitializerGenerator::DoGetBulkIni
}

TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TDataGenerartor(const TClickbenchWorkloadDataInitializerGenerator& owner)
: IBulkDataGenerator("hits", CalcSize(owner))
: IBulkDataGenerator("hits", DataSetSize)
, Owner(owner)
{
if (Owner.GetDataFiles().IsDirectory()) {
Expand All @@ -52,6 +52,22 @@ IBulkDataGenerator::TDataPortions TClickbenchWorkloadDataInitializerGenerator::T
if (Files.empty()) {
return {};
}
if (FirstPortion) {
FirstPortion = false;
ui64 toSkip = 0;
if (Owner.StateProcessor) {
for (const auto& [file, state]: Owner.StateProcessor->GetState()) {
toSkip += state.Position;
}
}
if (toSkip) {
return { MakeIntrusive<TDataPortion>(
Owner.Params.GetFullTableName(nullptr),
TDataPortion::TSkip(),
toSkip
)};
}
}
index = std::hash<std::thread::id>{}(std::this_thread::get_id()) % Files.size();
file = Files[index];
}
Expand All @@ -69,16 +85,6 @@ IBulkDataGenerator::TDataPortions TClickbenchWorkloadDataInitializerGenerator::T
}
}

ui64 TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::CalcSize(const TClickbenchWorkloadDataInitializerGenerator& owner) {
ui64 result = 99997497;
if (owner.StateProcessor) {
for (const auto& [file, state]: owner.StateProcessor->GetState()) {
result -= state.Position;
}
}
return result;
}

class TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TCsvFileBase: public TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TFile {
public:
TCsvFileBase(TDataGenerartor& owner, const TString& path, const TString& delimiter, const TString& foramt)
Expand Down Expand Up @@ -116,7 +122,8 @@ class TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TCsvFileBase
with_lock(Lock) {
TString line;
if (Owner.Owner.StateProcessor && Owner.Owner.StateProcessor->GetState().contains(Path)) {
while(Owner.Owner.StateProcessor->GetState().at(Path).Position > Readed && Decompressor->ReadLine(line)) {
auto position = Owner.Owner.StateProcessor->GetState().at(Path).Position;
while(position > Readed && Decompressor->ReadLine(line)) {
++Readed;
}
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/workload/clickbench/data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ class TClickbenchWorkloadDataInitializerGenerator: public TWorkloadDataInitializ
class TTsvFile;
class TCsvFile;
void AddFile(const TFsPath& path);
static ui64 CalcSize(const TClickbenchWorkloadDataInitializerGenerator& owner);

private:
const TClickbenchWorkloadDataInitializerGenerator& Owner;
TVector<TFile::TPtr> Files;
TAdaptiveLock Lock;
bool FirstPortion = true;
static constexpr ui64 DataSetSize = 99997497;
};
};

Expand Down
12 changes: 7 additions & 5 deletions ydb/library/workload/tpcds/data_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TPositions TTpcdsWor
split_work(tableNum, &result.FirstRow, &result.Count);
if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdef->name)) {
result.Position = owner.StateProcessor->GetState().at(tdef->name).Position;
result.Count -= std::min<i64>(result.Count, result.Position);

//this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
while (result.Position && !allowedModules.contains((result.FirstRow + result.Position) % 6)) {
--result.Position;
++result.Count;
}
}
//this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c
Expand Down Expand Up @@ -129,12 +127,11 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(c
: IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true).Count)
, TableNum(tableNum)
, Owner(owner)
, TableSize(CalcCountToGenerate(owner, tableNum, false).Count)
{}

TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
TDataPortions result;
if (TableSize == 0) {
if (GetSize() == 0) {
return result;
}
TContexts ctxs;
Expand All @@ -149,6 +146,11 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds
auto positions = CalcCountToGenerate(Owner, TableNum, !Generated);
if (!Generated) {
Generated = positions.Position;
result.push_back(MakeIntrusive<TDataPortion>(
GetFullTableName(tdef->name),
TDataPortion::TSkip(),
Generated
));
if (const ui32 toSkip = positions.FirstRow + positions.Position - 1) {
row_skip(TableNum, toSkip);
if (tdef->flags & FL_PARENT) {
Expand All @@ -159,7 +161,7 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds
resetCountCount();
}
}
const auto count = TableSize > Generated ? std::min(ui64(TableSize - Generated), Owner.Params.BulkSize) : 0;
const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0;
if (!count) {
return result;
}
Expand Down
1 change: 0 additions & 1 deletion ydb/library/workload/tpcds/data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class TTpcdsWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBas
};
static TPositions CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState);
const TTpcdsWorkloadDataInitializerGenerator& Owner;
ui64 TableSize;
};
};

Expand Down
22 changes: 11 additions & 11 deletions ydb/library/workload/tpch/data_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,19 @@ TBulkDataGeneratorList TTpchWorkloadDataInitializerGenerator::DoGetBulkInitialDa
}


ui64 TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) {
ui64 TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum) {
if (tableNum == NONE) {
return 0;
}
ui64 position = 0;
if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdefs[tableNum].name)) {
position = owner.StateProcessor->GetState().at(tdefs[tableNum].name).Position;
}
if (tableNum >= NATION) {
return owner.GetProcessIndex() ? 0 : (tdefs[tableNum].base - position);
return owner.GetProcessIndex() ? 0 : tdefs[tableNum].base;
}
ui64 rowCount = tdefs[tableNum].base * owner.GetScale();
ui64 extraRows = 0;
if (owner.GetProcessIndex() + 1 >= owner.GetProcessCount()) {
extraRows = rowCount % owner.GetProcessCount();
}
return rowCount / owner.GetProcessCount() + extraRows - position;
return rowCount / owner.GetProcessCount() + extraRows;
}

TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum, TGeneratorStateProcessor* state)
Expand Down Expand Up @@ -112,15 +108,14 @@ TString TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTableN
}

TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum)
: IBulkDataGenerator(tdefs[tableNum].name, CalcCountToGenerate(owner, tableNum, true))
: IBulkDataGenerator(tdefs[tableNum].name, CalcCountToGenerate(owner, tableNum))
, TableNum(tableNum)
, Owner(owner)
, TableSize(CalcCountToGenerate(owner, tableNum, false))
{}

TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() {
TDataPortions result;
if (TableSize == 0) {
if (GetSize() == 0) {
return result;
}
TContexts ctxs;
Expand All @@ -138,11 +133,16 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWo
if (!!Owner.StateProcessor) {
if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) {
Generated = state->Position;
result.push_back(MakeIntrusive<TDataPortion>(
GetFullTableName(tdefs[TableNum].name),
TDataPortion::TSkip(),
Generated
));
GenSeed(TableNum, Generated);
}
}
}
const auto count = TableSize > Generated ? std::min(ui64(TableSize - Generated), Owner.Params.BulkSize) : 0;
const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0;
if (!count) {
return result;
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/library/workload/tpch/data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ class TTpchWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBase

private:
TString GetFullTableName(const char* table) const;
static ui64 CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum, bool useState);
static ui64 CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum);
const TTpchWorkloadDataInitializerGenerator& Owner;
ui64 TableSize;
};

};
Expand Down
8 changes: 7 additions & 1 deletion ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter {
auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) {
return TStatus(result.GetValueSync());
};
if (std::holds_alternative<NYdbWorkload::IBulkDataGenerator::TDataPortion::TSkip>(portion->MutableData())) {
return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues()));
}
if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
return Owner.TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult);
}
Expand Down Expand Up @@ -118,6 +121,9 @@ class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter {
}

TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override {
if (std::holds_alternative<NYdbWorkload::IBulkDataGenerator::TDataPortion::TSkip>(portion->MutableData())) {
return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues()));
}
if (auto* value = std::get_if<TValue>(&portion->MutableData())) {
return NThreading::MakeErrorFuture<TStatus>(std::make_exception_ptr(yexception() << "Not implemented"));
}
Expand Down Expand Up @@ -165,7 +171,7 @@ void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_pt
if (!res.IsSuccess()) {
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
AtomicIncrement(ErrorsCount);
} else {
} else if (data->GetSize()) {
Bar->AddProgress(data->GetSize());
}
AtomicDecrement(counter);
Expand Down

0 comments on commit 3a89b73

Please sign in to comment.