Skip to content

Commit

Permalink
Kafka new metering (ydb-platform#4485)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored and naspirato committed May 15, 2024
1 parent 911220b commit f38240d
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 16 deletions.
4 changes: 2 additions & 2 deletions ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
for (size_t topicIndex = 0; topicIndex < Response->Responses.size(); topicIndex++) {
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
PrepareFetchRequestData(topicIndex, partPQRequests);
auto chargeExtraRU = topicIndex == 0 && Context->Config.GetChargeExtraRUOnRequest();
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, chargeExtraRU);
auto ruPerRequest = topicIndex == 0 && Context->Config.GetMeteringV2Enabled();
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, ruPerRequest);
auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
auto actorId = ctx.Register(fetchActor);
PendingResponses++;
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
const TString& topicName,
ui64 cookie,
const TString& clientDC,
bool chargeExtraRU) {
bool ruPerRequest) {
auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
auto& request = ev->Record;

Expand All @@ -255,8 +255,8 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
partitionRequest->SetPartition(data.Index);
// partitionRequest->SetCmdWriteOffset();
partitionRequest->SetCookie(cookie);
if (chargeExtraRU) {
partitionRequest->SetChargeExtraRU(true);
if (ruPerRequest) {
partitionRequest->SetMeteringV2Enabled(true);
}

ui64 totalSize = 0;
Expand Down Expand Up @@ -323,7 +323,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
pendingRequest->StartTime = ctx.Now();

size_t position = 0;
bool chargeExtraRU = Context->Config.GetChargeExtraRUOnRequest();
bool ruPerRequest = Context->Config.GetMeteringV2Enabled();
for(const auto& topicData : r->TopicData) {
const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
for(const auto& partitionData : topicData.PartitionData) {
Expand All @@ -340,8 +340,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
pendingRequest->WaitAcceptingCookies.insert(ownCookie);
pendingRequest->WaitResultCookies.insert(ownCookie);

auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, chargeExtraRU);
chargeExtraRU = false;
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, ruPerRequest);
ruPerRequest = false;

Send(writer.second, std::move(ev));
} else {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/fetch_request_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ struct TEvPrivate {
SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode());

if (IsQuotaRequired()) {
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + (Settings.ChargeExtraRU ? 1 : 0);
Settings.ChargeExtraRU = false;
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + (Settings.RuPerRequest ? 1 : 0);
Settings.RuPerRequest = false;
RequestDataQuota(PendingQuotaAmount, ctx);
} else {
ProceedFetchRequest(ctx);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/fetch_request_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ struct TFetchRequestSettings {
ui64 MaxWaitTimeMs;
ui64 TotalMaxBytes;
TRlContext RlCtx;
bool ChargeExtraRU;
bool RuPerRequest;

ui64 RequestId = 0;
TFetchRequestSettings(
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx,
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0, bool chargeExtraRU = false
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0, bool ruPerRequest = false
)
: Database(database)
, Partitions(partitions)
, User(user)
, MaxWaitTimeMs(maxWaitTimeMs)
, TotalMaxBytes(totalMaxBytes)
, RlCtx(rlCtx)
, ChargeExtraRU(chargeExtraRU)
, RuPerRequest(ruPerRequest)
, RequestId(requestId)
{}
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl

if (needToRequestQuota) {
++processed;
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + (it->second.GetPartitionRequest().GetChargeExtraRU() ? 1 : 0);
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + (it->second.GetPartitionRequest().GetMeteringV2Enabled() ? 1 : 0);
PendingQuota.emplace_back(it->first);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1715,7 +1715,7 @@ message TKafkaProxyConfig {
}

optional TProxy Proxy = 7;
optional bool ChargeExtraRUOnRequest = 10 [default = false];
optional bool MeteringV2Enabled = 10 [default = false];
}

message TAwsCompatibilityConfig {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/protos/msgbus_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ message TPersQueuePartitionRequest {
optional bool IsDirectWrite = 18 [default = false];
optional uint64 PutUnitsSize = 19;
optional int64 InitialSeqNo = 26;
optional bool ChargeExtraRU = 27 [default = false];
optional bool MeteringV2Enabled = 27 [default = false];
}

message TPersQueueMetaRequest {
Expand Down

0 comments on commit f38240d

Please sign in to comment.