Skip to content

Commit

Permalink
Fix autoscaling of topic: partitions boundaries was empty after restart
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Jun 4, 2024
1 parent c6c214c commit efad04c
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 68 deletions.
46 changes: 23 additions & 23 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,24 @@ namespace NPQ {
TPartitionScaleManager::TPartitionScaleManager(
const TString& topicName,
const TString& databasePath,
NKikimrPQ::TUpdateBalancerConfig& balancerConfig
ui64 pathId,
int version,
const NKikimrPQ::TPQTabletConfig& balancerConfig
)
: TopicName(topicName)
, DatabasePath(databasePath)
, BalancerConfig(balancerConfig) {
, BalancerConfig(pathId, version, balancerConfig) {

}

void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) {
void TPartitionScaleManager::HandleScaleStatusChange(const ui32 partitionId, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) {
if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange "
<< "need to split partition " << partition.Id);
PartitionsToSplit.emplace(partition.Id, partition);
<< "need to split partition " << partitionId);
PartitionsToSplit.insert(partitionId);
TrySendScaleRequest(ctx);
} else {
PartitionsToSplit.erase(partition.Id);
PartitionsToSplit.erase(partitionId);
}
}

Expand Down Expand Up @@ -66,35 +68,33 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
std::vector<TPartitionMerge> mergesToApply;

size_t allowedSplitsCount = BalancerConfig.MaxActivePartitions > BalancerConfig.CurPartitions ? BalancerConfig.MaxActivePartitions - BalancerConfig.CurPartitions : 0;
auto itSplit = PartitionsToSplit.begin();
while (allowedSplitsCount > 0 && itSplit != PartitionsToSplit.end()) {
const auto partitionId = itSplit->first;
const auto& partition = itSplit->second;

if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) {
auto from = partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "";
auto to = partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : "";
auto partitionId = PartitionsToSplit.begin();
while (allowedSplitsCount > 0 && partitionId != PartitionsToSplit.end()) {
auto* node = BalancerConfig.PartitionGraph.GetPartition(*partitionId);
if (node->Children.empty()) {
auto from = node->From;
auto to = node->To;
auto mid = MiddleOf(from, to);
if (mid.empty()) {
itSplit = PartitionsToSplit.erase(itSplit);
partitionId = PartitionsToSplit.erase(partitionId);
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
"TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << partitionId);
"TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << *partitionId);
continue;
}
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
"TPartitionScaleManager::BuildScaleRequest partition split ranges. From# '" << ToHex(from)
<< "'. To# '" << ToHex(to) << "'. Mid# '" << ToHex(mid)
<< "'. Topic# " << TopicName << ". Partition# " << partitionId);
<< "'. Topic# " << TopicName << ". Partition# " << *partitionId);

TPartitionSplit split;
split.set_partition(partition.Id);
split.set_partition(*partitionId);
split.set_splitboundary(mid);
splitsToApply.push_back(split);

allowedSplitsCount--;
itSplit++;
--allowedSplitsCount;
++partitionId;
} else {
itSplit = PartitionsToSplit.erase(itSplit);
partitionId = PartitionsToSplit.erase(partitionId);
}
}

Expand Down Expand Up @@ -122,8 +122,8 @@ void TPartitionScaleManager::Die(const TActorContext& ctx) {
}
}

void TPartitionScaleManager::UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config) {
BalancerConfig = TBalancerConfig(config);
void TPartitionScaleManager::UpdateBalancerConfig(ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config) {
BalancerConfig = TBalancerConfig(pathId, version, config);
}

void TPartitionScaleManager::UpdateDatabasePath(const TString& dbPath) {
Expand Down
29 changes: 12 additions & 17 deletions ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,19 @@ namespace NKikimr {
namespace NPQ {

class TPartitionScaleManager {

public:
struct TPartitionInfo {
ui32 Id;
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
};

private:
struct TBalancerConfig {
TBalancerConfig(
NKikimrPQ::TUpdateBalancerConfig& config
ui64 pathId,
int version,
const NKikimrPQ::TPQTabletConfig& config
)
: PathId(config.GetPathId())
, PathVersion(config.GetVersion())
: PathId(pathId)
, PathVersion(version)
, PartitionGraph(MakePartitionGraph(config))
, MaxActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
, MinActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMinPartitionCount())
, CurPartitions(config.PartitionsSize()) {
, MaxActivePartitions(config.GetPartitionStrategy().GetMaxPartitionCount())
, MinActivePartitions(config.GetPartitionStrategy().GetMinPartitionCount())
, CurPartitions(config.AllPartitionsSize()) {
}

ui64 PathId;
Expand All @@ -50,13 +45,13 @@ class TPartitionScaleManager {
};

public:
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, NKikimrPQ::TUpdateBalancerConfig& balancerConfig);
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& balancerConfig);

public:
void HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx);
void HandleScaleStatusChange(const ui32 partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx);
void HandleScaleRequestResult(TPartitionScaleRequest::TEvPartitionScaleRequestDone::TPtr& ev, const TActorContext& ctx);
void TrySendScaleRequest(const TActorContext& ctx);
void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config);
void UpdateBalancerConfig(ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config);
void UpdateDatabasePath(const TString& dbPath);
void Die(const TActorContext& ctx);

Expand All @@ -79,7 +74,7 @@ class TPartitionScaleManager {
TDuration RequestTimeout = TDuration::MilliSeconds(0);
TInstant LastResponseTime = TInstant::Zero();

std::unordered_map<ui64, TPartitionInfo> PartitionsToSplit;
std::unordered_set<ui32> PartitionsToSplit;

TBalancerConfig BalancerConfig;
bool RequestInflight = false;
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,11 +572,13 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
}

void TPartition::ChangeScaleStatusIfNeeded(NKikimrPQ::EScaleStatus scaleStatus) {
if (scaleStatus == ScaleStatus || LastScaleRequestTime + TDuration::Seconds(SCALE_REQUEST_REPEAT_MIN_SECONDS) > TInstant::Now()) {
auto now = TInstant::Now();
auto next = LastScaleRequestTime + TDuration::Seconds(SCALE_REQUEST_REPEAT_MIN_SECONDS);
if ((scaleStatus == ScaleStatus && next < now) || (scaleStatus != ScaleStatus && next > now)) {
return;
}
Send(Tablet, new TEvPQ::TEvPartitionScaleStatusChanged(Partition.OriginalPartitionId, scaleStatus));
LastScaleRequestTime = TInstant::Now();
LastScaleRequestTime = now;
ScaleStatus = scaleStatus;
}

Expand Down
27 changes: 8 additions & 19 deletions ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,9 +503,9 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr

if (SplitMergeEnabled(TabletConfig)) {
if (!PartitionsScaleManager) {
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, DatabasePath, record);
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, DatabasePath, PathId, Version, TabletConfig);
} else {
PartitionsScaleManager->UpdateBalancerConfig(record);
PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig);
}
}

Expand Down Expand Up @@ -533,13 +533,10 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
if (it == PartitionsInfo.end()) {
Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);

partitionsInfo[p.GetPartition()] = {p.GetTabletId(), {}};
if (SplitMergeEnabled(TabletConfig) && p.HasKeyRange()) {
partitionsInfo[p.GetPartition()].KeyRange.DeserializeFromProto(p.GetKeyRange());
}
partitionsInfo[p.GetPartition()] = {p.GetTabletId()};

newPartitionsIds.push_back(p.GetPartition());
newPartitions.push_back(TPartInfo{p.GetPartition(), p.GetTabletId(), 0, partitionsInfo[p.GetPartition()].KeyRange});
newPartitions.push_back(TPartInfo{p.GetPartition(), p.GetTabletId(), 0, p.GetKeyRange()});

++NumActiveParts;

Expand Down Expand Up @@ -701,11 +698,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c
}

if (SplitMergeEnabled(TabletConfig) && PartitionsScaleManager) {
TPartitionScaleManager::TPartitionInfo scalePartitionInfo = {
.Id = partitionId,
.KeyRange = PartitionsInfo[partitionId].KeyRange
};
PartitionsScaleManager->HandleScaleStatusChange(scalePartitionInfo, partRes.GetScaleStatus(), ctx);
PartitionsScaleManager->HandleScaleStatusChange(partitionId, partRes.GetScaleStatus(), ctx);
}

AggregatedStats.AggrStats(partitionId, partRes.GetPartitionSize(), partRes.GetUsedReserveSize());
Expand Down Expand Up @@ -1248,17 +1241,13 @@ void TPersQueueReadBalancer::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr&
return;
}
auto& record = ev->Get()->Record;
auto partitionInfoIt = PartitionsInfo.find(record.GetPartitionId());
if (partitionInfoIt == PartitionsInfo.end()) {
auto* node = PartitionGraph.GetPartition(record.GetPartitionId());
if (!node) {
return;
}

if (PartitionsScaleManager) {
TPartitionScaleManager::TPartitionInfo scalePartitionInfo = {
.Id = record.GetPartitionId(),
.KeyRange = partitionInfoIt->second.KeyRange
};
PartitionsScaleManager->HandleScaleStatusChange(scalePartitionInfo, record.GetScaleStatus(), ctx);
PartitionsScaleManager->HandleScaleStatusChange(record.GetPartitionId(), record.GetScaleStatus(), ctx);
}
}

Expand Down
1 change: 0 additions & 1 deletion ydb/core/persqueue/read_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
public:
struct TPartitionInfo {
ui64 TabletId;
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
};

private:
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/persqueue/read_balancer__txinit.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
Self->Consumers[consumer.GetName()];
}
Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);

if (SplitMergeEnabled(Self->TabletConfig)) {
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig);
}
Self->UpdateConfigCounters();
}
Self->Inited = true;
Expand All @@ -71,7 +75,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
ui32 part = partsRowset.GetValue<Schema::Partitions::Partition>();
ui64 tabletId = partsRowset.GetValue<Schema::Partitions::TabletId>();

partitionsInfo[part] = {tabletId, {}};
partitionsInfo[part] = {tabletId};
Self->AggregatedStats.AggrStats(part, partsRowset.GetValue<Schema::Partitions::DataSize>(),
partsRowset.GetValue<Schema::Partitions::UsedReserveSize>());

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/read_balancer__txwrite.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ struct TPartInfo {
ui32 PartitionId;
ui32 Group;
ui64 TabletId;
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
NKikimrPQ::TPartitionKeyRange KeyRange;

TPartInfo(const ui32 partitionId, const ui64 tabletId, const ui32 group, NSchemeShard::TTopicTabletInfo::TKeyRange keyRange)
TPartInfo(const ui32 partitionId, const ui64 tabletId, const ui32 group, const NKikimrPQ::TPartitionKeyRange& keyRange)
: PartitionId(partitionId)
, Group(group)
, TabletId(tabletId)
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ std::unordered_map<ui32, TPartitionGraph::Node> BuildGraph(const TCollection& pa
}

for (const auto& p : partitions) {
result.emplace(GetPartitionId(p), TPartitionGraph::Node(GetPartitionId(p), p.GetTabletId()));
result.emplace(GetPartitionId(p), TPartitionGraph::Node(GetPartitionId(p), p.GetTabletId(), p.GetKeyRange().GetFromBound(), p.GetKeyRange().GetToBound()));
}

std::deque<TPartitionGraph::Node*> queue;
Expand Down Expand Up @@ -248,9 +248,11 @@ std::unordered_map<ui32, TPartitionGraph::Node> BuildGraph(const TCollection& pa
return result;
}

TPartitionGraph::Node::Node(ui32 id, ui64 tabletId)
TPartitionGraph::Node::Node(ui32 id, ui64 tabletId, const TString from, const TString& to)
: Id(id)
, TabletId(tabletId) {
, TabletId(tabletId)
, From(from)
, To(to) {
}

bool TPartitionGraph::Node::IsRoot() const {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ class TPartitionGraph {
Node() = default;
Node(Node&&) = default;
Node(ui32 id, ui64 tabletId);
Node(ui32 id, ui64 tabletId, const TString from, const TString& to);

ui32 Id;
ui64 TabletId;
TString From;
TString To;

// Direct parents of this node
std::vector<Node*> Parents;
Expand Down

0 comments on commit efad04c

Please sign in to comment.