diff --git a/ydb/library/backup/backup.cpp b/ydb/library/backup/backup.cpp index 8998f626374c..28125fb533ae 100644 --- a/ydb/library/backup/backup.cpp +++ b/ydb/library/backup/backup.cpp @@ -1,16 +1,18 @@ #include "backup.h" #include "db_iterator.h" -#include "query_builder.h" -#include "query_uploader.h" #include "util.h" #include #include #include #include +#include +#include #include +#include #include +#include #include #include @@ -62,17 +64,6 @@ static void VerifyStatus(TStatus status, TString explain = "") { } } -static TString NameFromDbPath(const TString& path) { - TPathSplitUnix split(path); - return TString{split.back()}; -} - -static TString ParentPathFromDbPath(const TString& path) { - TPathSplitUnix split(path); - split.pop_back(); - return split.Reconstruct(); -} - static TString JoinDatabasePath(const TString& basePath, const TString& path) { if (basePath.empty()) { return path; @@ -200,7 +191,7 @@ void PrintValue(IOutputStream& out, TValueParser& parser) { } TMaybe ProcessResultSet(TStringStream& ss, - TResultSetParser resultSetParser, TFile* dataFile, const NTable::TTableDescription* desc) { + TResultSetParser& resultSetParser, TFile* dataFile, const NTable::TTableDescription* desc) { TMaybe lastReadPK; TStackVec colParsers; @@ -304,7 +295,8 @@ TMaybe TryReadTable(TDriver driver, const NTable::TTableDescription& des if (resultSetCurrent.Truncated()) { nextResult = iter->ReadNext(); } - lastReadPK = ProcessResultSet(ss, resultSetCurrent, &tmpFile, &desc); + auto resultSetParser = TResultSetParser(resultSetCurrent); + lastReadPK = ProcessResultSet(ss, resultSetParser, &tmpFile, &desc); // Next if (resultSetCurrent.Truncated()) { @@ -677,7 +669,7 @@ void CheckedCreateBackupFolder(const TFsPath& folderPath) { // relDbPath - relative path to directory/table to be backuped // folderPath - relative path to folder in local filesystem where backup will be stored -void BackupFolder(TDriver driver, const TString& database, const TString& relDbPath, TFsPath folderPath, +void BackupFolder(const TDriver& driver, const TString& database, const TString& relDbPath, TFsPath folderPath, const TVector& exclusionPatterns, bool schemaOnly, bool useConsistentCopyTable, bool avoidCopy, bool savePartialResult, bool preservePoolKinds, bool ordered) { TString temporalBackupPostfix = CreateTemporalBackupName(); @@ -721,301 +713,4 @@ void BackupFolder(TDriver driver, const TString& database, const TString& relDbP LOG_I("Backup completed successfully"); } -//////////////////////////////////////////////////////////////////////////////// -// Restore -//////////////////////////////////////////////////////////////////////////////// - -TString ProcessColumnType(const TString& name, TTypeParser parser, NTable::TTableBuilder *builder, std::optional sequenceDescription) { - TStringStream ss; - ss << "name: " << name << "; "; - if (parser.GetKind() == TTypeParser::ETypeKind::Optional) { - ss << " optional; "; - parser.OpenOptional(); - } - if (sequenceDescription.has_value()) { - ss << "serial; "; - } - ss << "kind: " << parser.GetKind() << "; "; - switch (parser.GetKind()) { - case TTypeParser::ETypeKind::Primitive: - ss << " type_id: " << parser.GetPrimitive() << "; "; - if (builder) { - if (sequenceDescription.has_value()) { - builder->AddSerialColumn(name, parser.GetPrimitive(), std::move(*sequenceDescription)); - } else { - builder->AddNullableColumn(name, parser.GetPrimitive()); - } - } - break; - case TTypeParser::ETypeKind::Decimal: - ss << " decimal_type: {" - << " precision: " << ui32(parser.GetDecimal().Precision) - << " scale: " << ui32(parser.GetDecimal().Scale) - << "}; "; - if (builder) { - builder->AddNullableColumn(name, parser.GetDecimal()); - } - break; - default: - Y_ENSURE(false, "Unexpected type kind# " << parser.GetKind() << " for column name# " << name.Quote()); - } - return ss.Str(); -} - -NTable::TTableDescription TableDescriptionFromProto(const Ydb::Table::CreateTableRequest& proto) { - NTable::TTableBuilder builder; - - std::optional sequenceDescription; - for (const auto &col : proto.Getcolumns()) { - if (col.from_sequence().name() == "_serial_column_" + col.name()) { - NTable::TSequenceDescription currentSequenceDescription; - if (col.from_sequence().has_set_val()) { - NTable::TSequenceDescription::TSetVal setVal; - setVal.NextUsed = col.from_sequence().set_val().next_used(); - setVal.NextValue = col.from_sequence().set_val().next_value(); - currentSequenceDescription.SetVal = std::move(setVal); - } - sequenceDescription = std::move(currentSequenceDescription); - } - LOG_D("AddColumn: " << ProcessColumnType(col.Getname(), TType(col.Gettype()), &builder, std::move(sequenceDescription))); - } - - for (const auto &primary : proto.Getprimary_key()) { - LOG_D("SetPrimaryKeyColumn: name: " << primary); - } - builder.SetPrimaryKeyColumns({proto.Getprimary_key().cbegin(), proto.Getprimary_key().cend()}); - - return builder.Build(); -} - -NTable::TTableDescription TableDescriptionFromFile(const TString& filePath) { - TFile file(filePath, OpenExisting | RdOnly); - TString str = TString::Uninitialized(file.GetLength()); - file.Read(str.Detach(), file.GetLength()); - - Ydb::Table::CreateTableRequest proto; - google::protobuf::TextFormat::ParseFromString(str, &proto); - return TableDescriptionFromProto(proto); -} - -TString SerializeColumnsToString(const TVector& columns, TVector primary) { - Sort(primary); - TStringStream ss; - for (const auto& col : columns) { - ss << " "; - if (BinarySearch(primary.cbegin(), primary.cend(), col.Name)) { - ss << "primary; "; - } - ss << ProcessColumnType(col.Name, col.Type, nullptr, std::nullopt) << Endl; - } - // Cerr << "Parse column to : " << ss.Str() << Endl; - return ss.Str(); -} - -void CheckTableDescriptionIsSame(const NTable::TTableDescription& backupDesc, - const NTable::TTableDescription& realDesc) { - if (backupDesc.GetColumns() != realDesc.GetColumns() || - backupDesc.GetPrimaryKeyColumns() != realDesc.GetPrimaryKeyColumns()) { - LOG_E("Error"); - LOG_E("Table scheme from backup:"); - LOG_E(SerializeColumnsToString(backupDesc.GetColumns(), backupDesc.GetPrimaryKeyColumns())); - LOG_E("Table scheme from database:"); - LOG_E(SerializeColumnsToString(realDesc.GetColumns(), realDesc.GetPrimaryKeyColumns())); - } else { - LOG_E("Ok"); - } -} - -void UploadDataIntoTable(TDriver driver, const NTable::TTableDescription& tableDesc, const TString& relPath, - const TString& absPath, TFsPath folderPath, const TRestoreFolderParams& params) { - Y_ENSURE(!folderPath.Child(INCOMPLETE_DATA_FILE_NAME).Exists(), - "There is incomplete data file in folder, path# " << TString(folderPath).Quote()); - ui32 fileCounter = 0; - TFsPath dataFileName = folderPath.Child(CreateDataFileName(fileCounter++)); - - if (params.UseBulkUpsert) { - LOG_D("Going to BulkUpsert into table# " << absPath.Quote()); - } - while (dataFileName.Exists()) { - LOG_D("Going to read new data file, fileName# " << dataFileName); - - - TUploader::TOptions opts; - if (params.UploadBandwidthBPS) { - opts.Rate = (opts.Interval.Seconds() * params.UploadBandwidthBPS + IO_BUFFER_SIZE - 1) / IO_BUFFER_SIZE; - LOG_D("Custom bandwidth limit is specified, will use bandwidth# " - << HumanReadableSize(params.UploadBandwidthBPS, SF_BYTES) << "B/s" - << " RPS# " << double(opts.Rate) / opts.Interval.Seconds() << " reqs/s" - << " IO buffer size# " << HumanReadableSize(IO_BUFFER_SIZE, SF_BYTES)); - } - if (params.MaxUploadRps) { - opts.Rate = params.MaxUploadRps * opts.Interval.Seconds(); - } - opts.Rate = Max(1, opts.Rate); - - TQueryFromFileIterator it(relPath, dataFileName, tableDesc.GetColumns(), IO_BUFFER_SIZE, params.MaxRowsPerQuery, - params.MaxBytesPerQuery); - NTable::TTableClient client(driver); - TUploader uploader(opts, client, it.GetQueryString()); - if (!params.UseBulkUpsert) { - LOG_D("Query string:\n" << it.GetQueryString()); - } - - while (!it.Empty()) { - bool ok = false; - if (params.UseBulkUpsert) { - ok = uploader.Push(absPath, it.ReadNextGetValue()); - } else { - ok = uploader.Push(it.ReadNextGetParams()); - } - Y_ENSURE(ok, "Error in uploader.Push()"); - } - uploader.WaitAllJobs(); - dataFileName = folderPath.Child(CreateDataFileName(fileCounter++)); - } -} - -void RestoreTable(TDriver driver, const TString& database, const TString& prefix, TFsPath folderPath, - const TRestoreFolderParams& params) { - Y_ENSURE(!folderPath.Child(INCOMPLETE_FILE_NAME).Exists(), - "There is incomplete file in folder, path# " << TString(folderPath).Quote()); - NTable::TTableClient client(driver); - - const TString relPath = JoinDatabasePath(prefix, folderPath.GetName()); - const TString absPath = JoinDatabasePath(database, relPath); - LOG_D("Restore table from folder: " << folderPath << " in database path# " << absPath.Quote()); - - NTable::TTableDescription tableDesc = TableDescriptionFromFile(folderPath.Child(SCHEME_FILE_NAME)); - - - if (params.OnlyCheck) { - LOG_E("Check table: " << absPath.Quote() << "..."); - NTable::TTableDescription tableDescReal = DescribeTable(driver, absPath); - CheckTableDescriptionIsSame(tableDesc, tableDescReal); - } else { - // Create Table - TStatus status = client.RetryOperationSync([absPath, &tableDesc](NTable::TSession session) { - auto result = session.CreateTable(absPath, std::move(tableDesc)).GetValueSync(); - return result; - }); - VerifyStatus(status, TStringBuilder() << "CreateTable on path: " << absPath.Quote()); - LOG_D("Table is created, path: " << absPath.Quote()); - if (!params.SchemaOnly) { - UploadDataIntoTable(driver, tableDesc, relPath, absPath, folderPath, params); - } - } -} - -void RestoreFolderImpl(TDriver driver, const TString& database, const TString& prefix, TFsPath folderPath, - const TRestoreFolderParams& params) { - LOG_D("Restore folder: " << folderPath); - Y_ENSURE(folderPath, "folderPath cannot be empty on restore, please specify path to folder containing backup"); - Y_ENSURE(folderPath.IsDirectory(), "Specified folderPath " << folderPath.GetPath().Quote() << " must be a folder"); - Y_ENSURE(!folderPath.Child(INCOMPLETE_FILE_NAME).Exists(), - "There is incomplete file in folder, path# " << TString(folderPath).Quote()); - - if (prefix != "/" && !params.OnlyCheck) { - LOG_D("Create prefix folder: " << prefix); - NScheme::TSchemeClient client(driver); - TString path = JoinDatabasePath(database, prefix); - TStatus status = client.MakeDirectory(path).GetValueSync(); - VerifyStatus(status, TStringBuilder() << "MakeDirectory on path: " << path.Quote()); - } - - if (folderPath.Child(SCHEME_FILE_NAME).Exists()) { - RestoreTable(driver, database, prefix, folderPath, params); - } else { - TVector children; - folderPath.List(children); - for (const auto& child : children) { - Y_ENSURE(folderPath.IsDirectory(), "Non directory and non table folder inside backup tree, " - "path: " << child.GetPath().Quote()); - if (child.Child(SCHEME_FILE_NAME).Exists()) { - RestoreTable(driver, database, prefix, child, params); - } else { - RestoreFolderImpl(driver, database, JoinDatabasePath(prefix, child.GetName()), child, params); - } - } - } -} - -static bool IsNamePresentedInDir(NScheme::TListDirectoryResult listResult, const TString& name) { - for (const auto& child : listResult.GetChildren()) { - if (child.Name == name) { - return true; - } - } - return false; -} - -void CheckTablesAbsence(NScheme::TSchemeClient client, const TString& database, const TString& prefix, TFsPath folderPath) { - Y_ENSURE(folderPath, "folderPath cannot be empty on restore, please specify path to folder containing backup"); - Y_ENSURE(folderPath.IsDirectory(), "Specified folderPath " << folderPath.GetPath().Quote() << " must be a folder"); - Y_ENSURE(!folderPath.Child(INCOMPLETE_FILE_NAME).Exists(), - "There is incomplete file in folder, path# " << TString(folderPath).Quote()); - - const TString path = JoinDatabasePath(database, prefix); - TString name = folderPath.GetName(); - - NScheme::TListDirectoryResult listResult = client.ListDirectory(path).GetValueSync(); - VerifyStatus(listResult, TStringBuilder() << "ListDirectory, path: " << path.Quote()); - - const bool isTable = folderPath.Child(SCHEME_FILE_NAME).Exists(); - if (isTable) { - Y_ENSURE(!IsNamePresentedInDir(listResult, name), "Table with name# " << name.Quote() - << " is presented in path# " << path.Quote()); - LOG_D("\tOk! Table " << name.Quote() << " is absent in database path# " << path.Quote()); - } else { - TVector children; - folderPath.List(children); - for (const auto& child : children) { - const bool isChildTable = child.Child(SCHEME_FILE_NAME).Exists(); - const TString childName = child.GetName(); - const bool isChildPresented = IsNamePresentedInDir(listResult, childName); - if (isChildTable) { - Y_ENSURE(!isChildPresented, "Table with name# " << childName.Quote() - << " is presented in path# " << path.Quote()); - LOG_D("\tOk! Table " << childName.Quote() << " is absent in database path# " - << path.Quote()); - } else { - if (isChildPresented) { - LOG_D("\tOk! Directory " << childName.Quote() << " is presented in database path# " - << path.Quote() << ", so check tables in that dir"); - CheckTablesAbsence(client, database, JoinDatabasePath(prefix, child.GetName()), child); - } else { - LOG_D("\tOk! Directory " << childName.Quote() << " is absent in database path# " - << path.Quote()); - } - } - } - } -} - -void RestoreFolder(TDriver driver, const TString& database, const TString& prefix, const TFsPath folderPath, - const TRestoreFolderParams& params) { - NScheme::TSchemeClient client(driver); - Y_ENSURE(prefix, "restore prefix cannot be empty, database# " << database.Quote() << " prefix# " << prefix.Quote()); - - if (params.CheckTablesAbsence && !params.OnlyCheck) { - LOG_D("Check absence of tables to be restored"); - if (prefix != "/") { - TString path = JoinDatabasePath(database, prefix); - TString parent = ParentPathFromDbPath(path); - TString name = NameFromDbPath(path); - LOG_D("Going to list parent# " << parent.Quote() << " for path path# " << path.Quote()); - NScheme::TListDirectoryResult listResult = client.ListDirectory(parent).GetValueSync(); - VerifyStatus(listResult, TStringBuilder() << "ListDirectory, path# " << parent.Quote()); - if (IsNamePresentedInDir(listResult, name)) { - CheckTablesAbsence(client, database, prefix, folderPath); - } else { - LOG_D("\tOk! restore directory# " << path.Quote() << " is absent in database"); - } - } else { - CheckTablesAbsence(client, database, prefix, folderPath); - } - LOG_D("Check done, everything is Ok"); - } - RestoreFolderImpl(driver, database, prefix, folderPath, params); -} - } // NYdb::NBackup diff --git a/ydb/library/backup/backup.h b/ydb/library/backup/backup.h index a2c15f5ccff4..9da17496f443 100644 --- a/ydb/library/backup/backup.h +++ b/ydb/library/backup/backup.h @@ -1,16 +1,24 @@ #pragma once -#include -#include -#include - -#include +#include +#include #include -#include +#include #include +class TRegExMatch; + namespace NYdb { + +class TDriver; +class TResultSetParser; +class TValue; + +namespace NTable { + class TTableDescription; +} + namespace NBackup { class TYdbErrorException : public yexception { @@ -23,39 +31,25 @@ class TYdbErrorException : public yexception { void LogToStderr() const; }; -void BackupFolder(TDriver driver, const TString& database, const TString& relDbPath, TFsPath folderPath, - const TVector& exclusionPatterns, - bool schemaOnly, bool useConsistentCopyTable, bool avoidCopy = false, bool savePartialResult = false, - bool preservePoolKinds = false, bool ordered = false); - -struct TRestoreFolderParams { - bool OnlyCheck = false; - bool SchemaOnly = false; - bool CheckTablesAbsence = true; - //////////////////////////////////////// - // Only one parameters set can be used. Either - ui64 UploadBandwidthBPS = 0; - // or - ui64 MaxRowsPerQuery = 0; - ui64 MaxBytesPerQuery = 0; - ui64 MaxUploadRps = 0; - //////////////////////////////////////// - bool UseBulkUpsert = false; - - bool CheckRps() const { - bool oldBPSLimit = UploadBandwidthBPS > 0; - bool newRpsLimit = MaxRowsPerQuery > 0 || MaxBytesPerQuery > 0 || MaxUploadRps > 0; - return !oldBPSLimit || !newRpsLimit; - } -}; - -void RestoreFolder(TDriver driver, const TString& database, const TString& prefix, const TFsPath folderPath, - const TRestoreFolderParams& params); +void BackupFolder( + const TDriver& driver, + const TString& database, + const TString& relDbPath, + TFsPath folderPath, + const TVector& exclusionPatterns, + bool schemaOnly, + bool useConsistentCopyTable, + bool avoidCopy = false, + bool savePartialResult = false, + bool preservePoolKinds = false, + bool ordered = false); // For unit-tests only -TMaybe ProcessResultSet(TStringStream& ss, TResultSetParser resultSetParser, - TFile* dataFile = nullptr, const NTable::TTableDescription* desc = nullptr); -void PrintValue(IOutputStream& out, TValueParser& parser); +TMaybe ProcessResultSet( + TStringStream& ss, + TResultSetParser& resultSetParser, + TFile* dataFile = nullptr, + const NTable::TTableDescription* desc = nullptr); } // NBackup } // NYdb diff --git a/ydb/library/backup/ut/ut.cpp b/ydb/library/backup/ut/ut.cpp index 081154307d63..ef6bb5bfccac 100644 --- a/ydb/library/backup/ut/ut.cpp +++ b/ydb/library/backup/ut/ut.cpp @@ -3,13 +3,14 @@ #include #include +#include #include +#include #include #include #include -#include namespace NYdb { @@ -20,10 +21,11 @@ void TestResultSetParsedOk(const TString& protoStr, const TString& expect) { google::protobuf::TextFormat::ParseFromString(protoStr, &proto); TResultSet result(proto); + auto resultSetParser = TResultSetParser(result); TStringStream got; got.Reserve(1 << 10); - NBackup::ProcessResultSet(got, result); + NBackup::ProcessResultSet(got, resultSetParser); UNIT_ASSERT(got.Size()); UNIT_ASSERT_NO_DIFF(got.Str(), expect); } diff --git a/ydb/library/backup/ya.make b/ydb/library/backup/ya.make index 945c042c4dea..b46410924448 100644 --- a/ydb/library/backup/ya.make +++ b/ydb/library/backup/ya.make @@ -5,16 +5,15 @@ PEERDIR( library/cpp/logger library/cpp/regex/pcre library/cpp/string_utils/quote - util ydb/library/dynumber - ydb/public/api/grpc ydb/public/api/protos ydb/public/lib/ydb_cli/common ydb/public/lib/ydb_cli/dump/util ydb/public/lib/yson_value - ydb/public/sdk/cpp/client/ydb_proto - ydb/public/sdk/cpp/client/ydb_scheme + ydb/public/sdk/cpp/client/ydb_driver + ydb/public/sdk/cpp/client/ydb_result ydb/public/sdk/cpp/client/ydb_table + ydb/public/sdk/cpp/client/ydb_value ) SRCS( diff --git a/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp b/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp index 8932ad76846d..350882830a7c 100644 --- a/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp +++ b/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp @@ -12,6 +12,7 @@ #include +#include #include #include