diff --git a/ydb/core/persqueue/partition_scale_manager.cpp b/ydb/core/persqueue/partition_scale_manager.cpp index 92a18074f32b..8d03b87e1d04 100644 --- a/ydb/core/persqueue/partition_scale_manager.cpp +++ b/ydb/core/persqueue/partition_scale_manager.cpp @@ -9,22 +9,23 @@ namespace NPQ { TPartitionScaleManager::TPartitionScaleManager( const TString& topicName, const TString& databasePath, - NKikimrPQ::TUpdateBalancerConfig& balancerConfig + ui64 pathId, + int version, + const NKikimrPQ::TPQTabletConfig& config ) : TopicName(topicName) , DatabasePath(databasePath) - , BalancerConfig(balancerConfig) { - + , BalancerConfig(pathId, version, config) { } -void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) { +void TPartitionScaleManager::HandleScaleStatusChange(const ui32 partitionId, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) { if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange " - << "need to split partition " << partition.Id); - PartitionsToSplit.emplace(partition.Id, partition); + << "need to split partition " << partitionId); + PartitionsToSplit.insert(partitionId); TrySendScaleRequest(ctx); } else { - PartitionsToSplit.erase(partition.Id); + PartitionsToSplit.erase(partitionId); } } @@ -66,35 +67,33 @@ std::pair, std::vector> TPartition std::vector mergesToApply; size_t allowedSplitsCount = BalancerConfig.MaxActivePartitions > BalancerConfig.CurPartitions ? BalancerConfig.MaxActivePartitions - BalancerConfig.CurPartitions : 0; - auto itSplit = PartitionsToSplit.begin(); - while (allowedSplitsCount > 0 && itSplit != PartitionsToSplit.end()) { - const auto partitionId = itSplit->first; - const auto& partition = itSplit->second; - - if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) { - auto from = partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : ""; - auto to = partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : ""; + auto partitionId = PartitionsToSplit.begin(); + while (allowedSplitsCount > 0 && partitionId != PartitionsToSplit.end()) { + auto* node = BalancerConfig.PartitionGraph.GetPartition(*partitionId); + if (node->Children.empty()) { + auto from = node->From; + auto to = node->To; auto mid = MiddleOf(from, to); if (mid.empty()) { - itSplit = PartitionsToSplit.erase(itSplit); + partitionId = PartitionsToSplit.erase(partitionId); LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, - "TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << partitionId); + "TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << *partitionId); continue; } LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest partition split ranges. From# '" << ToHex(from) << "'. To# '" << ToHex(to) << "'. Mid# '" << ToHex(mid) - << "'. Topic# " << TopicName << ". Partition# " << partitionId); + << "'. Topic# " << TopicName << ". Partition# " << *partitionId); TPartitionSplit split; - split.set_partition(partition.Id); + split.set_partition(*partitionId); split.set_splitboundary(mid); splitsToApply.push_back(split); - allowedSplitsCount--; - itSplit++; + --allowedSplitsCount; + ++partitionId; } else { - itSplit = PartitionsToSplit.erase(itSplit); + partitionId = PartitionsToSplit.erase(partitionId); } } @@ -122,8 +121,8 @@ void TPartitionScaleManager::Die(const TActorContext& ctx) { } } -void TPartitionScaleManager::UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config) { - BalancerConfig = TBalancerConfig(config); +void TPartitionScaleManager::UpdateBalancerConfig(ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config) { + BalancerConfig = TBalancerConfig(pathId, version, config); } void TPartitionScaleManager::UpdateDatabasePath(const TString& dbPath) { diff --git a/ydb/core/persqueue/partition_scale_manager.h b/ydb/core/persqueue/partition_scale_manager.h index 23e67079a179..dc46b38f0831 100644 --- a/ydb/core/persqueue/partition_scale_manager.h +++ b/ydb/core/persqueue/partition_scale_manager.h @@ -21,24 +21,21 @@ namespace NKikimr { namespace NPQ { class TPartitionScaleManager { - -public: - struct TPartitionInfo { - ui32 Id; - NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange; - }; - private: struct TBalancerConfig { TBalancerConfig( - NKikimrPQ::TUpdateBalancerConfig& config + ui64 pathId, + int version, + const NKikimrPQ::TPQTabletConfig& config ) - : PathId(config.GetPathId()) - , PathVersion(config.GetVersion()) + : PathId(pathId) + , PathVersion(version) , PartitionGraph(MakePartitionGraph(config)) - , MaxActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount()) - , MinActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMinPartitionCount()) - , CurPartitions(config.PartitionsSize()) { + , MaxActivePartitions(config.GetPartitionStrategy().GetMaxPartitionCount()) + , MinActivePartitions(config.GetPartitionStrategy().GetMinPartitionCount()) + , CurPartitions(std::count_if(config.GetAllPartitions().begin(), config.GetAllPartitions().end(), [](auto& p) { + return p.GetStatus() != NKikimrPQ::ETopicPartitionStatus::Inactive; + })) { } ui64 PathId; @@ -50,13 +47,13 @@ class TPartitionScaleManager { }; public: - TPartitionScaleManager(const TString& topicPath, const TString& databasePath, NKikimrPQ::TUpdateBalancerConfig& balancerConfig); + TPartitionScaleManager(const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config); public: - void HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx); + void HandleScaleStatusChange(const ui32 partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx); void HandleScaleRequestResult(TPartitionScaleRequest::TEvPartitionScaleRequestDone::TPtr& ev, const TActorContext& ctx); void TrySendScaleRequest(const TActorContext& ctx); - void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config); + void UpdateBalancerConfig(ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config); void UpdateDatabasePath(const TString& dbPath); void Die(const TActorContext& ctx); @@ -79,7 +76,7 @@ class TPartitionScaleManager { TDuration RequestTimeout = TDuration::MilliSeconds(0); TInstant LastResponseTime = TInstant::Zero(); - std::unordered_map PartitionsToSplit; + std::unordered_set PartitionsToSplit; TBalancerConfig BalancerConfig; bool RequestInflight = false; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 150f7cacb11a..6eb2179e2d36 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -572,11 +572,12 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) { } void TPartition::ChangeScaleStatusIfNeeded(NKikimrPQ::EScaleStatus scaleStatus) { - if (scaleStatus == ScaleStatus || LastScaleRequestTime + TDuration::Seconds(SCALE_REQUEST_REPEAT_MIN_SECONDS) > TInstant::Now()) { + auto now = TInstant::Now(); + if (scaleStatus == ScaleStatus || LastScaleRequestTime + TDuration::Seconds(SCALE_REQUEST_REPEAT_MIN_SECONDS) > now) { return; } Send(Tablet, new TEvPQ::TEvPartitionScaleStatusChanged(Partition.OriginalPartitionId, scaleStatus)); - LastScaleRequestTime = TInstant::Now(); + LastScaleRequestTime = now; ScaleStatus = scaleStatus; } diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index fa103152a911..495277081072 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -503,9 +503,9 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr if (SplitMergeEnabled(TabletConfig)) { if (!PartitionsScaleManager) { - PartitionsScaleManager = std::make_unique(Topic, DatabasePath, record); + PartitionsScaleManager = std::make_unique(Topic, DatabasePath, PathId, Version, TabletConfig); } else { - PartitionsScaleManager->UpdateBalancerConfig(record); + PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig); } } @@ -533,13 +533,10 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr if (it == PartitionsInfo.end()) { Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0); - partitionsInfo[p.GetPartition()] = {p.GetTabletId(), {}}; - if (SplitMergeEnabled(TabletConfig) && p.HasKeyRange()) { - partitionsInfo[p.GetPartition()].KeyRange.DeserializeFromProto(p.GetKeyRange()); - } + partitionsInfo[p.GetPartition()] = {p.GetTabletId()}; newPartitionsIds.push_back(p.GetPartition()); - newPartitions.push_back(TPartInfo{p.GetPartition(), p.GetTabletId(), 0, partitionsInfo[p.GetPartition()].KeyRange}); + newPartitions.push_back(TPartInfo{p.GetPartition(), p.GetTabletId(), 0, p.GetKeyRange()}); ++NumActiveParts; @@ -701,11 +698,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c } if (SplitMergeEnabled(TabletConfig) && PartitionsScaleManager) { - TPartitionScaleManager::TPartitionInfo scalePartitionInfo = { - .Id = partitionId, - .KeyRange = PartitionsInfo[partitionId].KeyRange - }; - PartitionsScaleManager->HandleScaleStatusChange(scalePartitionInfo, partRes.GetScaleStatus(), ctx); + PartitionsScaleManager->HandleScaleStatusChange(partitionId, partRes.GetScaleStatus(), ctx); } AggregatedStats.AggrStats(partitionId, partRes.GetPartitionSize(), partRes.GetUsedReserveSize()); @@ -1248,17 +1241,13 @@ void TPersQueueReadBalancer::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& return; } auto& record = ev->Get()->Record; - auto partitionInfoIt = PartitionsInfo.find(record.GetPartitionId()); - if (partitionInfoIt == PartitionsInfo.end()) { + auto* node = PartitionGraph.GetPartition(record.GetPartitionId()); + if (!node) { return; } if (PartitionsScaleManager) { - TPartitionScaleManager::TPartitionInfo scalePartitionInfo = { - .Id = record.GetPartitionId(), - .KeyRange = partitionInfoIt->second.KeyRange - }; - PartitionsScaleManager->HandleScaleStatusChange(scalePartitionInfo, record.GetScaleStatus(), ctx); + PartitionsScaleManager->HandleScaleStatusChange(record.GetPartitionId(), record.GetScaleStatus(), ctx); } } diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h index cb01f070a93b..e91c14d7b734 100644 --- a/ydb/core/persqueue/read_balancer.h +++ b/ydb/core/persqueue/read_balancer.h @@ -177,7 +177,6 @@ class TPersQueueReadBalancer : public TActor, public TTa public: struct TPartitionInfo { ui64 TabletId; - NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange; }; private: diff --git a/ydb/core/persqueue/read_balancer__txinit.h b/ydb/core/persqueue/read_balancer__txinit.h index 881d1a092d4e..cc9a26ad4678 100644 --- a/ydb/core/persqueue/read_balancer__txinit.h +++ b/ydb/core/persqueue/read_balancer__txinit.h @@ -58,6 +58,10 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction { Self->Consumers[consumer.GetName()]; } Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig); + + if (SplitMergeEnabled(Self->TabletConfig)) { + Self->PartitionsScaleManager = std::make_unique(Self->Topic, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig); + } Self->UpdateConfigCounters(); } Self->Inited = true; @@ -71,7 +75,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction { ui32 part = partsRowset.GetValue(); ui64 tabletId = partsRowset.GetValue(); - partitionsInfo[part] = {tabletId, {}}; + partitionsInfo[part] = {tabletId}; Self->AggregatedStats.AggrStats(part, partsRowset.GetValue(), partsRowset.GetValue()); diff --git a/ydb/core/persqueue/read_balancer__txwrite.h b/ydb/core/persqueue/read_balancer__txwrite.h index 29eb5243a619..b4177be6da72 100644 --- a/ydb/core/persqueue/read_balancer__txwrite.h +++ b/ydb/core/persqueue/read_balancer__txwrite.h @@ -15,9 +15,9 @@ struct TPartInfo { ui32 PartitionId; ui32 Group; ui64 TabletId; - NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange; + NKikimrPQ::TPartitionKeyRange KeyRange; - TPartInfo(const ui32 partitionId, const ui64 tabletId, const ui32 group, NSchemeShard::TTopicTabletInfo::TKeyRange keyRange) + TPartInfo(const ui32 partitionId, const ui64 tabletId, const ui32 group, const NKikimrPQ::TPartitionKeyRange& keyRange) : PartitionId(partitionId) , Group(group) , TabletId(tabletId) diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 310330e33b7e..4decfc2a6c7c 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -202,7 +202,7 @@ std::unordered_map BuildGraph(const TCollection& pa } for (const auto& p : partitions) { - result.emplace(GetPartitionId(p), TPartitionGraph::Node(GetPartitionId(p), p.GetTabletId())); + result.emplace(GetPartitionId(p), TPartitionGraph::Node(GetPartitionId(p), p.GetTabletId(), p.GetKeyRange().GetFromBound(), p.GetKeyRange().GetToBound())); } std::deque queue; @@ -248,9 +248,11 @@ std::unordered_map BuildGraph(const TCollection& pa return result; } -TPartitionGraph::Node::Node(ui32 id, ui64 tabletId) +TPartitionGraph::Node::Node(ui32 id, ui64 tabletId, const TString& from, const TString& to) : Id(id) - , TabletId(tabletId) { + , TabletId(tabletId) + , From(from) + , To(to) { } bool TPartitionGraph::Node::IsRoot() const { diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index 390b7bbcdb8d..c81442f1128c 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -33,9 +33,12 @@ class TPartitionGraph { Node() = default; Node(Node&&) = default; Node(ui32 id, ui64 tabletId); + Node(ui32 id, ui64 tabletId, const TString& from, const TString& to); ui32 Id; ui64 TabletId; + TString From; + TString To; // Direct parents of this node std::vector Parents;