Skip to content

Commit

Permalink
kafka offset commit
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed Jan 4, 2024
1 parent 30e9736 commit 7021134
Show file tree
Hide file tree
Showing 8 changed files with 481 additions and 107 deletions.
3 changes: 3 additions & 0 deletions ydb/core/kafka_proxy/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ inline EKafkaErrors ConvertErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode code)
return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
case Ydb::PersQueue::ErrorCode::ErrorCode::ACCESS_DENIED:
return EKafkaErrors::TOPIC_AUTHORIZATION_FAILED;
case Ydb::PersQueue::ErrorCode::ErrorCode::SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
case Ydb::PersQueue::ErrorCode::ErrorCode::SET_OFFSET_ERROR_COMMIT_TO_PAST:
return EKafkaErrors::OFFSET_OUT_OF_RANGE;
default:
return EKafkaErrors::UNKNOWN_SERVER_ERROR;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP);
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT);
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR);
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT);
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1});
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH);

return response;
Expand Down
188 changes: 177 additions & 11 deletions ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,38 +1,204 @@
#include "kafka_offset_commit_actor.h"

#include <ydb/core/kafka_proxy/kafka_events.h>

namespace NKafka {


NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message) {
return new TKafkaOffsetCommitActor(context, correlationId, message);
}

TOffsetCommitResponseData::TPtr TKafkaOffsetCommitActor::GetOffsetCommitResponse() {
TOffsetCommitResponseData::TPtr response = std::make_shared<TOffsetCommitResponseData>();
TString TKafkaOffsetCommitActor::LogPrefix() {
return "TKafkaOffsetCommitActor";
}

void TKafkaOffsetCommitActor::Die(const TActorContext& ctx) {
KAFKA_LOG_D("PassAway");
ctx.Send(AuthInitActor, new TEvents::TEvPoisonPill());
for (const auto& tabletToPipePair: TabletIdToPipe) {
NTabletPipe::CloseClient(ctx, tabletToPipePair.second);
}
TBase::Die(ctx);
}

void TKafkaOffsetCommitActor::Handle(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_CRIT("Auth failed. reason# " << ev->Get()->Reason);
Error = ConvertErrorCode(ev->Get()->ErrorCode);
SendFailedForAllPartitions(Error, ctx);
}

void TKafkaOffsetCommitActor::SendFailedForAllPartitions(EKafkaErrors error, const TActorContext& ctx) {
for (auto topicReq: Message->Topics) {
TOffsetCommitResponseData::TOffsetCommitResponseTopic topic;
topic.Name = topicReq.Name;
for (auto partitionRequest: topicReq.Partitions) {
TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition partition;
partition.PartitionIndex = partitionRequest.PartitionIndex;
partition.ErrorCode = NONE_ERROR;
partition.ErrorCode = error;
topic.Partitions.push_back(partition);
}
response->Topics.push_back(topic);
Response->Topics.push_back(topic);
}
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, Error));
Die(ctx);
}

void TKafkaOffsetCommitActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
TEvTabletPipe::TEvClientConnected *msg = ev->Get();

if (msg->Status != NKikimrProto::OK) {
KAFKA_LOG_CRIT("Pipe to tablet is dead. status# " << ev->Get()->Status);
ProcessPipeProblem(msg->TabletId, ctx);
}
}

void TKafkaOffsetCommitActor::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_CRIT("Pipe to tablet is destroyed");
ProcessPipeProblem(ev->Get()->TabletId, ctx);
}

void TKafkaOffsetCommitActor::ProcessPipeProblem(ui64 tabletId, const TActorContext& ctx) {
auto cookiesIt = TabletIdToCookies.find(tabletId);
Y_ABORT_UNLESS(cookiesIt != TabletIdToCookies.end());

for (auto cookie: cookiesIt->second) {
auto requestInfoIt = CookieToRequestInfo.find(cookie);
Y_ABORT_UNLESS(requestInfoIt != CookieToRequestInfo.end());

if (!requestInfoIt->second.Done) {
requestInfoIt->second.Done = true;
AddPartitionResponse(EKafkaErrors::UNKNOWN_SERVER_ERROR, requestInfoIt->second.TopicName, requestInfoIt->second.PartitionId, ctx);
}
}
}

void TKafkaOffsetCommitActor::Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx) {
KAFKA_LOG_D("Auth success. Topics count: " << ev->Get()->TopicAndTablets.size());
TopicAndTablets = std::move(ev->Get()->TopicAndTablets);

for (auto topicReq: Message->Topics) {
auto topicIt = TopicAndTablets.find(NormalizePath(Context->DatabasePath, topicReq.Name.value()));
for (auto partitionRequest: topicReq.Partitions) {
if (topicIt == TopicAndTablets.end()) {
AddPartitionResponse(UNKNOWN_TOPIC_OR_PARTITION, topicReq.Name.value(), partitionRequest.PartitionIndex, ctx);
continue;
}

auto tabletIdIt = topicIt->second.PartitionIdToTabletId.find(partitionRequest.PartitionIndex);
if (tabletIdIt == topicIt->second.PartitionIdToTabletId.end()) {
AddPartitionResponse(UNKNOWN_TOPIC_OR_PARTITION, topicReq.Name.value(), partitionRequest.PartitionIndex, ctx);
continue;
}

ui64 tabletId = tabletIdIt->second;

if (!TabletIdToPipe.contains(tabletId)) {
NTabletPipe::TClientConfig clientConfig;
clientConfig.RetryPolicy = RetryPolicyForPipes;
TabletIdToPipe[tabletId] = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
}

NKikimrClient::TPersQueueRequest request;
request.MutablePartitionRequest()->SetTopic(topicIt->second.TopicNameConverter->GetPrimaryPath());
request.MutablePartitionRequest()->SetPartition(partitionRequest.PartitionIndex);
request.MutablePartitionRequest()->SetCookie(NextCookie);

TRequestInfo info(topicReq.Name.value(), partitionRequest.PartitionIndex);

CookieToRequestInfo.emplace(std::make_pair(NextCookie, info));
TabletIdToCookies[tabletId].push_back(NextCookie);
NextCookie++;

auto commit = request.MutablePartitionRequest()->MutableCmdSetClientOffset();
commit->SetClientId(Message->GroupId.value());
commit->SetOffset(partitionRequest.CommittedOffset);
commit->SetStrict(true);

PendingResponses++;
KAFKA_LOG_D("Send commit request for group# " << Message->GroupId.value() <<
", topic# " << topicIt->second.TopicNameConverter->GetPrimaryPath() <<
", partition# " << partitionRequest.PartitionIndex <<
", offset# " << partitionRequest.CommittedOffset);

TAutoPtr<TEvPersQueue::TEvRequest> req(new TEvPersQueue::TEvRequest);
req->Record.Swap(&request);

NTabletPipe::SendData(ctx, TabletIdToPipe[tabletId], req.Release());
}
}
}

void TKafkaOffsetCommitActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
const auto& partitionResult = ev->Get()->Record.GetPartitionResponse();
auto requestInfo = CookieToRequestInfo.find(partitionResult.GetCookie());
requestInfo->second.Done = true;

Y_ABORT_UNLESS(requestInfo != CookieToRequestInfo.end());
if (ev->Get()->Record.GetErrorCode() != NPersQueue::NErrorCode::OK) {
KAFKA_LOG_CRIT("Commit offset error. status# " << EErrorCode_Name(ev->Get()->Record.GetErrorCode()) << ", reason# " << ev->Get()->Record.GetErrorReason());
}

return response;
AddPartitionResponse(ConvertErrorCode(NGRpcProxy::V1::ConvertOldCode(ev->Get()->Record.GetErrorCode())), requestInfo->second.TopicName, requestInfo->second.PartitionId, ctx);
}

void TKafkaOffsetCommitActor::AddPartitionResponse(EKafkaErrors error, const TString& topicName, ui64 partitionId, const TActorContext& ctx) {
if (error != NONE_ERROR) {
Error = error;
}

PendingResponses--;
TOffsetCommitResponseData::TOffsetCommitResponseTopic::TOffsetCommitResponsePartition partitionResponse;
partitionResponse.PartitionIndex = partitionId;
partitionResponse.ErrorCode = error;

auto topicIdIt = ResponseTopicIds.find(topicName);

if (topicIdIt != ResponseTopicIds.end()) {
Response->Topics[topicIdIt->second].Partitions.push_back(partitionResponse);
} else {
ResponseTopicIds[topicName] = Response->Topics.size();

TOffsetCommitResponseData::TOffsetCommitResponseTopic topicResponse;
topicResponse.Name = topicName;
topicResponse.Partitions.push_back(partitionResponse);

Response->Topics.push_back(topicResponse);
}

if (PendingResponses == 0) {
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, Response, Error));
Die(ctx);
}
}

void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
Y_UNUSED(Message);
auto response = GetOffsetCommitResponse();
THashSet<TString> topicsToResolve;
for (auto topicReq: Message->Topics) {
topicsToResolve.insert(NormalizePath(Context->DatabasePath, topicReq.Name.value()));
}

Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, EKafkaErrors::NONE_ERROR));
Die(ctx);
auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
NKikimr::AppData(ctx)->PQConfig, ""
);

auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(
topicConverterFactory
);

auto topicsToConverter = topicHandler->GetReadTopicsList(topicsToResolve, false, Context->DatabasePath);
if (!topicsToConverter.IsValid) {
KAFKA_LOG_CRIT("Commit offsets failed. reason# topicsToConverter is not valid");
Error = INVALID_REQUEST;
SendFailedForAllPartitions(Error, ctx);
return;
}

AuthInitActor = ctx.Register(new NKikimr::NGRpcProxy::V1::TReadInitAndAuthActor(
ctx, ctx.SelfID, Message->GroupId.value(), 0, "",
NKikimr::NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), NKikimr::MakeSchemeCacheID(), nullptr, Context->UserToken, topicsToConverter,
topicHandler->GetLocalCluster(), false)
);

Become(&TKafkaOffsetCommitActor::StateWork);
}

} // NKafka
72 changes: 70 additions & 2 deletions ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,92 @@
#include "actors.h"

#include "ydb/core/base/tablet_pipe.h"
#include "ydb/core/grpc_services/local_rpc/local_rpc.h"
#include <ydb/core/kafka_proxy/kafka_events.h>
#include <ydb/core/persqueue/events/internal.h>
#include <ydb/library/aclib/aclib.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h>
#include "ydb/public/lib/base/msgbus_status.h"
#include <ydb/services/persqueue_v1/actors/events.h>
#include "ydb/services/persqueue_v1/actors/persqueue_utils.h"
#include <ydb/services/persqueue_v1/actors/read_init_auth_actor.h>


namespace NKafka {
using namespace NKikimr;

class TKafkaOffsetCommitActor: public NActors::TActorBootstrapped<TKafkaOffsetCommitActor> {

struct TRequestInfo {
TString TopicName = "";
ui64 PartitionId = 0;
bool Done = false;

TRequestInfo(const TString& topicName, ui64 partitionId)
: TopicName(topicName), PartitionId(partitionId) {}
};

public:
using TBase = NActors::TActorBootstrapped<TKafkaOffsetCommitActor>;
TKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetCommitRequestData>& message)
: Context(context)
, CorrelationId(correlationId)
, Message(message) {
, Message(message)
, Response(new TOffsetCommitResponseData()) {
}

void Bootstrap(const NActors::TActorContext& ctx);
TOffsetCommitResponseData::TPtr GetOffsetCommitResponse();

private:
TString LogPrefix();
void Die(const TActorContext& ctx) override;

STATEFN(StateWork) {
KAFKA_LOG_T("Received event: " << (*ev.Get()).GetTypeName());
switch (ev->GetTypeRewrite()) {
HFunc(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk, Handle);
HFunc(TEvPersQueue::TEvResponse, Handle);
HFunc(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession, Handle);
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
}
}

void Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx);
void Handle(NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);

void AddPartitionResponse(EKafkaErrors error, const TString& topicName, ui64 partitionId, const TActorContext& ctx);
void ProcessPipeProblem(ui64 tabletId, const TActorContext& ctx);
void SendFailedForAllPartitions(EKafkaErrors error, const TActorContext& ctx);

private:
const TContext::TPtr Context;
const ui64 CorrelationId;
const TMessagePtr<TOffsetCommitRequestData> Message;
const TOffsetCommitResponseData::TPtr Response;

ui64 PendingResponses = 0;
ui64 NextCookie = 0;
std::unordered_map<ui64, TVector<ui64>> TabletIdToCookies;
std::unordered_map<ui64, TRequestInfo> CookieToRequestInfo;
std::unordered_map<TString, ui64> ResponseTopicIds;
NKikimr::NGRpcProxy::TTopicInitInfoMap TopicAndTablets;
std::unordered_map<ui64, TActorId> TabletIdToPipe;
TActorId AuthInitActor;
EKafkaErrors Error = NONE_ERROR;

static constexpr NTabletPipe::TClientRetryPolicy RetryPolicyForPipes = {
.RetryLimitCount = 6,
.MinRetryTime = TDuration::MilliSeconds(10),
.MaxRetryTime = TDuration::MilliSeconds(100),
.BackoffMultiplier = 2,
.DoFirstRetryInstantly = true
};
};

} // NKafka
Loading

0 comments on commit 7021134

Please sign in to comment.