Skip to content

Commit

Permalink
Set position to state only if all data portions succesfully send
Browse files Browse the repository at this point in the history
  • Loading branch information
iddqdex committed Nov 24, 2024
1 parent 398fb41 commit 8a71cc6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
38 changes: 25 additions & 13 deletions ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,34 @@ class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter {
void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try {
TAtomic counter = 0;
for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
TVector<TAsyncStatus> sendingResults;
for (const auto& data: portions) {
AtomicIncrement(counter);
Writer->WriteDataPortion(data).Apply(
[data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
const auto& res = result.GetValueSync();
data->SetSendResult(res);
auto guard = Guard(Lock);
if (!res.IsSuccess()) {
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
AtomicIncrement(ErrorsCount);
} else if (data->GetSize()) {
Bar->AddProgress(data->GetSize());
}
AtomicDecrement(counter);
});
sendingResults.emplace_back(Writer->WriteDataPortion(data).Apply([&counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
AtomicDecrement(counter);
return result.GetValueSync();
}));
}
NThreading::WaitAll(sendingResults).Apply([this, sendingResults, portions](const NThreading::TFuture<void>&) {
bool success = true;
for (size_t i = 0; i < portions.size(); ++i) {
const auto& data = portions[i];
const auto& res = sendingResults[i].GetValueSync();
auto guard = Guard(Lock);
if (!res.IsSuccess()) {
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
AtomicIncrement(ErrorsCount);
success = false;
} else if (data->GetSize()) {
Bar->AddProgress(data->GetSize());
}
}
if (success) {
for (size_t i = 0; i < portions.size(); ++i) {
portions[i]->SetSendResult(sendingResults[i].GetValueSync());
}
}
});
if (AtomicGet(ErrorsCount)) {
break;
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/tests/olap/load/test_tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,18 @@ class TestTpch100(TpchSuiteBase):


class TestTpch1000(TpchSuiteBase):
tables_size: dict[str, int] = {
'lineitem': 5999989709,
}
scale: int = 1000
check_canonical: bool = False
timeout = max(TpchSuiteBase.timeout, 3600.)


class TestTpch10000(TpchSuiteBase):
tables_size: dict[str, int] = {
'lineitem': 59999994267,
}
scale: int = 10000
iterations: int = 2
check_canonical: bool = False
Expand Down

0 comments on commit 8a71cc6

Please sign in to comment.