Skip to content

Commit

Permalink
Get rid of unused code, cleanup dependencies (#10777)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Oct 23, 2024
1 parent 515f32e commit 78f6425
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 356 deletions.
321 changes: 8 additions & 313 deletions ydb/library/backup/backup.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
#include "backup.h"
#include "db_iterator.h"
#include "query_builder.h"
#include "query_uploader.h"
#include "util.h"

#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>

#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/regex/pcre/regexp.h>
#include <library/cpp/string_utils/quote/quote.h>

#include <util/datetime/base.h>
Expand Down Expand Up @@ -62,17 +64,6 @@ static void VerifyStatus(TStatus status, TString explain = "") {
}
}

static TString NameFromDbPath(const TString& path) {
TPathSplitUnix split(path);
return TString{split.back()};
}

static TString ParentPathFromDbPath(const TString& path) {
TPathSplitUnix split(path);
split.pop_back();
return split.Reconstruct();
}

static TString JoinDatabasePath(const TString& basePath, const TString& path) {
if (basePath.empty()) {
return path;
Expand Down Expand Up @@ -200,7 +191,7 @@ void PrintValue(IOutputStream& out, TValueParser& parser) {
}

TMaybe<TValue> ProcessResultSet(TStringStream& ss,
TResultSetParser resultSetParser, TFile* dataFile, const NTable::TTableDescription* desc) {
TResultSetParser& resultSetParser, TFile* dataFile, const NTable::TTableDescription* desc) {
TMaybe<TValue> lastReadPK;

TStackVec<TValueParser*, 32> colParsers;
Expand Down Expand Up @@ -304,7 +295,8 @@ TMaybe<TValue> TryReadTable(TDriver driver, const NTable::TTableDescription& des
if (resultSetCurrent.Truncated()) {
nextResult = iter->ReadNext();
}
lastReadPK = ProcessResultSet(ss, resultSetCurrent, &tmpFile, &desc);
auto resultSetParser = TResultSetParser(resultSetCurrent);
lastReadPK = ProcessResultSet(ss, resultSetParser, &tmpFile, &desc);

// Next
if (resultSetCurrent.Truncated()) {
Expand Down Expand Up @@ -677,7 +669,7 @@ void CheckedCreateBackupFolder(const TFsPath& folderPath) {

// relDbPath - relative path to directory/table to be backuped
// folderPath - relative path to folder in local filesystem where backup will be stored
void BackupFolder(TDriver driver, const TString& database, const TString& relDbPath, TFsPath folderPath,
void BackupFolder(const TDriver& driver, const TString& database, const TString& relDbPath, TFsPath folderPath,
const TVector<TRegExMatch>& exclusionPatterns,
bool schemaOnly, bool useConsistentCopyTable, bool avoidCopy, bool savePartialResult, bool preservePoolKinds, bool ordered) {
TString temporalBackupPostfix = CreateTemporalBackupName();
Expand Down Expand Up @@ -721,301 +713,4 @@ void BackupFolder(TDriver driver, const TString& database, const TString& relDbP
LOG_I("Backup completed successfully");
}

////////////////////////////////////////////////////////////////////////////////
// Restore
////////////////////////////////////////////////////////////////////////////////

TString ProcessColumnType(const TString& name, TTypeParser parser, NTable::TTableBuilder *builder, std::optional<NTable::TSequenceDescription> sequenceDescription) {
TStringStream ss;
ss << "name: " << name << "; ";
if (parser.GetKind() == TTypeParser::ETypeKind::Optional) {
ss << " optional; ";
parser.OpenOptional();
}
if (sequenceDescription.has_value()) {
ss << "serial; ";
}
ss << "kind: " << parser.GetKind() << "; ";
switch (parser.GetKind()) {
case TTypeParser::ETypeKind::Primitive:
ss << " type_id: " << parser.GetPrimitive() << "; ";
if (builder) {
if (sequenceDescription.has_value()) {
builder->AddSerialColumn(name, parser.GetPrimitive(), std::move(*sequenceDescription));
} else {
builder->AddNullableColumn(name, parser.GetPrimitive());
}
}
break;
case TTypeParser::ETypeKind::Decimal:
ss << " decimal_type: {"
<< " precision: " << ui32(parser.GetDecimal().Precision)
<< " scale: " << ui32(parser.GetDecimal().Scale)
<< "}; ";
if (builder) {
builder->AddNullableColumn(name, parser.GetDecimal());
}
break;
default:
Y_ENSURE(false, "Unexpected type kind# " << parser.GetKind() << " for column name# " << name.Quote());
}
return ss.Str();
}

NTable::TTableDescription TableDescriptionFromProto(const Ydb::Table::CreateTableRequest& proto) {
NTable::TTableBuilder builder;

std::optional<NTable::TSequenceDescription> sequenceDescription;
for (const auto &col : proto.Getcolumns()) {
if (col.from_sequence().name() == "_serial_column_" + col.name()) {
NTable::TSequenceDescription currentSequenceDescription;
if (col.from_sequence().has_set_val()) {
NTable::TSequenceDescription::TSetVal setVal;
setVal.NextUsed = col.from_sequence().set_val().next_used();
setVal.NextValue = col.from_sequence().set_val().next_value();
currentSequenceDescription.SetVal = std::move(setVal);
}
sequenceDescription = std::move(currentSequenceDescription);
}
LOG_D("AddColumn: " << ProcessColumnType(col.Getname(), TType(col.Gettype()), &builder, std::move(sequenceDescription)));
}

for (const auto &primary : proto.Getprimary_key()) {
LOG_D("SetPrimaryKeyColumn: name: " << primary);
}
builder.SetPrimaryKeyColumns({proto.Getprimary_key().cbegin(), proto.Getprimary_key().cend()});

return builder.Build();
}

NTable::TTableDescription TableDescriptionFromFile(const TString& filePath) {
TFile file(filePath, OpenExisting | RdOnly);
TString str = TString::Uninitialized(file.GetLength());
file.Read(str.Detach(), file.GetLength());

Ydb::Table::CreateTableRequest proto;
google::protobuf::TextFormat::ParseFromString(str, &proto);
return TableDescriptionFromProto(proto);
}

TString SerializeColumnsToString(const TVector<TColumn>& columns, TVector<TString> primary) {
Sort(primary);
TStringStream ss;
for (const auto& col : columns) {
ss << " ";
if (BinarySearch(primary.cbegin(), primary.cend(), col.Name)) {
ss << "primary; ";
}
ss << ProcessColumnType(col.Name, col.Type, nullptr, std::nullopt) << Endl;
}
// Cerr << "Parse column to : " << ss.Str() << Endl;
return ss.Str();
}

void CheckTableDescriptionIsSame(const NTable::TTableDescription& backupDesc,
const NTable::TTableDescription& realDesc) {
if (backupDesc.GetColumns() != realDesc.GetColumns() ||
backupDesc.GetPrimaryKeyColumns() != realDesc.GetPrimaryKeyColumns()) {
LOG_E("Error");
LOG_E("Table scheme from backup:");
LOG_E(SerializeColumnsToString(backupDesc.GetColumns(), backupDesc.GetPrimaryKeyColumns()));
LOG_E("Table scheme from database:");
LOG_E(SerializeColumnsToString(realDesc.GetColumns(), realDesc.GetPrimaryKeyColumns()));
} else {
LOG_E("Ok");
}
}

void UploadDataIntoTable(TDriver driver, const NTable::TTableDescription& tableDesc, const TString& relPath,
const TString& absPath, TFsPath folderPath, const TRestoreFolderParams& params) {
Y_ENSURE(!folderPath.Child(INCOMPLETE_DATA_FILE_NAME).Exists(),
"There is incomplete data file in folder, path# " << TString(folderPath).Quote());
ui32 fileCounter = 0;
TFsPath dataFileName = folderPath.Child(CreateDataFileName(fileCounter++));

if (params.UseBulkUpsert) {
LOG_D("Going to BulkUpsert into table# " << absPath.Quote());
}
while (dataFileName.Exists()) {
LOG_D("Going to read new data file, fileName# " << dataFileName);


TUploader::TOptions opts;
if (params.UploadBandwidthBPS) {
opts.Rate = (opts.Interval.Seconds() * params.UploadBandwidthBPS + IO_BUFFER_SIZE - 1) / IO_BUFFER_SIZE;
LOG_D("Custom bandwidth limit is specified, will use bandwidth# "
<< HumanReadableSize(params.UploadBandwidthBPS, SF_BYTES) << "B/s"
<< " RPS# " << double(opts.Rate) / opts.Interval.Seconds() << " reqs/s"
<< " IO buffer size# " << HumanReadableSize(IO_BUFFER_SIZE, SF_BYTES));
}
if (params.MaxUploadRps) {
opts.Rate = params.MaxUploadRps * opts.Interval.Seconds();
}
opts.Rate = Max<ui64>(1, opts.Rate);

TQueryFromFileIterator it(relPath, dataFileName, tableDesc.GetColumns(), IO_BUFFER_SIZE, params.MaxRowsPerQuery,
params.MaxBytesPerQuery);
NTable::TTableClient client(driver);
TUploader uploader(opts, client, it.GetQueryString());
if (!params.UseBulkUpsert) {
LOG_D("Query string:\n" << it.GetQueryString());
}

while (!it.Empty()) {
bool ok = false;
if (params.UseBulkUpsert) {
ok = uploader.Push(absPath, it.ReadNextGetValue());
} else {
ok = uploader.Push(it.ReadNextGetParams());
}
Y_ENSURE(ok, "Error in uploader.Push()");
}
uploader.WaitAllJobs();
dataFileName = folderPath.Child(CreateDataFileName(fileCounter++));
}
}

void RestoreTable(TDriver driver, const TString& database, const TString& prefix, TFsPath folderPath,
const TRestoreFolderParams& params) {
Y_ENSURE(!folderPath.Child(INCOMPLETE_FILE_NAME).Exists(),
"There is incomplete file in folder, path# " << TString(folderPath).Quote());
NTable::TTableClient client(driver);

const TString relPath = JoinDatabasePath(prefix, folderPath.GetName());
const TString absPath = JoinDatabasePath(database, relPath);
LOG_D("Restore table from folder: " << folderPath << " in database path# " << absPath.Quote());

NTable::TTableDescription tableDesc = TableDescriptionFromFile(folderPath.Child(SCHEME_FILE_NAME));


if (params.OnlyCheck) {
LOG_E("Check table: " << absPath.Quote() << "...");
NTable::TTableDescription tableDescReal = DescribeTable(driver, absPath);
CheckTableDescriptionIsSame(tableDesc, tableDescReal);
} else {
// Create Table
TStatus status = client.RetryOperationSync([absPath, &tableDesc](NTable::TSession session) {
auto result = session.CreateTable(absPath, std::move(tableDesc)).GetValueSync();
return result;
});
VerifyStatus(status, TStringBuilder() << "CreateTable on path: " << absPath.Quote());
LOG_D("Table is created, path: " << absPath.Quote());
if (!params.SchemaOnly) {
UploadDataIntoTable(driver, tableDesc, relPath, absPath, folderPath, params);
}
}
}

void RestoreFolderImpl(TDriver driver, const TString& database, const TString& prefix, TFsPath folderPath,
const TRestoreFolderParams& params) {
LOG_D("Restore folder: " << folderPath);
Y_ENSURE(folderPath, "folderPath cannot be empty on restore, please specify path to folder containing backup");
Y_ENSURE(folderPath.IsDirectory(), "Specified folderPath " << folderPath.GetPath().Quote() << " must be a folder");
Y_ENSURE(!folderPath.Child(INCOMPLETE_FILE_NAME).Exists(),
"There is incomplete file in folder, path# " << TString(folderPath).Quote());

if (prefix != "/" && !params.OnlyCheck) {
LOG_D("Create prefix folder: " << prefix);
NScheme::TSchemeClient client(driver);
TString path = JoinDatabasePath(database, prefix);
TStatus status = client.MakeDirectory(path).GetValueSync();
VerifyStatus(status, TStringBuilder() << "MakeDirectory on path: " << path.Quote());
}

if (folderPath.Child(SCHEME_FILE_NAME).Exists()) {
RestoreTable(driver, database, prefix, folderPath, params);
} else {
TVector<TFsPath> children;
folderPath.List(children);
for (const auto& child : children) {
Y_ENSURE(folderPath.IsDirectory(), "Non directory and non table folder inside backup tree, "
"path: " << child.GetPath().Quote());
if (child.Child(SCHEME_FILE_NAME).Exists()) {
RestoreTable(driver, database, prefix, child, params);
} else {
RestoreFolderImpl(driver, database, JoinDatabasePath(prefix, child.GetName()), child, params);
}
}
}
}

static bool IsNamePresentedInDir(NScheme::TListDirectoryResult listResult, const TString& name) {
for (const auto& child : listResult.GetChildren()) {
if (child.Name == name) {
return true;
}
}
return false;
}

void CheckTablesAbsence(NScheme::TSchemeClient client, const TString& database, const TString& prefix, TFsPath folderPath) {
Y_ENSURE(folderPath, "folderPath cannot be empty on restore, please specify path to folder containing backup");
Y_ENSURE(folderPath.IsDirectory(), "Specified folderPath " << folderPath.GetPath().Quote() << " must be a folder");
Y_ENSURE(!folderPath.Child(INCOMPLETE_FILE_NAME).Exists(),
"There is incomplete file in folder, path# " << TString(folderPath).Quote());

const TString path = JoinDatabasePath(database, prefix);
TString name = folderPath.GetName();

NScheme::TListDirectoryResult listResult = client.ListDirectory(path).GetValueSync();
VerifyStatus(listResult, TStringBuilder() << "ListDirectory, path: " << path.Quote());

const bool isTable = folderPath.Child(SCHEME_FILE_NAME).Exists();
if (isTable) {
Y_ENSURE(!IsNamePresentedInDir(listResult, name), "Table with name# " << name.Quote()
<< " is presented in path# " << path.Quote());
LOG_D("\tOk! Table " << name.Quote() << " is absent in database path# " << path.Quote());
} else {
TVector<TFsPath> children;
folderPath.List(children);
for (const auto& child : children) {
const bool isChildTable = child.Child(SCHEME_FILE_NAME).Exists();
const TString childName = child.GetName();
const bool isChildPresented = IsNamePresentedInDir(listResult, childName);
if (isChildTable) {
Y_ENSURE(!isChildPresented, "Table with name# " << childName.Quote()
<< " is presented in path# " << path.Quote());
LOG_D("\tOk! Table " << childName.Quote() << " is absent in database path# "
<< path.Quote());
} else {
if (isChildPresented) {
LOG_D("\tOk! Directory " << childName.Quote() << " is presented in database path# "
<< path.Quote() << ", so check tables in that dir");
CheckTablesAbsence(client, database, JoinDatabasePath(prefix, child.GetName()), child);
} else {
LOG_D("\tOk! Directory " << childName.Quote() << " is absent in database path# "
<< path.Quote());
}
}
}
}
}

void RestoreFolder(TDriver driver, const TString& database, const TString& prefix, const TFsPath folderPath,
const TRestoreFolderParams& params) {
NScheme::TSchemeClient client(driver);
Y_ENSURE(prefix, "restore prefix cannot be empty, database# " << database.Quote() << " prefix# " << prefix.Quote());

if (params.CheckTablesAbsence && !params.OnlyCheck) {
LOG_D("Check absence of tables to be restored");
if (prefix != "/") {
TString path = JoinDatabasePath(database, prefix);
TString parent = ParentPathFromDbPath(path);
TString name = NameFromDbPath(path);
LOG_D("Going to list parent# " << parent.Quote() << " for path path# " << path.Quote());
NScheme::TListDirectoryResult listResult = client.ListDirectory(parent).GetValueSync();
VerifyStatus(listResult, TStringBuilder() << "ListDirectory, path# " << parent.Quote());
if (IsNamePresentedInDir(listResult, name)) {
CheckTablesAbsence(client, database, prefix, folderPath);
} else {
LOG_D("\tOk! restore directory# " << path.Quote() << " is absent in database");
}
} else {
CheckTablesAbsence(client, database, prefix, folderPath);
}
LOG_D("Check done, everything is Ok");
}
RestoreFolderImpl(driver, database, prefix, folderPath, params);
}

} // NYdb::NBackup
Loading

0 comments on commit 78f6425

Please sign in to comment.