Skip to content

Commit

Permalink
Merge 29605a2 into 55fae2a
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Sep 25, 2024
2 parents 55fae2a + 29605a2 commit 1d731d8
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
19 changes: 11 additions & 8 deletions ydb/services/datastreams/datastreams_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,7 @@ namespace NKikimr::NDataStreams::V1 {
Y_UNUSED(selfInfo);

TString error;
if (!GetProtoRequest()->has_partitioning_settings()) {
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
{
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error);
}

groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
}
switch (GetProtoRequest()->retention_case()) {
case Ydb::DataStreams::V1::UpdateStreamRequest::RetentionCase::kRetentionPeriodHours:
groupConfig.MutablePQTabletConfig()->MutablePartitionConfig()->SetLifetimeSeconds(
Expand Down Expand Up @@ -520,7 +513,17 @@ namespace NKikimr::NDataStreams::V1 {
}
}

if (GetProtoRequest()->has_partitioning_settings()) {
if (!GetProtoRequest()->has_partitioning_settings() ||
(GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() &&
GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() ==
Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED)) {
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
{
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error);
}

groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
} else {
auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings());
if (!r.empty()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r);
Expand Down
27 changes: 23 additions & 4 deletions ydb/services/lib/actors/pq_schema_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,26 @@ namespace NKikimr::NGRpcProxy::V1 {
auto pqTabletConfig = pqDescr.MutablePQTabletConfig();
NPQ::Migrate(*pqTabletConfig);
auto partConfig = pqTabletConfig->MutablePartitionConfig();
auto splitMergeFeatureEnabled = appData->FeatureFlags.GetEnableTopicSplitMerge();

auto finalAutoPartitioningEnabled = false;
if (appData->FeatureFlags.GetEnableTopicSplitMerge()) {

auto reqHasAutoPartitioningStrategyChange = request.has_alter_partitioning_settings() &&
request.alter_partitioning_settings().has_alter_auto_partitioning_settings() &&
request.alter_partitioning_settings().alter_auto_partitioning_settings().has_set_strategy();

auto pqConfigHasAutoPartitioningStrategy = pqTabletConfig->HasPartitionStrategy() &&
pqTabletConfig->GetPartitionStrategy().HasPartitionStrategyType() &&
pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType();

if (reqHasAutoPartitioningStrategyChange) {
finalAutoPartitioningEnabled = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy() != ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED;
} else if (pqConfigHasAutoPartitioningStrategy) {
finalAutoPartitioningEnabled = pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
}

}


if (request.has_set_retention_storage_mb()) {
CHECK_CDC;
Expand All @@ -1178,12 +1197,12 @@ namespace NKikimr::NGRpcProxy::V1 {
if (settings.has_set_min_active_partitions()) {
auto minParts = IfEqualThenDefault<i64>(settings.set_min_active_partitions(), 0L, 1L);
pqDescr.SetTotalGroupCount(minParts);
if (splitMergeFeatureEnabled) {
if (finalAutoPartitioningEnabled) {
pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts);
}
}

if (splitMergeFeatureEnabled) {
if (finalAutoPartitioningEnabled) {
if (settings.has_set_max_active_partitions()) {
pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions());
}
Expand Down Expand Up @@ -1219,7 +1238,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}

if (splitMergeFeatureEnabled) {
if (finalAutoPartitioningEnabled) {
auto code = ValidatePartitionStrategy(*pqTabletConfig, error);
if (code) return code->YdbCode;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/actors/schema_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
}

const auto &config = pqDescr.GetPQTabletConfig();
if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge()) {
if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge() && NPQ::SplitMergeEnabled(config)) {
Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount());
} else {
Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount());
Expand Down

0 comments on commit 1d731d8

Please sign in to comment.