Skip to content

Commit

Permalink
Kafka read with balance ut (#3732)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Apr 18, 2024
1 parent b22ab25 commit 3d042ad
Showing 1 changed file with 114 additions and 13 deletions.
127 changes: 114 additions & 13 deletions ydb/core/kafka_proxy/ut/ut_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ void Print(const TBuffer& buffer) {
Cerr << ">>>>> Packet sent: " << sb << Endl;
}

struct TReadInfo {
std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions;
TString MemberId;
i32 GenerationId;
};

template <class TKikimr, bool secure>
class TTestServer {
public:
Expand Down Expand Up @@ -268,6 +274,23 @@ void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent
UNIT_ASSERT_C(false, "Field " << field << " not found in message meta");
}

void AssertPartitionsIsUniqueAndCountIsExpected(std::vector<TReadInfo> readInfos, ui32 expectedPartitionsCount, TString topic) {
std::unordered_set<int> partitions;
ui32 partitionsCount = 0;
for (TReadInfo readInfo: readInfos) {
for (auto topicPartitions: readInfo.Partitions) {
if (topicPartitions.Topic == topic) {
for (auto partition: topicPartitions.Partitions) {
partitions.emplace(partition);
partitionsCount++;
}
}
}
}
UNIT_ASSERT_VALUES_EQUAL(partitionsCount, expectedPartitionsCount);
UNIT_ASSERT_VALUES_EQUAL(partitions.size(), expectedPartitionsCount);
}

std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<NYdb::NTopic::IReadSession> reader) {
std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> result;
while (true) {
Expand Down Expand Up @@ -487,12 +510,6 @@ class TTestClient {
return WriteAndRead<TSyncGroupResponseData>(header, request);
}

struct TReadInfo {
std::vector<TConsumerProtocolAssignment::TopicPartition> Partitions;
TString MemberId;
i32 GenerationId;
};

TReadInfo JoinAndSyncGroup(std::vector<TString>& topics, TString& groupId, i32 heartbeatTimeout = 1000000) {
auto joinResponse = JoinGroup(topics, groupId, heartbeatTimeout);
auto memberId = joinResponse->MemberId;
Expand Down Expand Up @@ -1202,7 +1219,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {

TString notExistsTopicName = "/Root/not-exists";

ui64 minActivePartitions = 10;
ui64 minActivePartitions = 12;

TString group = "consumer-0";
TString notExistsGroup = "consumer-not-exists";
Expand Down Expand Up @@ -1236,6 +1253,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {

TTestClient clientA(testServer.Port);
TTestClient clientB(testServer.Port);
TTestClient clientC(testServer.Port);
TTestClient clientD(testServer.Port);

{
auto msg = clientA.ApiVersions();
Expand Down Expand Up @@ -1272,26 +1291,108 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
auto readInfoB = clientB.JoinAndSyncGroup(topics, group);
UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 0);

// clientA gets RABALANCE status, because of new reader. We need to release some partitions
// clientA gets RABALANCE status, because of new reader. We need to release some partitions for new client
clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group);

// clientA now gets half of partitions
readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2);
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

// some partitions now released, and we can give them to clientB
// some partitions now released, and we can give them to clientB. clientB now gets half of partitions
clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);
readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

// cleintA leave group and all partitions goes to clientB
AssertPartitionsIsUniqueAndCountIsExpected({readInfoA, readInfoB}, minActivePartitions, topicName);

// clientC join group, and get 0 partitions, becouse it's all at clientA and clientB
UNIT_ASSERT_VALUES_EQUAL(clientC.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientC.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
auto readInfoC = clientC.JoinAndSyncGroup(topics, group);
UNIT_ASSERT_VALUES_EQUAL(readInfoC.Partitions.size(), 0);

// all clients gets RABALANCE status, because of new reader. We need to release some partitions for new client
clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group);
clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);

// all clients now gets minActivePartitions/3 partitions
readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientC.Heartbeat(readInfoC.MemberId, readInfoC.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

AssertPartitionsIsUniqueAndCountIsExpected({readInfoA, readInfoB, readInfoC}, minActivePartitions, topicName);

// clientD join group, and get 0 partitions, becouse it's all at clientA, clientB and clientC
UNIT_ASSERT_VALUES_EQUAL(clientD.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientD.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
auto readInfoD = clientD.JoinAndSyncGroup(topics, group);
UNIT_ASSERT_VALUES_EQUAL(readInfoD.Partitions.size(), 0);

// all clients gets RABALANCE status, because of new reader. We need to release some partitions
clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group);
clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);
clientC.WaitRebalance(readInfoC.MemberId, readInfoC.GenerationId, group);

// all clients now gets minActivePartitions/4 partitions
readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4);
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4);
UNIT_ASSERT_VALUES_EQUAL(clientC.Heartbeat(readInfoC.MemberId, readInfoC.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoD = clientD.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4);
UNIT_ASSERT_VALUES_EQUAL(clientD.Heartbeat(readInfoD.MemberId, readInfoD.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

AssertPartitionsIsUniqueAndCountIsExpected({readInfoA, readInfoB, readInfoC, readInfoD}, minActivePartitions, topicName);


// cleintA leave group and all partitions goes to clientB, clientB and clientD
UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

// all other clients gets RABALANCE status, because one clientA leave group.
clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group);
readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions);
clientC.WaitRebalance(readInfoC.MemberId, readInfoC.GenerationId, group);
clientD.WaitRebalance(readInfoD.MemberId, readInfoD.GenerationId, group);

// all other clients now gets minActivePartitions/3 partitions
readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

// clientB leave group
UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientC.Heartbeat(readInfoC.MemberId, readInfoC.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

readInfoD = clientD.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3);
UNIT_ASSERT_VALUES_EQUAL(clientD.Heartbeat(readInfoD.MemberId, readInfoD.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

AssertPartitionsIsUniqueAndCountIsExpected({readInfoB, readInfoC, readInfoD}, minActivePartitions, topicName);


// all other clients leaves the group
UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoB.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientC.LeaveGroup(readInfoC.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientD.LeaveGroup(readInfoD.MemberId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
}

//release partition before lock
{
std::vector<TString> topics;
topics.push_back(topicName);

auto readInfoA = clientA.JoinGroup(topics, group);
Sleep(TDuration::MilliSeconds(200));
auto readInfoB = clientB.JoinGroup(topics, group);
Sleep(TDuration::MilliSeconds(200));

UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoB->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
}

{
Expand Down

0 comments on commit 3d042ad

Please sign in to comment.