From ddf8d01460895f0e8bb956d9be95e15f17f49f0c Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Tue, 24 Sep 2024 14:15:33 +0000 Subject: [PATCH 1/5] fix describe and topics alter --- ydb/services/lib/actors/pq_schema_actor.cpp | 27 ++++++++++++++++--- .../persqueue_v1/actors/schema_actors.cpp | 2 +- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 05a2275b58e4..851271426aad 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -1164,7 +1164,26 @@ namespace NKikimr::NGRpcProxy::V1 { auto pqTabletConfig = pqDescr.MutablePQTabletConfig(); NPQ::Migrate(*pqTabletConfig); auto partConfig = pqTabletConfig->MutablePartitionConfig(); - auto splitMergeFeatureEnabled = appData->FeatureFlags.GetEnableTopicSplitMerge(); + + auto finalAutoPartitioningEnabled = false; + if (appData->FeatureFlags.GetEnableTopicSplitMerge()) { + + auto reqHasAutoPartitioningStrategyChange = request.has_alter_partitioning_settings() && + request.alter_partitioning_settings().has_alter_auto_partitioning_settings() && + request.alter_partitioning_settings().alter_auto_partitioning_settings().has_set_strategy(); + + auto pqConfigHasAutoPartitioningStrategy = pqTabletConfig->HasPartitionStrategy() && + pqTabletConfig->GetPartitionStrategy().HasPartitionStrategyType() && + pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType(); + + if (reqHasAutoPartitioningStrategyChange) { + finalAutoPartitioningEnabled = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy() != ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED; + } else if (pqConfigHasAutoPartitioningStrategy) { + finalAutoPartitioningEnabled = pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED; + } + + } + if (request.has_set_retention_storage_mb()) { CHECK_CDC; @@ -1178,12 +1197,12 @@ namespace NKikimr::NGRpcProxy::V1 { if (settings.has_set_min_active_partitions()) { auto minParts = IfEqualThenDefault(settings.set_min_active_partitions(), 0L, 1L); pqDescr.SetTotalGroupCount(minParts); - if (splitMergeFeatureEnabled) { + if (finalAutoPartitioningEnabled) { pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts); } } - if (splitMergeFeatureEnabled) { + if (finalAutoPartitioningEnabled) { if (settings.has_set_max_active_partitions()) { pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions()); } @@ -1219,7 +1238,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } - if (splitMergeFeatureEnabled) { + if (finalAutoPartitioningEnabled) { auto code = ValidatePartitionStrategy(*pqTabletConfig, error); if (code) return code->YdbCode; } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index c68c6fe205ad..63749dfc71a3 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1094,7 +1094,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv } const auto &config = pqDescr.GetPQTabletConfig(); - if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge()) { + if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge() && NPQ::SplitMergeEnabled(config)) { Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount()); } else { Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount()); From 29605a2a505fc1c0dd855f369e736c6d22e34e14 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 25 Sep 2024 07:49:20 +0000 Subject: [PATCH 2/5] fix --- .../datastreams/datastreams_proxy.cpp | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 48277a3c5ff0..e7f8ac11a45d 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -471,14 +471,7 @@ namespace NKikimr::NDataStreams::V1 { Y_UNUSED(selfInfo); TString error; - if (!GetProtoRequest()->has_partitioning_settings()) { - if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) - { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::BAD_REQUEST), error); - } - groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count()); - } switch (GetProtoRequest()->retention_case()) { case Ydb::DataStreams::V1::UpdateStreamRequest::RetentionCase::kRetentionPeriodHours: groupConfig.MutablePQTabletConfig()->MutablePartitionConfig()->SetLifetimeSeconds( @@ -520,7 +513,17 @@ namespace NKikimr::NDataStreams::V1 { } } - if (GetProtoRequest()->has_partitioning_settings()) { + if (!GetProtoRequest()->has_partitioning_settings() || + (GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() && + GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == + Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED)) { + if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) + { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::BAD_REQUEST), error); + } + + groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count()); + } else { auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings()); if (!r.empty()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::INVALID_ARGUMENT), r); From a59ef5149f768361cf54bed33c5c7ee8306db99b Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 25 Sep 2024 08:43:30 +0000 Subject: [PATCH 3/5] ut --- .../datastreams/datastreams_proxy.cpp | 5 +- ydb/services/datastreams/datastreams_ut.cpp | 62 ++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index e7f8ac11a45d..15c9f61fd117 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -516,13 +516,15 @@ namespace NKikimr::NDataStreams::V1 { if (!GetProtoRequest()->has_partitioning_settings() || (GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() && GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == - Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED)) { + Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED)) { + if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::BAD_REQUEST), error); } groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count()); + } else { auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings()); if (!r.empty()) { @@ -535,6 +537,7 @@ namespace NKikimr::NDataStreams::V1 { auto& as = s.auto_partitioning_settings(); switch(as.strategy()) { case Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED: + break; case Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED: case Ydb::DataStreams::V1::AutoPartitioningStrategy::AutoPartitioningStrategy_INT_MAX_SENTINEL_DO_NOT_USE_: case Ydb::DataStreams::V1::AutoPartitioningStrategy::AutoPartitioningStrategy_INT_MIN_SENTINEL_DO_NOT_USE_: diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 7d978c7515fc..2e9e439f2e5f 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -2706,7 +2706,6 @@ Y_UNIT_TEST_SUITE(DataStreams) { TString streamName = "test-topic"; TString streamName2 = "test-topic-2"; - { NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); auto settings = NYdb::NTopic::TCreateTopicSettings() @@ -2807,6 +2806,67 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(description.shards(4).parent_shard_id(), "shard-000001"); } + auto streamForAlterTest = "stream-alter-test"; + { + auto result = testServer.DataStreamsClient->CreateStream(streamForAlterTest, + NYDS_V1::TCreateStreamSettings() + .ShardCount(3) + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + if (result.GetStatus() != EStatus::SUCCESS) { + result.GetIssues().PrintTo(Cerr); + } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& d = result.GetResult().stream_description(); + UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 3); + UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE); + UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest); + UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest); + + UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); + } + + { + auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest, + NYDS_V1::TUpdateStreamSettings() + .TargetShardCount(5) + .BeginConfigurePartitioningSettings() + .BeginConfigureAutoPartitioningSettings() + .Strategy(NYdb::NDataStreams::V1::EAutoPartitioningStrategy::Disabled) + .EndConfigureAutoPartitioningSettings() + .EndConfigurePartitioningSettings() + ).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + if (result.GetStatus() != EStatus::SUCCESS) { + result.GetIssues().PrintTo(Cerr); + } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& d = result.GetResult().stream_description(); + UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 5); + UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE); + UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest); + UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest); + + UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); + } + { auto result = testServer.DataStreamsClient->CreateStream(streamName2, NYDS_V1::TCreateStreamSettings() From 2b0adf9e0f85af535d67185bc1d4c53df06bce7c Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 25 Sep 2024 09:53:33 +0000 Subject: [PATCH 4/5] fix --- .../ut/ut_with_sdk/autoscaling_ut.cpp | 37 ++++++++++++ .../datastreams/datastreams_proxy.cpp | 10 +++- ydb/services/datastreams/datastreams_ut.cpp | 60 +++++++++++++++++++ ydb/services/lib/actors/pq_schema_actor.cpp | 19 +++--- 4 files changed, 115 insertions(+), 11 deletions(-) diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index d363acca852b..87747e44d3a3 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -757,6 +757,43 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { } } + Y_UNIT_TEST(ControlPlane_BackCompatibility) { + auto topicName = "back-compatibility-test"; + + TTopicSdkTestSetup setup = CreateSetup(); + TTopicClient client = setup.MakeClient(); + + { + TCreateTopicSettings createSettings; + createSettings + .BeginConfigurePartitioningSettings() + .MinActivePartitions(3) + .EndConfigurePartitioningSettings(); + client.CreateTopic(topicName, createSettings).Wait(); + } + + { + auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 3); + } + + { + TAlterTopicSettings alterSettings; + alterSettings + .BeginAlterPartitioningSettings() + .MinActivePartitions(5) + .EndAlterTopicPartitioningSettings(); + client.AlterTopic(topicName, alterSettings).Wait(); + } + + { + auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 5); + } + } + Y_UNIT_TEST(ControlPlane_PauseAutoPartitioning) { auto topicName = "autoscalit-topic"; diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 15c9f61fd117..9a6dc997fd8a 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -514,9 +514,14 @@ namespace NKikimr::NDataStreams::V1 { } if (!GetProtoRequest()->has_partitioning_settings() || + (GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() && - GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == - Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED)) { + + (GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == + Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED) || + + (GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == + Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED))) { if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) { @@ -537,7 +542,6 @@ namespace NKikimr::NDataStreams::V1 { auto& as = s.auto_partitioning_settings(); switch(as.strategy()) { case Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED: - break; case Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED: case Ydb::DataStreams::V1::AutoPartitioningStrategy::AutoPartitioningStrategy_INT_MAX_SENTINEL_DO_NOT_USE_: case Ydb::DataStreams::V1::AutoPartitioningStrategy::AutoPartitioningStrategy_INT_MIN_SENTINEL_DO_NOT_USE_: diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 2e9e439f2e5f..4a7ae905c95b 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -2867,6 +2867,66 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); } + { + auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest, + NYDS_V1::TUpdateStreamSettings() + .TargetShardCount(10) + ).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + if (result.GetStatus() != EStatus::SUCCESS) { + result.GetIssues().PrintTo(Cerr); + } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& d = result.GetResult().stream_description(); + UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 10); + UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE); + UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest); + UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest); + + UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); + } + + { + auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest, + NYDS_V1::TUpdateStreamSettings() + .TargetShardCount(15) + .BeginConfigurePartitioningSettings() + .BeginConfigureAutoPartitioningSettings() + .EndConfigureAutoPartitioningSettings() + .EndConfigurePartitioningSettings() + ).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + if (result.GetStatus() != EStatus::SUCCESS) { + result.GetIssues().PrintTo(Cerr); + } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& d = result.GetResult().stream_description(); + UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 15); + UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE); + UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest); + UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest); + + UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); + } + { auto result = testServer.DataStreamsClient->CreateStream(streamName2, NYDS_V1::TCreateStreamSettings() diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 851271426aad..bbb641ac5625 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -1165,7 +1165,7 @@ namespace NKikimr::NGRpcProxy::V1 { NPQ::Migrate(*pqTabletConfig); auto partConfig = pqTabletConfig->MutablePartitionConfig(); - auto finalAutoPartitioningEnabled = false; + auto needHandleAutoPartitioning = false; if (appData->FeatureFlags.GetEnableTopicSplitMerge()) { auto reqHasAutoPartitioningStrategyChange = request.has_alter_partitioning_settings() && @@ -1176,10 +1176,13 @@ namespace NKikimr::NGRpcProxy::V1 { pqTabletConfig->GetPartitionStrategy().HasPartitionStrategyType() && pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType(); - if (reqHasAutoPartitioningStrategyChange) { - finalAutoPartitioningEnabled = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy() != ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED; - } else if (pqConfigHasAutoPartitioningStrategy) { - finalAutoPartitioningEnabled = pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED; + if (pqConfigHasAutoPartitioningStrategy && pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED) { + needHandleAutoPartitioning = true; + } else if (reqHasAutoPartitioningStrategyChange) { + auto strategy = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy(); + needHandleAutoPartitioning = strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED || + strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP || + strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN; } } @@ -1197,12 +1200,12 @@ namespace NKikimr::NGRpcProxy::V1 { if (settings.has_set_min_active_partitions()) { auto minParts = IfEqualThenDefault(settings.set_min_active_partitions(), 0L, 1L); pqDescr.SetTotalGroupCount(minParts); - if (finalAutoPartitioningEnabled) { + if (needHandleAutoPartitioning) { pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts); } } - if (finalAutoPartitioningEnabled) { + if (needHandleAutoPartitioning) { if (settings.has_set_max_active_partitions()) { pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions()); } @@ -1238,7 +1241,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } - if (finalAutoPartitioningEnabled) { + if (needHandleAutoPartitioning) { auto code = ValidatePartitionStrategy(*pqTabletConfig, error); if (code) return code->YdbCode; } From c39bccc8e8b3016958b0a62fbb045d0743e6755f Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 25 Sep 2024 10:24:49 +0000 Subject: [PATCH 5/5] fix --- ydb/services/datastreams/datastreams_proxy.cpp | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 9a6dc997fd8a..4a92862a0c7c 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -514,14 +514,9 @@ namespace NKikimr::NDataStreams::V1 { } if (!GetProtoRequest()->has_partitioning_settings() || - (GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() && - - (GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == - Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED) || - - (GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == - Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED))) { + (GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED) || + (GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED))) { if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) {