Skip to content

Commit

Permalink
Merge 09b5e2c into a421b6d
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored May 13, 2024
2 parents a421b6d + 09b5e2c commit edd91c6
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 28 deletions.
20 changes: 9 additions & 11 deletions ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
for (size_t topicIndex = 0; topicIndex < Response->Responses.size(); topicIndex++) {
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
PrepareFetchRequestData(topicIndex, partPQRequests);

NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken);

auto chargeExtraRU = topicIndex == 0 && Context->Config.GetChargeExtraRUOnRequest();
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, *Context->UserToken, chargeExtraRU);
auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
auto actorId = ctx.Register(fetchActor);
PendingResponses++;
Expand All @@ -55,7 +54,6 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector<
for (size_t partIndex = 0; partIndex < topicKafkaRequest.Partitions.size(); partIndex++) {
auto& partKafkaRequest = topicKafkaRequest.Partitions[partIndex];
KAFKA_LOG_D(TStringBuilder() << "Fetch actor: New request. Topic: " << topicKafkaRequest.Topic.value() << " Partition: " << partKafkaRequest.Partition << " FetchOffset: " << partKafkaRequest.FetchOffset << " PartitionMaxBytes: " << partKafkaRequest.PartitionMaxBytes);

auto& partPQRequest = partPQRequests[partIndex];
partPQRequest.Topic = NormalizePath(Context->DatabasePath, topicKafkaRequest.Topic.value()); // FIXME(savnik): handle empty topic
partPQRequest.Partition = partKafkaRequest.Partition;
Expand Down Expand Up @@ -113,9 +111,9 @@ void TKafkaFetchActor::HandleSuccessResponse(const NKikimr::TEvPQ::TEvFetchRespo
partKafkaResponse.ErrorCode = ConvertErrorCode(partPQResponse.GetReadResult().GetErrorCode());

if (partPQResponse.GetReadResult().GetErrorCode() != NPersQueue::NErrorCode::EErrorCode::OK) {
KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic <<
", partition: " << partPQResponse.GetPartition() <<
". Code: " << static_cast<size_t>(partPQResponse.GetReadResult().GetErrorCode()) <<
KAFKA_LOG_ERROR("Fetch actor: Failed to get responses for topic: " << topicResponse.Topic <<
", partition: " << partPQResponse.GetPartition() <<
". Code: " << static_cast<size_t>(partPQResponse.GetReadResult().GetErrorCode()) <<
". Reason: " + partPQResponse.GetReadResult().GetErrorReason());
}

Expand Down Expand Up @@ -174,7 +172,7 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp
record.TimestampDelta = lastTimestamp - baseTimestamp;

record.Length = record.Size(TKafkaRecord::MessageMeta::PresentVersions.Max) - SizeOfZeroVarint;
KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta <<
KAFKA_LOG_D("Fetch actor: Record info. OffsetDelta: " << record.OffsetDelta <<
", TimestampDelta: " << record.TimestampDelta << ", Length: " << record.Length);
}

Expand All @@ -187,11 +185,11 @@ void TKafkaFetchActor::FillRecordsBatch(const NKikimrClient::TPersQueueFetchResp
//recordsBatch.Attributes https://kafka.apache.org/documentation/#recordbatch

recordsBatch.BatchLength = recordsBatch.Size(TKafkaRecordBatch::MessageMeta::PresentVersions.Max) - BatchFirstTwoFieldsSize;
KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta <<
", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp <<
KAFKA_LOG_D("Fetch actor: RecordBatch info. BaseOffset: " << recordsBatch.BaseOffset << ", LastOffsetDelta: " << recordsBatch.LastOffsetDelta <<
", BaseTimestamp: " << recordsBatch.BaseTimestamp << ", MaxTimestamp: " << recordsBatch.MaxTimestamp <<
", BaseSequence: " << recordsBatch.BaseSequence << ", BatchLength: " << recordsBatch.BatchLength);
auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, partPQResponse.GetTopic());
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", "")));
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(recordsBatch.Records.size(), BuildLabels(Context, "", topicWithoutDb, "api.kafka.fetch.messages", "")));
}

void TKafkaFetchActor::RespondIfRequired(const TActorContext& ctx) {
Expand Down
17 changes: 11 additions & 6 deletions ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void TKafkaProduceActor::LogEvent(IEventHandle& ev) {
void TKafkaProduceActor::SendMetrics(const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx) {
auto topicWithoutDb = GetTopicNameWithoutDb(Context->DatabasePath, topicName);
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, TStringBuilder() << "api.kafka.produce." << name, "")));
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", "")));
ctx.Send(MakeKafkaMetricsServiceID(), new TEvKafka::TEvUpdateCounter(delta, BuildLabels(Context, "", topicWithoutDb, "api.kafka.produce.total_messages", "")));
}

void TKafkaProduceActor::Bootstrap(const NActors::TActorContext& /*ctx*/) {
Expand Down Expand Up @@ -82,7 +82,7 @@ void TKafkaProduceActor::PassAway() {
void TKafkaProduceActor::CleanTopics(const TActorContext& ctx) {
const auto now = ctx.Now();

std::map<TString, TTopicInfo> newTopics;
std::map<TString, TTopicInfo> newTopics;
for(auto& [topicPath, topicInfo] : Topics) {
if (topicInfo.ExpirationTime > now) {
newTopics[topicPath] = std::move(topicInfo);
Expand Down Expand Up @@ -242,7 +242,8 @@ size_t TKafkaProduceActor::EnqueueInitialization() {
THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data,
const TString& topicName,
ui64 cookie,
const TString& clientDC) {
const TString& clientDC,
bool chargeExtraRU) {
auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
auto& request = ev->Record;

Expand All @@ -254,6 +255,9 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
partitionRequest->SetPartition(data.Index);
// partitionRequest->SetCmdWriteOffset();
partitionRequest->SetCookie(cookie);
if (chargeExtraRU) {
partitionRequest->SetChargeExtraRUCount(1);
}

ui64 totalSize = 0;

Expand Down Expand Up @@ -319,11 +323,11 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
pendingRequest->StartTime = ctx.Now();

size_t position = 0;
bool chargeExtraRU = Context->Config.GetChargeExtraRUOnRequest();
for(const auto& topicData : r->TopicData) {
const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
for(const auto& partitionData : topicData.PartitionData) {
const auto partitionId = partitionData.Index;

auto writer = PartitionWriter(topicPath, partitionId, ctx);
if (OK == writer.first) {
auto ownCookie = ++Cookie;
Expand All @@ -336,7 +340,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
pendingRequest->WaitAcceptingCookies.insert(ownCookie);
pendingRequest->WaitResultCookies.insert(ownCookie);

auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC);
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, chargeExtraRU);
chargeExtraRU = false;

Send(writer.second, std::move(ev));
} else {
Expand Down Expand Up @@ -443,7 +448,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
// We send the results in the order of receipt of the request
while (!PendingRequests.empty()) {
auto pendingRequest = PendingRequests.front();

// We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died.
bool expired = expireTime > pendingRequest->StartTime;

Expand Down
17 changes: 9 additions & 8 deletions ydb/core/persqueue/fetch_request_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ struct TEvPrivate {

bool CanProcessFetchRequest; //any partitions answered that it has data or WaitMs timeout occured
ui32 FetchRequestReadsDone;
ui64 FetchRequestCurrentReadTablet;
ui64 FetchRequestCurrentReadTablet;
ui64 CurrentCookie;
ui32 FetchRequestBytesLeft;
THolder<TEvPQ::TEvFetchResponse> Response;
Expand Down Expand Up @@ -145,10 +145,10 @@ struct TEvPrivate {
break;

case EWakeupTag::RlNoResource:
// Re-requesting the quota. We do this until we get a quota.
// Re-requesting the quota. We do this until we get a quota.
RequestDataQuota(PendingQuotaAmount, ctx);
break;

default:
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
}
Expand All @@ -171,7 +171,7 @@ struct TEvPrivate {

void SendSchemeCacheRequest(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "SendSchemeCacheRequest");

auto schemeCacheRequest = std::make_unique<TSchemeCacheNavigate>(1);
schemeCacheRequest->DatabaseName = Settings.Database;

Expand Down Expand Up @@ -250,7 +250,7 @@ struct TEvPrivate {
), ctx
);;
}

auto& description = entry.PQGroupInfo->Description;
auto& topicInfo = TopicInfo[path];
topicInfo.BalancerTabletId = description.GetBalancerTabletID();
Expand Down Expand Up @@ -345,7 +345,7 @@ struct TEvPrivate {
if (HandlePipeError(tabletId, ctx))
return;

auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status"
auto reason = TStringBuilder() << "Client pipe to " << tabletId << " connection error, Status"
<< NKikimrProto::EReplyStatus_Name(msg->Status).data()
<< ", Marker# PQ6";
return SendReplyAndDie(CreateErrorReply(Ydb::StatusIds::INTERNAL_ERROR, reason), ctx);
Expand Down Expand Up @@ -423,7 +423,7 @@ struct TEvPrivate {
preq->Record.SetRequestId(reqId);
auto partReq = preq->Record.MutablePartitionRequest();
partReq->SetCookie(CurrentCookie);

partReq->SetTopic(topic);
partReq->SetPartition(part);
auto read = partReq->MutableCmdRead();
Expand Down Expand Up @@ -480,7 +480,8 @@ struct TEvPrivate {
SetMeteringMode(it->second.PQInfo->Description.GetPQTabletConfig().GetMeteringMode());

if (IsQuotaRequired()) {
PendingQuotaAmount = 1 + CalcRuConsumption(GetPayloadSize(record));
PendingQuotaAmount = CalcRuConsumption(GetPayloadSize(record)) + Settings.ChargeExtraRU ? 1 : 0;
Settings.ChargeExtraRU = false;
RequestDataQuota(PendingQuotaAmount, ctx);
} else {
ProceedFetchRequest(ctx);
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/persqueue/fetch_request_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct TPartitionFetchRequest {
ui64 Offset;
ui64 MaxBytes;
ui64 ReadTimestampMs;

TPartitionFetchRequest(const TString& topic, ui32 partition, ui64 offset, ui64 maxBytes, ui64 readTimestampMs = 0, const TString& clientId = NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER)
: Topic(topic)
, ClientId(clientId)
Expand All @@ -35,18 +35,20 @@ struct TFetchRequestSettings {
ui64 MaxWaitTimeMs;
ui64 TotalMaxBytes;
TRlContext RlCtx;
bool ChargeExtraRU;

ui64 RequestId = 0;
TFetchRequestSettings(
const TString& database, const TVector<TPartitionFetchRequest>& partitions, ui64 maxWaitTimeMs, ui64 totalMaxBytes, TRlContext rlCtx,
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0
const TMaybe<NACLib::TUserToken>& user = {}, ui64 requestId = 0, bool chargeExtraRU = false
)
: Database(database)
, Partitions(partitions)
, User(user)
, MaxWaitTimeMs(maxWaitTimeMs)
, TotalMaxBytes(totalMaxBytes)
, RlCtx(rlCtx)
, ChargeExtraRU(chargeExtraRU)
, RequestId(requestId)
{}
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl

if (needToRequestQuota) {
++processed;
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize());
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()) + it->second.GetPartitionRequest().HasChargeExtraRUCount() ? it->second.GetPartitionRequest().GetChargeExtraRUCount() : 0;
PendingQuota.emplace_back(it->first);
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1715,6 +1715,7 @@ message TKafkaProxyConfig {
}

optional TProxy Proxy = 7;
optional bool ChargeExtraRUOnRequest = 10 [default = false];
}

message TAwsCompatibilityConfig {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/msgbus_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ message TPersQueuePartitionRequest {
optional bool IsDirectWrite = 18 [default = false];
optional uint64 PutUnitsSize = 19;
optional int64 InitialSeqNo = 26;
optional int32 ChargeExtraRUCount = 27 [default = 0];
}

message TPersQueueMetaRequest {
Expand Down

0 comments on commit edd91c6

Please sign in to comment.