Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The consumer's generation number is not stored in the transaction #9590

Merged
merged 3 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 32 additions & 20 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,8 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
{
Config = newConfig;

PQ_LOG_D("Apply new config " << Config.ShortDebugString());

ui32 cacheSize = CACHE_SIZE;
if (Config.HasCacheSize()) {
cacheSize = Config.GetCacheSize();
Expand Down Expand Up @@ -1630,6 +1632,30 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
Y_ABORT_UNLESS(topicConverter->IsValid(), "%s", topicConverter->GetReason().c_str());
}

void TPersQueue::UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const
{
Y_ABORT_UNLESS(cfg.HasVersion());
const int curConfigVersion = cfg.GetVersion();

// set rr generation for provided read rules
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
for (const auto& c : Config.GetConsumers()) {
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
}

for (auto& c : *cfg.MutableConsumers()) {
auto it = existed.find(c.GetName());
ui64 generation = 0;
if (it != existed.end() && it->second.first == c.GetVersion()) {
generation = it->second.second;
} else {
generation = curConfigVersion;
}
c.SetGeneration(generation);
cfg.AddReadRuleGenerations(generation);
}
}

void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
{
const auto& record = ev->GetRecord();
Expand All @@ -1642,7 +1668,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
NKikimrPQ::TPQTabletConfig cfg = record.GetTabletConfig();

Y_ABORT_UNLESS(cfg.HasVersion());
int curConfigVersion = cfg.GetVersion();
const int curConfigVersion = cfg.GetVersion();

if (curConfigVersion == oldConfigVersion) { //already applied
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
Expand Down Expand Up @@ -1741,25 +1767,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf

Migrate(cfg);

// set rr generation for provided read rules
{
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
for (const auto& c : Config.GetConsumers()) {
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
}

for (auto& c : *cfg.MutableConsumers()) {
auto it = existed.find(c.GetName());
ui64 generation = 0;
if (it != existed.end() && it->second.first == c.GetVersion()) {
generation = it->second.second;
} else {
generation = curConfigVersion;
}
c.SetGeneration(generation);
cfg.AddReadRuleGenerations(generation);
}
}
UpdateReadRuleGenerations(cfg);

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
<< " Config update version " << cfg.GetVersion() << "(current " << Config.GetVersion() << ") received from actor " << sender
Expand Down Expand Up @@ -3725,6 +3733,10 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
tx.OnProposeTransaction(event, GetAllowedStep(),
TabletID());

if (tx.Kind == NKikimrPQ::TTransaction::KIND_CONFIG) {
UpdateReadRuleGenerations(tx.TabletConfig);
}

if (tx.WriteId.Defined()) {
const TWriteId& writeId = *tx.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId),
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {

bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const;
void DeleteWriteId(const TMaybe<TWriteId>& writeId);

void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const;
};


Expand Down
116 changes: 116 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/ut_utils.h>
#include <ydb/core/cms/console/console.h>
#include <ydb/core/keyvalue/keyvalue_events.h>
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/blob.h>
Expand Down Expand Up @@ -42,8 +43,14 @@ class TFixture : public NUnitTest::TBaseFixture {
void WaitForEvent();
};

struct TFeatureFlags {
bool EnablePQConfigTransactionsAtSchemeShard = true;
};

void SetUp(NUnitTest::TTestContext&) override;

void NotifySchemeShard(const TFeatureFlags& flags);

NTable::TSession CreateTableSession();
NTable::TTransaction BeginTx(NTable::TSession& session);
void CommitTx(NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS);
Expand All @@ -67,6 +74,8 @@ class TFixture : public NUnitTest::TBaseFixture {
std::optional<size_t> maxPartitionCount = std::nullopt);
void DescribeTopic(const TString& path);

void AddConsumer(const TString& topic, const TVector<TString>& consumers);

void WriteToTopicWithInvalidTxId(bool invalidTxId);

TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath,
Expand Down Expand Up @@ -101,6 +110,8 @@ class TFixture : public NUnitTest::TBaseFixture {
NYdb::EStatus status);
void CloseTopicWriteSession(const TString& topicPath,
const TString& messageGroupId);
void CloseTopicReadSession(const TString& topicPath,
const TString& consumerName);

enum EEndOfTransaction {
Commit,
Expand Down Expand Up @@ -181,6 +192,8 @@ class TFixture : public NUnitTest::TBaseFixture {
ui64 tabletId,
const NPQ::TWriteId& writeId);

ui64 GetSchemeShardTabletId(const TActorId& actorId);

std::unique_ptr<TTopicSdkTestSetup> Setup;
std::unique_ptr<TDriver> Driver;

Expand All @@ -198,11 +211,27 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
{
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
settings.SetEnableTopicServiceTx(true);

Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);

Driver = std::make_unique<TDriver>(Setup->MakeDriver());
}

void TFixture::NotifySchemeShard(const TFeatureFlags& flags)
{
auto request = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
*request->Record.MutableConfig() = *Setup->GetServer().ServerSettings.AppConfig;
request->Record.MutableConfig()->MutableFeatureFlags()->SetEnablePQConfigTransactionsAtSchemeShard(flags.EnablePQConfigTransactionsAtSchemeShard);

auto& runtime = Setup->GetRuntime();
auto actorId = runtime.AllocateEdgeActor();

ui64 ssId = GetSchemeShardTabletId(actorId);

runtime.SendToPipe(ssId, actorId, request.release());
runtime.GrabEdgeEvent<NConsole::TEvConsole::TEvConfigNotificationResponse>();
}

NTable::TSession TFixture::CreateTableSession()
{
NTable::TTableClient client(GetDriver());
Expand Down Expand Up @@ -329,6 +358,20 @@ void TFixture::CreateTopic(const TString& path,
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
}

void TFixture::AddConsumer(const TString& path,
const TVector<TString>& consumers)
{
NTopic::TTopicClient client(GetDriver());
NTopic::TAlterTopicSettings settings;

for (const auto& consumer : consumers) {
settings.BeginAddConsumer(consumer);
}

auto result = client.AlterTopic(path, settings).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

void TFixture::DescribeTopic(const TString& path)
{
Setup->DescribeTopic(path);
Expand Down Expand Up @@ -663,6 +706,13 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath,
TopicWriteSessions.erase(key);
}

void TFixture::CloseTopicReadSession(const TString& topicPath,
const TString& consumerName)
{
Y_UNUSED(consumerName);
TopicReadSessions.erase(topicPath);
}

void TFixture::WriteToTopic(const TString& topicPath,
const TString& messageGroupId,
const TString& message,
Expand Down Expand Up @@ -779,6 +829,37 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
UNIT_ASSERT(context.AckCount() <= context.WriteCount);
}

ui64 TFixture::GetSchemeShardTabletId(const TActorId& actorId)
{
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
navigate->DatabaseName = "/Root";

NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = SplitPath("/Root");
entry.SyncVersion = true;
entry.ShowPrivatePath = true;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;

navigate->ResultSet.push_back(std::move(entry));
//navigate->UserToken = "root@builtin";
navigate->Cookie = 12345;

auto& runtime = Setup->GetRuntime();

runtime.Send(MakeSchemeCacheID(), actorId,
new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()),
0,
true);
auto response = runtime.GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();

UNIT_ASSERT_VALUES_EQUAL(response->Request->Cookie, 12345);
UNIT_ASSERT_VALUES_EQUAL(response->Request->ErrorCount, 0);

auto& front = response->Request->ResultSet.front();

return front.Self->Info.GetSchemeshardId();
}

ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition)
{
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
Expand Down Expand Up @@ -2016,6 +2097,41 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
{
// There was a server
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false});

// Users have created their own topic on it
CreateTopic(TEST_TOPIC);

// And they wrote their messages into it
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-1");
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-2");
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-3");

// And he had a consumer
AddConsumer(TEST_TOPIC, {"consumer-1"});

// We read messages from the topic and committed offsets
auto messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
CloseTopicReadSession(TEST_TOPIC, "consumer-1");

// And then the Logbroker team turned on the feature flag
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true});

// Users continued to write to the topic
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4");

// Users have added new consumers
AddConsumer(TEST_TOPIC, {"consumer-2"});

// And they wanted to continue reading their messages
messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
}

}

}
Loading