Skip to content

Commit

Permalink
Optimize waiting for index creation during restore in YDB CLI (#8936)
Browse files Browse the repository at this point in the history
  • Loading branch information
pixcc authored Sep 10, 2024
1 parent 0766f41 commit ce3201c
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 69 deletions.
108 changes: 39 additions & 69 deletions ydb/public/lib/ydb_cli/dump/restore_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,60 +51,26 @@ Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TString& fsPath) {
return proto;
}

bool HasRunningIndexBuilds(TOperationClient& client, const TString& dbPath) {
const ui64 pageSize = 100;
TString pageToken;

do {
const ui32 maxRetries = 8;
TDuration retrySleep = TDuration::MilliSeconds(100);

for (ui32 retryNumber = 0; retryNumber <= maxRetries; ++retryNumber) {
auto operations = client.List<TBuildIndexOperation>(pageSize, pageToken).GetValueSync();

if (!operations.IsSuccess()) {
if (retryNumber == maxRetries) {
Y_ENSURE(false, "Cannot list operations");
}

switch (operations.GetStatus()) {
case EStatus::ABORTED:
break;
TStatus WaitForIndexBuild(TOperationClient& client, const TOperation::TOperationId& id) {
TDuration retrySleep = TDuration::MilliSeconds(100);
while (true) {
auto operation = client.Get<TBuildIndexOperation>(id).GetValueSync();
if (!operation.Status().IsTransportError()) {
switch (operation.Status().GetStatus()) {
case EStatus::OVERLOADED:
case EStatus::CLIENT_RESOURCE_EXHAUSTED:
case EStatus::UNAVAILABLE:
case EStatus::TRANSPORT_UNAVAILABLE:
NConsoleClient::ExponentialBackoff(retrySleep);
break;
default:
Y_ENSURE(false, "Unexpected status while trying to list operations: " << operations.GetStatus());
}

continue;
}

for (const auto& operation : operations.GetList()) {
if (operation.Metadata().Path != dbPath) {
continue;
}

switch (operation.Metadata().State) {
case EBuildIndexState::Preparing:
case EBuildIndexState::TransferData:
case EBuildIndexState::Applying:
case EBuildIndexState::Cancellation:
case EBuildIndexState::Rejection:
return true;
case EStatus::STATUS_UNDEFINED:
break; // retry
default:
break;
}
}

pageToken = operations.NextPageToken();
return operation.Status();
}
}
} while (pageToken != "0");
NConsoleClient::ExponentialBackoff(retrySleep, TDuration::Minutes(1));
}
}

return false;
bool IsOperationStarted(TStatus operationStatus) {
return operationStatus.IsSuccess() || operationStatus.GetStatus() == EStatus::STATUS_UNDEFINED;
}

} // anonymous
Expand Down Expand Up @@ -416,36 +382,40 @@ TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString&

TRestoreResult TRestoreClient::RestoreIndexes(const TString& dbPath, const TTableDescription& desc) {
TMaybe<TTableDescription> actualDesc;
auto descResult = DescribeTable(TableClient, dbPath, actualDesc);
if (!descResult.IsSuccess()) {
return Result<TRestoreResult>(dbPath, std::move(descResult));
}

for (const auto& index : desc.GetIndexDescriptions()) {
// check (and wait) for unuexpected index buils
while (HasRunningIndexBuilds(OperationClient, dbPath)) {
actualDesc.Clear();
Sleep(TDuration::Minutes(1));
}

if (!actualDesc) {
auto descResult = DescribeTable(TableClient, dbPath, actualDesc);
if (!descResult.IsSuccess()) {
return Result<TRestoreResult>(dbPath, std::move(descResult));
}
}

if (FindPtr(actualDesc->GetIndexDescriptions(), index)) {
continue;
}

auto status = TableClient.RetryOperationSync([&dbPath, &index](TSession session) {
TOperation::TOperationId buildIndexId;
auto buildIndexStatus = TableClient.RetryOperationSync([&, &outId = buildIndexId](TSession session) {
auto settings = TAlterTableSettings().AppendAddIndexes(index);
return session.AlterTableLong(dbPath, settings).GetValueSync().Status();
auto result = session.AlterTableLong(dbPath, settings).GetValueSync();
if (IsOperationStarted(result.Status())) {
outId = result.Id();
}
return result.Status();
});
if (!status.IsSuccess() && status.GetStatus() != EStatus::STATUS_UNDEFINED) {
return Result<TRestoreResult>(dbPath, std::move(status));

if (!IsOperationStarted(buildIndexStatus)) {
return Result<TRestoreResult>(dbPath, std::move(buildIndexStatus));
}

auto waitForIndexBuildStatus = WaitForIndexBuild(OperationClient, buildIndexId);
if (!waitForIndexBuildStatus.IsSuccess()) {
return Result<TRestoreResult>(dbPath, std::move(waitForIndexBuildStatus));
}

// wait for expected index build
while (HasRunningIndexBuilds(OperationClient, dbPath)) {
Sleep(TDuration::Minutes(1));
auto forgetStatus = NConsoleClient::RetryFunction([&]() {
return OperationClient.Forget(buildIndexId).GetValueSync();
});
if (!forgetStatus.IsSuccess()) {
return Result<TRestoreResult>(dbPath, std::move(forgetStatus));
}
}

Expand Down
56 changes: 56 additions & 0 deletions ydb/services/ydb/backup_ut/ydb_backup_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <google/protobuf/util/message_differencer.h>

using namespace NYdb;
using namespace NYdb::NOperation;
using namespace NYdb::NTable;

namespace NYdb::NTable {
Expand Down Expand Up @@ -98,12 +99,30 @@ auto CreateMinPartitionsChecker(ui32 expectedMinPartitions, const TString& debug
};
}

auto CreateHasIndexChecker(const TString& indexName) {
return [=](const TTableDescription& tableDescription) {
for (const auto& indexDesc : tableDescription.GetIndexDescriptions()) {
if (indexDesc.GetIndexName() == indexName) {
return true;
}
}
return false;
};
}

void CheckTableDescription(TSession& session, const TString& path, auto&& checker,
const TDescribeTableSettings& settings = {}
) {
checker(GetTableDescription(session, path, settings));
}

void CheckBuildIndexOperationsCleared(TDriver& driver) {
TOperationClient operationClient(driver);
const auto result = operationClient.List<TBuildIndexOperation>().GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), "issues:\n" << result.GetIssues().ToString());
UNIT_ASSERT_C(result.GetList().empty(), "Build index operations aren't cleared:\n" << result.ToJsonString());
}

using TBackupFunction = std::function<void(const char*)>;
using TRestoreFunction = std::function<void(const char*)>;

Expand Down Expand Up @@ -417,6 +436,43 @@ Y_UNIT_TEST_SUITE(BackupRestore) {
}

// TO DO: test index impl table split boundaries restoration from a backup

Y_UNIT_TEST(BasicRestoreTableWithIndex) {
TKikimrWithGrpcAndRootSchema server;
auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%d", server.GetPort())));
TTableClient tableClient(driver);
auto session = tableClient.GetSession().ExtractValueSync().GetSession();

constexpr const char* table = "/Root/table";
constexpr const char* index = "byValue";
ExecuteDataDefinitionQuery(session, Sprintf(R"(
CREATE TABLE `%s` (
Key Uint32,
Value Uint32,
PRIMARY KEY (Key),
INDEX %s GLOBAL ON (Value)
);
)",
table, index
));

TTempDir tempDir;
const auto& pathToBackup = tempDir.Path();
// TO DO: implement NDump::TClient::Dump and call it instead of BackupFolder
NYdb::NBackup::BackupFolder(driver, "/Root", ".", pathToBackup, {}, false, false);

NDump::TClient backupClient(driver);

// restore deleted table
ExecuteDataDefinitionQuery(session, Sprintf(R"(
DROP TABLE `%s`;
)", table
));
Restore(backupClient, pathToBackup, "/Root");

CheckTableDescription(session, table, CreateHasIndexChecker(index));
CheckBuildIndexOperationsCleared(driver);
}
}

Y_UNIT_TEST_SUITE(BackupRestoreS3) {
Expand Down

0 comments on commit ce3201c

Please sign in to comment.