From 312992d8f0a594a4b866998c768b8917a88ab02a Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Mon, 29 Jul 2024 09:52:06 +0300 Subject: [PATCH] 24-3: Allow streams on index tables, replicate index tables (#7150) --- ydb/core/base/path.h | 8 + ydb/core/grpc_services/rpc_alter_table.cpp | 77 ++++- ydb/core/grpc_services/rpc_describe_table.cpp | 83 ++++- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 64 ++++ ydb/core/protos/feature_flags.proto | 1 + ydb/core/protos/flat_scheme_op.proto | 5 - ydb/core/testlib/basics/feature_flags.h | 1 + .../tx/replication/controller/dst_alterer.cpp | 1 + .../tx/replication/controller/dst_creator.cpp | 144 +++++++-- .../replication/controller/dst_creator_ut.cpp | 64 ++-- .../tx/replication/controller/dst_remover.cpp | 10 +- .../replication/controller/private_events.cpp | 20 +- .../replication/controller/private_events.h | 19 +- .../tx/replication/controller/replication.cpp | 4 +- .../tx/replication/controller/replication.h | 1 + .../replication/controller/stream_creator.cpp | 12 +- .../replication/controller/stream_remover.cpp | 1 + .../controller/target_discoverer.cpp | 107 +++++-- .../replication/controller/target_table.cpp | 27 +- .../tx/replication/controller/target_table.h | 24 +- .../tx_discovery_targets_result.cpp | 18 +- ydb/core/tx/replication/controller/util.h | 36 --- ydb/core/tx/replication/controller/ya.make | 1 + .../replication/service/table_writer_ut.cpp | 5 +- ydb/core/tx/replication/ut_helpers/test_env.h | 5 +- ydb/core/tx/scheme_board/cache.cpp | 12 +- ...chemeshard__operation_alter_cdc_stream.cpp | 46 ++- .../schemeshard__operation_alter_cdc_stream.h | 6 +- ...ard__operation_alter_continuous_backup.cpp | 7 +- .../schemeshard__operation_alter_index.cpp | 7 +- .../schemeshard__operation_alter_table.cpp | 3 +- ...hemeshard__operation_create_cdc_stream.cpp | 230 +++++--------- ...schemeshard__operation_create_cdc_stream.h | 15 +- ...rd__operation_create_continuous_backup.cpp | 4 +- .../schemeshard__operation_create_lock.cpp | 6 +- ...schemeshard__operation_drop_cdc_stream.cpp | 58 ++-- .../schemeshard__operation_drop_cdc_stream.h | 6 +- ...hard__operation_drop_continuous_backup.cpp | 7 +- .../schemeshard__operation_drop_index.cpp | 15 +- ...emeshard__operation_drop_indexed_table.cpp | 98 +----- .../schemeshard__operation_drop_lock.cpp | 11 +- .../schemeshard__operation_part.cpp | 105 +++++++ .../schemeshard/schemeshard__operation_part.h | 4 + ydb/core/tx/schemeshard/schemeshard_path.cpp | 20 +- ydb/core/tx/schemeshard/schemeshard_path.h | 4 +- ydb/core/tx/schemeshard/schemeshard_utils.cpp | 8 + .../ut_cdc_stream/ut_cdc_stream.cpp | 292 ++++++++++++++++-- .../ut_cdc_stream_reboots.cpp | 141 +++++---- .../tx/schemeshard/ut_helpers/test_env.cpp | 1 + ydb/core/tx/schemeshard/ut_helpers/test_env.h | 1 + .../ut_replication/ut_replication.cpp | 46 +++ ydb/core/ydb_convert/table_description.cpp | 12 +- ydb/core/ydb_convert/table_description.h | 3 + 53 files changed, 1313 insertions(+), 593 deletions(-) diff --git a/ydb/core/base/path.h b/ydb/core/base/path.h index ca7c1403f56b..71263b8c6b79 100644 --- a/ydb/core/base/path.h +++ b/ydb/core/base/path.h @@ -37,4 +37,12 @@ inline TVector ChildPath(const TVector& parentPath, const TStr return path; } +inline TVector ChildPath(const TVector& parentPath, const TVector& childPath) { + auto path = parentPath; + for (const auto& childName : childPath) { + path.push_back(childName); + } + return path; +} + } diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp index b18ade6d3f0b..290a129fdcbc 100644 --- a/ydb/core/grpc_services/rpc_alter_table.cpp +++ b/ydb/core/grpc_services/rpc_alter_table.cpp @@ -109,12 +109,12 @@ class TAlterTableRPC : public TRpcSchemeRequestActorServices.SchemeCache, ctx); } - void PrepareAlterUserAttrubutes() { + void GetProxyServices() { using namespace NTxProxy; Send(MakeTxProxyID(), new TEvTxUserProxy::TEvGetProxyServicesRequest); } @@ -222,13 +222,38 @@ class TAlterTableRPC : public TRpcSchemeRequestActor(ev)->Request->ResultSet.emplace_back(); - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; entry.Path = paths; } Send(schemeCache, ev); } + void Navigate(const TTableId& pathId) { + DatabaseName = Request_->GetDatabaseName() + .GetOrElse(DatabaseFromDomain(AppData())); + + auto ev = CreateNavigateForPath(DatabaseName); + { + auto& entry = static_cast(ev)->Request->ResultSet.emplace_back(); + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + entry.TableId = pathId; + entry.ShowPrivatePath = true; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; + } + + Send(MakeSchemeCacheID(), ev); + } + + static bool IsChangefeedOperation(EOp type) { + switch (type) { + case EOp::AddChangefeed: + case EOp::DropChangefeed: + return true; + default: + return false; + } + } + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { TXLOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult" << ", errors# " << ev->Get()->Request.Get()->ErrorCount); @@ -251,13 +276,48 @@ class TAlterTableRPC : public TRpcSchemeRequestActorResultSet.empty()); + const auto& entry = resp->ResultSet.back(); + + switch (entry.Kind) { + case NSchemeCache::TSchemeCacheNavigate::KindTable: + case NSchemeCache::TSchemeCacheNavigate::KindColumnTable: + case NSchemeCache::TSchemeCacheNavigate::KindExternalTable: + case NSchemeCache::TSchemeCacheNavigate::KindExternalDataSource: + case NSchemeCache::TSchemeCacheNavigate::KindView: + break; // table + case NSchemeCache::TSchemeCacheNavigate::KindIndex: + if (IsChangefeedOperation(OpType)) { + break; + } + [[fallthrough]]; + default: + Request_->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TStringBuilder() + << "Unable to nagivate: " << JoinPath(entry.Path) << " status: PathNotTable")); + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + } + switch (OpType) { case EOp::AddIndex: return AlterTableAddIndexOp(resp, ctx); case EOp::Attribute: - Y_ABORT_UNLESS(!resp->ResultSet.empty()); ResolvedPathId = resp->ResultSet.back().TableId.PathId; return AlterTable(ctx); + case EOp::AddChangefeed: + case EOp::DropChangefeed: + if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindIndex) { + AlterTable(ctx); + } else if (auto list = entry.ListNodeEntry) { + if (list->Children.size() != 1) { + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + } + + const auto& child = list->Children.at(0); + AlterTable(ctx, CanonizePath(ChildPath(NKikimr::SplitPath(GetProtoRequest()->path()), child.Name))); + } else { + Navigate(entry.TableId); + } + break; default: TXLOG_E("Got unexpected cache response"); return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx); @@ -351,13 +411,14 @@ class TAlterTableRPC : public TRpcSchemeRequestActor& overridePath = {}) { const auto req = GetProtoRequest(); std::unique_ptr proposeRequest = CreateProposeTransaction(); auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme(); + modifyScheme->SetAllowAccessToPrivatePaths(overridePath.Defined()); Ydb::StatusIds::StatusCode code; TString error; - if (!BuildAlterTableModifyScheme(req, modifyScheme, Profiles, ResolvedPathId, code, error)) { + if (!BuildAlterTableModifyScheme(overridePath.GetOrElse(req->path()), req, modifyScheme, Profiles, ResolvedPathId, code, error)) { NYql::TIssues issues; issues.AddIssue(NYql::TIssue(error)); return Reply(code, issues, ctx); diff --git a/ydb/core/grpc_services/rpc_describe_table.cpp b/ydb/core/grpc_services/rpc_describe_table.cpp index 22879f460906..62e1dfd2d5c5 100644 --- a/ydb/core/grpc_services/rpc_describe_table.cpp +++ b/ydb/core/grpc_services/rpc_describe_table.cpp @@ -1,11 +1,11 @@ -#include "service_table.h" -#include - #include "rpc_calls.h" #include "rpc_scheme_base.h" - #include "service_table.h" -#include "rpc_common/rpc_common.h" + +#include +#include +#include +#include #include #include #include @@ -22,6 +22,20 @@ using TEvDescribeTableRequest = TGrpcRequestOperationCall { using TBase = TRpcSchemeRequestActor; + TString OverrideName; + + static bool ShowPrivatePath(const TString& path) { + if (AppData()->AllowPrivateTableDescribeForTest) { + return true; + } + + if (path.EndsWith("/indexImplTable")) { + return true; + } + + return false; + } + public: TDescribeTableRPC(IRequestOpCtx* msg) : TBase(msg) {} @@ -29,18 +43,63 @@ class TDescribeTableRPC : public TRpcSchemeRequestActorpath(); + const auto paths = NKikimr::SplitPath(path); + if (paths.empty()) { + Request_->RaiseIssue(NYql::TIssue("Invalid path")); + return Reply(Ydb::StatusIds::BAD_REQUEST, ctx); + } + + auto navigate = MakeHolder(); + navigate->DatabaseName = CanonizePath(Request_->GetDatabaseName().GetOrElse("")); + auto& entry = navigate->ResultSet.emplace_back(); + entry.Path = paths; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; + entry.SyncVersion = true; + entry.ShowPrivatePath = ShowPrivatePath(path); + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate)); Become(&TDescribeTableRPC::StateWork); } private: void StateWork(TAutoPtr& ev) { switch (ev->GetTypeRewrite()) { + HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle); default: TBase::StateWork(ev); } } + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + auto* navigate = ev->Get()->Request.Get(); + + Y_ABORT_UNLESS(navigate->ResultSet.size() == 1); + const auto& entry = navigate->ResultSet.front(); + + if (navigate->ErrorCount > 0) { + switch (entry.Status) { + case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: + case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown: + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + default: + return Reply(Ydb::StatusIds::UNAVAILABLE, ctx); + } + } + + if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindIndex) { + auto list = entry.ListNodeEntry; + if (!list || list->Children.size() != 1) { + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + } + + OverrideName = entry.Path.back(); + SendProposeRequest(CanonizePath(ChildPath(entry.Path, list->Children.at(0).Name)), ctx); + } else { + SendProposeRequest(GetProtoRequest()->path(), ctx); + } + } + void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->GetRecord(); const auto status = record.GetStatus(); @@ -53,9 +112,10 @@ class TDescribeTableRPC : public TRpcSchemeRequestActorset_name(pathDescription.GetSelf().GetName()); - selfEntry->set_type(static_cast(pathDescription.GetSelf().GetPathType())); ConvertDirectoryEntry(pathDescription.GetSelf(), selfEntry, true); + if (OverrideName) { + selfEntry->set_name(OverrideName); + } if (pathDescription.HasColumnTableDescription()) { const auto& tableDescription = pathDescription.GetColumnTableDescription(); @@ -136,9 +196,8 @@ class TDescribeTableRPC : public TRpcSchemeRequestActorpath(); std::unique_ptr navigateRequest(new TEvTxUserProxy::TEvNavigate()); SetAuthToken(navigateRequest, *Request_); @@ -153,9 +212,7 @@ class TDescribeTableRPC : public TRpcSchemeRequestActorMutableOptions()->SetReturnPartitionStats(true); } - if (AppData(ctx)->AllowPrivateTableDescribeForTest || path.EndsWith("/indexImplTable")) { - record->MutableOptions()->SetShowPrivateTable(true); - } + record->MutableOptions()->SetShowPrivateTable(ShowPrivatePath(path)); ctx.Send(MakeTxProxyID(), navigateRequest.release()); } diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 124d7c46f7d1..e42dae725d01 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -4075,6 +4075,70 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } } + Y_UNIT_TEST(ChangefeedOnIndexTable) { + TKikimrRunner kikimr(TKikimrSettings() + .SetPQConfig(DefaultPQConfig()) + .SetEnableChangefeedsOnIndexTables(true)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/table` ( + Key Uint64, + Value String, + PRIMARY KEY (Key), + INDEX SyncIndex GLOBAL SYNC ON (`Value`), + INDEX AsyncIndex GLOBAL ASYNC ON (`Value`) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + const auto changefeed = TChangefeedDescription("feed", EChangefeedMode::KeysOnly, EChangefeedFormat::Json); + { + auto result = session.AlterTable("/Root/table/AsyncIndex", TAlterTableSettings() + .AppendAddChangefeeds(changefeed) + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + { + auto result = session.AlterTable("/Root/table/SyncIndex", TAlterTableSettings() + .AppendAddChangefeeds(changefeed) + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(DescribeIndexTable) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/table` ( + Key Uint64, + Value String, + PRIMARY KEY (Key), + INDEX SyncIndex GLOBAL SYNC ON (`Value`) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto desc = session.DescribeTable("/Root/table/SyncIndex").ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(desc.GetEntry().Name, "SyncIndex"); + } + } + Y_UNIT_TEST(CreatedAt) { TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig())); auto scheme = NYdb::NScheme::TSchemeClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root")); diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 62fa1dd141f8..21b958196695 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -145,4 +145,5 @@ message TFeatureFlags { optional bool EnableColumnStatistics = 130 [default = false]; optional bool EnableSingleCompositeActionGroup = 131 [default = false]; optional bool EnableResourcePoolsOnServerless = 132 [default = false]; + optional bool EnableChangefeedsOnIndexTables = 134 [default = false]; } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 7b4b630661de..1d385713a62a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -919,10 +919,6 @@ message TCreateCdcStream { optional TCdcStreamDescription StreamDescription = 2; optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default optional uint32 TopicPartitions = 4; - oneof IndexMode { - google.protobuf.Empty AllIndexes = 5; // Create topic per each index - string IndexName = 6; - } } message TAlterCdcStream { @@ -1637,7 +1633,6 @@ message TIndexBuildControl { message TLockConfig { optional string Name = 1; - optional bool AllowIndexImplLock = 2; } message TLockGuard { diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 1270874e4a02..25426b836d7c 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -61,6 +61,7 @@ class TTestFeatureFlagsHolder { FEATURE_FLAG_SETTER(EnableCMSRequestPriorities) FEATURE_FLAG_SETTER(EnableTableDatetime64) FEATURE_FLAG_SETTER(EnableResourcePools) + FEATURE_FLAG_SETTER(EnableChangefeedsOnIndexTables) #undef FEATURE_FLAG_SETTER }; diff --git a/ydb/core/tx/replication/controller/dst_alterer.cpp b/ydb/core/tx/replication/controller/dst_alterer.cpp index cb436259fae2..c03314f1d59e 100644 --- a/ydb/core/tx/replication/controller/dst_alterer.cpp +++ b/ydb/core/tx/replication/controller/dst_alterer.cpp @@ -41,6 +41,7 @@ class TDstAlterer: public TActorBootstrapped { switch (Kind) { case TReplication::ETargetKind::Table: + case TReplication::ETargetKind::IndexTable: tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable); PathIdFromPathId(DstPathId, tx.MutableAlterTable()->MutablePathId()); tx.MutableAlterTable()->MutableReplicationConfig()->SetMode( diff --git a/ydb/core/tx/replication/controller/dst_creator.cpp b/ydb/core/tx/replication/controller/dst_creator.cpp index 4b43cbad4fd5..cabb8c757334 100644 --- a/ydb/core/tx/replication/controller/dst_creator.cpp +++ b/ydb/core/tx/replication/controller/dst_creator.cpp @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include #include #include @@ -116,6 +118,8 @@ class TDstCreator: public TActorBootstrapped { .WithKeyShardBoundary(true))); } break; + case TReplication::ETargetKind::IndexTable: + Y_ABORT("unreachable"); } } @@ -128,7 +132,7 @@ class TDstCreator: public TActorBootstrapped { } } - NKikimrScheme::EStatus ConvertStatus(NYdb::EStatus status) { + static NKikimrScheme::EStatus ConvertStatus(NYdb::EStatus status) { switch (status) { case NYdb::EStatus::SUCCESS: return NKikimrScheme::StatusSuccess; @@ -165,8 +169,20 @@ class TDstCreator: public TActorBootstrapped { Ydb::Table::CreateTableRequest scheme; result.GetTableDescription().SerializeTo(scheme); - // Disable index support until other replicator code be ready to process index replication - scheme.mutable_indexes()->Clear(); + + // filter out unsupported index types + auto& indexes = *scheme.mutable_indexes(); + for (auto it = indexes.begin(); it != indexes.end();) { + switch (it->type_case()) { + case Ydb::Table::TableIndex::kGlobalIndex: + case Ydb::Table::TableIndex::kGlobalUniqueIndex: + ++it; + continue; + default: + it = indexes.erase(it); + break; + } + } Ydb::StatusIds::StatusCode status; TString error; @@ -182,30 +198,37 @@ class TDstCreator: public TActorBootstrapped { TxBody.SetWorkingDir(pathPair.first); - NKikimrSchemeOp::TTableDescription* tableDesc = nullptr; + NKikimrSchemeOp::TTableDescription* desc = nullptr; if (scheme.indexes_size()) { + NeedToCheck = true; TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable); - tableDesc = TxBody.MutableCreateIndexedTable()->MutableTableDescription(); TxBody.SetInternal(true); + desc = TxBody.MutableCreateIndexedTable()->MutableTableDescription(); + if (!FillIndexDescription(*TxBody.MutableCreateIndexedTable(), scheme, status, error)) { + return Error(NKikimrScheme::StatusSchemeError, error); + } } else { TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable); - tableDesc = TxBody.MutableCreateTable(); + desc = TxBody.MutableCreateTable(); } - Ydb::StatusIds::StatusCode dummyCode; + Y_ABORT_UNLESS(desc); + desc->SetName(pathPair.second); - if (!FillIndexDescription(*TxBody.MutableCreateIndexedTable(), scheme, dummyCode, error)) { - return Error(NKikimrScheme::StatusSchemeError, error); + FillReplicationConfig(*desc->MutableReplicationConfig()); + if (scheme.indexes_size()) { + for (auto& index : *TxBody.MutableCreateIndexedTable()->MutableIndexDescription()) { + FillReplicationConfig(*index.MutableIndexImplTableDescription()->MutableReplicationConfig()); + } } - tableDesc->SetName(pathPair.second); + AllocateTxId(); + } + static void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) { // TODO: support other modes - auto& replicationConfig = *tableDesc->MutableReplicationConfig(); replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK); - - AllocateTxId(); } void AllocateTxId() { @@ -257,7 +280,9 @@ class TDstCreator: public TActorBootstrapped { switch (record.GetStatus()) { case NKikimrScheme::StatusAccepted: - DstPathId = TPathId(SchemeShardId, record.GetPathId()); + if (!NeedToCheck) { + DstPathId = TPathId(SchemeShardId, record.GetPathId()); + } Y_DEBUG_ABORT_UNLESS(TxId == record.GetTxId()); return SubscribeTx(record.GetTxId()); case NKikimrScheme::StatusMultipleModifications: @@ -338,6 +363,8 @@ class TDstCreator: public TActorBootstrapped { switch (Kind) { case TReplication::ETargetKind::Table: return CheckTableScheme(desc.GetTable(), error); + case TReplication::ETargetKind::IndexTable: + Y_ABORT("unreachable"); } } @@ -366,21 +393,30 @@ class TDstCreator: public TActorBootstrapped { return false; } - const auto& expected = TxBody.GetCreateTable(); + const NKikimrSchemeOp::TIndexedTableCreationConfig* indexedDesc = nullptr; + const NKikimrSchemeOp::TTableDescription* tableDesc = nullptr; + if (TxBody.GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexedTable) { + indexedDesc = &TxBody.GetCreateIndexedTable(); + tableDesc = &indexedDesc->GetTableDescription(); + } else { + tableDesc = &TxBody.GetCreateTable(); + } + + Y_ABORT_UNLESS(tableDesc); // check key - if (expected.KeyColumnNamesSize() != got.KeyColumnNamesSize()) { + if (tableDesc->KeyColumnNamesSize() != got.KeyColumnNamesSize()) { error = TStringBuilder() << "Key columns size mismatch" - << ": expected: " << expected.KeyColumnNamesSize() + << ": expected: " << tableDesc->KeyColumnNamesSize() << ", got: " << got.KeyColumnNamesSize(); return false; } - for (ui32 i = 0; i < expected.KeyColumnNamesSize(); ++i) { - if (expected.GetKeyColumnNames(i) != got.GetKeyColumnNames(i)) { + for (ui32 i = 0; i < tableDesc->KeyColumnNamesSize(); ++i) { + if (tableDesc->GetKeyColumnNames(i) != got.GetKeyColumnNames(i)) { error = TStringBuilder() << "Key column name mismatch" << ": position: " << i - << ", expected: " << expected.GetKeyColumnNames(i) + << ", expected: " << tableDesc->GetKeyColumnNames(i) << ", got: " << got.GetKeyColumnNames(i); return false; } @@ -392,14 +428,14 @@ class TDstCreator: public TActorBootstrapped { columns.emplace(column.GetName(), column.GetType()); } - if (expected.ColumnsSize() != columns.size()) { + if (tableDesc->ColumnsSize() != columns.size()) { error = TStringBuilder() << "Columns size mismatch" - << ": expected: " << expected.ColumnsSize() + << ": expected: " << tableDesc->ColumnsSize() << ", got: " << columns.size(); return false; } - for (const auto& column : expected.GetColumns()) { + for (const auto& column : tableDesc->GetColumns()) { auto it = columns.find(column.GetName()); if (it == columns.end()) { error = TStringBuilder() << "Cannot find column" @@ -422,14 +458,25 @@ class TDstCreator: public TActorBootstrapped { indexes.emplace(index.GetName(), &index); } - if (expected.TableIndexesSize() != indexes.size()) { + if (!indexedDesc) { + if (!indexes.empty()) { + error = TStringBuilder() << "Indexes size mismatch" + << ": expected: " << 0 + << ", got: " << indexes.size(); + return false; + } + + return true; + } + + if (indexedDesc->IndexDescriptionSize() != indexes.size()) { error = TStringBuilder() << "Indexes size mismatch" - << ": expected: " << expected.TableIndexesSize() + << ": expected: " << indexedDesc->IndexDescriptionSize() << ", got: " << indexes.size(); return false; } - for (const auto& index : expected.GetTableIndexes()) { + for (const auto& index : indexedDesc->GetIndexDescription()) { auto it = indexes.find(index.GetName()); if (it == indexes.end()) { error = TStringBuilder() << "Cannot find index" @@ -487,6 +534,36 @@ class TDstCreator: public TActorBootstrapped { return true; } + void SubscribeDstPath() { + Subscriber = Register(CreateSchemeBoardSubscriber(SelfId(), DstPath)); + Become(&TThis::StateSubscribeDstPath); + } + + STATEFN(StateSubscribeDstPath) { + switch (ev->GetTypeRewrite()) { + hFunc(TSchemeBoardEvents::TEvNotifyUpdate, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + + const auto& desc = ev->Get()->DescribeSchemeResult; + if (desc.GetStatus() != NKikimrScheme::StatusSuccess) { + return; + } + + const auto& entryDesc = desc.GetPathDescription().GetSelf(); + if (!entryDesc.HasCreateFinished() || !entryDesc.GetCreateFinished()) { + return; + } + + DstPathId = ev->Get()->PathId; + return Success(); + } + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { LOG_T("Handle " << ev->Get()->ToString()); @@ -525,6 +602,12 @@ class TDstCreator: public TActorBootstrapped { Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup); } + void PassAway() override { + if (const auto& actorId = std::exchange(Subscriber, {})) { + Send(actorId, new TEvents::TEvPoison()); + } + } + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::REPLICATION_CONTROLLER_DST_CREATOR; @@ -554,7 +637,13 @@ class TDstCreator: public TActorBootstrapped { } void Bootstrap() { - Resolve(PathId); + switch (Kind) { + case TReplication::ETargetKind::Table: + return Resolve(PathId); + case TReplication::ETargetKind::IndexTable: + // indexed table will be created along with its indexes + return SubscribeDstPath(); + } } STATEFN(StateBase) { @@ -586,6 +675,7 @@ class TDstCreator: public TActorBootstrapped { TActorId PipeCache; bool NeedToCheck = false; TPathId DstPathId; + TActorId Subscriber; }; // TDstCreator diff --git a/ydb/core/tx/replication/controller/dst_creator_ut.cpp b/ydb/core/tx/replication/controller/dst_creator_ut.cpp index d7fab275eb28..805cd2b1e3b3 100644 --- a/ydb/core/tx/replication/controller/dst_creator_ut.cpp +++ b/ydb/core/tx/replication/controller/dst_creator_ut.cpp @@ -62,8 +62,16 @@ Y_UNIT_TEST_SUITE(DstCreator) { CheckTableReplica(tableDesc, replicatedDesc); } - void WithSyncIndex(const TString& replicatedPath) { - TEnv env; + Y_UNIT_TEST(Basic) { + Basic("/Root/Replicated"); + } + + Y_UNIT_TEST(WithIntermediateDir) { + Basic("/Root/Dir/Replicated"); + } + + void WithIndex(const TString& replicatedPath, NKikimrSchemeOp::EIndexType indexType) { + TEnv env(TFeatureFlags().SetEnableChangefeedsOnIndexTables(true)); env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); const auto tableDesc = TTestTableDescription{ @@ -79,25 +87,45 @@ Y_UNIT_TEST_SUITE(DstCreator) { const TString indexName = "index_by_value"; env.CreateTableWithIndex("/Root", *MakeTableDescription(tableDesc), - indexName, TVector{"value"}, NKikimrSchemeOp::EIndexTypeGlobal, - TVector{}, TDuration::Seconds(5000)); + indexName, TVector{"value"}, indexType); env.GetRuntime().Register(CreateDstCreator( env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"), 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath )); - - auto ev = env.GetRuntime().GrabEdgeEvent(env.GetSender()); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess); + { + auto ev = env.GetRuntime().GrabEdgeEvent(env.GetSender()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess); + } auto desc = env.GetDescription(replicatedPath); const auto& replicatedDesc = desc.GetPathDescription().GetTable(); CheckTableReplica(tableDesc, replicatedDesc); + switch (indexType) { + case NKikimrSchemeOp::EIndexTypeGlobal: + case NKikimrSchemeOp::EIndexTypeGlobalUnique: + UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.TableIndexesSize(), 1); + break; + default: + UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.TableIndexesSize(), 0); + return; + } + + env.GetRuntime().Register(CreateDstCreator( + env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"), + 1 /* rid */, 2 /* tid */, TReplication::ETargetKind::IndexTable, + "/Root/Table/" + indexName + "/indexImplTable", replicatedPath + "/" + indexName + "/indexImplTable" + )); + { + auto ev = env.GetRuntime().GrabEdgeEvent(env.GetSender()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess); + } + { auto desc = env.GetDescription(replicatedPath + "/" + indexName); UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetName(), indexName); - UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetType(), NKikimrSchemeOp::EIndexType::EIndexTypeGlobal); + UNIT_ASSERT_VALUES_EQUAL(desc.GetPathDescription().GetTableIndex().GetType(), indexType); } { @@ -106,25 +134,19 @@ Y_UNIT_TEST_SUITE(DstCreator) { const auto& indexTableDesc = desc.GetPathDescription().GetTable(); UNIT_ASSERT_VALUES_EQUAL(indexTableDesc.KeyColumnNamesSize(), 2); } - } - - - Y_UNIT_TEST(Basic) { - Basic("/Root/Replicated"); } - Y_UNIT_TEST(WithIntermediateDir) { - Basic("/Root/Dir/Replicated"); - } -/* Y_UNIT_TEST(WithSyncIndex) { - WithSyncIndex("/Root/Replicated"); + WithIndex("/Root/Replicated", NKikimrSchemeOp::EIndexTypeGlobal); + } + + Y_UNIT_TEST(WithSyncIndexAndIntermediateDir) { + WithIndex("/Root/Dir/Replicated", NKikimrSchemeOp::EIndexTypeGlobal); } - Y_UNIT_TEST(WithSyncIndexWithIntermediateDir) { - WithSyncIndex("/Root/Dir/Replicated"); + Y_UNIT_TEST(WithAsyncIndex) { + WithIndex("/Root/Replicated", NKikimrSchemeOp::EIndexTypeGlobalAsync); } -*/ Y_UNIT_TEST(SameOwner) { TEnv env; diff --git a/ydb/core/tx/replication/controller/dst_remover.cpp b/ydb/core/tx/replication/controller/dst_remover.cpp index 2d0ec2d3b72f..538eddd812ff 100644 --- a/ydb/core/tx/replication/controller/dst_remover.cpp +++ b/ydb/core/tx/replication/controller/dst_remover.cpp @@ -43,6 +43,8 @@ class TDstRemover: public TActorBootstrapped { case TReplication::ETargetKind::Table: tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable); break; + case TReplication::ETargetKind::IndexTable: + Y_ABORT("unreachable"); } Send(PipeCache, new TEvPipeCache::TEvForward(ev.Release(), SchemeShardId, true)); @@ -156,7 +158,13 @@ class TDstRemover: public TActorBootstrapped { if (!DstPathId) { Success(); } else { - AllocateTxId(); + switch (Kind) { + case TReplication::ETargetKind::Table: + return AllocateTxId(); + case TReplication::ETargetKind::IndexTable: + // indexed table will be removed along with its indexes + return Success(); + } } } diff --git a/ydb/core/tx/replication/controller/private_events.cpp b/ydb/core/tx/replication/controller/private_events.cpp index f562331cc26d..12807487c856 100644 --- a/ydb/core/tx/replication/controller/private_events.cpp +++ b/ydb/core/tx/replication/controller/private_events.cpp @@ -4,6 +4,20 @@ namespace NKikimr::NReplication::NController { +TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry::TAddEntry( + const TString& srcPath, const TString& dstPath, TReplication::ETargetKind kind) + : SrcPath(srcPath) + , DstPath(dstPath) + , Kind(kind) +{ +} + +TEvPrivate::TEvDiscoveryTargetsResult::TFailedEntry::TFailedEntry(const TString& srcPath, const NYdb::TStatus& error) + : SrcPath(srcPath) + , Error(error) +{ +} + TEvPrivate::TEvDiscoveryTargetsResult::TEvDiscoveryTargetsResult(ui64 rid, TVector&& toAdd, TVector&& toDel) : ReplicationId(rid) , ToAdd(std::move(toAdd)) @@ -179,11 +193,11 @@ TString TEvPrivate::TEvDescribeTargetsResult::ToString() const { } Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) { - stream << value.first.Name << " (" << value.first.Type << ")"; + stream << value.SrcPath << " (" << value.Kind << ")"; } Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TFailedEntry, stream, value) { - stream << value.first << ": " << value.second.GetStatus() << " ("; - value.second.GetIssues().PrintTo(stream, true); + stream << value.SrcPath << ": " << value.Error.GetStatus() << " ("; + value.Error.GetIssues().PrintTo(stream, true); stream << ")"; } diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h index 7383d6f7ffc0..f69f08ae3f63 100644 --- a/ydb/core/tx/replication/controller/private_events.h +++ b/ydb/core/tx/replication/controller/private_events.h @@ -1,6 +1,7 @@ #pragma once -#include +#include "replication.h" + #include #include @@ -38,8 +39,20 @@ struct TEvPrivate { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)"); struct TEvDiscoveryTargetsResult: public TEventLocal { - using TAddEntry = std::pair; // src, dst - using TFailedEntry = std::pair; // src, error + struct TAddEntry { + TString SrcPath; + TString DstPath; + TReplication::ETargetKind Kind; + + explicit TAddEntry(const TString& srcPath, const TString& dstPath, TReplication::ETargetKind kind); + }; + + struct TFailedEntry { + TString SrcPath; + NYdb::TStatus Error; + + explicit TFailedEntry(const TString& srcPath, const NYdb::TStatus& error); + }; const ui64 ReplicationId; TVector ToAdd; diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp index 9bd3a8606e1b..9acc0f7f9b7a 100644 --- a/ydb/core/tx/replication/controller/replication.cpp +++ b/ydb/core/tx/replication/controller/replication.cpp @@ -42,7 +42,9 @@ class TReplication::TImpl: public TLagProvider { ITarget* CreateTarget(TReplication* self, ui64 id, ETargetKind kind, Args&&... args) const { switch (kind) { case ETargetKind::Table: - return new TTableTarget(self, id, std::forward(args)...); + return new TTargetTable(self, id, std::forward(args)...); + case ETargetKind::IndexTable: + return new TTargetIndexTable(self, id, std::forward(args)...); } } diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index afdcfccebe16..c4e64436bb6d 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -32,6 +32,7 @@ class TReplication: public TSimpleRefCount { enum class ETargetKind: ui8 { Table, + IndexTable, }; enum class EDstState: ui8 { diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp index 2e295fab12ed..086c2892a381 100644 --- a/ydb/core/tx/replication/controller/stream_creator.cpp +++ b/ydb/core/tx/replication/controller/stream_creator.cpp @@ -26,6 +26,7 @@ class TStreamCreator: public TActorBootstrapped { void CreateStream() { switch (Kind) { case TReplication::ETargetKind::Table: + case TReplication::ETargetKind::IndexTable: Send(YdbProxy, new TEvYdbProxy::TEvAlterTableRequest(SrcPath, NYdb::NTable::TAlterTableSettings() .AppendAddChangefeeds(Changefeed))); break; @@ -64,8 +65,17 @@ class TStreamCreator: public TActorBootstrapped { } } + TString BuildStreamPath() const { + switch (Kind) { + case TReplication::ETargetKind::Table: + return CanonizePath(ChildPath(SplitPath(SrcPath), Changefeed.GetName())); + case TReplication::ETargetKind::IndexTable: + return CanonizePath(ChildPath(SplitPath(SrcPath), {"indexImplTable", Changefeed.GetName()})); + } + } + void CreateConsumer() { - const auto streamPath = CanonizePath(ChildPath(SplitPath(SrcPath), Changefeed.GetName())); + const auto streamPath = BuildStreamPath(); const auto settings = NYdb::NTopic::TAlterTopicSettings() .BeginAddConsumer() .ConsumerName(ReplicationConsumerName) diff --git a/ydb/core/tx/replication/controller/stream_remover.cpp b/ydb/core/tx/replication/controller/stream_remover.cpp index 6a30ac851801..b1acdb46b04e 100644 --- a/ydb/core/tx/replication/controller/stream_remover.cpp +++ b/ydb/core/tx/replication/controller/stream_remover.cpp @@ -13,6 +13,7 @@ class TStreamRemover: public TActorBootstrapped { void DropStream() { switch (Kind) { case TReplication::ETargetKind::Table: + case TReplication::ETargetKind::IndexTable: Send(YdbProxy, new TEvYdbProxy::TEvAlterTableRequest(SrcPath, NYdb::NTable::TAlterTableSettings() .AppendDropChangefeeds(StreamName))); break; diff --git a/ydb/core/tx/replication/controller/target_discoverer.cpp b/ydb/core/tx/replication/controller/target_discoverer.cpp index c5f956555cac..9e8c5510aefb 100644 --- a/ydb/core/tx/replication/controller/target_discoverer.cpp +++ b/ydb/core/tx/replication/controller/target_discoverer.cpp @@ -3,11 +3,11 @@ #include "target_discoverer.h" #include "util.h" +#include +#include #include #include -#include - #include #include @@ -25,7 +25,7 @@ class TTargetDiscoverer: public TActorBootstrapped { auto it = Pending.find(ev->Cookie); if (it == Pending.end()) { - LOG_W("Unknown describe response" + LOG_W("Unknown describe path response" << ": cookie# " << ev->Cookie); return; } @@ -35,37 +35,95 @@ class TTargetDiscoverer: public TActorBootstrapped { const auto& result = ev->Get()->Result; if (result.IsSuccess()) { - LOG_D("Describe succeeded" + LOG_D("Describe path succeeded" << ": path# " << path.first); - auto entry = result.GetEntry(); + const auto& entry = result.GetEntry(); switch (entry.Type) { case NYdb::NScheme::ESchemeEntryType::SubDomain: case NYdb::NScheme::ESchemeEntryType::Directory: Pending.erase(it); return ListDirectory(path); + case NYdb::NScheme::ESchemeEntryType::Table: + return DescribeTable(ev->Cookie); default: break; } - entry.Name = path.first; // replace by full path + LOG_W("Unsupported entry type" + << ": path# " << path.first + << ", type# " << entry.Type); - if (const auto kind = TryTargetKindFromEntryType(entry.Type)) { - LOG_I("Add target" - << ": path# " << path.first - << ", kind# " << kind); - ToAdd.emplace_back(std::move(entry), path.second); + NYql::TIssues issues; + issues.AddIssue(TStringBuilder() << "Unsupported entry type: " << entry.Type); + Failed.emplace_back(path.first, NYdb::TStatus(NYdb::EStatus::UNSUPPORTED, std::move(issues))); + } else { + LOG_E("Describe path failed" + << ": path# " << path.first + << ", status# " << result.GetStatus() + << ", issues# " << result.GetIssues().ToOneLineString()); + + if (IsRetryableError(result)) { + return RetryDescribe(*it); } else { - LOG_W("Unsupported entry type" - << ": path# " << path.first - << ", type# " << entry.Type); + Failed.emplace_back(path.first, result); + } + } + + Pending.erase(it); + MaybeReply(); + } + + void DescribeTable(ui32 idx) { + Y_ABORT_UNLESS(idx < Paths.size()); + Send(YdbProxy, new TEvYdbProxy::TEvDescribeTableRequest(Paths.at(idx).first, {}), 0, idx); + Pending.insert(idx); + } - NYql::TIssues issues; - issues.AddIssue(TStringBuilder() << "Unsupported entry type: " << entry.Type); - Failed.emplace_back(path.first, NYdb::TStatus(NYdb::EStatus::UNSUPPORTED, std::move(issues))); + void Handle(TEvYdbProxy::TEvDescribeTableResponse::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + + auto it = Pending.find(ev->Cookie); + if (it == Pending.end()) { + LOG_W("Unknown describe table response" + << ": cookie# " << ev->Cookie); + return; + } + + Y_ABORT_UNLESS(*it < Paths.size()); + const auto& path = Paths.at(*it); + + const auto& result = ev->Get()->Result; + if (result.IsSuccess()) { + LOG_D("Describe table succeeded" + << ": path# " << path.first); + + const auto& target = ToAdd.emplace_back(path.first, path.second, TReplication::ETargetKind::Table); + LOG_I("Add target" + << ": srcPath# " << target.SrcPath + << ", dstPath# " << target.DstPath + << ", kind# " << target.Kind); + + for (const auto& index : result.GetTableDescription().GetIndexDescriptions()) { + switch (index.GetIndexType()) { + case NYdb::NTable::EIndexType::GlobalSync: + case NYdb::NTable::EIndexType::GlobalUnique: + break; + default: + continue; + } + + const auto& target = ToAdd.emplace_back( + CanonizePath(ChildPath(SplitPath(path.first), index.GetIndexName())), + CanonizePath(ChildPath(SplitPath(path.second), {index.GetIndexName(), "indexImplTable"})), + TReplication::ETargetKind::IndexTable); + LOG_I("Add target" + << ": srcPath# " << target.SrcPath + << ", dstPath# " << target.DstPath + << ", kind# " << target.Kind); } } else { - LOG_E("Describe failed" + LOG_E("Describe table failed" << ": path# " << path.first << ", status# " << result.GetStatus() << ", issues# " << result.GetIssues().ToOneLineString()); @@ -143,13 +201,13 @@ class TTargetDiscoverer: public TActorBootstrapped { path.second + '/' + child.Name)); } break; + case NYdb::NScheme::ESchemeEntryType::Table: + Paths.emplace_back( + path.first + '/' + child.Name, + path.second + '/' + child.Name); + DescribeTable(Paths.size() - 1); + break; default: - if (TryTargetKindFromEntryType(child.Type)) { - Paths.emplace_back( - path.first + '/' + child.Name, - path.second + '/' + child.Name); - DescribePath(Paths.size() - 1); - } break; } } @@ -225,6 +283,7 @@ class TTargetDiscoverer: public TActorBootstrapped { switch (ev->GetTypeRewrite()) { hFunc(TEvYdbProxy::TEvDescribePathResponse, Handle); hFunc(TEvYdbProxy::TEvListDirectoryResponse, Handle); + hFunc(TEvYdbProxy::TEvDescribeTableResponse, Handle); sFunc(TEvents::TEvWakeup, Retry); sFunc(TEvents::TEvPoison, PassAway); } diff --git a/ydb/core/tx/replication/controller/target_table.cpp b/ydb/core/tx/replication/controller/target_table.cpp index d44b699bf0d0..6201bc29fbfa 100644 --- a/ydb/core/tx/replication/controller/target_table.cpp +++ b/ydb/core/tx/replication/controller/target_table.cpp @@ -102,16 +102,35 @@ class TTableWorkerRegistar: public TActorBootstrapped { }; // TTableWorkerRegistar -TTableTarget::TTableTarget(TReplication* replication, ui64 id, const TString& srcPath, const TString& dstPath) - : TTargetWithStream(replication, ETargetKind::Table, id, srcPath, dstPath) +TTargetTableBase::TTargetTableBase(TReplication* replication, ETargetKind finalKind, + ui64 id, const TString& srcPath, const TString& dstPath) + : TTargetWithStream(replication, finalKind, id, srcPath, dstPath) { } -IActor* TTableTarget::CreateWorkerRegistar(const TActorContext& ctx) const { +IActor* TTargetTableBase::CreateWorkerRegistar(const TActorContext& ctx) const { auto replication = GetReplication(); return new TTableWorkerRegistar(ctx.SelfID, replication->GetYdbProxy(), replication->GetConfig().GetSrcConnectionParams(), replication->GetId(), GetId(), - CanonizePath(ChildPath(SplitPath(GetSrcPath()), GetStreamName())), GetDstPathId()); + BuildStreamPath(), GetDstPathId()); +} + +TTargetTable::TTargetTable(TReplication* replication, ui64 id, const TString& srcPath, const TString& dstPath) + : TTargetTableBase(replication, ETargetKind::Table, id, srcPath, dstPath) +{ +} + +TString TTargetTable::BuildStreamPath() const { + return CanonizePath(ChildPath(SplitPath(GetSrcPath()), GetStreamName())); +} + +TTargetIndexTable::TTargetIndexTable(TReplication* replication, ui64 id, const TString& srcPath, const TString& dstPath) + : TTargetTableBase(replication, ETargetKind::IndexTable, id, srcPath, dstPath) +{ +} + +TString TTargetIndexTable::BuildStreamPath() const { + return CanonizePath(ChildPath(SplitPath(GetSrcPath()), {"indexImplTable", GetStreamName()})); } } diff --git a/ydb/core/tx/replication/controller/target_table.h b/ydb/core/tx/replication/controller/target_table.h index 08b45d7b0153..30f57d5db31a 100644 --- a/ydb/core/tx/replication/controller/target_table.h +++ b/ydb/core/tx/replication/controller/target_table.h @@ -4,14 +4,32 @@ namespace NKikimr::NReplication::NController { -class TTableTarget: public TTargetWithStream { +class TTargetTableBase: public TTargetWithStream { public: - explicit TTableTarget(TReplication* replication, + explicit TTargetTableBase(TReplication* replication, ETargetKind finalKind, ui64 id, const TString& srcPath, const TString& dstPath); protected: IActor* CreateWorkerRegistar(const TActorContext& ctx) const override; + virtual TString BuildStreamPath() const = 0; +}; -}; // TTableTarget +class TTargetTable: public TTargetTableBase { +public: + explicit TTargetTable(TReplication* replication, + ui64 id, const TString& srcPath, const TString& dstPath); + +protected: + TString BuildStreamPath() const override; +}; + +class TTargetIndexTable: public TTargetTableBase { +public: + explicit TTargetIndexTable(TReplication* replication, + ui64 id, const TString& srcPath, const TString& dstPath); + +protected: + TString BuildStreamPath() const override; +}; } diff --git a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp index 686de8fb261e..7b75428e2c71 100644 --- a/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp +++ b/ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp @@ -43,23 +43,19 @@ class TController::TTxDiscoveryTargetsResult: public TTxBase { if (Ev->Get()->IsSuccess()) { for (const auto& target : Ev->Get()->ToAdd) { - const auto kind = TargetKindFromEntryType(target.first.Type); - const auto& srcPath = target.first.Name; - const auto& dstPath = target.second; - - const auto tid = Replication->AddTarget(kind, srcPath, dstPath); + const auto tid = Replication->AddTarget(target.Kind, target.SrcPath, target.DstPath); db.Table().Key(rid, tid).Update( - NIceDb::TUpdate(kind), - NIceDb::TUpdate(srcPath), - NIceDb::TUpdate(dstPath) + NIceDb::TUpdate(target.Kind), + NIceDb::TUpdate(target.SrcPath), + NIceDb::TUpdate(target.DstPath) ); CLOG_N(ctx, "Add target" << ": rid# " << rid << ", tid# " << tid - << ", kind# " << kind - << ", srcPath# " << srcPath - << ", dstPath# " << dstPath); + << ", kind# " << target.Kind + << ", srcPath# " << target.SrcPath + << ", dstPath# " << target.DstPath); } } else { const auto error = JoinSeq(", ", Ev->Get()->Failed); diff --git a/ydb/core/tx/replication/controller/util.h b/ydb/core/tx/replication/controller/util.h index 30b92d243965..d0b3afba3199 100644 --- a/ydb/core/tx/replication/controller/util.h +++ b/ydb/core/tx/replication/controller/util.h @@ -1,48 +1,12 @@ #pragma once -#include "replication.h" - -#include #include -#include - #include -#include #include namespace NKikimr::NReplication::NController { -inline TMaybe TryTargetKindFromEntryType(NYdb::NScheme::ESchemeEntryType type) { - switch (type) { - case NYdb::NScheme::ESchemeEntryType::Table: - return TReplication::ETargetKind::Table; - case NYdb::NScheme::ESchemeEntryType::Unknown: - case NYdb::NScheme::ESchemeEntryType::Directory: - case NYdb::NScheme::ESchemeEntryType::PqGroup: - case NYdb::NScheme::ESchemeEntryType::SubDomain: - case NYdb::NScheme::ESchemeEntryType::RtmrVolume: - case NYdb::NScheme::ESchemeEntryType::BlockStoreVolume: - case NYdb::NScheme::ESchemeEntryType::CoordinationNode: - case NYdb::NScheme::ESchemeEntryType::Sequence: - case NYdb::NScheme::ESchemeEntryType::Replication: - case NYdb::NScheme::ESchemeEntryType::ColumnTable: - case NYdb::NScheme::ESchemeEntryType::ColumnStore: - case NYdb::NScheme::ESchemeEntryType::Topic: - case NYdb::NScheme::ESchemeEntryType::ExternalTable: - case NYdb::NScheme::ESchemeEntryType::ExternalDataSource: - case NYdb::NScheme::ESchemeEntryType::View: - case NYdb::NScheme::ESchemeEntryType::ResourcePool: - return Nothing(); - } -} - -inline TReplication::ETargetKind TargetKindFromEntryType(NYdb::NScheme::ESchemeEntryType type) { - auto res = TryTargetKindFromEntryType(type); - Y_VERIFY_S(res, "Unexpected entry type: " << static_cast(type)); - return *res; -} - inline TString& TruncatedIssue(TString& issue) { static constexpr ui32 sizeLimit = 2_KB; static constexpr TStringBuf ellipsis = "..."; diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make index 52bdfbbabdcb..25b9453a277a 100644 --- a/ydb/core/tx/replication/controller/ya.make +++ b/ydb/core/tx/replication/controller/ya.make @@ -8,6 +8,7 @@ PEERDIR( ydb/core/tablet_flat ydb/core/tx/replication/common ydb/core/tx/replication/ydb_proxy + ydb/core/tx/scheme_board ydb/core/util ydb/core/ydb_convert ydb/services/metadata diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index 65c2c710e2a9..5b1b3de7b169 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -39,10 +39,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { } Y_UNIT_TEST(SupportedTypes) { - auto featureFlags = TFeatureFlags(); - featureFlags.SetEnableTableDatetime64(true); - - TEnv env(featureFlags); + TEnv env(TFeatureFlags().SetEnableTableDatetime64(true)); env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{ diff --git a/ydb/core/tx/replication/ut_helpers/test_env.h b/ydb/core/tx/replication/ut_helpers/test_env.h index 6920d0f2e921..c9517b48667d 100644 --- a/ydb/core/tx/replication/ut_helpers/test_env.h +++ b/ydb/core/tx/replication/ut_helpers/test_env.h @@ -10,6 +10,9 @@ namespace NKikimr::NReplication::NTestHelpers { +class TFeatureFlags: public TTestFeatureFlagsHolder { +}; + template class TEnv { static constexpr char DomainName[] = "Root"; @@ -63,7 +66,7 @@ class TEnv { TEnv(const TFeatureFlags& featureFlags, bool init = true) : Settings(Tests::TServerSettings(PortManager.GetPort(), {}, MakePqConfig()) .SetDomainName(DomainName) - .SetFeatureFlags(featureFlags) + .SetFeatureFlags(featureFlags.FeatureFlags) ) , Server(Settings) , Client(Settings) diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 07062b4ce553..b521e7ed7e87 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -898,8 +898,6 @@ class TSchemeCache: public TMonitorableActor { default: return false; } - case NKikimrSchemeOp::EPathTypeTableIndex: - return true; default: return false; } @@ -2570,14 +2568,12 @@ class TSchemeCache: public TMonitorableActor { if (entry.RequestType == TNavigate::TEntry::ERequestType::ByPath) { auto pathExtractor = [this](TNavigate::TEntry& entry) { + NSysView::ISystemViewResolver::TSystemViewPath sysViewPath; if (AppData()->FeatureFlags.GetEnableSystemViews() - && (entry.Operation == TNavigate::OpPath || entry.Operation == TNavigate::OpTable)) + && SystemViewResolver->IsSystemViewPath(entry.Path, sysViewPath)) { - NSysView::ISystemViewResolver::TSystemViewPath sysViewPath; - if (SystemViewResolver->IsSystemViewPath(entry.Path, sysViewPath)) { - entry.TableId.SysViewInfo = sysViewPath.ViewName; - return CanonizePath(sysViewPath.Parent); - } + entry.TableId.SysViewInfo = sysViewPath.ViewName; + return CanonizePath(sysViewPath.Parent); } TString path = CanonizePath(entry.Path); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 4efa03b17955..5ace1b97d44b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -143,9 +143,12 @@ class TAlterCdcStream: public TSubOperation { .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -370,10 +373,13 @@ class TAlterCdcStreamAtTable: public TSubOperation { .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -476,10 +482,10 @@ class TAlterCdcStreamAtTable: public TSubOperation { } // anonymous std::variant DoAlterStreamPathChecks( - const TOperationId& opId, - const TPath& workingDirPath, - const TString& tableName, - const TString& streamName) + const TOperationId& opId, + const TPath& workingDirPath, + const TString& tableName, + const TString& streamName) { const auto tablePath = workingDirPath.Child(tableName); { @@ -492,9 +498,12 @@ std::variant DoAlterStreamPathChecks( .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { return CreateReject(opId, checks.GetStatus(), checks.GetError()); } @@ -521,27 +530,24 @@ std::variant DoAlterStreamPathChecks( } void DoAlterStream( - const NKikimrSchemeOp::TAlterCdcStream& op, - const TOperationId& opId, - const TPath& workingDirPath, - const TPath& tablePath, - TVector& result) + TVector& result, + const NKikimrSchemeOp::TAlterCdcStream& op, + const TOperationId& opId, + const TPath& workingDirPath, + const TPath& tablePath) { { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl); outTx.MutableAlterCdcStream()->CopyFrom(op); - if (op.HasGetReady()) { outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId()); } result.push_back(CreateAlterCdcStreamImpl(NextPartId(opId, result), outTx)); } - { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable); outTx.MutableAlterCdcStream()->CopyFrom(op); - if (op.HasGetReady()) { outTx.MutableLockGuard()->SetOwnerTxId(op.GetGetReady().GetLockTxId()); } @@ -601,7 +607,7 @@ TVector CreateAlterCdcStream(TOperationId opId, const TTxTr TVector result; - DoAlterStream(op, opId, workingDirPath, tablePath, result); + DoAlterStream(result, op, opId, workingDirPath, tablePath); if (op.HasGetReady()) { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock); @@ -613,6 +619,14 @@ TVector CreateAlterCdcStream(TOperationId opId, const TTxTr result.push_back(DropLock(NextPartId(opId, result), outTx)); } + if (workingDirPath.IsTableIndex()) { + auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); + outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); + + result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); + } + return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h index 198d5ae35cc9..6154ee05ed17 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.h @@ -1,7 +1,7 @@ #pragma once -#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths #include "schemeshard__operation_common.h" +#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths #include "schemeshard__operation_part.h" #include "schemeshard_impl.h" @@ -17,10 +17,10 @@ std::variant DoAlterStreamPathChecks( const TString& streamName); void DoAlterStream( + TVector& result, const NKikimrSchemeOp::TAlterCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, - const TPath& tablePath, - TVector& result); + const TPath& tablePath); } // namespace NKikimr::NSchemesShard::NCdc diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp index 4c7d2f282cad..69883a91a28a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp @@ -1,9 +1,8 @@ -#include "schemeshard__operation_part.h" +#include "schemeshard__operation_alter_cdc_stream.h" #include "schemeshard__operation_common.h" +#include "schemeshard__operation_part.h" #include "schemeshard_impl.h" -#include "schemeshard__operation_alter_cdc_stream.h" - #include #include @@ -111,7 +110,7 @@ TVector CreateAlterContinuousBackup(TOperationId opId, cons TVector result; - NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result); + NCdc::DoAlterStream(result, alterCdcStreamOp, opId, workingDirPath, tablePath); if (cbOp.GetActionCase() == NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup) { DoCreateIncBackupTable(opId, backupTablePath, schema, result); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp index 9e4bf359c59c..0415c18c18b0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_index.cpp @@ -140,8 +140,11 @@ class TAlterTableIndex: public TSubOperation { .NotDeleted() .NotUnderDeleting() .IsCommonSensePath() - .IsTable() - .NotAsyncReplicaTable(); + .IsTable(); + + if (!Transaction.GetInternal()) { + checks.NotAsyncReplicaTable(); + } if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp index f512165507a5..ea7d79c2b0a0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp @@ -731,7 +731,7 @@ TVector CreateConsistentAlterTable(TOperationId id, const T // Admins can alter indexImplTable unconditionally. // Regular users can only alter allowed fields. if (!IsSuperUser(context.UserToken.Get()) - && (!CheckAllowedFields(alter, {"Name", "PartitionConfig"}) + && (!CheckAllowedFields(alter, {"Name", "PathId", "PartitionConfig", "ReplicationConfig"}) || (alter.HasPartitionConfig() && !CheckAllowedFields(alter.GetPartitionConfig(), {"PartitioningPolicy"}) ) @@ -744,6 +744,7 @@ TVector CreateConsistentAlterTable(TOperationId id, const T { auto tableIndexAltering = TransactionTemplate(parent.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + tableIndexAltering.SetInternal(tx.GetInternal()); auto alterIndex = tableIndexAltering.MutableAlterTableIndex(); alterIndex->SetName(parent.LeafName()); alterIndex->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index a615d6b68a4a..d5a087b47313 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -105,38 +105,22 @@ class TNewCdcStream: public TSubOperation { } } - TString BuildWorkingDir() const { - if (Transaction.GetCreateCdcStream().HasIndexName()) { - return Transaction.GetWorkingDir() + "/" - + Transaction.GetCreateCdcStream().GetIndexName() + "/indexImplTable"; - } else { - return Transaction.GetWorkingDir(); - } - } - public: using TSubOperation::TSubOperation; THolder Propose(const TString& owner, TOperationContext& context) override { + const auto& workingDir = Transaction.GetWorkingDir(); const auto& op = Transaction.GetCreateCdcStream(); const auto& streamDesc = op.GetStreamDescription(); const auto& streamName = streamDesc.GetName(); const auto acceptExisted = !Transaction.GetFailOnExist(); - auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); - - if (op.HasAllIndexes()) { - result->SetError(NKikimrScheme::StatusInvalidParameter, - "Illigal part operation with all indexes flag"); - return result; - } - - const auto& workingDir = BuildWorkingDir(); - LOG_N("TNewCdcStream Propose" << ": opId# " << OperationId << ", stream# " << workingDir << "/" << streamName); + auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + const auto tablePath = TPath::Resolve(workingDir, context.SS); { const auto checks = tablePath.Check(); @@ -146,15 +130,12 @@ class TNewCdcStream: public TSubOperation { .IsAtLocalSchemeShard() .IsResolved() .NotDeleted() + .IsTable() .NotAsyncReplicaTable() .NotUnderDeleting(); - if (op.HasIndexName() && op.GetIndexName()) { - checks.IsInsideTableIndexPath(); - } else { - checks - .IsTable() - .IsCommonSensePath(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); } if (!checks) { @@ -529,35 +510,17 @@ class TNewCdcStreamAtTable: public TSubOperation { } THolder Propose(const TString&, TOperationContext& context) override { - auto workingDir = Transaction.GetWorkingDir(); + const auto& workingDir = Transaction.GetWorkingDir(); const auto& op = Transaction.GetCreateCdcStream(); - auto tableName = op.GetTableName(); + const auto& tableName = op.GetTableName(); const auto& streamName = op.GetStreamDescription().GetName(); - auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); - bool isIndexTable = false; - - if (op.HasAllIndexes()) { - result->SetError(NKikimrScheme::StatusInvalidParameter, - "Illigal part operation with all indexes flag"); - return result; - } - - if (op.HasIndexName()) { - if (!op.GetIndexName()) { - result->SetError(NKikimrScheme::StatusInvalidParameter, - "Unexpected empty index name"); - return result; - } - isIndexTable = true; - workingDir += ("/" + tableName + "/" + op.GetIndexName()); - tableName = "indexImplTable"; - } - LOG_N("TNewCdcStreamAtTable Propose" << ": opId# " << OperationId << ", stream# " << workingDir << "/" << tableName << "/" << streamName); + auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + const auto workingDirPath = TPath::Resolve(workingDir, context.SS); { const auto checks = workingDirPath.Check(); @@ -569,9 +532,7 @@ class TNewCdcStreamAtTable: public TSubOperation { .IsLikeDirectory() .NotUnderDeleting(); - if (isIndexTable) { - checks.IsInsideTableIndexPath(); - } else { + if (checks && !workingDirPath.IsTableIndex()) { checks.IsCommonSensePath(); } @@ -595,7 +556,7 @@ class TNewCdcStreamAtTable: public TSubOperation { .NotUnderDeleting(); if (checks) { - if (!isIndexTable) { + if (!tablePath.IsInsideTableIndexPath()) { checks.IsCommonSensePath(); } if (InitialScan) { @@ -679,27 +640,34 @@ class TNewCdcStreamAtTable: public TSubOperation { private: const bool InitialScan; + }; // TNewCdcStreamAtTable -void DoCreateLock(const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock, - TVector& result) +void DoCreateLock( + TVector& result, + const TOperationId opId, + const TPath& workingDirPath, + const TPath& tablePath) { - auto outTx = TransactionTemplate(workingDirPath.PathString(), - NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock); + auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock); outTx.SetFailOnExist(false); outTx.SetInternal(true); - auto cfg = outTx.MutableLockConfig(); - cfg->SetName(tablePath.LeafName()); - cfg->SetAllowIndexImplLock(allowIndexImplLock); + outTx.MutableLockConfig()->SetName(tablePath.LeafName()); result.push_back(CreateLock(NextPartId(opId, result), outTx)); } } // anonymous -void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TString& streamName, - const TIntrusivePtr table, const NKikimrSchemeOp::TCreateCdcStream& op, - const TVector& boundaries, const bool acceptExisted, TVector& result) +void DoCreatePqPart( + TVector& result, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, + const TPath& streamPath, + const TString& streamName, + TTableInfo::TCPtr table, + const TVector& boundaries, + const bool acceptExisted) { auto outTx = TransactionTemplate(streamPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup); outTx.SetFailOnExist(!acceptExisted); @@ -752,34 +720,37 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt result.push_back(CreateNewPQ(NextPartId(opId, result), outTx)); } -void FillModifySchemaForCdc(NKikimrSchemeOp::TModifyScheme& outTx, const NKikimrSchemeOp::TCreateCdcStream& op, - const TOperationId& opId, const TString& indexName, bool acceptExisted, bool initialScan) +static void FillModifySchemaForCdc( + NKikimrSchemeOp::TModifyScheme& outTx, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, + bool acceptExisted, + bool initialScan) { outTx.SetFailOnExist(!acceptExisted); outTx.MutableCreateCdcStream()->CopyFrom(op); - if (indexName) { - outTx.MutableCreateCdcStream()->SetIndexName(indexName); - } else { - outTx.MutableCreateCdcStream()->ClearIndexMode(); - } - if (initialScan) { outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId())); } } -void DoCreateStream(const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, - const bool acceptExisted, const bool initialScan, const TString& indexName, TVector& result) +void DoCreateStream( + TVector& result, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, + const TPath& workingDirPath, + const TPath& tablePath, + const bool acceptExisted, + const bool initialScan) { { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl); - FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan); + FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan); result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx)); } - { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); - FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan); + FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan); result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, initialScan)); } } @@ -826,10 +797,24 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting() .NotUnderOperation(); + if (checks) { + if (!tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } else { + if (!tablePath.Parent().IsTableIndex(NKikimrSchemeOp::EIndexTypeGlobal)) { + return CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, + "Cannot add changefeed to index table"); + } + if (!AppData()->FeatureFlags.GetEnableChangefeedsOnIndexTables()) { + return CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, + "Changefeed on index table is not supported yet"); + } + } + } + if (!checks) { return CreateReject(opId, checks.GetStatus(), checks.GetError()); } @@ -837,19 +822,7 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat return nullptr; } -void CalcBoundaries(const TTableInfo& table, TVector& boundaries) { - const auto& partitions = table.GetPartitions(); - boundaries.reserve(partitions.size() - 1); - - for (ui32 i = 0; i < partitions.size(); ++i) { - const auto& partition = partitions.at(i); - if (i != partitions.size() - 1) { - boundaries.push_back(partition.EndOfRange); - } - } -} - -bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector& boundaries, TString& errStr) { +bool FillBoundaries(const TTableInfo& table, const NKikimrSchemeOp::TCreateCdcStream& op, TVector& boundaries, TString& errStr) { if (op.HasTopicPartitions()) { const auto& keyColumns = table.KeyColumnIds; const auto& columns = table.Columns; @@ -862,8 +835,17 @@ bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdc return false; } } else { - CalcBoundaries(table, boundaries); + const auto& partitions = table.GetPartitions(); + boundaries.reserve(partitions.size() - 1); + + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } } + return true; } @@ -921,7 +903,6 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran const auto& tableName = op.GetTableName(); const auto& streamDesc = op.GetStreamDescription(); const auto& streamName = streamDesc.GetName(); - const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS); const auto checksResult = DoNewStreamPathChecks(opId, workingDirPath, tableName, streamName, acceptExisted); @@ -971,76 +952,35 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran << "Initial scan is not supported yet")}; } - if (op.HasTopicPartitions()) { - if (op.GetTopicPartitions() <= 0) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")}; - } - } - - std::vector candidates; - - if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes) { - candidates.reserve(tablePath->GetChildren().size()); - for (const auto& child : tablePath->GetChildren()) { - candidates.emplace_back(child.first); - } - } else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) { - auto it = tablePath->GetChildren().find(op.GetIndexName()); - if (it == tablePath->GetChildren().end()) { - return {CreateReject(opId, NKikimrScheme::StatusSchemeError, - "requested particular path hasn't been found")}; - } - candidates.emplace_back(it->first); + if (op.HasTopicPartitions() && op.GetTopicPartitions() <= 0) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")}; } TVector result; - for (const auto& name : candidates) { - const TPath indexPath = tablePath.Child(name); - if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) { - continue; - } - - const TPath indexImplPath = indexPath.Child("indexImplTable"); - if (!indexImplPath) { - return {CreateReject(opId, NKikimrScheme::StatusSchemeError, - "indexImplTable hasn't been found")}; - } - - Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); - auto indexImplTable = context.SS->Tables.at(indexImplPath.Base()->PathId); - - const TPath indexStreamPath = indexImplPath.Child(streamName); - if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) { - return {reject}; - } - - if (initialScan) { - DoCreateLock(opId, indexPath, indexImplPath, true, result); - } - - TVector boundaries; - if (!FillBoundaries(*indexImplTable, op, boundaries, errStr)) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; - } - - DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result); - DoCreatePqPart(opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result); + if (initialScan) { + DoCreateLock(result, opId, workingDirPath, tablePath); } - if (initialScan) { - DoCreateLock(opId, workingDirPath, tablePath, false, result); + if (workingDirPath.IsTableIndex()) { + auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); + outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); + + result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); } Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); auto table = context.SS->Tables.at(tablePath.Base()->PathId); + TVector boundaries; if (!FillBoundaries(*table, op, boundaries, errStr)) { return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; } - DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result); - DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result); + DoCreateStream(result, op, opId, workingDirPath, tablePath, acceptExisted, initialScan); + DoCreatePqPart(result, op, opId, streamPath, streamName, table, boundaries, acceptExisted); + return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h index 11a921d84168..635e57a28b63 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h @@ -1,7 +1,7 @@ #pragma once -#include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" +#include "schemeshard__operation_part.h" #include "schemeshard_impl.h" #include @@ -22,23 +22,22 @@ std::variant DoNewStreamPathChecks( bool acceptExisted); void DoCreateStream( + TVector& result, const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, const bool acceptExisted, - const bool initialScan, - const TString& indexName, - TVector& result); + const bool initialScan); void DoCreatePqPart( + TVector& result, + const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& streamPath, const TString& streamName, - const TIntrusivePtr table, - const NKikimrSchemeOp::TCreateCdcStream& op, + TTableInfo::TCPtr table, const TVector& boundaries, - const bool acceptExisted, - TVector& result); + const bool acceptExisted); } // namespace NKikimr::NSchemesShard::NCdc diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp index 6bb280316138..ab2c187d24bf 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp @@ -68,8 +68,8 @@ TVector CreateNewContinuousBackup(TOperationId opId, const TVector result; - NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false, {}, result); - NCdc::DoCreatePqPart(opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, createCdcStreamOp, boundaries, acceptExisted, result); + NCdc::DoCreateStream(result, createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false); + NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, boundaries, acceptExisted); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp index bd7ad540099d..c119f253f47f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp @@ -127,9 +127,7 @@ class TCreateLock: public TSubOperation { .IsLikeDirectory() .FailOnRestrictedCreateInTempZone(); - if (op.GetAllowIndexImplLock()) { - checks.IsInsideTableIndexPath(); - } else { + if (checks && !parentPath.IsTableIndex()) { checks.IsCommonSensePath(); } @@ -151,7 +149,7 @@ class TCreateLock: public TSubOperation { .IsTable() .NotAsyncReplicaTable(); - if (!op.GetAllowIndexImplLock()) { + if (checks && !parentPath.IsTableIndex()) { checks.IsCommonSensePath(); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index 07d9bd17e0f7..1654ca58bdba 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -150,10 +150,13 @@ class TDropCdcStream: public TSubOperation { .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .IsUnderOperation() .IsUnderTheSameOperation(OperationId.GetTxId()); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -332,10 +335,13 @@ class TDropCdcStreamAtTable: public TSubOperation { .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -438,10 +444,10 @@ class TDropCdcStreamAtTable: public TSubOperation { } // anonymous std::variant DoDropStreamPathChecks( - const TOperationId& opId, - const TPath& workingDirPath, - const TString& tableName, - const TString& streamName) + const TOperationId& opId, + const TPath& workingDirPath, + const TString& tableName, + const TString& streamName) { const auto tablePath = workingDirPath.Child(tableName); { @@ -454,10 +460,13 @@ std::variant DoDropStreamPathChecks( .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting() .NotUnderOperation(); + if (checks && !tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } + if (!checks) { return CreateReject(opId, checks.GetStatus(), checks.GetError()); } @@ -485,10 +494,11 @@ std::variant DoDropStreamPathChecks( } ISubOperation::TPtr DoDropStreamChecks( - const TOperationId& opId, - const TPath& tablePath, - const TTxId lockTxId, - TOperationContext& context) { + const TOperationId& opId, + const TPath& tablePath, + const TTxId lockTxId, + TOperationContext& context) +{ TString errStr; if (!context.SS->CheckLocks(tablePath.Base()->PathId, lockTxId, errStr)) { @@ -499,14 +509,14 @@ ISubOperation::TPtr DoDropStreamChecks( } void DoDropStream( - const NKikimrSchemeOp::TDropCdcStream& op, - const TOperationId& opId, - const TPath& workingDirPath, - const TPath& tablePath, - const TPath& streamPath, - const TTxId lockTxId, - TOperationContext& context, - TVector& result) + TVector& result, + const NKikimrSchemeOp::TDropCdcStream& op, + const TOperationId& opId, + const TPath& workingDirPath, + const TPath& tablePath, + const TPath& streamPath, + const TTxId lockTxId, + TOperationContext& context) { { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamAtTable); @@ -529,6 +539,14 @@ void DoDropStream( result.push_back(DropLock(NextPartId(opId, result), outTx)); } + if (workingDirPath.IsTableIndex()) { + auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); + outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); + + result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); + } + { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl); outTx.MutableDrop()->SetName(streamPath.Base()->Name); @@ -615,7 +633,7 @@ TVector CreateDropCdcStream(TOperationId opId, const TTxTra TVector result; - DoDropStream(op, opId, workingDirPath, tablePath, streamPath, lockTxId, context, result); + DoDropStream(result, op, opId, workingDirPath, tablePath, streamPath, lockTxId, context); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h index ec4720da71c0..12be7102684c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.h @@ -1,8 +1,8 @@ #pragma once +#include "schemeshard__operation_common.h" #include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths #include "schemeshard__operation_part.h" -#include "schemeshard__operation_common.h" #include "schemeshard_impl.h" #include @@ -23,13 +23,13 @@ ISubOperation::TPtr DoDropStreamChecks( TOperationContext& context); void DoDropStream( + TVector& result, const NKikimrSchemeOp::TDropCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, const TPath& streamPath, const TTxId lockTxId, - TOperationContext& context, - TVector& result); + TOperationContext& context); } // namespace NKikimr::NSchemesShard::NCdc diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp index 9e7ec8ac7e43..e0e882b84d67 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_continuous_backup.cpp @@ -1,8 +1,7 @@ -#include "schemeshard__operation_part.h" #include "schemeshard__operation_common.h" -#include "schemeshard_impl.h" - #include "schemeshard__operation_drop_cdc_stream.h" +#include "schemeshard__operation_part.h" +#include "schemeshard_impl.h" #include @@ -40,7 +39,7 @@ TVector CreateDropContinuousBackup(TOperationId opId, const TVector result; - NCdc::DoDropStream(dropCdcStreamOp, opId, workingDirPath, tablePath, streamPath, InvalidTxId, context, result); + NCdc::DoDropStream(result, dropCdcStreamOp, opId, workingDirPath, tablePath, streamPath, InvalidTxId, context); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp index fb268a8b0619..a0d6ff35513f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp @@ -473,21 +473,22 @@ TVector CreateDropIndex(TOperationId nextId, const TTxTrans result.push_back(CreateDropTableIndex(NextPartId(nextId, result), indexDropping)); } - for (const auto& items: indexPath.Base()->GetChildren()) { - Y_ABORT_UNLESS(context.SS->PathsById.contains(items.second)); - auto implPath = context.SS->PathsById.at(items.second); - if (implPath->Dropped()) { + for (const auto& [childName, childPathId] : indexPath.Base()->GetChildren()) { + TPath child = indexPath.Child(childName); + if (child.IsDeleted()) { continue; } - auto implTable = context.SS->PathsById.at(items.second); - Y_ABORT_UNLESS(implTable->IsTable()); + Y_ABORT_UNLESS(child.Base()->IsTable()); auto implTableDropping = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); auto operation = implTableDropping.MutableDrop(); - operation->SetName(items.first); + operation->SetName(child.LeafName()); result.push_back(CreateDropTable(NextPartId(nextId, result), implTableDropping)); + if (auto reject = CascadeDropTableChildren(result, nextId, child)) { + return {reject}; + } } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp index 5f32acec8c03..c0ef94bbda58 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp @@ -443,102 +443,8 @@ TVector CreateDropIndexedTable(TOperationId nextId, const T TVector result; result.push_back(CreateDropTable(NextPartId(nextId, result), tx)); - - for (const auto& [childName, childPathId] : table.Base()->GetChildren()) { - TPath child = table.Child(childName); - { - TPath::TChecker checks = child.Check(); - checks - .NotEmpty() - .IsResolved(); - - if (checks) { - if (child.IsDeleted()) { - continue; - } - } - - if (child.IsTableIndex()) { - checks.IsTableIndex(); - } else if (child.IsCdcStream()) { - checks.IsCdcStream(); - } else if (child.IsSequence()) { - checks.IsSequence(); - } - - checks.NotDeleted() - .NotUnderDeleting() - .NotUnderOperation(); - - if (!checks) { - return {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; - } - } - Y_ABORT_UNLESS(child.Base()->PathId == childPathId); - - if (child.IsSequence()) { - auto dropSequence = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence); - dropSequence.MutableDrop()->SetName(ToString(child->Name)); - - result.push_back(CreateDropSequence(NextPartId(nextId, result), dropSequence)); - continue; - } else if (child.IsTableIndex()) { - auto dropIndex = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex); - dropIndex.MutableDrop()->SetName(ToString(child.Base()->Name)); - - result.push_back(CreateDropTableIndex(NextPartId(nextId, result), dropIndex)); - } else if (child.IsCdcStream()) { - auto dropStream = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl); - dropStream.MutableDrop()->SetName(ToString(child.Base()->Name)); - - result.push_back(CreateDropCdcStreamImpl(NextPartId(nextId, result), dropStream)); - } - - Y_ABORT_UNLESS(child.Base()->GetChildren().size() == 1); - for (auto& [implName, implPathId] : child.Base()->GetChildren()) { - Y_ABORT_UNLESS(implName == "indexImplTable" || implName == "streamImpl", - "unexpected name %s", implName.c_str()); - - TPath implPath = child.Child(implName); - { - TPath::TChecker checks = implPath.Check(); - checks - .NotEmpty() - .IsResolved() - .NotDeleted() - .NotUnderDeleting() - .NotUnderOperation(); - - if (checks) { - if (implPath.Base()->IsTable()) { - checks - .IsTable() - .IsInsideTableIndexPath(); - } else if (implPath.Base()->IsPQGroup()) { - checks - .IsPQGroup() - .IsInsideCdcStreamPath(); - } - } - - if (!checks) { - return {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; - } - } - Y_ABORT_UNLESS(implPath.Base()->PathId == implPathId); - - if (implPath.Base()->IsTable()) { - auto dropIndexTable = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); - dropIndexTable.MutableDrop()->SetName(ToString(implPath.Base()->Name)); - - result.push_back(CreateDropTable(NextPartId(nextId, result), dropIndexTable)); - } else if (implPath.Base()->IsPQGroup()) { - auto dropPQGroup = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup); - dropPQGroup.MutableDrop()->SetName(ToString(implPath.Base()->Name)); - - result.push_back(CreateDropPQ(NextPartId(nextId, result), dropPQGroup)); - } - } + if (auto reject = CascadeDropTableChildren(result, nextId, table)) { + return {reject}; } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp index c5026592a0ec..8dc9cf01b553 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_lock.cpp @@ -119,9 +119,12 @@ class TDropLock: public TSubOperation { .IsResolved() .NotDeleted() .NotUnderDeleting() - .IsCommonSensePath() .IsLikeDirectory(); + if (checks && !parentPath.IsTableIndex()) { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -134,10 +137,12 @@ class TDropLock: public TSubOperation { checks .IsAtLocalSchemeShard() .IsResolved() - .NotUnderDeleting() - .IsCommonSensePath(); + .NotUnderDeleting(); if (checks) { + if (!parentPath.IsTableIndex()) { + checks.IsCommonSensePath(); + } if (dstPath.IsUnderOperation()) { // may be part of a consistent operation checks.IsUnderTheSameOperation(OperationId.GetTxId()); } else { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp index e06c7c1cca8a..a854295c8da1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.cpp @@ -1,5 +1,6 @@ #include "schemeshard__operation_part.h" #include "schemeshard_impl.h" +#include "schemeshard_path.h" namespace NKikimr::NSchemeShard { @@ -99,4 +100,108 @@ void TSubOperationState::IgnoreMessages(TString debugHint, TSet mgsIds) { MsgToIgnore.swap(mgsIds); } +ISubOperation::TPtr CascadeDropTableChildren(TVector& result, const TOperationId& id, const TPath& table) { + for (const auto& [childName, childPathId] : table.Base()->GetChildren()) { + TPath child = table.Child(childName); + { + TPath::TChecker checks = child.Check(); + checks + .NotEmpty() + .IsResolved(); + + if (checks) { + if (child.IsDeleted()) { + continue; + } + } + + if (child.IsTableIndex()) { + checks.IsTableIndex(); + } else if (child.IsCdcStream()) { + checks.IsCdcStream(); + } else if (child.IsSequence()) { + checks.IsSequence(); + } + + checks.NotDeleted() + .NotUnderDeleting() + .NotUnderOperation(); + + if (!checks) { + return CreateReject(id, checks.GetStatus(), checks.GetError()); + } + } + Y_ABORT_UNLESS(child.Base()->PathId == childPathId); + + if (child.IsSequence()) { + auto dropSequence = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence); + dropSequence.MutableDrop()->SetName(ToString(child->Name)); + + result.push_back(CreateDropSequence(NextPartId(id, result), dropSequence)); + continue; + } else if (child.IsTableIndex()) { + auto dropIndex = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex); + dropIndex.MutableDrop()->SetName(ToString(child.Base()->Name)); + + result.push_back(CreateDropTableIndex(NextPartId(id, result), dropIndex)); + } else if (child.IsCdcStream()) { + auto dropStream = TransactionTemplate(table.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl); + dropStream.MutableDrop()->SetName(ToString(child.Base()->Name)); + + result.push_back(CreateDropCdcStreamImpl(NextPartId(id, result), dropStream)); + } + + Y_ABORT_UNLESS(child.Base()->GetChildren().size() == 1); + for (auto& [implName, implPathId] : child.Base()->GetChildren()) { + Y_ABORT_UNLESS(implName == "indexImplTable" || implName == "streamImpl", + "unexpected name %s", implName.c_str()); + + TPath implPath = child.Child(implName); + { + TPath::TChecker checks = implPath.Check(); + checks + .NotEmpty() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .NotUnderOperation(); + + if (checks) { + if (implPath.Base()->IsTable()) { + checks + .IsTable() + .IsInsideTableIndexPath(); + } else if (implPath.Base()->IsPQGroup()) { + checks + .IsPQGroup() + .IsInsideCdcStreamPath(); + } + } + + if (!checks) { + return CreateReject(id, checks.GetStatus(), checks.GetError()); + } + } + Y_ABORT_UNLESS(implPath.Base()->PathId == implPathId); + + if (implPath.Base()->IsTable()) { + auto dropIndexTable = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); + dropIndexTable.MutableDrop()->SetName(ToString(implPath.Base()->Name)); + + result.push_back(CreateDropTable(NextPartId(id, result), dropIndexTable)); + if (auto reject = CascadeDropTableChildren(result, id, implPath)) { + return reject; + } + } else if (implPath.Base()->IsPQGroup()) { + auto dropPQGroup = TransactionTemplate(child.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup); + dropPQGroup.MutableDrop()->SetName(ToString(implPath.Base()->Name)); + + result.push_back(CreateDropPQ(NextPartId(id, result), dropPQGroup)); + } + } + } + + return nullptr; +} + } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 386dcb20768a..12d67f768e2d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -84,6 +84,7 @@ namespace NKikimr { namespace NSchemeShard { class TSchemeShard; +class TPath; struct TOperationContext { public: @@ -620,5 +621,8 @@ ISubOperation::TPtr CreateAlterResourcePool(TOperationId id, TTxState::ETxState ISubOperation::TPtr CreateDropResourcePool(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropResourcePool(TOperationId id, TTxState::ETxState state); +// returns Reject in case of error, nullptr otherwise +ISubOperation::TPtr CascadeDropTableChildren(TVector& result, const TOperationId& id, const TPath& table); + } } diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp index f45e6190eb5f..0704fbcb5cee 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp @@ -1596,20 +1596,22 @@ bool TPath::IsInsideCdcStreamPath() const { return false; } - ++item; - for (; item != Elements.rend(); ++item) { - if (!(*item)->IsDirectory() && !(*item)->IsSubDomainRoot()) { - return false; - } - } - return true; } -bool TPath::IsTableIndex() const { +bool TPath::IsTableIndex(const TMaybe& type) const { Y_ABORT_UNLESS(IsResolved()); - return Base()->IsTableIndex(); + if (!Base()->IsTableIndex()) { + return false; + } + + if (!type.Defined()) { + return true; + } + + Y_ABORT_UNLESS(SS->Indexes.contains(Base()->PathId)); + return SS->Indexes.at(Base()->PathId)->Type == *type; } bool TPath::IsBackupTable() const { diff --git a/ydb/core/tx/schemeshard/schemeshard_path.h b/ydb/core/tx/schemeshard/schemeshard_path.h index a1474fef7176..4e6aec639db3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.h +++ b/ydb/core/tx/schemeshard/schemeshard_path.h @@ -5,6 +5,8 @@ #include +#include + namespace NKikimr::NSchemeShard { class TSchemeShard; @@ -159,7 +161,7 @@ class TPath { bool AtLocalSchemeShardPath() const; bool IsInsideTableIndexPath() const; bool IsInsideCdcStreamPath() const; - bool IsTableIndex() const; + bool IsTableIndex(const TMaybe& type = {}) const; bool IsBackupTable() const; bool IsAsyncReplicaTable() const; bool IsCdcStream() const; diff --git a/ydb/core/tx/schemeshard/schemeshard_utils.cpp b/ydb/core/tx/schemeshard/schemeshard_utils.cpp index 13642fc1054e..74ce3f4b30bc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_utils.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_utils.cpp @@ -317,6 +317,10 @@ NKikimrSchemeOp::TTableDescription CalcImplTableDesc( result.AddKeyColumnNames(keyName); } + if (indexTableDesc.HasReplicationConfig()) { + result.MutableReplicationConfig()->CopyFrom(indexTableDesc.GetReplicationConfig()); + } + return result; } @@ -384,6 +388,10 @@ NKikimrSchemeOp::TTableDescription CalcImplTableDesc( result.AddKeyColumnNames(keyName); } + if (indexTableDesc.HasReplicationConfig()) { + result.MutableReplicationConfig()->CopyFrom(indexTableDesc.GetReplicationConfig()); + } + return result; } diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index a4e9f6a1b2c2..560b63103c65 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -591,15 +591,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } )", {NKikimrScheme::StatusNameConflict}); - TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( - TableName: "indexImplTable" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - )", {NKikimrScheme::StatusNameConflict}); - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( TableName: "Table" StreamDescription { @@ -617,29 +608,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "StreamWithIndex" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - IndexName: "NotExistedIndex" - )", {NKikimrScheme::StatusSchemeError}); - - TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "StreamWithIndex" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - IndexName: "Index" - )"); - env.TestWaitNotification(runtime, txId); - - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/StreamWithIndex/streamImpl"), {NLs::PathExist}); - TestDropTable(runtime, ++txId, "/MyRoot", "Table"); env.TestWaitNotification(runtime, txId); @@ -1232,6 +1200,266 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } } + Y_UNIT_TEST(StreamOnIndexTableNegative) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(false)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )", {NKikimrScheme::StatusPreconditionFailed}); + } + + Y_UNIT_TEST(StreamOnIndexTable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "SyncIndex" + KeyColumnNames: ["indexed"] + } + IndexDescription { + Name: "AsyncIndex" + KeyColumnNames: ["indexed"] + Type: EIndexTypeGlobalAsync + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(2)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(3)}); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )", {NKikimrScheme::StatusPathDoesNotExist}); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/AsyncIndex", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )", {NKikimrScheme::StatusPreconditionFailed}); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(3)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(4)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {NLs::PathExist}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {NLs::PathExist}); + + TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"( + TableName: "indexImplTable" + StreamName: "Stream" + Disable {} + )", {NKikimrScheme::StatusPathDoesNotExist}); + + TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"( + TableName: "indexImplTable" + StreamName: "Stream" + Disable {} + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(4)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(5)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled), + }); + + TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/UnknownIndex", R"( + TableName: "indexImplTable" + StreamName: "Stream" + )", {NKikimrScheme::StatusPathDoesNotExist}); + + TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"( + TableName: "indexImplTable" + StreamName: "Stream" + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex"), {NLs::PathVersionEqual(5)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable"), {NLs::PathVersionEqual(6)}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {NLs::PathNotExist}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {NLs::PathNotExist}); + } + + Y_UNIT_TEST(StreamOnBuildingIndexTable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + THolder blockedBuildIndexRequest; + auto blockBuildIndexRequest = runtime.AddObserver([&](auto& ev) { + blockedBuildIndexRequest.Reset(ev.Release()); + }); + + AsyncBuildIndex(runtime, ++txId, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", "Index", {"indexed"}); + const auto buildIndexId = txId; + { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&blockedBuildIndexRequest](IEventHandle&) { + return bool(blockedBuildIndexRequest); + }); + runtime.DispatchEvents(opts); + } + blockBuildIndexRequest.Remove(); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )", {NKikimrScheme::StatusMultipleModifications}); + + runtime.Send(blockedBuildIndexRequest.Release(), 0, true); + env.TestWaitNotification(runtime, buildIndexId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + } + + Y_UNIT_TEST(DropIndexWithStream) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDropTableIndex(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + IndexName: "Index" + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + NLs::PathNotExist, + }); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), { + NLs::PathNotExist, + }); + } + + Y_UNIT_TEST(DropTableWithIndexWithStream) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableChangefeedsOnIndexTables(true)); + ui64 txId = 100; + + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDropTable(runtime, ++txId, "/MyRoot", "Table"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + NLs::PathNotExist, + }); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), { + NLs::PathNotExist, + }); + } + } // TCdcStreamTests Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 7444ec4dac15..47209679f222 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -6,40 +6,38 @@ using namespace NSchemeShardUT_Private; -static const TString createTableProto = R"( - Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } - KeyColumnNames: ["key"] -)"; - -static const TString createTableWithIndexProto = R"( - TableDescription { - Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } - KeyColumnNames: ["key"] - } - IndexDescription { - Name: "SyncIndex" - KeyColumnNames: ["value"] - } -)"; - Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { template - void CreateStream(const TMaybe& state = Nothing(), bool vt = false, bool tableWithIndex = false) { + void CreateStream(const TMaybe& state = Nothing(), bool vt = false, bool onIndex = false) { T t; - t.GetTestEnvOptions().EnableChangefeedInitialScan(true); + t.GetTestEnvOptions() + .EnableChangefeedInitialScan(true) + .EnableChangefeedsOnIndexTables(true); t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { { TInactiveZone inactive(activeZone); runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; - if (tableWithIndex) { - TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", createTableWithIndexProto); + if (!onIndex) { + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); } else { - TestCreateTable(runtime, ++t.TxId, "/MyRoot", createTableProto); + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); } t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -58,24 +56,19 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { const bool ok = google::protobuf::TextFormat::PrintToString(streamDesc, &strDesc); UNIT_ASSERT_C(ok, "protobuf serialization failed"); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( - TableName: "Table" + const TString path = !onIndex ? "/MyRoot" : "/MyRoot/Table/Index"; + const TString tableName = !onIndex ? "Table": "indexImplTable"; + + TestCreateCdcStream(runtime, ++t.TxId, path, Sprintf(R"( + TableName: "%s" StreamDescription { %s } - AllIndexes {} - )", strDesc.c_str())); + )", tableName.c_str(), strDesc.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + TestDescribeResult(DescribePrivatePath(runtime, path + "/" + tableName + "/Stream"), { NLs::PathExist, NLs::StreamVirtualTimestamps(vt), }); - - if (tableWithIndex) { - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { - NLs::PathExist, - NLs::StreamVirtualTimestamps(vt), - }); - } }); } @@ -83,15 +76,15 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream(); } - Y_UNIT_TEST_WITH_REBOOTS(CreateStreamTableWithIndex) { - CreateStream(Nothing(), false, true); + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTable) { + CreateStream({}, false, true); } Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReady) { CreateStream(NKikimrSchemeOp::ECdcStreamStateReady); } - Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReadyTableWithIndex) { + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTableExplicitReady) { CreateStream(NKikimrSchemeOp::ECdcStreamStateReady, false, true); } @@ -99,7 +92,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream(NKikimrSchemeOp::ECdcStreamStateScan); } - Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithInitialScanTableWithIndex) { + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTableWithInitialScan) { CreateStream(NKikimrSchemeOp::ECdcStreamStateScan, false, true); } @@ -107,6 +100,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream({}, true); } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamOnIndexTableWithVirtualTimestamps) { + CreateStream({}, true, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithAwsRegion) { T t; t.GetTestEnvOptions().EnableChangefeedDynamoDBStreamsFormat(true); @@ -293,21 +290,41 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { } template - void DropStream(const TMaybe& state = Nothing()) { + void DropStream(const TMaybe& state = Nothing(), bool onIndex = false) { T t; - t.GetTestEnvOptions().EnableChangefeedInitialScan(true); + t.GetTestEnvOptions() + .EnableChangefeedInitialScan(true) + .EnableChangefeedsOnIndexTables(true); t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + const TString path = !onIndex ? "/MyRoot" : "/MyRoot/Table/Index"; + const TString tableName = !onIndex ? "Table": "indexImplTable"; + { TInactiveZone inactive(activeZone); runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; - TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( - Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } - KeyColumnNames: ["key"] - )"); + if (!onIndex) { + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + } else { + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + } + )"); + } t.TestEnv->TestWaitNotification(runtime, t.TxId); NKikimrSchemeOp::TCdcStreamDescription streamDesc; @@ -323,20 +340,20 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { const bool ok = google::protobuf::TextFormat::PrintToString(streamDesc, &strDesc); UNIT_ASSERT_C(ok, "protobuf serialization failed"); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( - TableName: "Table" + TestCreateCdcStream(runtime, ++t.TxId, path, Sprintf(R"( + TableName: "%s" StreamDescription { %s } - )", strDesc.c_str())); + )", tableName.c_str(), strDesc.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId); } - TestDropCdcStream(runtime, ++t.TxId, "/MyRoot", R"( - TableName: "Table" + TestDropCdcStream(runtime, ++t.TxId, path, Sprintf(R"( + TableName: "%s" StreamName: "Stream" - )"); + )", tableName.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {NLs::PathNotExist}); + TestDescribeResult(DescribePrivatePath(runtime, path + "/" + tableName + "/Stream"), {NLs::PathNotExist}); }); } @@ -344,14 +361,26 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { DropStream(); } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamOnIndexTable) { + DropStream({}, true); + } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamExplicitReady) { DropStream(NKikimrSchemeOp::ECdcStreamStateReady); } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamOnIndexTableExplicitReady) { + DropStream(NKikimrSchemeOp::ECdcStreamStateReady, true); + } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamCreatedWithInitialScan) { DropStream(NKikimrSchemeOp::ECdcStreamStateScan); } + Y_UNIT_TEST_WITH_REBOOTS(DropStreamOnIndexTableCreatedWithInitialScan) { + DropStream(NKikimrSchemeOp::ECdcStreamStateScan, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateDropRecreate) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index afef5e0ffab5..a4f0b3a544d7 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -541,6 +541,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnableServerlessExclusiveDynamicNodes(opts.EnableServerlessExclusiveDynamicNodes_); app.SetEnableAddColumsWithDefaults(opts.EnableAddColumsWithDefaults_); app.SetEnableReplaceIfExistsForExternalEntities(opts.EnableReplaceIfExistsForExternalEntities_); + app.SetEnableChangefeedsOnIndexTables(opts.EnableChangefeedsOnIndexTables_); app.ColumnShardConfig.SetDisabledOnSchemeShard(false); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index 36a9d36888cb..388b50caa579 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -65,6 +65,7 @@ namespace NSchemeShardUT_Private { OPTION(std::optional, EnableAddColumsWithDefaults, std::nullopt); OPTION(std::optional, EnableReplaceIfExistsForExternalEntities, std::nullopt); OPTION(std::optional, GraphBackendType, std::nullopt); + OPTION(std::optional, EnableChangefeedsOnIndexTables, std::nullopt); #undef OPTION }; diff --git a/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp b/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp index 8817995f8881..a9efb04c0fda 100644 --- a/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp +++ b/ydb/core/tx/schemeshard/ut_replication/ut_replication.cpp @@ -347,6 +347,52 @@ Y_UNIT_TEST_SUITE(TReplicationTests) { }); } + Y_UNIT_TEST(AlterReplicatedIndexTable) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + AsyncSend(runtime, TTestTxConfig::SchemeShard, InternalTransaction(CreateIndexedTableRequest(++txId, "/MyRoot", R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "indexed" Type: "Uint64" } + KeyColumnNames: ["key"] + ReplicationConfig { + Mode: REPLICATION_MODE_READ_ONLY + } + } + IndexDescription { + Name: "Index" + KeyColumnNames: ["indexed"] + IndexImplTableDescription { + ReplicationConfig { + Mode: REPLICATION_MODE_READ_ONLY + } + } + } + )"))); + TestModificationResults(runtime, txId, {NKikimrScheme::StatusAccepted}); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable"), { + NLs::ReplicationMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY), + }); + + AsyncSend(runtime, TTestTxConfig::SchemeShard, InternalTransaction(AlterTableRequest(++txId, "/MyRoot/Table/Index", R"( + Name: "indexImplTable" + ReplicationConfig { + Mode: REPLICATION_MODE_NONE + } + )"))); + TestModificationResults(runtime, txId, {NKikimrScheme::StatusAccepted}); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable"), { + NLs::ReplicationMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE), + }); + } + Y_UNIT_TEST(CopyReplicatedTable) { TTestBasicRuntime runtime; TTestEnv env(runtime); diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 999840e630df..979b59f6fc60 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -166,7 +166,8 @@ bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NK return true; } -bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles, +bool BuildAlterTableModifyScheme(const TString& path, const Ydb::Table::AlterTableRequest* req, + NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles, const TPathId& resolvedPathId, Ydb::StatusIds::StatusCode& code, TString& error) { @@ -187,7 +188,7 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki const auto OpType = *ops.begin(); try { - pathPair = SplitPathIntoWorkingDirAndName(req->path()); + pathPair = SplitPathIntoWorkingDirAndName(path); } catch (const std::exception&) { code = Ydb::StatusIds::BAD_REQUEST; return false; @@ -230,7 +231,7 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki for(const auto& rename: req->rename_indexes()) { modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex); auto& alter = *modifyScheme->MutableMoveIndex(); - alter.SetTablePath(req->path()); + alter.SetTablePath(path); alter.SetSrcPath(rename.source_name()); alter.SetDstPath(rename.destination_name()); alter.SetAllowOverwrite(rename.replace_destination()); @@ -366,6 +367,11 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki return true; } +bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, + const TTableProfiles& profiles, const TPathId& resolvedPathId, Ydb::StatusIds::StatusCode& code, TString& error) +{ + return BuildAlterTableModifyScheme(req->path(), req, modifyScheme, profiles, resolvedPathId, code, error); +} template static Ydb::Type* AddColumn(Ydb::Table::ColumnMeta* newColumn, const TColumn& column) { diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 1c1a3b761f0a..d76722e58446 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -31,6 +31,9 @@ struct TPathId; THashSet GetAlterOperationKinds(const Ydb::Table::AlterTableRequest* req); +bool BuildAlterTableModifyScheme(const TString& path, const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, + const TTableProfiles& profiles, const TPathId& resolvedPathId, + Ydb::StatusIds::StatusCode& status, TString& error); bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles, const TPathId& resolvedPathId, Ydb::StatusIds::StatusCode& status, TString& error);