Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka api charge extra RU on request #3929

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
for (size_t topicIndex = 0; topicIndex < Response->Responses.size(); topicIndex++) {
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
PrepareFetchRequestData(topicIndex, partPQRequests);

NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken);

auto chargeExtraRU = topicIndex == 0 && Context->Config.GetChargeExtraRUOnRequest();
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, chargeExtraRU);
auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
auto actorId = ctx.Register(fetchActor);
PendingResponses++;
Expand All @@ -55,7 +54,6 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector<
for (size_t partIndex = 0; partIndex < topicKafkaRequest.Partitions.size(); partIndex++) {
auto& partKafkaRequest = topicKafkaRequest.Partitions[partIndex];
KAFKA_LOG_D(TStringBuilder() << "Fetch actor: New request. Topic: " << topicKafkaRequest.Topic.value() << " Partition: " << partKafkaRequest.Partition << " FetchOffset: " << partKafkaRequest.FetchOffset << " PartitionMaxBytes: " << partKafkaRequest.PartitionMaxBytes);

auto& partPQRequest = partPQRequests[partIndex];
partPQRequest.Topic = NormalizePath(Context->DatabasePath, topicKafkaRequest.Topic.value()); // FIXME(savnik): handle empty topic
partPQRequest.Partition = partKafkaRequest.Partition;
Expand Down Expand Up @@ -113,9 +111,9 @@ void TKafkaFetchActor::HandleSuccessResponse(const NKikimr::TEvPQ::TEvFetchRespo
partKafkaResponse.ErrorCode = ConvertErrorCode(partPQResponse.GetReadResult().GetErrorCode());

if (partPQResponse.GetReadResult().GetErrorCode() != NPersQueue::NErrorCode::EErrorCode::OK) {
KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic <<
", partition: " << partPQResponse.GetPartition() <<
". Code: " << static_cast<size_t>(partPQResponse.GetReadResult().GetErrorCode()) <<
KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic <<
", partition: " << partPQResponse.GetPartition() <<
". Code: " << static_cast<size_t>(partPQResponse.GetReadResult().GetErrorCode()) <<
". Reason: " + partPQResponse.GetReadResult().GetErrorReason());
}

Expand Down Expand Up @@ -174,7 +172,7 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp
record.TimestampDelta = lastTimestamp - baseTimestamp;

record.Length = record.Size(TKafkaRecord::MessageMeta::PresentVersions.Max) - SizeOfZeroVarint;
KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta <<
KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta <<
", TimestampDelta: " << record.TimestampDelta << ", Length: " << record.Length);
}

Expand All @@ -187,11 +185,11 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp
//recordsBatch.Attributes https://kafka.apache.org/documentation/#recordbatch

recordsBatch.BatchLength = recordsBatch.Size(TKafkaRecordBatch::MessageMeta::PresentVersions.Max) - BatchFirstTwoFieldsSize;
KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta <<
", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp <<
KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta <<
", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp <<
", BaseSequence: " << recordsBatch.BaseSequence << ", BatchLength: " << recordsBatch.BatchLength);
auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, partPQResponse.GetTopic());
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", "")));
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", "")));
}

void TKafkaFetchActor::RespondIfRequired(const TActorContext& ctx) {
Expand Down
17 changes: 11 additions & 6 deletions ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void TKafkaProduceActor::LogEvent(IEventHandle& ev) {
void TKafkaProduceActor::SendMetrics(const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx) {
auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, topicName);
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, TStringBuilder() << "api.kafka.produce." << name, "")));
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", "")));
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", "")));
}

void TKafkaProduceActor::Bootstrap(const NActors::TActorContext& /*ctx*/) {
Expand Down Expand Up @@ -82,7 +82,7 @@ void TKafkaProduceActor::PassAway() {
void TKafkaProduceActor::CleanTopics(const TActorContext& ctx) {
const auto now = ctx.Now();

std::map<TString, TTopicInfo> newTopics;
std::map<TString, TTopicInfo> newTopics;
for(auto& [topicPath, topicInfo] : Topics) {
if (topicInfo.ExpirationTime > now) {
newTopics[topicPath] = std::move(topicInfo);
Expand Down Expand Up @@ -242,7 +242,8 @@ size_t TKafkaProduceActor::EnqueueInitialization() {
THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data,
const TString& topicName,
ui64 cookie,
const TString& clientDC) {
const TString& clientDC,
bool chargeExtraRU) {
auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
auto& request = ev->Record;

Expand All @@ -254,6 +255,9 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
partitionRequest->SetPartition(data.Index);
// partitionRequest->SetCmdWriteOffset();
partitionRequest->SetCookie(cookie);
if (chargeExtraRU) {
partitionRequest->SetChargeExtraRU(true);
}

ui64 totalSize = 0;

Expand Down Expand Up @@ -319,11 +323,11 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
pendingRequest->StartTime = ctx.Now();

size_t position = 0;
bool chargeExtraRU = Context->Config.GetChargeExtraRUOnRequest();
for(const auto& topicData : r->TopicData) {
const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
for(const auto& partitionData : topicData.PartitionData) {
const auto partitionId = partitionData.Index;

auto writer = PartitionWriter(topicPath, partitionId, ctx);
if (OK == writer.first) {
auto ownCookie = ++Cookie;
Expand All @@ -336,7 +340,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
pendingRequest->WaitAcceptingCookies.insert(ownCookie);
pendingRequest->WaitResultCookies.insert(ownCookie);

auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC);
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, chargeExtraRU);
chargeExtraRU = false;

Send(writer.second, std::move(ev));
} else {
Expand Down Expand Up @@ -443,7 +448,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
// We send the results in the order of receipt of the request
while (!PendingRequests.empty()) {
auto pendingRequest = PendingRequests.front();

// We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died.
bool expired = expireTime > pendingRequest->StartTime;

Expand Down
17 changes: 9 additions & 8 deletions ydb/core/persqueue/fetch_request_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ struct TEvPrivate {

bool CanProcessFetchRequest; //any partitions answered that it has data or WaitMs timeout occured
ui32 FetchRequestReadsDone;
ui64 FetchRequestCurrentReadTablet;
ui64 FetchRequestCurrentReadTablet;
ui64 CurrentCookie;
ui32 FetchRequestBytesLeft;
THolder<TEvPQ::TEvFetchResponse> Response;
Expand Down Expand Up @@ -145,10 +145,10 @@ struct TEvPrivate {
break;

case EWakeupTag::RlNoResource:
// Re-requesting the quota. We do this until we get a quota.
// Re-requesting the quota. We do this until we get a quota.
RequestDataQuota(PendingQuotaAmount, ctx);
break;

default:
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
}
Expand All @@ -171,7 +171,7 @@ struct TEvPrivate {

void SendSchemeCacheRequest(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "SendSchemeCacheRequest");

auto schemeCacheRequest = std::make_unique<TSchemeCacheNavigate>(1);
schemeCacheRequest->DatabaseName = Settings.Database;

Expand Down Expand Up @@ -250,7 +250,7 @@ struct TEvPrivate {
), ctx
);;
}

auto& description = entry.PQGroupInfo->Description;
auto& topicInfo = TopicInfo[path];
topicInfo.BalancerTabletId = description.GetBalancerTabletID();
Expand Down Expand Up @@ -345,7 +345,7 @@ struct TEvPrivate {
if (HandlePipeError(tabletId, ctx))
return;

auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status"
auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status"
<< NKikimrProto::EReplyStatus_Name(msg->Status).data()
<< ", Marker# PQ6";
return SendReplyAndDie(CreateErrorReply(Ydb::StatusIds::INTERNAL_ERROR, reason), ctx);
Expand Down Expand Up @@ -423,7 +423,7 @@ struct TEvPrivate {
preq->Record.SetRequestId(reqId);
auto partReq = preq->Record.MutablePartitionRequest();
partReq->SetCookie(CurrentCookie);

partReq->SetTopic(topic);
partReq->SetPartition(part);
auto read = partReq->MutableCmdRead();
Expand Down Expand Up @@ -480,7 +480,8 @@ struct TEvPrivate {
SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode());

if (IsQuotaRequired()) {
PendingQuotaAmount = 1 + CalcRuConsumption(GetPayloadSize(record));
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + (Settings.ChargeExtraRU ? 1 : 0);
Settings.ChargeExtraRU = false;
RequestDataQuota(PendingQuotaAmount, ctx);
} else {
ProceedFetchRequest(ctx);
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/persqueue/fetch_request_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct TPartitionFetchRequest {
ui64 Offset;
ui64 MaxBytes;
ui64 ReadTimestampMs;

TPartitionFetchRequest(const TString& topic, ui32 partition, ui64 offset, ui64 maxBytes, ui64 readTimestampMs = 0, const TString& clientId = NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER)
: Topic(topic)
, ClientId(clientId)
Expand All @@ -35,18 +35,20 @@ struct TFetchRequestSettings {
ui64 MaxWaitTimeMs;
ui64 TotalMaxBytes;
TRlContext RlCtx;
bool ChargeExtraRU;

ui64 RequestId = 0;
TFetchRequestSettings(
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx,
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0, bool chargeExtraRU = false
)
: Database(database)
, Partitions(partitions)
, User(user)
, MaxWaitTimeMs(maxWaitTimeMs)
, TotalMaxBytes(totalMaxBytes)
, RlCtx(rlCtx)
, ChargeExtraRU(chargeExtraRU)
, RequestId(requestId)
{}
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl

if (needToRequestQuota) {
++processed;
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize());
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + (it->second.GetPartitionRequest().GetChargeExtraRU() ? 1 : 0);
PendingQuota.emplace_back(it->first);
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,7 @@ message TKafkaProxyConfig {
}

optional TProxy Proxy = 7;
optional bool ChargeExtraRUOnRequest = 10 [default = false];
}

message TAwsCompatibilityConfig {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/msgbus_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ message TPersQueuePartitionRequest {
optional bool IsDirectWrite = 18 [default = false];
optional uint64 PutUnitsSize = 19;
optional int64 InitialSeqNo = 26;
optional bool ChargeExtraRU = 27 [default = false];
}

message TPersQueueMetaRequest {
Expand Down
Loading