Skip to content

Commit

Permalink
Fixes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed Apr 16, 2024
1 parent f660532 commit bed96a5
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 31 deletions.
4 changes: 2 additions & 2 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,9 @@ struct TEvPQ {
struct TEvPartitionScaleStatusChanged : public TEventPB<TEvPartitionScaleStatusChanged, NKikimrPQ::TEvPartitionScaleStatusChanged, EvPartitionScaleStatusChanged> {
TEvPartitionScaleStatusChanged() = default;

TEvPartitionScaleStatusChanged(NKikimrPQ::EScaleStatus scaleStatus, ui32 partitionId) {
Record.SetScaleStatus(scaleStatus);
TEvPartitionScaleStatusChanged(ui32 partitionId, NKikimrPQ::EScaleStatus scaleStatus) {
Record.SetPartitionId(partitionId);
Record.SetScaleStatus(scaleStatus);
}
};
};
Expand Down
8 changes: 2 additions & 6 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,11 +790,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
}
}

if (SplitMergeEnabled(TabletConfig)) {
result.SetScaleStatus(ScaleStatus);
} else {
result.SetScaleStatus(NKikimrPQ::EScaleStatus::NORMAL);
}
result.SetScaleStatus(SplitMergeEnabled(TabletConfig) ? ScaleStatus :NKikimrPQ::EScaleStatus::NORMAL);

ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result, Partition));
}
Expand Down Expand Up @@ -2081,7 +2077,7 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);

if (Config.GetPartitionStrategy().GetScaleThresholdSeconds() != SplitMergeAvgWriteBytes->GetDuration().Seconds()) {
SplitMergeAvgWriteBytes = std::make_unique<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>>(TDuration::Seconds(Config.GetPartitionStrategy().GetScaleThresholdSeconds()), 1000);
InitSplitMergeSlidingWindow();
}

Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
const TActorContext& ctx);

void Initialize(const TActorContext& ctx);

void InitSplitMergeSlidingWindow();
template <typename T>
void EmplacePendingRequest(T&& body, const TActorContext& ctx) {
const auto now = ctx.Now();
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ void TPartition::Initialize(const TActorContext& ctx) {
LastUsedStorageMeterTimestamp = ctx.Now();
WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();

SplitMergeAvgWriteBytes = std::make_unique<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>>(TDuration::Seconds(Config.GetPartitionStrategy().GetScaleThresholdSeconds()), 1000);
InitSplitMergeSlidingWindow();

CloudId = Config.GetYcCloudId();
DbId = Config.GetYdbDatabaseId();
Expand Down Expand Up @@ -958,6 +958,10 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
);
}

void TPartition::InitSplitMergeSlidingWindow() {
using Tui64SumSlidingWindow = NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>;
SplitMergeAvgWriteBytes = std::make_unique<Tui64SumSlidingWindow>(TDuration::Seconds(Config.GetPartitionStrategy().GetScaleThresholdSeconds()), 1000);
}

//
// Functions
Expand Down
19 changes: 16 additions & 3 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ namespace NKikimr {
namespace NPQ {


TPartitionScaleManager::TPartitionScaleManager(const TString& topicName, const TString& databasePath, NKikimrPQ::TUpdateBalancerConfig& balancerConfig)
: TopicName(topicName), DatabasePath(databasePath), BalancerConfig(balancerConfig) {}
TPartitionScaleManager::TPartitionScaleManager(
const TString& topicName,
const TString& databasePath,
NKikimrPQ::TUpdateBalancerConfig& balancerConfig
)
: TopicName(topicName)
, DatabasePath(databasePath)
, BalancerConfig(balancerConfig) {

}

void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) {
if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) {
Expand All @@ -17,7 +25,8 @@ void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& parti
}

void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
if (RequestInflight || (LastResponseTime + RequestTimeout > ctx.Now())) {
TInstant delayDeadline = LastResponseTime + RequestTimeout;
if (!DatabasePath || RequestInflight || delayDeadline > ctx.Now()) {
return;
}

Expand Down Expand Up @@ -92,6 +101,10 @@ void TPartitionScaleManager::UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConf
BalancerConfig = TBalancerConfig(config);
}

void TPartitionScaleManager::UpdateDatabasePath(const TString& dbPath) {
DatabasePath = dbPath;
}

TString TPartitionScaleManager::GetRangeMid(const TString& from, const TString& to) {
TStringBuilder result;

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class TPartitionScaleManager {
void HandleScaleRequestResult(TPartitionScaleRequest::TEvPartitionScaleRequestDone::TPtr& ev, const TActorContext& ctx);
void TrySendScaleRequest(const TActorContext& ctx);
void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config);
void UpdateDatabasePath(const TString& dbPath);
void Die(const TActorContext& ctx);

static TString GetRangeMid(const TString& from, const TString& to);
Expand All @@ -75,7 +76,7 @@ class TPartitionScaleManager {
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;

const TString TopicName;
const TString DatabasePath;
TString DatabasePath;
TActorId CurrentScaleRequest;
TDuration RequestTimeout = TDuration::MilliSeconds(0);
TInstant LastResponseTime = TInstant::Zero();
Expand Down
20 changes: 18 additions & 2 deletions ydb/core/persqueue/partition_scale_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,24 @@
namespace NKikimr {
namespace NPQ {

TPartitionScaleRequest::TPartitionScaleRequest(TString topicName, TString databasePath, ui64 pathId, ui64 pathVersion, std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits, const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges, NActors::TActorId parentActorId)
: Topic(topicName), DatabasePath(databasePath), PathId(pathId), PathVersion(pathVersion), Splits(splits), Merges(merges), ParentActorId(parentActorId) {}
TPartitionScaleRequest::TPartitionScaleRequest(
TString topicName,
TString databasePath,
ui64 pathId,
ui64 pathVersion,
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
NActors::TActorId parentActorId
)
: Topic(topicName)
, DatabasePath(databasePath)
, PathId(pathId)
, PathVersion(pathVersion)
, Splits(splits)
, Merges(merges)
, ParentActorId(parentActorId) {

}

void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
SendProposeRequest(ctx);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ void TPartition::ChangeScaleStatusIfNeeded(NKikimrPQ::EScaleStatus scaleStatus)
if (scaleStatus == ScaleStatus || LastScaleRequestTime + TDuration::Seconds(SCALE_REQUEST_REPEAT_MIN_SECONDS) > TInstant::Now()) {
return;
}
Send(Tablet, new TEvPQ::TEvPartitionScaleStatusChanged(scaleStatus, Partition.OriginalPartitionId));
Send(Tablet, new TEvPQ::TEvPartitionScaleStatusChanged(Partition.OriginalPartitionId, scaleStatus));
LastScaleRequestTime = TInstant::Now();
ScaleStatus = scaleStatus;
}
Expand Down
34 changes: 22 additions & 12 deletions ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void TPersQueueReadBalancer::HandleWakeup(TEvents::TEvWakeup::TPtr& ev, const TA

switch (ev->Get()->Tag) {
case TPartitionScaleManager::TRY_SCALE_REQUEST_WAKE_UP_TAG: {
if (PartitionsScaleManager) {
if (PartitionsScaleManager && SplitMergeEnabled(TabletConfig)) {
PartitionsScaleManager->TrySendScaleRequest(ctx);
}
}
Expand Down Expand Up @@ -471,7 +471,6 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
PathId = record.GetPathId();
Topic = std::move(record.GetTopicName());
Path = std::move(record.GetPath());
Cerr << "\n Path: " << Path << " \n";
TxId = record.GetTxId();
TabletConfig = std::move(record.GetTabletConfig());
Migrate(TabletConfig);
Expand Down Expand Up @@ -507,13 +506,12 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
std::vector<std::pair<ui32, ui32>> newGroups;
std::vector<std::pair<ui64, TTabletInfo>> reallocatedTablets;

if (!PartitionsScaleManager) {
auto path = SplitPath(Path);
path.pop_back();
auto dbPath = CanonizePath(path);
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, dbPath, record);
} else {
PartitionsScaleManager->UpdateBalancerConfig(record);
if (SplitMergeEnabled(TabletConfig)) {
if (!PartitionsScaleManager) {
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, DatabasePath, record);
} else {
PartitionsScaleManager->UpdateBalancerConfig(record);
}
}

for (auto& p : record.GetTablets()) {
Expand Down Expand Up @@ -546,8 +544,10 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
if (it == PartitionsInfo.end()) {
Y_ABORT_UNLESS(group <= TotalGroups && group > prevGroups || TotalGroups == 0);
Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);
partitionsInfo[p.GetPartition()] = {p.GetTabletId(), EPS_FREE, TActorId(), group, {}}; //savnik: check keyrange
partitionsInfo[p.GetPartition()].KeyRange.DeserializeFromProto(p.GetKeyRange());
partitionsInfo[p.GetPartition()] = {p.GetTabletId(), EPS_FREE, TActorId(), group, {}};
if(SplitMergeEnabled(TabletConfig)) {
partitionsInfo[p.GetPartition()].KeyRange.DeserializeFromProto(p.GetKeyRange());
}

newPartitions.push_back(TPartInfo{p.GetPartition(), p.GetTabletId(), group, partitionsInfo[p.GetPartition()].KeyRange});

Expand Down Expand Up @@ -758,7 +758,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c
if (!PartitionsInfo.contains(partRes.GetPartition())) {
continue;
}
if (PartitionsScaleManager) {
if (SplitMergeEnabled(TabletConfig) && PartitionsScaleManager) {
TPartitionScaleManager::TPartitionInfo scalePartitionInfo = {
.Id = partitionId,
.KeyRange = PartitionsInfo[partRes.GetPartition()].KeyRange
Expand Down Expand Up @@ -1996,6 +1996,10 @@ void TPersQueueReadBalancer::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated
}
}

if (PartitionsScaleManager) {
PartitionsScaleManager->UpdateDatabasePath(DatabasePath);
}

if (SubDomainPathId && msg->PathId == *SubDomainPathId) {
const bool outOfSpace = msg->Result->GetPathDescription()
.GetDomainDescription()
Expand Down Expand Up @@ -2149,6 +2153,9 @@ void TPersQueueReadBalancer::Handle(TEvPQ::TEvWakeupReleasePartition::TPtr &ev,
}

void TPersQueueReadBalancer::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx) {
if (!SplitMergeEnabled(TabletConfig)) {
return;
}
auto& record = ev->Get()->Record;
auto partitionInfoIt = PartitionsInfo.find(record.GetPartitionId());
if (partitionInfoIt.IsEnd()) {
Expand All @@ -2165,6 +2172,9 @@ void TPersQueueReadBalancer::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr&
}

void TPersQueueReadBalancer::Handle(TPartitionScaleRequest::TEvPartitionScaleRequestDone::TPtr& ev, const TActorContext& ctx) {
if (!SplitMergeEnabled(TabletConfig)) {
return;
}
if (PartitionsScaleManager) {
PartitionsScaleManager->HandleScaleRequestResult(ev, ctx);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4022,7 +4022,7 @@ NKikimrSchemeOp::TPathVersion TSchemeShard::GetPathVersion(const TPath& path) co
case NKikimrSchemeOp::EPathType::EPathTypePersQueueGroup:
Y_ABORT_UNLESS(Topics.contains(pathId));
result.SetPQVersion(Topics.at(pathId)->AlterVersion);
generalVersion += result.GetPQVersion(); //тут уже единичка
generalVersion += result.GetPQVersion();
break;
case NKikimrSchemeOp::EPathType::EPathTypeBlockStoreVolume:
Y_ABORT_UNLESS(BlockStoreVolumes.contains(pathId));
Expand Down Expand Up @@ -4151,7 +4151,7 @@ NKikimrSchemeOp::TPathVersion TSchemeShard::GetPathVersion(const TPath& path) co
generalVersion += result.GetChildrenVersion();

result.SetUserAttrsVersion(pathEl->UserAttrs->AlterVersion);
generalVersion += result.GetUserAttrsVersion(); //тут инкременится второй раз
generalVersion += result.GetUserAttrsVersion();

result.SetACLVersion(pathEl->ACLVersion); // do not add ACL version to the generalVersion here
result.SetEffectiveACLVersion(path.GetEffectiveACLVersion()); // ACL version is added to generalVersion here
Expand Down

0 comments on commit bed96a5

Please sign in to comment.