Skip to content

Commit

Permalink
The consumer's generation number is not stored in the transaction (yd…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov committed Sep 23, 2024
1 parent 16c4b26 commit 46cdd05
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 22 deletions.
56 changes: 34 additions & 22 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,8 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
{
Config = newConfig;

PQ_LOG_D("Apply new config " << Config.ShortDebugString());

ui32 cacheSize = CACHE_SIZE;
if (Config.HasCacheSize()) {
cacheSize = Config.GetCacheSize();
Expand Down Expand Up @@ -1630,6 +1632,32 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
Y_ABORT_UNLESS(topicConverter->IsValid(), "%s", topicConverter->GetReason().c_str());
}

void TPersQueue::UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const
{
Y_ABORT_UNLESS(cfg.HasVersion());
const int curConfigVersion = cfg.GetVersion();

// set rr generation for provided read rules
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
for (const auto& c : Config.GetConsumers()) {
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
}

for (auto& c : *cfg.MutableConsumers()) {
auto it = existed.find(c.GetName());
ui64 generation = 0;
if (it != existed.end() && it->second.first == c.GetVersion()) {
generation = it->second.second;
} else {
generation = curConfigVersion;
}
c.SetGeneration(generation);
if (ReadRuleCompatible()) {
cfg.AddReadRuleGenerations(generation);
}
}
}

void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
{
const auto& record = ev->GetRecord();
Expand All @@ -1642,7 +1670,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
NKikimrPQ::TPQTabletConfig cfg = record.GetTabletConfig();

Y_ABORT_UNLESS(cfg.HasVersion());
int curConfigVersion = cfg.GetVersion();
const int curConfigVersion = cfg.GetVersion();

if (curConfigVersion == oldConfigVersion) { //already applied
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
Expand Down Expand Up @@ -1741,27 +1769,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf

Migrate(cfg);

// set rr generation for provided read rules
{
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
for (const auto& c : Config.GetConsumers()) {
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
}

for (auto& c : *cfg.MutableConsumers()) {
auto it = existed.find(c.GetName());
ui64 generation = 0;
if (it != existed.end() && it->second.first == c.GetVersion()) {
generation = it->second.second;
} else {
generation = curConfigVersion;
}
c.SetGeneration(generation);
if (ReadRuleCompatible()) {
cfg.AddReadRuleGenerations(generation);
}
}
}
UpdateReadRuleGenerations(cfg);

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
<< " Config update version " << cfg.GetVersion() << "(current " << Config.GetVersion() << ") received from actor " << sender
Expand Down Expand Up @@ -3727,6 +3735,10 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
tx.OnProposeTransaction(event, GetAllowedStep(),
TabletID());

if (tx.Kind == NKikimrPQ::TTransaction::KIND_CONFIG) {
UpdateReadRuleGenerations(tx.TabletConfig);
}

if (tx.WriteId.Defined()) {
const TWriteId& writeId = *tx.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId),
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {

bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const;
void DeleteWriteId(const TMaybe<TWriteId>& writeId);

void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const;
};


Expand Down
116 changes: 116 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/ut_utils.h>
#include <ydb/core/cms/console/console.h>
#include <ydb/core/keyvalue/keyvalue_events.h>
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/blob.h>
Expand Down Expand Up @@ -37,8 +38,14 @@ class TFixture : public NUnitTest::TBaseFixture {
void Write(const TString& message, NTable::TTransaction* tx = nullptr);
};

struct TFeatureFlags {
bool EnablePQConfigTransactionsAtSchemeShard = true;
};

void SetUp(NUnitTest::TTestContext&) override;

void NotifySchemeShard(const TFeatureFlags& flags);

NTable::TSession CreateTableSession();
NTable::TTransaction BeginTx(NTable::TSession& session);
void CommitTx(NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS);
Expand All @@ -62,6 +69,8 @@ class TFixture : public NUnitTest::TBaseFixture {
std::optional<size_t> maxPartitionCount = std::nullopt);
void DescribeTopic(const TString& path);

void AddConsumer(const TString& topic, const TVector<TString>& consumers);

void WriteToTopicWithInvalidTxId(bool invalidTxId);

TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath,
Expand Down Expand Up @@ -95,6 +104,8 @@ class TFixture : public NUnitTest::TBaseFixture {
NYdb::EStatus status);
void CloseTopicWriteSession(const TString& topicPath,
const TString& messageGroupId);
void CloseTopicReadSession(const TString& topicPath,
const TString& consumerName);

enum EEndOfTransaction {
Commit,
Expand Down Expand Up @@ -175,6 +186,8 @@ class TFixture : public NUnitTest::TBaseFixture {
ui64 tabletId,
const NPQ::TWriteId& writeId);

ui64 GetSchemeShardTabletId(const TActorId& actorId);

std::unique_ptr<TTopicSdkTestSetup> Setup;
std::unique_ptr<TDriver> Driver;

Expand All @@ -192,11 +205,27 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
{
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
settings.SetEnableTopicServiceTx(true);

Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);

Driver = std::make_unique<TDriver>(Setup->MakeDriver());
}

void TFixture::NotifySchemeShard(const TFeatureFlags& flags)
{
auto request = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
*request->Record.MutableConfig() = *Setup->GetServer().ServerSettings.AppConfig;
request->Record.MutableConfig()->MutableFeatureFlags()->SetEnablePQConfigTransactionsAtSchemeShard(flags.EnablePQConfigTransactionsAtSchemeShard);

auto& runtime = Setup->GetRuntime();
auto actorId = runtime.AllocateEdgeActor();

ui64 ssId = GetSchemeShardTabletId(actorId);

runtime.SendToPipe(ssId, actorId, request.release());
runtime.GrabEdgeEvent<NConsole::TEvConsole::TEvConfigNotificationResponse>();
}

NTable::TSession TFixture::CreateTableSession()
{
NTable::TTableClient client(GetDriver());
Expand Down Expand Up @@ -323,6 +352,20 @@ void TFixture::CreateTopic(const TString& path,
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
}

void TFixture::AddConsumer(const TString& path,
const TVector<TString>& consumers)
{
NTopic::TTopicClient client(GetDriver());
NTopic::TAlterTopicSettings settings;

for (const auto& consumer : consumers) {
settings.BeginAddConsumer(consumer);
}

auto result = client.AlterTopic(path, settings).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

void TFixture::DescribeTopic(const TString& path)
{
Setup->DescribeTopic(path);
Expand Down Expand Up @@ -645,6 +688,13 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath,
TopicWriteSessions.erase(key);
}

void TFixture::CloseTopicReadSession(const TString& topicPath,
const TString& consumerName)
{
Y_UNUSED(consumerName);
TopicReadSessions.erase(topicPath);
}

void TFixture::WriteToTopic(const TString& topicPath,
const TString& messageGroupId,
const TString& message,
Expand Down Expand Up @@ -763,6 +813,37 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
UNIT_ASSERT(context.AckCount <= context.WriteCount);
}

ui64 TFixture::GetSchemeShardTabletId(const TActorId& actorId)
{
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
navigate->DatabaseName = "/Root";

NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = SplitPath("/Root");
entry.SyncVersion = true;
entry.ShowPrivatePath = true;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;

navigate->ResultSet.push_back(std::move(entry));
//navigate->UserToken = "root@builtin";
navigate->Cookie = 12345;

auto& runtime = Setup->GetRuntime();

runtime.Send(MakeSchemeCacheID(), actorId,
new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()),
0,
true);
auto response = runtime.GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();

UNIT_ASSERT_VALUES_EQUAL(response->Request->Cookie, 12345);
UNIT_ASSERT_VALUES_EQUAL(response->Request->ErrorCount, 0);

auto& front = response->Request->ResultSet.front();

return front.Self->Info.GetSchemeshardId();
}

ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition)
{
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
Expand Down Expand Up @@ -1998,6 +2079,41 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
{
// There was a server
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false});

// Users have created their own topic on it
CreateTopic(TEST_TOPIC);

// And they wrote their messages into it
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-1");
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-2");
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-3");

// And he had a consumer
AddConsumer(TEST_TOPIC, {"consumer-1"});

// We read messages from the topic and committed offsets
auto messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
CloseTopicReadSession(TEST_TOPIC, "consumer-1");

// And then the Logbroker team turned on the feature flag
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true});

// Users continued to write to the topic
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4");

// Users have added new consumers
AddConsumer(TEST_TOPIC, {"consumer-2"});

// And they wanted to continue reading their messages
messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
}

}

}

0 comments on commit 46cdd05

Please sign in to comment.