From 3d042adfb7d6c68c0705a175470231bb0e8b5a0f Mon Sep 17 00:00:00 2001 From: niksaveliev Date: Thu, 18 Apr 2024 12:15:52 +0500 Subject: [PATCH] Kafka read with balance ut (#3732) --- ydb/core/kafka_proxy/ut/ut_protocol.cpp | 127 +++++++++++++++++++++--- 1 file changed, 114 insertions(+), 13 deletions(-) diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 4bc681ee730b..908ad80a7c36 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -60,6 +60,12 @@ void Print(const TBuffer& buffer) { Cerr << ">>>>> Packet sent: " << sb << Endl; } +struct TReadInfo { + std::vector Partitions; + TString MemberId; + i32 GenerationId; +}; + template class TTestServer { public: @@ -268,6 +274,23 @@ void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent UNIT_ASSERT_C(false, "Field " << field << " not found in message meta"); } +void AssertPartitionsIsUniqueAndCountIsExpected(std::vector readInfos, ui32 expectedPartitionsCount, TString topic) { + std::unordered_set partitions; + ui32 partitionsCount = 0; + for (TReadInfo readInfo: readInfos) { + for (auto topicPartitions: readInfo.Partitions) { + if (topicPartitions.Topic == topic) { + for (auto partition: topicPartitions.Partitions) { + partitions.emplace(partition); + partitionsCount++; + } + } + } + } + UNIT_ASSERT_VALUES_EQUAL(partitionsCount, expectedPartitionsCount); + UNIT_ASSERT_VALUES_EQUAL(partitions.size(), expectedPartitionsCount); +} + std::vector Read(std::shared_ptr reader) { std::vector result; while (true) { @@ -487,12 +510,6 @@ class TTestClient { return WriteAndRead(header, request); } - struct TReadInfo { - std::vector Partitions; - TString MemberId; - i32 GenerationId; - }; - TReadInfo JoinAndSyncGroup(std::vector& topics, TString& groupId, i32 heartbeatTimeout = 1000000) { auto joinResponse = JoinGroup(topics, groupId, heartbeatTimeout); auto memberId = joinResponse->MemberId; @@ -1202,7 +1219,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString notExistsTopicName = "/Root/not-exists"; - ui64 minActivePartitions = 10; + ui64 minActivePartitions = 12; TString group = "consumer-0"; TString notExistsGroup = "consumer-not-exists"; @@ -1236,6 +1253,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TTestClient clientA(testServer.Port); TTestClient clientB(testServer.Port); + TTestClient clientC(testServer.Port); + TTestClient clientD(testServer.Port); { auto msg = clientA.ApiVersions(); @@ -1272,26 +1291,108 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { auto readInfoB = clientB.JoinAndSyncGroup(topics, group); UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 0); - // clientA gets RABALANCE status, because of new reader. We need to release some partitions + // clientA gets RABALANCE status, because of new reader. We need to release some partitions for new client clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group); // clientA now gets half of partitions readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2); UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - // some partitions now released, and we can give them to clientB + // some partitions now released, and we can give them to clientB. clientB now gets half of partitions clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group); readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/2); UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - // cleintA leave group and all partitions goes to clientB + AssertPartitionsIsUniqueAndCountIsExpected({readInfoA, readInfoB}, minActivePartitions, topicName); + + // clientC join group, and get 0 partitions, becouse it's all at clientA and clientB + UNIT_ASSERT_VALUES_EQUAL(clientC.SaslHandshake()->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(clientC.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + auto readInfoC = clientC.JoinAndSyncGroup(topics, group); + UNIT_ASSERT_VALUES_EQUAL(readInfoC.Partitions.size(), 0); + + // all clients gets RABALANCE status, because of new reader. We need to release some partitions for new client + clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group); + clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group); + + // all clients now gets minActivePartitions/3 partitions + readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3); + UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3); + UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3); + UNIT_ASSERT_VALUES_EQUAL(clientC.Heartbeat(readInfoC.MemberId, readInfoC.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + AssertPartitionsIsUniqueAndCountIsExpected({readInfoA, readInfoB, readInfoC}, minActivePartitions, topicName); + + // clientD join group, and get 0 partitions, becouse it's all at clientA, clientB and clientC + UNIT_ASSERT_VALUES_EQUAL(clientD.SaslHandshake()->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(clientD.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + auto readInfoD = clientD.JoinAndSyncGroup(topics, group); + UNIT_ASSERT_VALUES_EQUAL(readInfoD.Partitions.size(), 0); + + // all clients gets RABALANCE status, because of new reader. We need to release some partitions + clientA.WaitRebalance(readInfoA.MemberId, readInfoA.GenerationId, group); + clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group); + clientC.WaitRebalance(readInfoC.MemberId, readInfoC.GenerationId, group); + + // all clients now gets minActivePartitions/4 partitions + readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4); + UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4); + UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4); + UNIT_ASSERT_VALUES_EQUAL(clientC.Heartbeat(readInfoC.MemberId, readInfoC.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + readInfoD = clientD.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/4); + UNIT_ASSERT_VALUES_EQUAL(clientD.Heartbeat(readInfoD.MemberId, readInfoD.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + AssertPartitionsIsUniqueAndCountIsExpected({readInfoA, readInfoB, readInfoC, readInfoD}, minActivePartitions, topicName); + + + // cleintA leave group and all partitions goes to clientB, clientB and clientD UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + // all other clients gets RABALANCE status, because one clientA leave group. clientB.WaitRebalance(readInfoB.MemberId, readInfoB.GenerationId, group); - readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions); + clientC.WaitRebalance(readInfoC.MemberId, readInfoC.GenerationId, group); + clientD.WaitRebalance(readInfoD.MemberId, readInfoD.GenerationId, group); + + // all other clients now gets minActivePartitions/3 partitions + readInfoB = clientB.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3); UNIT_ASSERT_VALUES_EQUAL(clientB.Heartbeat(readInfoB.MemberId, readInfoB.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - // clientB leave group - UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoA.MemberId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + readInfoC = clientC.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3); + UNIT_ASSERT_VALUES_EQUAL(clientC.Heartbeat(readInfoC.MemberId, readInfoC.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + readInfoD = clientD.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions/3); + UNIT_ASSERT_VALUES_EQUAL(clientD.Heartbeat(readInfoD.MemberId, readInfoD.GenerationId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + + AssertPartitionsIsUniqueAndCountIsExpected({readInfoB, readInfoC, readInfoD}, minActivePartitions, topicName); + + + // all other clients leaves the group + UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoB.MemberId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(clientC.LeaveGroup(readInfoC.MemberId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(clientD.LeaveGroup(readInfoD.MemberId, group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + } + + //release partition before lock + { + std::vector topics; + topics.push_back(topicName); + + auto readInfoA = clientA.JoinGroup(topics, group); + Sleep(TDuration::MilliSeconds(200)); + auto readInfoB = clientB.JoinGroup(topics, group); + Sleep(TDuration::MilliSeconds(200)); + + UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA->MemberId.value(), group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(clientB.LeaveGroup(readInfoB->MemberId.value(), group)->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); } {