From cca613b80dadd9c988ef9d9fea65f9ad1be63954 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Fri, 19 Apr 2024 12:03:54 +0000 Subject: [PATCH 1/6] Kafka api charge extra RU on request --- ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp | 2 +- ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp | 10 ++++++---- ydb/core/kafka_proxy/actors/kafka_produce_actor.h | 2 +- ydb/core/persqueue/fetch_request_actor.cpp | 2 +- ydb/core/persqueue/fetch_request_actor.h | 4 +++- ydb/core/persqueue/writer/writer.cpp | 2 +- ydb/core/persqueue/writer/writer.h | 2 ++ 7 files changed, 15 insertions(+), 9 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp index 5cd0be58bd6b..4a0c1b37b9cd 100644 --- a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp @@ -35,7 +35,7 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) { TVector partPQRequests; PrepareFetchRequestData(topicIndex, partPQRequests); - NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken); + NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, topicIndex == 0); auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID); auto actorId = ctx.Register(fetchActor); diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 452beeadc61d..fd6b607b748a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -319,12 +319,13 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co pendingRequest->StartTime = ctx.Now(); size_t position = 0; + bool chargeExtraRu = true; for(const auto& topicData : r->TopicData) { const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name); for(const auto& partitionData : topicData.PartitionData) { const auto partitionId = partitionData.Index; - - auto writer = PartitionWriter(topicPath, partitionId, ctx); + auto writer = PartitionWriter(topicPath, partitionId, chargeExtraRu, ctx); + chargeExtraRu = false; if (OK == writer.first) { auto ownCookie = ++Cookie; auto& cookieInfo = Cookies[ownCookie]; @@ -555,7 +556,7 @@ void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx) ctx.Send(MakeSchemeCacheID(), MakeHolder(request.release())); } -std::pair TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx) { +std::pair TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, bool chargeExtraRu, const TActorContext& ctx) { auto it = Topics.find(topicPath); if (it == Topics.end()) { KAFKA_LOG_ERROR("Produce actor: Internal error: topic '" << topicPath << "' isn`t initialized"); @@ -584,7 +585,8 @@ std::pair TKafkaProduceActor::Partit opts.WithDeduplication(false) .WithSourceId(SourceId) .WithTopicPath(topicPath) - .WithCheckRequestUnits(topicInfo.MeteringMode, Context->RlContext); + .WithCheckRequestUnits(topicInfo.MeteringMode, Context->RlContext) + .WithChargeExtraRu(chargeExtraRu); auto* writerActor = CreatePartitionWriter(SelfId(), partition->TabletId, partitionId, opts); auto& writerInfo = partitionWriters[partitionId]; diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index f1eea1c150de..f4c4bdf39336 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -127,7 +127,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped void CleanTopics(const TActorContext& ctx); void CleanWriters(const TActorContext& ctx); - std::pair PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx); + std::pair PartitionWriter(const TString& topicPath, ui32 partitionId, bool chargeExtraRu, const TActorContext& ctx); TString LogPrefix(); void LogEvent(IEventHandle& ev); diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index 4ddd00ac3b6b..b729b649891e 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -480,7 +480,7 @@ struct TEvPrivate { SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode()); if (IsQuotaRequired()) { - PendingQuotaAmount = 1 + CalcRuConsumption(GetPayloadSize(record)); + PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + Settings.ChargeExtraRu ? 1 : 0; RequestDataQuota(PendingQuotaAmount, ctx); } else { ProceedFetchRequest(ctx); diff --git a/ydb/core/persqueue/fetch_request_actor.h b/ydb/core/persqueue/fetch_request_actor.h index e6adac4f9d62..80dec380e529 100644 --- a/ydb/core/persqueue/fetch_request_actor.h +++ b/ydb/core/persqueue/fetch_request_actor.h @@ -35,11 +35,12 @@ struct TFetchRequestSettings { ui64 MaxWaitTimeMs; ui64 TotalMaxBytes; TRlContext RlCtx; + bool ChargeExtraRu; ui64 RequestId = 0; TFetchRequestSettings( const TString& database, const TVector& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx, - const TMaybe& user = {}, ui64 requestId = 0 + const TMaybe& user = {}, ui64 requestId = 0, bool chargeExtraRu = false ) : Database(database) , Partitions(partitions) @@ -47,6 +48,7 @@ struct TFetchRequestSettings { , MaxWaitTimeMs(maxWaitTimeMs) , TotalMaxBytes(totalMaxBytes) , RlCtx(rlCtx) + , ChargeExtraRu(chargeExtraRu) , RequestId(requestId) {} }; diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 67bfcaf10801..f03f86326d9f 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -526,7 +526,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl if (needToRequestQuota) { ++processed; - PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()); + PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + Opts.ChargeExtraRu ? 1 : 0; PendingQuota.emplace_back(it->first); } diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index 9cd28cd5c40a..4cb7101879fc 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -173,6 +173,7 @@ struct TPartitionWriterOpts { bool CheckState = false; bool AutoRegister = false; bool UseDeduplication = true; + bool ChargeExtraRu = false; TString SourceId; std::optional ExpectedGeneration; @@ -194,6 +195,7 @@ struct TPartitionWriterOpts { TPartitionWriterOpts& WithCheckState(bool value) { CheckState = value; return *this; } TPartitionWriterOpts& WithAutoRegister(bool value) { AutoRegister = value; return *this; } TPartitionWriterOpts& WithDeduplication(bool value) { UseDeduplication = value; return *this; } + TPartitionWriterOpts& WithChargeExtraRu(bool value) { ChargeExtraRu = value; return *this; } TPartitionWriterOpts& WithSourceId(const TString& value) { SourceId = value; return *this; } TPartitionWriterOpts& WithExpectedGeneration(ui32 value) { ExpectedGeneration = value; return *this; } TPartitionWriterOpts& WithExpectedGeneration(std::optional value) { ExpectedGeneration = value; return *this; } From a847457eb6eef15347ef4be47bfc60dc11c5ed26 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 1 May 2024 10:57:32 +0000 Subject: [PATCH 2/6] Move charge extra ru to TPersQueuePartitionRequest --- .../kafka_proxy/actors/kafka_produce_actor.cpp | 18 +++++++++++------- .../kafka_proxy/actors/kafka_produce_actor.h | 2 +- ydb/core/persqueue/writer/writer.cpp | 2 +- ydb/core/persqueue/writer/writer.h | 2 -- ydb/core/protos/msgbus_pq.proto | 2 ++ 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index fd6b607b748a..8e3d32c69d28 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -242,7 +242,8 @@ size_t TKafkaProduceActor::EnqueueInitialization() { THolder Convert(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data, const TString& topicName, ui64 cookie, - const TString& clientDC) { + const TString& clientDC, + bool chargeExtraRU) { auto ev = MakeHolder(); auto& request = ev->Record; @@ -254,6 +255,10 @@ THolder Convert(const TProduceRequestData:: partitionRequest->SetPartition(data.Index); // partitionRequest->SetCmdWriteOffset(); partitionRequest->SetCookie(cookie); + if (chargeExtraRU) { + partitionRequest->SetChargeExtraRuCount(1); + chargeExtraRU = false; + } ui64 totalSize = 0; @@ -324,8 +329,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name); for(const auto& partitionData : topicData.PartitionData) { const auto partitionId = partitionData.Index; - auto writer = PartitionWriter(topicPath, partitionId, chargeExtraRu, ctx); - chargeExtraRu = false; + auto writer = PartitionWriter(topicPath, partitionId, ctx); if (OK == writer.first) { auto ownCookie = ++Cookie; auto& cookieInfo = Cookies[ownCookie]; @@ -337,7 +341,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co pendingRequest->WaitAcceptingCookies.insert(ownCookie); pendingRequest->WaitResultCookies.insert(ownCookie); - auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC); + auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, chargeExtraRu); + chargeExtraRu = false; Send(writer.second, std::move(ev)); } else { @@ -556,7 +561,7 @@ void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx) ctx.Send(MakeSchemeCacheID(), MakeHolder(request.release())); } -std::pair TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, bool chargeExtraRu, const TActorContext& ctx) { +std::pair TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx) { auto it = Topics.find(topicPath); if (it == Topics.end()) { KAFKA_LOG_ERROR("Produce actor: Internal error: topic '" << topicPath << "' isn`t initialized"); @@ -585,8 +590,7 @@ std::pair TKafkaProduceActor::Partit opts.WithDeduplication(false) .WithSourceId(SourceId) .WithTopicPath(topicPath) - .WithCheckRequestUnits(topicInfo.MeteringMode, Context->RlContext) - .WithChargeExtraRu(chargeExtraRu); + .WithCheckRequestUnits(topicInfo.MeteringMode, Context->RlContext); auto* writerActor = CreatePartitionWriter(SelfId(), partition->TabletId, partitionId, opts); auto& writerInfo = partitionWriters[partitionId]; diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index f4c4bdf39336..f1eea1c150de 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -127,7 +127,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped void CleanTopics(const TActorContext& ctx); void CleanWriters(const TActorContext& ctx); - std::pair PartitionWriter(const TString& topicPath, ui32 partitionId, bool chargeExtraRu, const TActorContext& ctx); + std::pair PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx); TString LogPrefix(); void LogEvent(IEventHandle& ev); diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index f03f86326d9f..4b2771ee40a0 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -526,7 +526,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl if (needToRequestQuota) { ++processed; - PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + Opts.ChargeExtraRu ? 1 : 0; + PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + it->second.GetPartitionRequest().HasChargeExtraRuCount() ? it->second.GetPartitionRequest().GetChargeExtraRuCount() : 0; PendingQuota.emplace_back(it->first); } diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index 4cb7101879fc..9cd28cd5c40a 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -173,7 +173,6 @@ struct TPartitionWriterOpts { bool CheckState = false; bool AutoRegister = false; bool UseDeduplication = true; - bool ChargeExtraRu = false; TString SourceId; std::optional ExpectedGeneration; @@ -195,7 +194,6 @@ struct TPartitionWriterOpts { TPartitionWriterOpts& WithCheckState(bool value) { CheckState = value; return *this; } TPartitionWriterOpts& WithAutoRegister(bool value) { AutoRegister = value; return *this; } TPartitionWriterOpts& WithDeduplication(bool value) { UseDeduplication = value; return *this; } - TPartitionWriterOpts& WithChargeExtraRu(bool value) { ChargeExtraRu = value; return *this; } TPartitionWriterOpts& WithSourceId(const TString& value) { SourceId = value; return *this; } TPartitionWriterOpts& WithExpectedGeneration(ui32 value) { ExpectedGeneration = value; return *this; } TPartitionWriterOpts& WithExpectedGeneration(std::optional value) { ExpectedGeneration = value; return *this; } diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 87b3fa3cb2c9..b52963aef7d0 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -169,7 +169,9 @@ message TPersQueuePartitionRequest { optional bool IsDirectWrite = 18 [default = false]; optional uint64 PutUnitsSize = 19; + optional int64 InitialSeqNo = 26; + optional int32 ChargeExtraRuCount = 27 [default = 0]; } message TPersQueueMetaRequest { From 8805f2443491c4ae482e743774e09aec9f52b9a1 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 1 May 2024 11:01:11 +0000 Subject: [PATCH 3/6] fix --- ydb/core/protos/msgbus_pq.proto | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index b52963aef7d0..a0ee479258fe 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -169,7 +169,6 @@ message TPersQueuePartitionRequest { optional bool IsDirectWrite = 18 [default = false]; optional uint64 PutUnitsSize = 19; - optional int64 InitialSeqNo = 26; optional int32 ChargeExtraRuCount = 27 [default = 0]; } From b561642a8cb3d97b6e048f901ba037f9959a6de2 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Sun, 12 May 2024 22:56:22 +0000 Subject: [PATCH 4/6] Move charge extra RU parameter to config --- .../kafka_proxy/actors/kafka_fetch_actor.cpp | 20 +++++++++---------- .../actors/kafka_produce_actor.cpp | 15 +++++++------- ydb/core/persqueue/fetch_request_actor.cpp | 16 +++++++-------- ydb/core/persqueue/fetch_request_actor.h | 8 ++++---- ydb/core/persqueue/writer/writer.cpp | 2 +- ydb/core/protos/config.proto | 1 + ydb/core/protos/msgbus_pq.proto | 2 +- 7 files changed, 31 insertions(+), 33 deletions(-) diff --git a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp index 4a0c1b37b9cd..1d4d196f3f4e 100644 --- a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp @@ -34,9 +34,8 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) { for (size_t topicIndex = 0; topicIndex < Response->Responses.size(); topicIndex++) { TVector partPQRequests; PrepareFetchRequestData(topicIndex, partPQRequests); - - NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, topicIndex == 0); - + auto chargeExtraRU = topicIndex == 0 && Context->Config.GetChargeExtraRUOnRequest(); + NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, chargeExtraRU); auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID); auto actorId = ctx.Register(fetchActor); PendingResponses++; @@ -55,7 +54,6 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector< for (size_t partIndex = 0; partIndex < topicKafkaRequest.Partitions.size(); partIndex++) { auto& partKafkaRequest = topicKafkaRequest.Partitions[partIndex]; KAFKA_LOG_D(TStringBuilder() << "Fetch actor: New request. Topic: " << topicKafkaRequest.Topic.value() << " Partition: " << partKafkaRequest.Partition << " FetchOffset: " << partKafkaRequest.FetchOffset << " PartitionMaxBytes: " << partKafkaRequest.PartitionMaxBytes); - auto& partPQRequest = partPQRequests[partIndex]; partPQRequest.Topic = NormalizePath(Context->DatabasePath, topicKafkaRequest.Topic.value()); // FIXME(savnik): handle empty topic partPQRequest.Partition = partKafkaRequest.Partition; @@ -113,9 +111,9 @@ void TKafkaFetchActor::HandleSuccessResponse(const NKikimr::TEvPQ::TEvFetchRespo partKafkaResponse.ErrorCode = ConvertErrorCode(partPQResponse.GetReadResult().GetErrorCode()); if (partPQResponse.GetReadResult().GetErrorCode() != NPersQueue::NErrorCode::EErrorCode::OK) { - KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic << - ", partition: " << partPQResponse.GetPartition() << - ". Code: " << static_cast(partPQResponse.GetReadResult().GetErrorCode()) << + KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic << + ", partition: " << partPQResponse.GetPartition() << + ". Code: " << static_cast(partPQResponse.GetReadResult().GetErrorCode()) << ". Reason: " + partPQResponse.GetReadResult().GetErrorReason()); } @@ -174,7 +172,7 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp record.TimestampDelta = lastTimestamp - baseTimestamp; record.Length = record.Size(TKafkaRecord::MessageMeta::PresentVersions.Max) - SizeOfZeroVarint; - KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta << + KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta << ", TimestampDelta: " << record.TimestampDelta << ", Length: " << record.Length); } @@ -187,11 +185,11 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp //recordsBatch.Attributes https://kafka.apache.org/documentation/#recordbatch recordsBatch.BatchLength = recordsBatch.Size(TKafkaRecordBatch::MessageMeta::PresentVersions.Max) - BatchFirstTwoFieldsSize; - KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta << - ", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp << + KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta << + ", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp << ", BaseSequence: " << recordsBatch.BaseSequence << ", BatchLength: " << recordsBatch.BatchLength); auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, partPQResponse.GetTopic()); - ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", ""))); + ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", ""))); } void TKafkaFetchActor::RespondIfRequired(const TActorContext& ctx) { diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 8e3d32c69d28..0e028dcb29a1 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -43,7 +43,7 @@ void TKafkaProduceActor::LogEvent(IEventHandle& ev) { void TKafkaProduceActor::SendMetrics(const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx) { auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, topicName); ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, TStringBuilder() << "api.kafka.produce." << name, ""))); - ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", ""))); + ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", ""))); } void TKafkaProduceActor::Bootstrap(const NActors::TActorContext& /*ctx*/) { @@ -82,7 +82,7 @@ void TKafkaProduceActor::PassAway() { void TKafkaProduceActor::CleanTopics(const TActorContext& ctx) { const auto now = ctx.Now(); - std::map newTopics; + std::map newTopics; for(auto& [topicPath, topicInfo] : Topics) { if (topicInfo.ExpirationTime > now) { newTopics[topicPath] = std::move(topicInfo); @@ -256,8 +256,7 @@ THolder Convert(const TProduceRequestData:: // partitionRequest->SetCmdWriteOffset(); partitionRequest->SetCookie(cookie); if (chargeExtraRU) { - partitionRequest->SetChargeExtraRuCount(1); - chargeExtraRU = false; + partitionRequest->SetChargeExtraRUCount(1); } ui64 totalSize = 0; @@ -324,7 +323,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co pendingRequest->StartTime = ctx.Now(); size_t position = 0; - bool chargeExtraRu = true; + bool chargeExtraRU = Context->Config.GetChargeExtraRUOnRequest(); for(const auto& topicData : r->TopicData) { const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name); for(const auto& partitionData : topicData.PartitionData) { @@ -341,8 +340,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co pendingRequest->WaitAcceptingCookies.insert(ownCookie); pendingRequest->WaitResultCookies.insert(ownCookie); - auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, chargeExtraRu); - chargeExtraRu = false; + auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, chargeExtraRU); + chargeExtraRU = false; Send(writer.second, std::move(ev)); } else { @@ -449,7 +448,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { // We send the results in the order of receipt of the request while (!PendingRequests.empty()) { auto pendingRequest = PendingRequests.front(); - + // We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died. bool expired = expireTime > pendingRequest->StartTime; diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index b729b649891e..d37acaaacd42 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -75,7 +75,7 @@ struct TEvPrivate { bool CanProcessFetchRequest; //any partitions answered that it has data or WaitMs timeout occured ui32 FetchRequestReadsDone; - ui64 FetchRequestCurrentReadTablet; + ui64 FetchRequestCurrentReadTablet; ui64 CurrentCookie; ui32 FetchRequestBytesLeft; THolder Response; @@ -145,10 +145,10 @@ struct TEvPrivate { break; case EWakeupTag::RlNoResource: - // Re-requesting the quota. We do this until we get a quota. + // Re-requesting the quota. We do this until we get a quota. RequestDataQuota(PendingQuotaAmount, ctx); break; - + default: Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast(tag)); } @@ -171,7 +171,7 @@ struct TEvPrivate { void SendSchemeCacheRequest(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "SendSchemeCacheRequest"); - + auto schemeCacheRequest = std::make_unique(1); schemeCacheRequest->DatabaseName = Settings.Database; @@ -250,7 +250,7 @@ struct TEvPrivate { ), ctx );; } - + auto& description = entry.PQGroupInfo->Description; auto& topicInfo = TopicInfo[path]; topicInfo.BalancerTabletId = description.GetBalancerTabletID(); @@ -345,7 +345,7 @@ struct TEvPrivate { if (HandlePipeError(tabletId, ctx)) return; - auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status" + auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status" << NKikimrProto::EReplyStatus_Name(msg->Status).data() << ", Marker# PQ6"; return SendReplyAndDie(CreateErrorReply(Ydb::StatusIds::INTERNAL_ERROR, reason), ctx); @@ -423,7 +423,7 @@ struct TEvPrivate { preq->Record.SetRequestId(reqId); auto partReq = preq->Record.MutablePartitionRequest(); partReq->SetCookie(CurrentCookie); - + partReq->SetTopic(topic); partReq->SetPartition(part); auto read = partReq->MutableCmdRead(); @@ -480,7 +480,7 @@ struct TEvPrivate { SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode()); if (IsQuotaRequired()) { - PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + Settings.ChargeExtraRu ? 1 : 0; + PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + Settings.ChargeExtraRU ? 1 : 0; RequestDataQuota(PendingQuotaAmount, ctx); } else { ProceedFetchRequest(ctx); diff --git a/ydb/core/persqueue/fetch_request_actor.h b/ydb/core/persqueue/fetch_request_actor.h index 80dec380e529..178901d6f50d 100644 --- a/ydb/core/persqueue/fetch_request_actor.h +++ b/ydb/core/persqueue/fetch_request_actor.h @@ -15,7 +15,7 @@ struct TPartitionFetchRequest { ui64 Offset; ui64 MaxBytes; ui64 ReadTimestampMs; - + TPartitionFetchRequest(const TString& topic, ui32 partition, ui64 offset, ui64 maxBytes, ui64 readTimestampMs = 0, const TString& clientId = NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER) : Topic(topic) , ClientId(clientId) @@ -35,12 +35,12 @@ struct TFetchRequestSettings { ui64 MaxWaitTimeMs; ui64 TotalMaxBytes; TRlContext RlCtx; - bool ChargeExtraRu; + bool ChargeExtraRU; ui64 RequestId = 0; TFetchRequestSettings( const TString& database, const TVector& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx, - const TMaybe& user = {}, ui64 requestId = 0, bool chargeExtraRu = false + const TMaybe& user = {}, ui64 requestId = 0, bool chargeExtraRU = false ) : Database(database) , Partitions(partitions) @@ -48,7 +48,7 @@ struct TFetchRequestSettings { , MaxWaitTimeMs(maxWaitTimeMs) , TotalMaxBytes(totalMaxBytes) , RlCtx(rlCtx) - , ChargeExtraRu(chargeExtraRu) + , ChargeExtraRU(chargeExtraRU) , RequestId(requestId) {} }; diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 4b2771ee40a0..2c840ae5a1c3 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -526,7 +526,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl if (needToRequestQuota) { ++processed; - PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + it->second.GetPartitionRequest().HasChargeExtraRuCount() ? it->second.GetPartitionRequest().GetChargeExtraRuCount() : 0; + PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + it->second.GetPartitionRequest().HasChargeExtraRUCount() ? it->second.GetPartitionRequest().GetChargeExtraRUCount() : 0; PendingQuota.emplace_back(it->first); } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 50aa843c062d..9d2783293528 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1701,6 +1701,7 @@ message TKafkaProxyConfig { } optional TProxy Proxy = 7; + optional bool ChargeExtraRUOnRequest = 10 [default = false]; } message TAwsCompatibilityConfig { diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index a0ee479258fe..74adbd999810 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -170,7 +170,7 @@ message TPersQueuePartitionRequest { optional bool IsDirectWrite = 18 [default = false]; optional uint64 PutUnitsSize = 19; optional int64 InitialSeqNo = 26; - optional int32 ChargeExtraRuCount = 27 [default = 0]; + optional int32 ChargeExtraRUCount = 27 [default = 0]; } message TPersQueueMetaRequest { From 840f2c1173142f95264150a69f24e600bdb2dc13 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 13 May 2024 06:10:23 +0000 Subject: [PATCH 5/6] Fix --- ydb/core/persqueue/fetch_request_actor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index d37acaaacd42..7dc6841c391d 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -481,6 +481,7 @@ struct TEvPrivate { if (IsQuotaRequired()) { PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + Settings.ChargeExtraRU ? 1 : 0; + Settings.ChargeExtraRU = 0; RequestDataQuota(PendingQuotaAmount, ctx); } else { ProceedFetchRequest(ctx); From 09b5e2c350416cb4f59c30176cdcd6b4e789068e Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 13 May 2024 06:13:32 +0000 Subject: [PATCH 6/6] Fix --- ydb/core/persqueue/fetch_request_actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index 7dc6841c391d..896b0dabeb27 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -481,7 +481,7 @@ struct TEvPrivate { if (IsQuotaRequired()) { PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + Settings.ChargeExtraRU ? 1 : 0; - Settings.ChargeExtraRU = 0; + Settings.ChargeExtraRU = false; RequestDataQuota(PendingQuotaAmount, ctx); } else { ProceedFetchRequest(ctx);