Skip to content

Commit

Permalink
Merge bed96a5 into d1ae2d7
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Apr 16, 2024
2 parents d1ae2d7 + bed96a5 commit 2acfa72
Show file tree
Hide file tree
Showing 20 changed files with 606 additions and 28 deletions.
11 changes: 11 additions & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ struct TEvPQ {
EvReadingPartitionStatusRequest,
EvProcessChangeOwnerRequests,
EvWakeupReleasePartition,
EvPartitionScaleStatusChanged,
EvPartitionScaleRequestDone,
EvEnd
};

Expand Down Expand Up @@ -1112,6 +1114,15 @@ struct TEvPQ {
ui32 PartitionId;
ui64 Cookie;
};

struct TEvPartitionScaleStatusChanged : public TEventPB<TEvPartitionScaleStatusChanged, NKikimrPQ::TEvPartitionScaleStatusChanged, EvPartitionScaleStatusChanged> {
TEvPartitionScaleStatusChanged() = default;

TEvPartitionScaleStatusChanged(ui32 partitionId, NKikimrPQ::EScaleStatus scaleStatus) {
Record.SetPartitionId(partitionId);
Record.SetScaleStatus(scaleStatus);
}
};
};

} //NKikimr
7 changes: 7 additions & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,9 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
}
}
}

result.SetScaleStatus(SplitMergeEnabled(TabletConfig) ? ScaleStatus :NKikimrPQ::EScaleStatus::NORMAL);

ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result, Partition));
}

Expand Down Expand Up @@ -2073,6 +2076,10 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,

Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);

if (Config.GetPartitionStrategy().GetScaleThresholdSeconds() != SplitMergeAvgWriteBytes->GetDuration().Seconds()) {
InitSplitMergeSlidingWindow();
}

Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
TotalPartitionWriteSpeed = config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond();
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class TPartition : public TActorBootstrapped<TPartition> {

private:
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
static const ui32 SCALE_REQUEST_REPEAT_MIN_SECONDS = 60;

private:
struct THasDataReq;
Expand Down Expand Up @@ -333,7 +334,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
const TActorContext& ctx);

void Initialize(const TActorContext& ctx);

void InitSplitMergeSlidingWindow();
template <typename T>
void EmplacePendingRequest(T&& body, const TActorContext& ctx) {
const auto now = ctx.Now();
Expand All @@ -358,6 +359,9 @@ class TPartition : public TActorBootstrapped<TPartition> {

void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);

NKikimrPQ::EScaleStatus CheckScaleStatus(const TActorContext& ctx);
void ChangeScaleStatusIfNeeded(NKikimrPQ::EScaleStatus scaleStatus);

TString LogPrefix() const;

void Handle(TEvPQ::TEvProcessChangeOwnerRequests::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -720,6 +724,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>> AvgReadBytes;
TVector<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> AvgQuotaBytes;

std::unique_ptr<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> SplitMergeAvgWriteBytes;
TInstant LastScaleRequestTime = TInstant::Zero();
NKikimrPQ::EScaleStatus ScaleStatus = NKikimrPQ::EScaleStatus::NORMAL;

ui64 ReservedSize;
std::deque<THolder<TEvPQ::TEvReserveBytes>> ReserveRequests;

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,8 @@ void TPartition::Initialize(const TActorContext& ctx) {
LastUsedStorageMeterTimestamp = ctx.Now();
WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();

InitSplitMergeSlidingWindow();

CloudId = Config.GetYcCloudId();
DbId = Config.GetYdbDatabaseId();
DbPath = Config.GetYdbDatabasePath();
Expand Down Expand Up @@ -956,6 +958,10 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
);
}

void TPartition::InitSplitMergeSlidingWindow() {
using Tui64SumSlidingWindow = NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>;
SplitMergeAvgWriteBytes = std::make_unique<Tui64SumSlidingWindow>(TDuration::Seconds(Config.GetPartitionStrategy().GetScaleThresholdSeconds()), 1000);
}

//
// Functions
Expand Down
132 changes: 132 additions & 0 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#include "ydb/core/persqueue/partition_scale_manager.h"

namespace NKikimr {
namespace NPQ {


TPartitionScaleManager::TPartitionScaleManager(
const TString& topicName,
const TString& databasePath,
NKikimrPQ::TUpdateBalancerConfig& balancerConfig
)
: TopicName(topicName)
, DatabasePath(databasePath)
, BalancerConfig(balancerConfig) {

}

void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) {
if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) {
PartitionsToSplit.emplace(partition.Id, partition);
TrySendScaleRequest(ctx);
} else {
PartitionsToSplit.erase(partition.Id);
}
}

void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
TInstant delayDeadline = LastResponseTime + RequestTimeout;
if (!DatabasePath || RequestInflight || delayDeadline > ctx.Now()) {
return;
}

auto splitMergePair = BuildScaleRequest();
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
return;
}

RequestInflight = true;
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
TopicName,
DatabasePath,
BalancerConfig.PathId,
BalancerConfig.PathVersion,
splitMergePair.first,
splitMergePair.second,
ctx.SelfID
));
}


using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;

std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartitionScaleManager::BuildScaleRequest() {
std::vector<TPartitionSplit> splitsToApply;
std::vector<TPartitionMerge> mergesToApply;

size_t allowedSplitsCount = BalancerConfig.PartitionCountLimit > BalancerConfig.CurPartitions ? BalancerConfig.PartitionCountLimit - BalancerConfig.CurPartitions : 0;
auto itSplit = PartitionsToSplit.begin();
while (allowedSplitsCount > 0 && itSplit != PartitionsToSplit.end()) {
const auto partitionId = itSplit->first;
const auto& partition = itSplit->second;

if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) {
TPartitionSplit split;
split.set_partition(partition.Id);
split.set_splitboundary(GetRangeMid(partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "", partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : ""));

splitsToApply.push_back(split);

allowedSplitsCount--;
itSplit++;
} else {
itSplit = PartitionsToSplit.erase(itSplit);
}
}

return {splitsToApply, mergesToApply};
}

void TPartitionScaleManager::HandleScaleRequestResult(TPartitionScaleRequest::TEvPartitionScaleRequestDone::TPtr& ev, const TActorContext& ctx) {
RequestInflight = false;
LastResponseTime = ctx.Now();
auto result = ev->Get();
if (result->Status == TEvTxUserProxy::TResultStatus::ExecComplete) {
TrySendScaleRequest(ctx);
} else {
ui64 newTimeout = RequestTimeout.MilliSeconds() == 0 ? MIN_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT + RandomNumber<ui64>(50) : RequestTimeout.MilliSeconds() * 2;
RequestTimeout = TDuration::MilliSeconds(std::min(newTimeout, static_cast<ui64>(MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT)));
ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup(TRY_SCALE_REQUEST_WAKE_UP_TAG));
}
}

void TPartitionScaleManager::Die(const TActorContext& ctx) {
if (CurrentScaleRequest) {
ctx.Send(CurrentScaleRequest, new TEvents::TEvPoisonPill());
}
}

void TPartitionScaleManager::UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config) {
BalancerConfig = TBalancerConfig(config);
}

void TPartitionScaleManager::UpdateDatabasePath(const TString& dbPath) {
DatabasePath = dbPath;
}

TString TPartitionScaleManager::GetRangeMid(const TString& from, const TString& to) {
TStringBuilder result;

unsigned char fromPadding = 0;
unsigned char toPadding = 255;

size_t maxSize = std::max(from.size(), to.size());
for (size_t i = 0; i < maxSize; ++i) {
ui16 fromChar = i < from.size() ? static_cast<ui16>(from[i]) : fromPadding;
unsigned char toChar = i < to.size() ? static_cast<unsigned char>(to[i]) : toPadding;

ui16 sum = fromChar + toChar;

result += static_cast<unsigned char>(sum / 2);
}

if (result == from) {
result += static_cast<unsigned char>(127);
}

return result;
}

} // namespace NPQ
} // namespace NKikimr
91 changes: 91 additions & 0 deletions ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#pragma once

#include <ydb/core/base/path.h>
#include "ydb/core/persqueue/utils.h"
#include <ydb/core/persqueue/partition_scale_request.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/schemeshard/schemeshard_info_types.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/actorsystem.h>

#include <util/system/types.h>
#include <util/generic/fwd.h>
#include <util/generic/string.h>

#include <unordered_map>
#include <utility>

namespace NKikimr {
namespace NPQ {

class TPartitionScaleManager {

public:
struct TPartitionInfo {
ui32 Id;
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
};

private:
struct TBalancerConfig {
TBalancerConfig(
NKikimrPQ::TUpdateBalancerConfig& config
)
: PathId(config.GetPathId())
, PathVersion(config.GetVersion())
, PartitionGraph(MakePartitionGraph(config))
, PartitionCountLimit(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
, MinActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMinPartitionCount())
, CurPartitions(config.PartitionsSize()) {
}

ui64 PathId;
int PathVersion;
TPartitionGraph PartitionGraph;
ui64 PartitionCountLimit;
ui64 MinActivePartitions;
ui64 CurPartitions;
};

public:
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, NKikimrPQ::TUpdateBalancerConfig& balancerConfig);

public:
void HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx);
void HandleScaleRequestResult(TPartitionScaleRequest::TEvPartitionScaleRequestDone::TPtr& ev, const TActorContext& ctx);
void TrySendScaleRequest(const TActorContext& ctx);
void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config);
void UpdateDatabasePath(const TString& dbPath);
void Die(const TActorContext& ctx);

static TString GetRangeMid(const TString& from, const TString& to);

private:
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;

std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> BuildScaleRequest();

public:
static const ui64 TRY_SCALE_REQUEST_WAKE_UP_TAG = 10;

private:
static const ui32 MIN_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 10;
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;

const TString TopicName;
TString DatabasePath;
TActorId CurrentScaleRequest;
TDuration RequestTimeout = TDuration::MilliSeconds(0);
TInstant LastResponseTime = TInstant::Zero();

std::unordered_map<ui64, TPartitionInfo> PartitionsToSplit;

TBalancerConfig BalancerConfig;
bool RequestInflight = false;
};

} // namespace NPQ
} // namespace NKikimr
Loading

0 comments on commit 2acfa72

Please sign in to comment.