Skip to content

Commit

Permalink
Refactoring - kafka_proxy use PartitionChooser
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Dec 22, 2023
1 parent 2729ebb commit 0c63fbc
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 17 deletions.
19 changes: 6 additions & 13 deletions ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu
if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *Context->UserToken)) {
topic.Status = OK;
topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL;
for(auto& p : info.PQGroupInfo->Description.GetPartitions()) {
topic.partitions[p.GetPartitionId()] = p.GetTabletId();
}
topic.PartitionChooser = CreatePartitionChooser(info.PQGroupInfo->Description);
} else {
KAFKA_LOG_W("Produce actor: Unauthorized PRODUCE to topic '" << topicPath << "'");
topic.Status = UNAUTHORIZED;
Expand Down Expand Up @@ -178,7 +176,7 @@ void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TP
auto& topicInfo = Topics[path];
topicInfo.Status = NOT_FOUND;
topicInfo.ExpirationTime = ctx.Now() + TOPIC_NOT_FOUND_EXPIRATION_INTERVAL;
topicInfo.partitions.clear();
topicInfo.PartitionChooser.reset();
}

void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -192,10 +190,7 @@ void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TP
}
topic.Status = OK;
topic.ExpirationTime = ctx.Now() + TOPIC_OK_EXPIRATION_INTERVAL;
topic.partitions.clear();
for (auto& p : e->Result->GetPathDescription().GetPersQueueGroup().GetPartitions()) {
topic.partitions[p.GetPartitionId()] = p.GetTabletId();
}
topic.PartitionChooser = CreatePartitionChooser(e->Result->GetPathDescription().GetPersQueueGroup());
}

void TKafkaProduceActor::Handle(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx) {
Expand Down Expand Up @@ -578,19 +573,17 @@ std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::Partit
return { OK, writerInfo.ActorId };
}

auto& partitions = topicInfo.partitions;
auto pit = partitions.find(partitionId);
if (pit == partitions.end()) {
auto* partition = topicInfo.PartitionChooser->GetPartition(partitionId);
if (!partition) {
return { NOT_FOUND, TActorId{} };
}

auto tabletId = pit->second;
TPartitionWriterOpts opts;
opts.WithDeduplication(false)
.WithSourceId(SourceId)
.WithTopicPath(topicPath)
.WithCheckRequestUnits(topicInfo.MeteringMode, Context->RlContext);
auto* writerActor = CreatePartitionWriter(SelfId(), tabletId, partitionId, opts);
auto* writerActor = CreatePartitionWriter(SelfId(), partition->TabletId, partitionId, opts);

auto& writerInfo = partitionWriters[partitionId];
writerInfo.ActorId = ctx.RegisterWithSameMailbox(writerActor);
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kafka_proxy/actors/kafka_produce_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
TInstant ExpirationTime;

NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;

// partitioId -> tabletId
std::unordered_map<ui32, ui64> partitions;
std::shared_ptr<IPartitionChooser> PartitionChooser;
};
std::map<TString, TTopicInfo> Topics;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/writer/partition_chooser.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class IPartitionChooser {
virtual const TPartitionInfo* GetPartition(ui32 partitionId) const = 0;
};

std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash);
std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash = false);

NActors::IActor* CreatePartitionChooserActor(TActorId parentId,
const NKikimrSchemeOp::TPersQueueGroupDescription& config,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl


IActor* CreatePartitionWriter(const TActorId& client,
// const NKikimrSchemeOp::TPersQueueGroupDescription& config,
ui64 tabletId,
ui32 partitionId,
const TPartitionWriterOpts& opts) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/writer/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include <variant>

#include "partition_chooser.h"

namespace NKikimr::NPQ {

constexpr ui64 INVALID_WRITE_ID = Max<ui64>();
Expand Down Expand Up @@ -205,6 +207,7 @@ struct TPartitionWriterOpts {
};

IActor* CreatePartitionWriter(const TActorId& client,
// const NKikimrSchemeOp::TPersQueueGroupDescription& config,
ui64 tabletId,
ui32 partitionId,
const TPartitionWriterOpts& opts = {});
Expand Down

0 comments on commit 0c63fbc

Please sign in to comment.