diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 7d6330148c0a..178315afa537 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -715,6 +715,8 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig, { Config = newConfig; + PQ_LOG_D("Apply new config " << Config.ShortDebugString()); + ui32 cacheSize = CACHE_SIZE; if (Config.HasCacheSize()) { cacheSize = Config.GetCacheSize(); @@ -1630,6 +1632,32 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config, Y_ABORT_UNLESS(topicConverter->IsValid(), "%s", topicConverter->GetReason().c_str()); } +void TPersQueue::UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const +{ + Y_ABORT_UNLESS(cfg.HasVersion()); + const int curConfigVersion = cfg.GetVersion(); + + // set rr generation for provided read rules + THashMap> existed; // map name -> rrVersion, rrGeneration + for (const auto& c : Config.GetConsumers()) { + existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration()); + } + + for (auto& c : *cfg.MutableConsumers()) { + auto it = existed.find(c.GetName()); + ui64 generation = 0; + if (it != existed.end() && it->second.first == c.GetVersion()) { + generation = it->second.second; + } else { + generation = curConfigVersion; + } + c.SetGeneration(generation); + if (ReadRuleCompatible()) { + cfg.AddReadRuleGenerations(generation); + } + } +} + void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr ev, const TActorId& sender, const TActorContext& ctx) { const auto& record = ev->GetRecord(); @@ -1642,7 +1670,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr> existed; // map name -> rrVersion, rrGeneration - for (const auto& c : Config.GetConsumers()) { - existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration()); - } - - for (auto& c : *cfg.MutableConsumers()) { - auto it = existed.find(c.GetName()); - ui64 generation = 0; - if (it != existed.end() && it->second.first == c.GetVersion()) { - generation = it->second.second; - } else { - generation = curConfigVersion; - } - c.SetGeneration(generation); - if (ReadRuleCompatible()) { - cfg.AddReadRuleGenerations(generation); - } - } - } + UpdateReadRuleGenerations(cfg); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Config update version " << cfg.GetVersion() << "(current " << Config.GetVersion() << ") received from actor " << sender @@ -3727,6 +3735,10 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) tx.OnProposeTransaction(event, GetAllowedStep(), TabletID()); + if (tx.Kind == NKikimrPQ::TTransaction::KIND_CONFIG) { + UpdateReadRuleGenerations(tx.TabletConfig); + } + if (tx.WriteId.Defined()) { const TWriteId& writeId = *tx.WriteId; Y_ABORT_UNLESS(TxWrites.contains(writeId), diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 1b57caaf9303..e7fabe310422 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -530,6 +530,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe& writeId) const; void DeleteWriteId(const TMaybe& writeId); + + void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const; }; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 5618ade5e90a..0379754e32ba 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -37,8 +38,14 @@ class TFixture : public NUnitTest::TBaseFixture { void Write(const TString& message, NTable::TTransaction* tx = nullptr); }; + struct TFeatureFlags { + bool EnablePQConfigTransactionsAtSchemeShard = true; + }; + void SetUp(NUnitTest::TTestContext&) override; + void NotifySchemeShard(const TFeatureFlags& flags); + NTable::TSession CreateTableSession(); NTable::TTransaction BeginTx(NTable::TSession& session); void CommitTx(NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS); @@ -62,6 +69,8 @@ class TFixture : public NUnitTest::TBaseFixture { std::optional maxPartitionCount = std::nullopt); void DescribeTopic(const TString& path); + void AddConsumer(const TString& topic, const TVector& consumers); + void WriteToTopicWithInvalidTxId(bool invalidTxId); TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath, @@ -95,6 +104,8 @@ class TFixture : public NUnitTest::TBaseFixture { NYdb::EStatus status); void CloseTopicWriteSession(const TString& topicPath, const TString& messageGroupId); + void CloseTopicReadSession(const TString& topicPath, + const TString& consumerName); enum EEndOfTransaction { Commit, @@ -175,6 +186,8 @@ class TFixture : public NUnitTest::TBaseFixture { ui64 tabletId, const NPQ::TWriteId& writeId); + ui64 GetSchemeShardTabletId(const TActorId& actorId); + std::unique_ptr Setup; std::unique_ptr Driver; @@ -192,11 +205,27 @@ void TFixture::SetUp(NUnitTest::TTestContext&) { NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetEnableTopicServiceTx(true); + Setup = std::make_unique(TEST_CASE_NAME, settings); Driver = std::make_unique(Setup->MakeDriver()); } +void TFixture::NotifySchemeShard(const TFeatureFlags& flags) +{ + auto request = std::make_unique(); + *request->Record.MutableConfig() = *Setup->GetServer().ServerSettings.AppConfig; + request->Record.MutableConfig()->MutableFeatureFlags()->SetEnablePQConfigTransactionsAtSchemeShard(flags.EnablePQConfigTransactionsAtSchemeShard); + + auto& runtime = Setup->GetRuntime(); + auto actorId = runtime.AllocateEdgeActor(); + + ui64 ssId = GetSchemeShardTabletId(actorId); + + runtime.SendToPipe(ssId, actorId, request.release()); + runtime.GrabEdgeEvent(); +} + NTable::TSession TFixture::CreateTableSession() { NTable::TTableClient client(GetDriver()); @@ -323,6 +352,20 @@ void TFixture::CreateTopic(const TString& path, Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount); } +void TFixture::AddConsumer(const TString& path, + const TVector& consumers) +{ + NTopic::TTopicClient client(GetDriver()); + NTopic::TAlterTopicSettings settings; + + for (const auto& consumer : consumers) { + settings.BeginAddConsumer(consumer); + } + + auto result = client.AlterTopic(path, settings).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); +} + void TFixture::DescribeTopic(const TString& path) { Setup->DescribeTopic(path); @@ -645,6 +688,13 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath, TopicWriteSessions.erase(key); } +void TFixture::CloseTopicReadSession(const TString& topicPath, + const TString& consumerName) +{ + Y_UNUSED(consumerName); + TopicReadSessions.erase(topicPath); +} + void TFixture::WriteToTopic(const TString& topicPath, const TString& messageGroupId, const TString& message, @@ -763,6 +813,37 @@ void TFixture::WaitForSessionClose(const TString& topicPath, UNIT_ASSERT(context.AckCount <= context.WriteCount); } +ui64 TFixture::GetSchemeShardTabletId(const TActorId& actorId) +{ + auto navigate = std::make_unique(); + navigate->DatabaseName = "/Root"; + + NSchemeCache::TSchemeCacheNavigate::TEntry entry; + entry.Path = SplitPath("/Root"); + entry.SyncVersion = true; + entry.ShowPrivatePath = true; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; + + navigate->ResultSet.push_back(std::move(entry)); + //navigate->UserToken = "root@builtin"; + navigate->Cookie = 12345; + + auto& runtime = Setup->GetRuntime(); + + runtime.Send(MakeSchemeCacheID(), actorId, + new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()), + 0, + true); + auto response = runtime.GrabEdgeEvent(); + + UNIT_ASSERT_VALUES_EQUAL(response->Request->Cookie, 12345); + UNIT_ASSERT_VALUES_EQUAL(response->Request->ErrorCount, 0); + + auto& front = response->Request->ResultSet.front(); + + return front.Self->Info.GetSchemeshardId(); +} + ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition) { auto navigate = std::make_unique(); @@ -1998,6 +2079,41 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture) WriteMessagesInTx(0, 1); } +Y_UNIT_TEST_F(ReadRuleGeneration, TFixture) +{ + // There was a server + NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false}); + + // Users have created their own topic on it + CreateTopic(TEST_TOPIC); + + // And they wrote their messages into it + WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-1"); + WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-2"); + WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-3"); + + // And he had a consumer + AddConsumer(TEST_TOPIC, {"consumer-1"}); + + // We read messages from the topic and committed offsets + auto messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3); + CloseTopicReadSession(TEST_TOPIC, "consumer-1"); + + // And then the Logbroker team turned on the feature flag + NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true}); + + // Users continued to write to the topic + WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4"); + + // Users have added new consumers + AddConsumer(TEST_TOPIC, {"consumer-2"}); + + // And they wanted to continue reading their messages + messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1); +} + } }