Skip to content

Commit

Permalink
Merge 3b1f52c into 3a59d95
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik authored May 22, 2024
2 parents 3a59d95 + 3b1f52c commit ab225b5
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 68 deletions.
2 changes: 0 additions & 2 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@

#include <util/system/hostname.h>

#include <thread>

namespace NKikimr {

namespace NKikimrServicesInitializers {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,10 @@ message TCreateCdcStream {
optional TCdcStreamDescription StreamDescription = 2;
optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default
optional uint32 TopicPartitions = 4;
oneof IndexMode {
google.protobuf.Empty AllIndexes = 5; // Create topic per each index
string IndexName = 6;
}
}

message TAlterCdcStream {
Expand Down Expand Up @@ -1524,6 +1528,7 @@ message TIndexBuildControl {

message TLockConfig {
optional string Name = 1;
optional bool AllowIndexImplLock = 2;
}

message TLockGuard {
Expand Down
218 changes: 165 additions & 53 deletions ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,38 @@ class TNewCdcStream: public TSubOperation {
}
}

TString BuildWorkingDir() const {
if (Transaction.GetCreateCdcStream().HasIndexName()) {
return Transaction.GetWorkingDir() + "/"
+ Transaction.GetCreateCdcStream().GetIndexName() + "/indexImplTable";
} else {
return Transaction.GetWorkingDir();
}
}

public:
using TSubOperation::TSubOperation;

THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override {
const auto& workingDir = Transaction.GetWorkingDir();
const auto& op = Transaction.GetCreateCdcStream();
const auto& streamDesc = op.GetStreamDescription();
const auto& streamName = streamDesc.GetName();
const auto acceptExisted = !Transaction.GetFailOnExist();

auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());

if (op.HasAllIndexes()) {
result->SetError(NKikimrScheme::StatusInvalidParameter,
"Illigal part operation with all indexes flag");
return result;
}

const auto& workingDir = BuildWorkingDir();

LOG_N("TNewCdcStream Propose"
<< ": opId# " << OperationId
<< ", stream# " << workingDir << "/" << streamName);

auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());

const auto tablePath = TPath::Resolve(workingDir, context.SS);
{
const auto checks = tablePath.Check();
Expand All @@ -130,11 +146,17 @@ class TNewCdcStream: public TSubOperation {
.IsAtLocalSchemeShard()
.IsResolved()
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
.IsCommonSensePath()
.NotUnderDeleting();

if (op.HasIndexName() && op.GetIndexName()) {
checks.IsInsideTableIndexPath();
} else {
checks
.IsTable()
.IsCommonSensePath();
}

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
Expand Down Expand Up @@ -507,17 +529,35 @@ class TNewCdcStreamAtTable: public TSubOperation {
}

THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
const auto& workingDir = Transaction.GetWorkingDir();
auto workingDir = Transaction.GetWorkingDir();
const auto& op = Transaction.GetCreateCdcStream();
const auto& tableName = op.GetTableName();
auto tableName = op.GetTableName();
const auto& streamName = op.GetStreamDescription().GetName();

auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());
bool isIndexTable = false;

if (op.HasAllIndexes()) {
result->SetError(NKikimrScheme::StatusInvalidParameter,
"Illigal part operation with all indexes flag");
return result;
}

if (op.HasIndexName()) {
if (!op.GetIndexName()) {
result->SetError(NKikimrScheme::StatusInvalidParameter,
"Unexpected empty index name");
return result;
}
isIndexTable = true;
workingDir += ("/" + tableName + "/" + op.GetIndexName());
tableName = "indexImplTable";
}

LOG_N("TNewCdcStreamAtTable Propose"
<< ": opId# " << OperationId
<< ", stream# " << workingDir << "/" << tableName << "/" << streamName);

auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());

const auto workingDirPath = TPath::Resolve(workingDir, context.SS);
{
const auto checks = workingDirPath.Check();
Expand All @@ -526,10 +566,15 @@ class TNewCdcStreamAtTable: public TSubOperation {
.IsAtLocalSchemeShard()
.IsResolved()
.NotDeleted()
.IsCommonSensePath()
.IsLikeDirectory()
.NotUnderDeleting();

if (isIndexTable) {
checks.IsInsideTableIndexPath();
} else {
checks.IsCommonSensePath();
}

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
Expand All @@ -547,10 +592,12 @@ class TNewCdcStreamAtTable: public TSubOperation {
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
.IsCommonSensePath()
.NotUnderDeleting();

if (checks) {
if (!isIndexTable) {
checks.IsCommonSensePath();
}
if (InitialScan) {
checks.IsUnderTheSameOperation(OperationId.GetTxId()); // lock op
} else {
Expand Down Expand Up @@ -632,17 +679,18 @@ class TNewCdcStreamAtTable: public TSubOperation {

private:
const bool InitialScan;

}; // TNewCdcStreamAtTable

void DoCreateLock(const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
void DoCreateLock(const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock,
TVector<ISubOperation::TPtr>& result)
{
auto outTx = TransactionTemplate(workingDirPath.PathString(),
NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock);
outTx.SetFailOnExist(false);
outTx.SetInternal(true);
outTx.MutableLockConfig()->SetName(tablePath.LeafName());
auto cfg = outTx.MutableLockConfig();
cfg->SetName(tablePath.LeafName());
cfg->SetAllowIndexImplLock(allowIndexImplLock);

result.push_back(CreateLock(NextPartId(opId, result), outTx));
}
Expand Down Expand Up @@ -704,30 +752,34 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt
result.push_back(CreateNewPQ(NextPartId(opId, result), outTx));
}

void FillModifySchemaForCdc(NKikimrSchemeOp::TModifyScheme& outTx, const NKikimrSchemeOp::TCreateCdcStream& op,
const TOperationId& opId, const TString& indexName, bool acceptExisted, bool initialScan)
{
outTx.SetFailOnExist(!acceptExisted);
outTx.MutableCreateCdcStream()->CopyFrom(op);
if (indexName) {
outTx.MutableCreateCdcStream()->SetIndexName(indexName);
} else {
outTx.MutableCreateCdcStream()->ClearIndexMode();
}

if (initialScan) {
outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId()));
}
}

void DoCreateStream(const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
const bool acceptExisted, const bool initialScan, TVector<ISubOperation::TPtr>& result)
const bool acceptExisted, const bool initialScan, const TString& indexName, TVector<ISubOperation::TPtr>& result)
{
{
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl);
outTx.SetFailOnExist(!acceptExisted);
outTx.MutableCreateCdcStream()->CopyFrom(op);

if (initialScan) {
outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId()));
}

FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan);
result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx));
}

{
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable);
outTx.SetFailOnExist(!acceptExisted);
outTx.MutableCreateCdcStream()->CopyFrom(op);

if (initialScan) {
outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId()));
}

FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan);
result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, initialScan));
}
}
Expand Down Expand Up @@ -785,6 +837,36 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
return nullptr;
}

void CalcBoundaries(const TTableInfo& table, TVector<TString>& boundaries) {
const auto& partitions = table.GetPartitions();
boundaries.reserve(partitions.size() - 1);

for (ui32 i = 0; i < partitions.size(); ++i) {
const auto& partition = partitions.at(i);
if (i != partitions.size() - 1) {
boundaries.push_back(partition.EndOfRange);
}
}
}

bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector<TString>& boundaries, TString& errStr) {
if (op.HasTopicPartitions()) {
const auto& keyColumns = table.KeyColumnIds;
const auto& columns = table.Columns;

Y_ABORT_UNLESS(!keyColumns.empty());
Y_ABORT_UNLESS(columns.contains(keyColumns.at(0)));
const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType;

if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) {
return false;
}
} else {
CalcBoundaries(table, boundaries);
}
return true;
}

} // anonymous

std::variant<TStreamPaths, ISubOperation::TPtr> DoNewStreamPathChecks(
Expand Down Expand Up @@ -889,46 +971,76 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
<< "Initial scan is not supported yet")};
}

Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
auto table = context.SS->Tables.at(tablePath.Base()->PathId);

TVector<TString> boundaries;
if (op.HasTopicPartitions()) {
if (op.GetTopicPartitions() <= 0) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")};
}
}

const auto& keyColumns = table->KeyColumnIds;
const auto& columns = table->Columns;

Y_ABORT_UNLESS(!keyColumns.empty());
Y_ABORT_UNLESS(columns.contains(keyColumns.at(0)));
const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType;
std::vector<TString> candidates;

if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes) {
candidates.reserve(tablePath->GetChildren().size());
for (const auto& child : tablePath->GetChildren()) {
candidates.emplace_back(child.first);
}
} else {
const auto& partitions = table->GetPartitions();
boundaries.reserve(partitions.size() - 1);

for (ui32 i = 0; i < partitions.size(); ++i) {
const auto& partition = partitions.at(i);
if (i != partitions.size() - 1) {
boundaries.push_back(partition.EndOfRange);
}
} else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) {
auto it = tablePath->GetChildren().find(op.GetIndexName());
if (it == tablePath->GetChildren().end()) {
return {CreateReject(opId, NKikimrScheme::StatusSchemeError,
"requested particular path hasn't been found")};
}
candidates.emplace_back(it->first);
}

TVector<ISubOperation::TPtr> result;

for (const auto& name : candidates) {
const TPath indexPath = tablePath.Child(name);
if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) {
continue;
}

const TPath indexImplPath = indexPath.Child("indexImplTable");
if (!indexImplPath) {
return {CreateReject(opId, NKikimrScheme::StatusSchemeError,
"indexImplTable hasn't been found")};
}

Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
auto indexImplTable = context.SS->Tables.at(indexImplPath.Base()->PathId);

const TPath indexStreamPath = indexImplPath.Child(streamName);
if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) {
return {reject};
}

if (initialScan) {
DoCreateLock(opId, indexPath, indexImplPath, true, result);
}

TVector<TString> boundaries;
if (!FillBoundaries(*indexImplTable, op, boundaries, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
}

DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result);
DoCreatePqPart(opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result);
}

if (initialScan) {
DoCreateLock(opId, workingDirPath, tablePath, result);
DoCreateLock(opId, workingDirPath, tablePath, false, result);
}

DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, result);
DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result);
Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
auto table = context.SS->Tables.at(tablePath.Base()->PathId);
TVector<TString> boundaries;
if (!FillBoundaries(*table, op, boundaries, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
}

DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result);
DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ void DoCreateStream(
const TPath& tablePath,
const bool acceptExisted,
const bool initialScan,
const TString& indexName,
TVector<ISubOperation::TPtr>& result);

void DoCreatePqPart(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ TVector<ISubOperation::TPtr> CreateNewContinuousBackup(TOperationId opId, const

TVector<ISubOperation::TPtr> result;

NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false, result);
NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false, {}, result);
NCdc::DoCreatePqPart(opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, createCdcStreamOp, boundaries, acceptExisted, result);

return result;
Expand Down
Loading

0 comments on commit ab225b5

Please sign in to comment.