Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backups] add take incremental backup in cb alter #4922

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -865,9 +865,12 @@ message TAlterContinuousBackup {
message TStop {
}

message TTakeIncrementalBackup {
}

oneof Action {
TStop Stop = 2;
// TODO(innokentii): something like TakeIncremental
TTakeIncrementalBackup TakeIncrementalBackup = 3;
}
}

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/protos/pqconfig.proto
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ enum EConsumerScalingSupport {
FULL_SUPPORT = 3;
}

message TOffloadConfig {

}

message TPQTabletConfig {
optional uint64 CacheSize = 1 [default = 104857600]; //100Mb, per tablet
optional TPartitionConfig PartitionConfig = 2; //mandatory
Expand Down Expand Up @@ -400,6 +404,8 @@ message TPQTabletConfig {
optional TPartitionStrategy PartitionStrategy = 35;

repeated TPartition AllPartitions = 36; // filled by schemeshard

optional TOffloadConfig OffloadConfig = 38;
}

message THeartbeat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,30 @@

namespace NKikimr::NSchemeShard {

void DoAlterPqPart(const TOperationId& opId, const TPath& topicPath, TTopicInfo::TPtr topic, TVector<ISubOperation::TPtr>& result)
{
auto outTx = TransactionTemplate(topicPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
// outTx.SetFailOnExist(!acceptExisted);

outTx.SetAllowAccessToPrivatePaths(true);

auto& desc = *outTx.MutableAlterPersQueueGroup();
desc.SetPathId(topicPath.Base()->PathId.LocalPathId);

NKikimrPQ::TPQTabletConfig tabletConfig;
if (!topic->TabletConfig.empty()) {
bool parseOk = ParseFromStringNoSizeLimit(tabletConfig, topic->TabletConfig);
Y_ABORT_UNLESS(parseOk, "Previously serialized pq tablet config cannot be parsed");
}

auto& pqConfig = *desc.MutablePQTabletConfig();
pqConfig.CopyFrom(tabletConfig);
pqConfig.ClearPartitionKeySchema();
pqConfig.MutableOffloadConfig();

result.push_back(CreateAlterPQ(NextPartId(opId, result), outTx));
}

TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup);

Expand All @@ -24,6 +48,8 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons
}

const auto [tablePath, streamPath] = std::get<NCdc::TStreamPaths>(checksResult);
const auto topicPath = streamPath.Child("streamImpl");
TTopicInfo::TPtr topic = context.SS->Topics.at(topicPath.Base()->PathId);

TString errStr;
if (!context.SS->CheckApplyIf(tx, errStr)) {
Expand All @@ -40,6 +66,7 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons

switch (cbOp.GetActionCase()) {
case NKikimrSchemeOp::TAlterContinuousBackup::kStop:
case NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup:
alterCdcStreamOp.MutableDisable();
break;
default:
Expand All @@ -51,6 +78,10 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons

NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result);

if (cbOp.GetActionCase() == NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup) {
DoAlterPqPart(opId, topicPath, topic, result);
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,48 @@ Y_UNIT_TEST_SUITE(TContinuousBackupTests) {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl"), {NLs::PathNotExist});
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl/streamImpl"), {NLs::PathNotExist});
}

Y_UNIT_TEST(TakeIncrementalBackup) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

TestCreateContinuousBackup(runtime, ++txId, "/MyRoot", R"(
TableName: "Table"
ContinuousBackupDescription {
}
)");
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl"), {
NLs::PathExist,
NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate),
NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
NLs::StreamVirtualTimestamps(false),
});
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl/streamImpl"), {
NLs::PathExist,
NLs::HasNotOffloadConfig,
});

TestAlterContinuousBackup(runtime, ++txId, "/MyRoot", R"(
TableName: "Table"
TakeIncrementalBackup {}
)");
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/continuousBackupImpl/streamImpl"), {
NLs::PathExist,
NLs::HasOffloadConfig,
});
}
} // TCdcStreamWithInitialScanTests
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,11 @@ TCheckFunc ServerlessComputeResourcesMode(NKikimrSubDomains::EServerlessComputeR
};
}

void HasOffloadConfigBase(const NKikimrScheme::TEvDescribeSchemeResult& record, TInverseTag inverse) {
UNIT_ASSERT(inverse.Value xor record.GetPathDescription().GetPersQueueGroup()
.GetPQTabletConfig().HasOffloadConfig());
}

#undef DESCRIBE_ASSERT_EQUAL
#undef DESCRIBE_ASSERT_GE
#undef DESCRIBE_ASSERT
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ namespace NLs {
TCheckFunc SharedHive(ui64 sharedHiveId);
TCheckFunc ServerlessComputeResourcesMode(NKikimrSubDomains::EServerlessComputeResourcesMode serverlessComputeResourcesMode);

struct TInverseTag {
bool Value = false;
};

void HasOffloadConfigBase(const NKikimrScheme::TEvDescribeSchemeResult& record, TInverseTag inverse);
inline void HasOffloadConfig(const NKikimrScheme::TEvDescribeSchemeResult& record) { return HasOffloadConfigBase(record, {}); };
inline void HasNotOffloadConfig(const NKikimrScheme::TEvDescribeSchemeResult& record) { return HasOffloadConfigBase(record, {.Value = true}); };

template<class TCheck>
void PerformAllChecks(const NKikimrScheme::TEvDescribeSchemeResult& result, TCheck&& check) {
check(result);
Expand Down
Loading