diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index 48aa0db04e9b..92533d7befde 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -24,13 +24,20 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBaseState != TShardState::Ready && !Self->IsReplicated()) { + if (Self->State != TShardState::Ready) { Result = MakeHolder( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_WRONG_STATE); return true; } + if (!Self->IsReplicated()) { + Result = MakeHolder( + NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST); + return true; + } + const auto& msg = Ev->Get()->Record; const auto& tableId = msg.GetTableId(); diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp index e72f6203b329..b0395077e915 100644 --- a/ydb/core/tx/datashard/datashard_ut_replication.cpp +++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp @@ -281,6 +281,29 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { ); } + Y_UNIT_TEST(ApplyChangesToCommonTable) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()); + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + auto tableId = ResolveTableId(server, sender, "/Root/table-1"); + + ApplyChanges(server, shards.at(0), tableId, "my-source", { + TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 }, + }, NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/replication/controller/dst_creator.cpp b/ydb/core/tx/replication/controller/dst_creator.cpp index 7fa0132512d2..fe7ab716806a 100644 --- a/ydb/core/tx/replication/controller/dst_creator.cpp +++ b/ydb/core/tx/replication/controller/dst_creator.cpp @@ -112,7 +112,8 @@ class TDstCreator: public TActorBootstrapped { if (bootstrap) { GetTableProfiles(); } else { - Send(YdbProxy, new TEvYdbProxy::TEvDescribeTableRequest(SrcPath, {})); + Send(YdbProxy, new TEvYdbProxy::TEvDescribeTableRequest(SrcPath, NYdb::NTable::TDescribeTableSettings() + .WithKeyShardBoundary(true))); } break; } diff --git a/ydb/core/tx/replication/controller/dst_creator_ut.cpp b/ydb/core/tx/replication/controller/dst_creator_ut.cpp index c54e59393ac7..f750e9c7cc08 100644 --- a/ydb/core/tx/replication/controller/dst_creator_ut.cpp +++ b/ydb/core/tx/replication/controller/dst_creator_ut.cpp @@ -80,6 +80,7 @@ Y_UNIT_TEST_SUITE(DstCreator) { }, .ReplicationConfig = Nothing(), })); + env.GetRuntime().Register(CreateDstCreator( env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"), 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replicated" @@ -93,6 +94,38 @@ Y_UNIT_TEST_SUITE(DstCreator) { UNIT_ASSERT_VALUES_EQUAL(replicatedSelf.GetOwner(), "user@builtin"); } + Y_UNIT_TEST(SamePartitionCount) { + TEnv env; + env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); + + env.CreateTable("/Root", *MakeTableDescription({ + .Name = "Table", + .KeyColumns = {"key"}, + .Columns = { + {.Name = "key", .Type = "Uint32"}, + {.Name = "value", .Type = "Utf8"}, + }, + .ReplicationConfig = Nothing(), + .UniformPartitions = 2, + })); + + env.GetRuntime().Register(CreateDstCreator( + env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"), + 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replicated" + )); + + auto ev = env.GetRuntime().GrabEdgeEvent(env.GetSender()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess); + + auto originalDesc = env.GetDescription("/Root/Table"); + const auto& originalTable = originalDesc.GetPathDescription(); + UNIT_ASSERT_VALUES_EQUAL(originalTable.TablePartitionsSize(), 2); + + auto replicatedDesc = env.GetDescription("/Root/Replicated"); + const auto& replicatedTable = replicatedDesc.GetPathDescription(); + UNIT_ASSERT_VALUES_EQUAL(originalTable.TablePartitionsSize(), replicatedTable.TablePartitionsSize()); + } + Y_UNIT_TEST(NonExistentSrc) { TEnv env; env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp index 345c9f19c632..e80919c2c31c 100644 --- a/ydb/core/tx/replication/service/table_writer.cpp +++ b/ydb/core/tx/replication/service/table_writer.cpp @@ -93,7 +93,7 @@ class TTablePartitionWriter: public TActorBootstrapped { event->Record.SetSource(source); } - Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, false)); + Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, true, ++SubscribeCookie)); Become(&TThis::StateWaitingStatus); } @@ -117,7 +117,11 @@ class TTablePartitionWriter: public TActorBootstrapped { << ": status# " << static_cast(record.GetStatus()) << ", reason# " << static_cast(record.GetReason()) << ", error# " << record.GetErrorDescription()); - return Leave(IsHardError(record.GetReason())); + if (IsHardError(record.GetReason())) { + return Leave(true); + } else { + return DelayedLeave(); + } } } @@ -133,11 +137,16 @@ class TTablePartitionWriter: public TActorBootstrapped { } void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { - if (TabletId == ev->Get()->TabletId) { - Leave(); + if (TabletId == ev->Get()->TabletId && ev->Cookie == SubscribeCookie) { + DelayedLeave(); } } + void DelayedLeave() { + static constexpr TDuration delay = TDuration::MilliSeconds(50); + this->Schedule(delay, new TEvents::TEvWakeup()); + } + void Leave(bool hardError = false) { LOG_I("Leave" << ": hard error# " << hardError); @@ -177,6 +186,7 @@ class TTablePartitionWriter: public TActorBootstrapped { STATEFN(StateBase) { switch (ev->GetTypeRewrite()) { hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + sFunc(TEvents::TEvWakeup, Leave); sFunc(TEvents::TEvPoison, PassAway); } } @@ -188,6 +198,7 @@ class TTablePartitionWriter: public TActorBootstrapped { mutable TMaybe LogPrefix; TActorId LeaderPipeCache; + ui64 SubscribeCookie = 0; TMemoryPool MemoryPool; }; // TTablePartitionWriter diff --git a/ydb/core/tx/replication/ut_helpers/test_table.cpp b/ydb/core/tx/replication/ut_helpers/test_table.cpp index 8460f97181bd..e77b6b5a4a16 100644 --- a/ydb/core/tx/replication/ut_helpers/test_table.cpp +++ b/ydb/core/tx/replication/ut_helpers/test_table.cpp @@ -57,6 +57,10 @@ void TTestTableDescription::SerializeTo(NKikimrSchemeOp::TTableDescription& prot if (ReplicationConfig) { ReplicationConfig->SerializeTo(*proto.MutableReplicationConfig()); } + + if (UniformPartitions) { + proto.SetUniformPartitionsCount(*UniformPartitions); + } } THolder MakeTableDescription(const TTestTableDescription& desc) { diff --git a/ydb/core/tx/replication/ut_helpers/test_table.h b/ydb/core/tx/replication/ut_helpers/test_table.h index ea7083aec960..1df27de901d1 100644 --- a/ydb/core/tx/replication/ut_helpers/test_table.h +++ b/ydb/core/tx/replication/ut_helpers/test_table.h @@ -44,6 +44,7 @@ struct TTestTableDescription { TVector KeyColumns; TVector Columns; TMaybe ReplicationConfig = TReplicationConfig::Default(); + TMaybe UniformPartitions = Nothing(); void SerializeTo(NKikimrSchemeOp::TTableDescription& proto) const; }; diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index 88a519410dcf..82387a20781d 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -2395,7 +2395,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { Y_UNIT_TEST(AlterTableAddIndexWithIsNotSupported) { ExpectFailWithError("USE plato; ALTER TABLE table ADD INDEX idx LOCAL WITH (a=b, c=d, e=f) ON (col)", - "
:1:40: Error: local: alternative is not implemented yet: 714:7: local_index\n"); + "
:1:40: Error: local: alternative is not implemented yet: 715:7: local_index\n"); } Y_UNIT_TEST(OptionalAliases) {