diff --git a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp index 5cd0be58bd6b..1d4d196f3f4e 100644 --- a/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp @@ -34,9 +34,8 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) { for (size_t topicIndex = 0; topicIndex < Response->Responses.size(); topicIndex++) { TVector partPQRequests; PrepareFetchRequestData(topicIndex, partPQRequests); - - NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken); - + auto chargeExtraRU = topicIndex == 0 && Context->Config.GetChargeExtraRUOnRequest(); + NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, chargeExtraRU); auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID); auto actorId = ctx.Register(fetchActor); PendingResponses++; @@ -55,7 +54,6 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector< for (size_t partIndex = 0; partIndex < topicKafkaRequest.Partitions.size(); partIndex++) { auto& partKafkaRequest = topicKafkaRequest.Partitions[partIndex]; KAFKA_LOG_D(TStringBuilder() << "Fetch actor: New request. Topic: " << topicKafkaRequest.Topic.value() << " Partition: " << partKafkaRequest.Partition << " FetchOffset: " << partKafkaRequest.FetchOffset << " PartitionMaxBytes: " << partKafkaRequest.PartitionMaxBytes); - auto& partPQRequest = partPQRequests[partIndex]; partPQRequest.Topic = NormalizePath(Context->DatabasePath, topicKafkaRequest.Topic.value()); // FIXME(savnik): handle empty topic partPQRequest.Partition = partKafkaRequest.Partition; @@ -113,9 +111,9 @@ void TKafkaFetchActor::HandleSuccessResponse(const NKikimr::TEvPQ::TEvFetchRespo partKafkaResponse.ErrorCode = ConvertErrorCode(partPQResponse.GetReadResult().GetErrorCode()); if (partPQResponse.GetReadResult().GetErrorCode() != NPersQueue::NErrorCode::EErrorCode::OK) { - KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic << - ", partition: " << partPQResponse.GetPartition() << - ". Code: " << static_cast(partPQResponse.GetReadResult().GetErrorCode()) << + KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic << + ", partition: " << partPQResponse.GetPartition() << + ". Code: " << static_cast(partPQResponse.GetReadResult().GetErrorCode()) << ". Reason: " + partPQResponse.GetReadResult().GetErrorReason()); } @@ -174,7 +172,7 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp record.TimestampDelta = lastTimestamp - baseTimestamp; record.Length = record.Size(TKafkaRecord::MessageMeta::PresentVersions.Max) - SizeOfZeroVarint; - KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta << + KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta << ", TimestampDelta: " << record.TimestampDelta << ", Length: " << record.Length); } @@ -187,11 +185,11 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp //recordsBatch.Attributes https://kafka.apache.org/documentation/#recordbatch recordsBatch.BatchLength = recordsBatch.Size(TKafkaRecordBatch::MessageMeta::PresentVersions.Max) - BatchFirstTwoFieldsSize; - KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta << - ", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp << + KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta << + ", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp << ", BaseSequence: " << recordsBatch.BaseSequence << ", BatchLength: " << recordsBatch.BatchLength); auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, partPQResponse.GetTopic()); - ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", ""))); + ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", ""))); } void TKafkaFetchActor::RespondIfRequired(const TActorContext& ctx) { diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 452beeadc61d..0e028dcb29a1 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -43,7 +43,7 @@ void TKafkaProduceActor::LogEvent(IEventHandle& ev) { void TKafkaProduceActor::SendMetrics(const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx) { auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, topicName); ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, TStringBuilder() << "api.kafka.produce." << name, ""))); - ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", ""))); + ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", ""))); } void TKafkaProduceActor::Bootstrap(const NActors::TActorContext& /*ctx*/) { @@ -82,7 +82,7 @@ void TKafkaProduceActor::PassAway() { void TKafkaProduceActor::CleanTopics(const TActorContext& ctx) { const auto now = ctx.Now(); - std::map newTopics; + std::map newTopics; for(auto& [topicPath, topicInfo] : Topics) { if (topicInfo.ExpirationTime > now) { newTopics[topicPath] = std::move(topicInfo); @@ -242,7 +242,8 @@ size_t TKafkaProduceActor::EnqueueInitialization() { THolder Convert(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data, const TString& topicName, ui64 cookie, - const TString& clientDC) { + const TString& clientDC, + bool chargeExtraRU) { auto ev = MakeHolder(); auto& request = ev->Record; @@ -254,6 +255,9 @@ THolder Convert(const TProduceRequestData:: partitionRequest->SetPartition(data.Index); // partitionRequest->SetCmdWriteOffset(); partitionRequest->SetCookie(cookie); + if (chargeExtraRU) { + partitionRequest->SetChargeExtraRUCount(1); + } ui64 totalSize = 0; @@ -319,11 +323,11 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co pendingRequest->StartTime = ctx.Now(); size_t position = 0; + bool chargeExtraRU = Context->Config.GetChargeExtraRUOnRequest(); for(const auto& topicData : r->TopicData) { const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name); for(const auto& partitionData : topicData.PartitionData) { const auto partitionId = partitionData.Index; - auto writer = PartitionWriter(topicPath, partitionId, ctx); if (OK == writer.first) { auto ownCookie = ++Cookie; @@ -336,7 +340,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co pendingRequest->WaitAcceptingCookies.insert(ownCookie); pendingRequest->WaitResultCookies.insert(ownCookie); - auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC); + auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, chargeExtraRU); + chargeExtraRU = false; Send(writer.second, std::move(ev)); } else { @@ -443,7 +448,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { // We send the results in the order of receipt of the request while (!PendingRequests.empty()) { auto pendingRequest = PendingRequests.front(); - + // We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died. bool expired = expireTime > pendingRequest->StartTime; diff --git a/ydb/core/persqueue/fetch_request_actor.cpp b/ydb/core/persqueue/fetch_request_actor.cpp index 4ddd00ac3b6b..d37acaaacd42 100644 --- a/ydb/core/persqueue/fetch_request_actor.cpp +++ b/ydb/core/persqueue/fetch_request_actor.cpp @@ -75,7 +75,7 @@ struct TEvPrivate { bool CanProcessFetchRequest; //any partitions answered that it has data or WaitMs timeout occured ui32 FetchRequestReadsDone; - ui64 FetchRequestCurrentReadTablet; + ui64 FetchRequestCurrentReadTablet; ui64 CurrentCookie; ui32 FetchRequestBytesLeft; THolder Response; @@ -145,10 +145,10 @@ struct TEvPrivate { break; case EWakeupTag::RlNoResource: - // Re-requesting the quota. We do this until we get a quota. + // Re-requesting the quota. We do this until we get a quota. RequestDataQuota(PendingQuotaAmount, ctx); break; - + default: Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast(tag)); } @@ -171,7 +171,7 @@ struct TEvPrivate { void SendSchemeCacheRequest(const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "SendSchemeCacheRequest"); - + auto schemeCacheRequest = std::make_unique(1); schemeCacheRequest->DatabaseName = Settings.Database; @@ -250,7 +250,7 @@ struct TEvPrivate { ), ctx );; } - + auto& description = entry.PQGroupInfo->Description; auto& topicInfo = TopicInfo[path]; topicInfo.BalancerTabletId = description.GetBalancerTabletID(); @@ -345,7 +345,7 @@ struct TEvPrivate { if (HandlePipeError(tabletId, ctx)) return; - auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status" + auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status" << NKikimrProto::EReplyStatus_Name(msg->Status).data() << ", Marker# PQ6"; return SendReplyAndDie(CreateErrorReply(Ydb::StatusIds::INTERNAL_ERROR, reason), ctx); @@ -423,7 +423,7 @@ struct TEvPrivate { preq->Record.SetRequestId(reqId); auto partReq = preq->Record.MutablePartitionRequest(); partReq->SetCookie(CurrentCookie); - + partReq->SetTopic(topic); partReq->SetPartition(part); auto read = partReq->MutableCmdRead(); @@ -480,7 +480,7 @@ struct TEvPrivate { SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode()); if (IsQuotaRequired()) { - PendingQuotaAmount = 1 + CalcRuConsumption(GetPayloadSize(record)); + PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + Settings.ChargeExtraRU ? 1 : 0; RequestDataQuota(PendingQuotaAmount, ctx); } else { ProceedFetchRequest(ctx); diff --git a/ydb/core/persqueue/fetch_request_actor.h b/ydb/core/persqueue/fetch_request_actor.h index e6adac4f9d62..178901d6f50d 100644 --- a/ydb/core/persqueue/fetch_request_actor.h +++ b/ydb/core/persqueue/fetch_request_actor.h @@ -15,7 +15,7 @@ struct TPartitionFetchRequest { ui64 Offset; ui64 MaxBytes; ui64 ReadTimestampMs; - + TPartitionFetchRequest(const TString& topic, ui32 partition, ui64 offset, ui64 maxBytes, ui64 readTimestampMs = 0, const TString& clientId = NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER) : Topic(topic) , ClientId(clientId) @@ -35,11 +35,12 @@ struct TFetchRequestSettings { ui64 MaxWaitTimeMs; ui64 TotalMaxBytes; TRlContext RlCtx; + bool ChargeExtraRU; ui64 RequestId = 0; TFetchRequestSettings( const TString& database, const TVector& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx, - const TMaybe& user = {}, ui64 requestId = 0 + const TMaybe& user = {}, ui64 requestId = 0, bool chargeExtraRU = false ) : Database(database) , Partitions(partitions) @@ -47,6 +48,7 @@ struct TFetchRequestSettings { , MaxWaitTimeMs(maxWaitTimeMs) , TotalMaxBytes(totalMaxBytes) , RlCtx(rlCtx) + , ChargeExtraRU(chargeExtraRU) , RequestId(requestId) {} }; diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 67bfcaf10801..2c840ae5a1c3 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -526,7 +526,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl if (needToRequestQuota) { ++processed; - PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()); + PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + it->second.GetPartitionRequest().HasChargeExtraRUCount() ? it->second.GetPartitionRequest().GetChargeExtraRUCount() : 0; PendingQuota.emplace_back(it->first); } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 12a11ff85b1e..aa9c88b4b77c 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1715,6 +1715,7 @@ message TKafkaProxyConfig { } optional TProxy Proxy = 7; + optional bool ChargeExtraRUOnRequest = 10 [default = false]; } message TAwsCompatibilityConfig { diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 87b3fa3cb2c9..74adbd999810 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -170,6 +170,7 @@ message TPersQueuePartitionRequest { optional bool IsDirectWrite = 18 [default = false]; optional uint64 PutUnitsSize = 19; optional int64 InitialSeqNo = 26; + optional int32 ChargeExtraRUCount = 27 [default = 0]; } message TPersQueueMetaRequest {