From 1115b5fc435cc2de473eca02fb0b3feb4643e266 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 1 Jul 2024 09:38:47 +0000 Subject: [PATCH] Add auto partitioning bounds to SDK and important consumer to CLI --- .../ut/ut_with_sdk/autoscaling_ut.cpp | 35 +++++++++++++++++++ .../schemeshard__operation_alter_pq.cpp | 4 +++ ydb/public/api/protos/ydb_topic.proto | 5 +++ .../ydb_cli/commands/ydb_service_topic.cpp | 5 +++ .../lib/ydb_cli/commands/ydb_service_topic.h | 1 + .../sdk/cpp/client/ydb_topic/impl/topic.cpp | 25 +++++++++++++ .../client/ydb_topic/include/control_plane.h | 16 +++++++-- .../persqueue_v1/actors/schema_actors.cpp | 25 ++++++++++--- 8 files changed, 109 insertions(+), 7 deletions(-) diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index 80de3bd5b9c5..c6c7d9a3a482 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -800,6 +800,41 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync(); UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3); + bool firstPartitionFound = false; + for (const auto& partition : describe.GetTopicDescription().GetPartitions()) { + if (partition.GetPartitionId() == 0) { + firstPartitionFound = true; + UNIT_ASSERT(!partition.GetActive()); + UNIT_ASSERT_EQUAL(partition.GetChildPartitionIds().size(), 2); + auto childIds = partition.GetChildPartitionIds(); + std::sort(childIds.begin(), childIds.end()); + UNIT_ASSERT_EQUAL(childIds[0], 1); + UNIT_ASSERT_EQUAL(childIds[1], 2); + } + } + UNIT_ASSERT(firstPartitionFound); + + TString secondPartitionTo = ""; + TString thirdPartitionFrom = ""; + for (const auto& partition : describe.GetTopicDescription().GetPartitions()) { + if (partition.GetPartitionId() == 1 || partition.GetPartitionId() == 2) { + UNIT_ASSERT(partition.GetActive()); + if (partition.GetPartitionId() == 1) { + UNIT_ASSERT(partition.GetToBound().Defined() && !partition.GetToBound()->Empty()); + secondPartitionTo = *partition.GetToBound(); + } + if (partition.GetPartitionId() == 2) { + UNIT_ASSERT(partition.GetFromBound().Defined() && !partition.GetFromBound()->Empty()); + thirdPartitionFrom = *partition.GetFromBound(); + } + UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds().size(), 1); + UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds()[0], 0); + } + } + + UNIT_ASSERT(!secondPartitionTo.Empty()); + UNIT_ASSERT(!thirdPartitionFrom.Empty()); + auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false); UNIT_ASSERT(writeSession2->Write(Msg(msg, 3))); UNIT_ASSERT(writeSession2->Write(Msg(msg, 4))); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index dac4b58742ce..5246284f82c4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -136,6 +136,10 @@ class TAlterPQ: public TSubOperation { return nullptr; } + if (!alterConfig.HasPartitionStrategy() && tabletConfig->HasPartitionStrategy()) { + alterConfig.MutablePartitionStrategy()->CopyFrom(tabletConfig->GetPartitionStrategy()); + } + if (alterConfig.GetPartitionConfig().HasLifetimeSeconds()) { const auto lifetimeSeconds = alterConfig.GetPartitionConfig().GetLifetimeSeconds(); if (lifetimeSeconds <= 0 || (ui32)lifetimeSeconds > TSchemeShard::MaxPQLifetimeSeconds) { diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index ccccbd59d811..98fb38b118f7 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -1087,6 +1087,11 @@ message DescribeTopicResult { // Partition location, filled only when include_location in request is true. PartitionLocation partition_location = 6; + + // Inclusive left border. Emptiness means -inf. + optional bytes FromBound = 7; + // Exclusive right border. Emptiness means +inf. + optional bytes ToBound = 8; } message TopicStats { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index c9caba3825ba..15ea3748aa6d 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -507,6 +507,10 @@ namespace { config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed") .Optional() .StoreResult(&StartingMessageTimestamp_); + config.Opts->AddLongOption("important", "Is consumer important") + .Optional() + .DefaultValue(false) + .StoreResult(&IsImportant_); config.Opts->SetFreeArgsNum(1); SetFreeArgTitle(0, "", "Topic path"); AddAllowedCodecs(config, AllowedCodecs); @@ -537,6 +541,7 @@ namespace { codecs.push_back(NTopic::ECodec::RAW); } consumerSettings.SetSupportedCodecs(codecs); + consumerSettings.SetImportant(IsImportant_); readRuleSettings.AppendAddConsumers(consumerSettings); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index cebefe86f6af..029ae5d4e07a 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -122,6 +122,7 @@ namespace NYdb::NConsoleClient { private: TString ConsumerName_; + bool IsImportant_; TMaybe StartingMessageTimestamp_; }; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index 486e6a747d39..a92e1ca8ee00 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -395,6 +395,7 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI for (const auto& partId : partitionInfo.parent_partition_ids()) { ParentPartitionIds_.push_back(partId); } + if (partitionInfo.has_partition_stats()) { PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()}; } @@ -402,6 +403,14 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI if (partitionInfo.has_partition_location()) { PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()}; } + + if (partitionInfo.HasFromBound()) { + FromBound_ = TString(partitionInfo.GetFromBound()); + } + + if (partitionInfo.HasToBound()) { + ToBound_ = TString(partitionInfo.GetToBound()); + } } TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo) @@ -437,6 +446,14 @@ const TMaybe& TPartitionInfo::GetPartitionLocation() const { return PartitionLocation_; } +const TVector TPartitionInfo::GetChildPartitionIds() const { + return ChildPartitionIds_; +} + +const TVector TPartitionInfo::GetParentPartitionIds() const { + return ParentPartitionIds_; +} + bool TPartitionInfo::GetActive() const { return Active_; } @@ -445,6 +462,14 @@ ui64 TPartitionInfo::GetPartitionId() const { return PartitionId_; } +const TMaybe& TPartitionInfo::GetFromBound() const { + return FromBound_; +} + +const TMaybe& TPartitionInfo::GetToBound() const { + return ToBound_; +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TTopicClient diff --git a/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h b/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h index e84c54dd3135..8c5cede81c18 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h @@ -144,14 +144,21 @@ class TPartitionInfo { const TMaybe& GetPartitionConsumerStats() const; const TMaybe& GetPartitionLocation() const; + const TMaybe& GetFromBound() const; + const TMaybe& GetToBound() const; + private: ui64 PartitionId_; bool Active_; TVector ChildPartitionIds_; TVector ParentPartitionIds_; + TMaybe PartitionStats_; TMaybe PartitionConsumerStats_; TMaybe PartitionLocation_; + + TMaybe FromBound_; + TMaybe ToBound_; }; struct TAlterPartitioningSettings; @@ -206,11 +213,11 @@ class TPartitioningSettings { public: TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), PartitionCountLimit_(0), AutoPartitioningSettings_(){} TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings); - TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {}) + TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoPartitioning = {}) : MinActivePartitions_(minActivePartitions) , MaxActivePartitions_(maxActivePartitions) , PartitionCountLimit_(0) - , AutoPartitioningSettings_(autoscalingSettings) + , AutoPartitioningSettings_(autoPartitioning) { } @@ -459,6 +466,11 @@ struct TConsumerSettings { return *this; } + TConsumerSettings& SetImportant(bool isImportant) { + Important_ = isImportant; + return *this; + } + TSettings& EndAddConsumer() { return Parent_; }; private: diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index a0e4d3e1bad8..1f40f36dde2f 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1070,10 +1070,26 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv if (response.PQGroupInfo) { const auto& pqDescr = response.PQGroupInfo->Description; - for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) { - auto part = Result.add_partitions(); - part->set_partition_id(i); - part->set_active(true); + for (auto& sourcePart: pqDescr.GetPartitions()) { + auto destPart = Result.add_partitions(); + destPart->set_partition_id(sourcePart.GetPartitionId()); + destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active); + if (sourcePart.HasKeyRange()) { + if (sourcePart.GetKeyRange().HasFromBound()) { + destPart->set_frombound(sourcePart.GetKeyRange().GetFromBound()); + } + if (sourcePart.GetKeyRange().HasToBound()) { + destPart->set_tobound(sourcePart.GetKeyRange().GetToBound()); + } + } + + for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) { + destPart->add_child_partition_ids(static_cast(sourcePart.GetChildPartitionIds(i))); + } + + for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) { + destPart->add_parent_partition_ids(static_cast(sourcePart.GetParentPartitionIds(i))); + } } const auto &config = pqDescr.GetPQTabletConfig(); @@ -1401,7 +1417,6 @@ void TDescribePartitionActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TE for (auto partData : record.GetPartResult()) { if ((ui32)partData.GetPartition() != Settings.Partitions[0]) continue; - Y_ABORT_UNLESS((ui32)(partData.GetPartition()) == Settings.Partitions[0]); partResult->set_partition_id(partData.GetPartition()); partResult->set_active(true);