Skip to content

Commit

Permalink
Read without consumer from fed fix to 24-3 (ydb-platform#9138)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored and uzhastik committed Sep 24, 2024
1 parent f23a5e1 commit c72e978
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 4 deletions.
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon
}

// ToDo[migration] - separate option - ?
bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen();
bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !SkipReadRuleCheck;
if (doCheckClientAcl) {
CheckClientACL(ctx);
} else {
Expand Down
80 changes: 77 additions & 3 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp);
}


Y_UNIT_TEST(UpdatePartitionLocation) {
TPersQueueV1TestServer server;
SET_LOCALS;
Expand Down Expand Up @@ -6631,7 +6630,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};

Y_UNIT_TEST(PartitionsMapping) {
NPersQueue::TTestServer server;

TString topic = "topic1";
TString topicFullName = "rt3.dc1--" + topic;

Expand Down Expand Up @@ -6997,7 +6995,83 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
driver->Stop();
}

Y_UNIT_TEST(ReadWithoutConsumer) {
Y_UNIT_TEST(ReadWithoutConsumerFederation) {
const ui32 partititonsCount = 5;
const auto topic = "rt3.dc1--topic2";

TPersQueueV1TestServer server;
server.Server->AnnoyingClient->CreateTopic(topic, partititonsCount);

NACLib::TDiffACL acl;
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, "user@" BUILTIN_ACL_DOMAIN);
server.Server->AnnoyingClient->ModifyACL("/Root/PQ", topic, acl.SerializeAsString());

auto writeSettings = NYdb::NPersQueue::TWriteSessionSettings()
.Path(topic)
.MessageGroupId("src_id");

auto writer = server.PersQueueClient->CreateSimpleBlockingWriteSession(writeSettings);

auto res = writer->Write("some_data");
UNIT_ASSERT(res);
writer->Close();

std::shared_ptr<grpc::Channel> Channel_;
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> StubP_;

Channel_ = grpc::CreateChannel("localhost:" + ToString(server.Server->GrpcPort), grpc::InsecureChannelCredentials());
StubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);

grpc::ClientContext rcontext;
rcontext.AddMetadata("x-ydb-auth-ticket", "user@" BUILTIN_ACL_DOMAIN);
auto readStream = StubP_->StreamRead(&rcontext);
UNIT_ASSERT(readStream);

{
Ydb::Topic::StreamReadMessage::FromClient req;
Ydb::Topic::StreamReadMessage::FromServer resp;
auto topicReadSettings = req.mutable_init_request()->add_topics_read_settings();
topicReadSettings->set_path(topic);
for (ui32 i = 0; i < partititonsCount; i++) {
topicReadSettings->add_partition_ids(i);
}

req.mutable_init_request()->set_consumer("");

if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
}

UNIT_ASSERT(readStream->Read(&resp));
UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse);
}
ui32 partitionsSigned = 0;

while (partitionsSigned != partititonsCount) {

Ydb::Topic::StreamReadMessage::FromServer resp;
UNIT_ASSERT(readStream->Read(&resp));
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest, resp);
auto assignId = resp.start_partition_session_request().partition_session().partition_session_id();

Ydb::Topic::StreamReadMessage::FromClient req;
req.mutable_start_partition_session_response()->set_partition_session_id(assignId);
req.mutable_start_partition_session_response()->set_read_offset(0);
auto res = readStream->Write(req);
UNIT_ASSERT(res);
partitionsSigned++;
}

Ydb::Topic::StreamReadMessage::FromClient req;
req.mutable_read_request()->set_bytes_size(1);
readStream->Write(req);

Ydb::Topic::StreamReadMessage::FromServer resp;
UNIT_ASSERT(readStream->Read(&resp));
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp);
}

Y_UNIT_TEST(ReadWithoutConsumerFirstClassCitizen) {
auto readToEndThenCommit = [] (NPersQueue::TTestServer& server, ui32 partitions, ui32 maxOffset, TString consumer, ui32 readByBytes) {
std::shared_ptr<grpc::Channel> Channel_;
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> StubP_;
Expand Down

0 comments on commit c72e978

Please sign in to comment.