Skip to content

Commit

Permalink
Merge 7faead1 into 1c15550
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Jul 20, 2024
2 parents 1c15550 + 7faead1 commit 4e13ff9
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 7 deletions.
36 changes: 36 additions & 0 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,42 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);

bool firstPartitionFound = false;
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
if (partition.GetPartitionId() == 0) {
firstPartitionFound = true;
UNIT_ASSERT(!partition.GetActive());
UNIT_ASSERT_EQUAL(partition.GetChildPartitionIds().size(), 2);
auto childIds = partition.GetChildPartitionIds();
std::sort(childIds.begin(), childIds.end());
UNIT_ASSERT_EQUAL(childIds[0], 1);
UNIT_ASSERT_EQUAL(childIds[1], 2);
}
}

UNIT_ASSERT(firstPartitionFound);

TString secondPartitionTo = "";
TString thirdPartitionFrom = "";
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
if (partition.GetPartitionId() == 1 || partition.GetPartitionId() == 2) {
UNIT_ASSERT(partition.GetActive());
if (partition.GetPartitionId() == 1) {
UNIT_ASSERT(partition.GetToBound().Defined() && !partition.GetToBound()->Empty());
secondPartitionTo = *partition.GetToBound();
}
if (partition.GetPartitionId() == 2) {
UNIT_ASSERT(partition.GetFromBound().Defined() && !partition.GetFromBound()->Empty());
thirdPartitionFrom = *partition.GetFromBound();
}
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds().size(), 1);
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds()[0], 0);
}
}

UNIT_ASSERT(!secondPartitionTo.Empty());
UNIT_ASSERT(!thirdPartitionFrom.Empty());

auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
UNIT_ASSERT(writeSession2->Write(Msg(msg, 3)));
UNIT_ASSERT(writeSession2->Write(Msg(msg, 4)));
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class TAlterPQ: public TSubOperation {
return nullptr;
}

if (!alterConfig.HasPartitionStrategy() && tabletConfig->HasPartitionStrategy()) {
alterConfig.MutablePartitionStrategy()->CopyFrom(tabletConfig->GetPartitionStrategy());
}

if (alterConfig.GetPartitionConfig().HasLifetimeSeconds()) {
const auto lifetimeSeconds = alterConfig.GetPartitionConfig().GetLifetimeSeconds();
if (lifetimeSeconds <= 0 || (ui32)lifetimeSeconds > TSchemeShard::MaxPQLifetimeSeconds) {
Expand Down
9 changes: 9 additions & 0 deletions ydb/public/api/protos/ydb_topic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,13 @@ message DescribeTopicResponse {
Ydb.Operations.Operation operation = 1;
}

message PartitionKeyRange {
// Inclusive left border. Emptiness means -inf.
optional bytes from_bound = 1;
// Exclusive right border. Emptiness means +inf.
optional bytes to_bound = 2;
}

// Describe topic result message that will be inside DescribeTopicResponse.operation.
message DescribeTopicResult {
// Description of scheme object.
Expand Down Expand Up @@ -1087,6 +1094,8 @@ message DescribeTopicResult {

// Partition location, filled only when include_location in request is true.
PartitionLocation partition_location = 6;

PartitionKeyRange key_range = 7;
}

message TopicStats {
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ namespace {
config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed")
.Optional()
.StoreResult(&StartingMessageTimestamp_);
config.Opts->AddLongOption("important", "Is consumer important")
.Optional()
.DefaultValue(false)
.StoreResult(&IsImportant_);
config.Opts->SetFreeArgsNum(1);
SetFreeArgTitle(0, "<topic-path>", "Topic path");
AddAllowedCodecs(config, AllowedCodecs);
Expand Down Expand Up @@ -537,6 +541,7 @@ namespace {
codecs.push_back(NTopic::ECodec::RAW);
}
consumerSettings.SetSupportedCodecs(codecs);
consumerSettings.SetImportant(IsImportant_);

readRuleSettings.AppendAddConsumers(consumerSettings);

Expand Down
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ namespace NYdb::NConsoleClient {

private:
TString ConsumerName_;
bool IsImportant_;
TMaybe<ui64> StartingMessageTimestamp_;
};

Expand Down
25 changes: 25 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,22 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI
for (const auto& partId : partitionInfo.parent_partition_ids()) {
ParentPartitionIds_.push_back(partId);
}

if (partitionInfo.has_partition_stats()) {
PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()};
}

if (partitionInfo.has_partition_location()) {
PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()};
}

if (partitionInfo.has_key_range() && partitionInfo.key_range().has_from_bound()) {
FromBound_ = TString(partitionInfo.key_range().from_bound());
}

if (partitionInfo.has_key_range() && partitionInfo.key_range().has_to_bound()) {
ToBound_ = TString(partitionInfo.key_range().to_bound());
}
}

TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo)
Expand Down Expand Up @@ -437,6 +446,14 @@ const TMaybe<TPartitionLocation>& TPartitionInfo::GetPartitionLocation() const {
return PartitionLocation_;
}

const TVector<ui64> TPartitionInfo::GetChildPartitionIds() const {
return ChildPartitionIds_;
}

const TVector<ui64> TPartitionInfo::GetParentPartitionIds() const {
return ParentPartitionIds_;
}

bool TPartitionInfo::GetActive() const {
return Active_;
}
Expand All @@ -445,6 +462,14 @@ ui64 TPartitionInfo::GetPartitionId() const {
return PartitionId_;
}

const TMaybe<TString>& TPartitionInfo::GetFromBound() const {
return FromBound_;
}

const TMaybe<TString>& TPartitionInfo::GetToBound() const {
return ToBound_;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TTopicClient

Expand Down
16 changes: 14 additions & 2 deletions ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,21 @@ class TPartitionInfo {
const TMaybe<TPartitionConsumerStats>& GetPartitionConsumerStats() const;
const TMaybe<TPartitionLocation>& GetPartitionLocation() const;

const TMaybe<TString>& GetFromBound() const;
const TMaybe<TString>& GetToBound() const;

private:
ui64 PartitionId_;
bool Active_;
TVector<ui64> ChildPartitionIds_;
TVector<ui64> ParentPartitionIds_;

TMaybe<TPartitionStats> PartitionStats_;
TMaybe<TPartitionConsumerStats> PartitionConsumerStats_;
TMaybe<TPartitionLocation> PartitionLocation_;

TMaybe<TString> FromBound_;
TMaybe<TString> ToBound_;
};

struct TAlterPartitioningSettings;
Expand Down Expand Up @@ -206,11 +213,11 @@ class TPartitioningSettings {
public:
TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), PartitionCountLimit_(0), AutoPartitioningSettings_(){}
TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings);
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {})
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoPartitioning = {})
: MinActivePartitions_(minActivePartitions)
, MaxActivePartitions_(maxActivePartitions)
, PartitionCountLimit_(0)
, AutoPartitioningSettings_(autoscalingSettings)
, AutoPartitioningSettings_(autoPartitioning)
{
}

Expand Down Expand Up @@ -459,6 +466,11 @@ struct TConsumerSettings {
return *this;
}

TConsumerSettings& SetImportant(bool isImportant) {
Important_ = isImportant;
return *this;
}

TSettings& EndAddConsumer() { return Parent_; };

private:
Expand Down
25 changes: 20 additions & 5 deletions ydb/services/persqueue_v1/actors/schema_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,10 +1070,26 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv

if (response.PQGroupInfo) {
const auto& pqDescr = response.PQGroupInfo->Description;
for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) {
auto part = Result.add_partitions();
part->set_partition_id(i);
part->set_active(true);
for (auto& sourcePart: pqDescr.GetPartitions()) {
auto destPart = Result.add_partitions();
destPart->set_partition_id(sourcePart.GetPartitionId());
destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active);
if (sourcePart.HasKeyRange()) {
if (sourcePart.GetKeyRange().HasFromBound()) {
destPart->mutable_key_range()->set_from_bound(sourcePart.GetKeyRange().GetFromBound());
}
if (sourcePart.GetKeyRange().HasToBound()) {
destPart->mutable_key_range()->set_to_bound(sourcePart.GetKeyRange().GetToBound());
}
}

for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) {
destPart->add_child_partition_ids(static_cast<int64_t>(sourcePart.GetChildPartitionIds(i)));
}

for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) {
destPart->add_parent_partition_ids(static_cast<int64_t>(sourcePart.GetParentPartitionIds(i)));
}
}

const auto &config = pqDescr.GetPQTabletConfig();
Expand Down Expand Up @@ -1401,7 +1417,6 @@ void TDescribePartitionActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TE
for (auto partData : record.GetPartResult()) {
if ((ui32)partData.GetPartition() != Settings.Partitions[0])
continue;

Y_ABORT_UNLESS((ui32)(partData.GetPartition()) == Settings.Partitions[0]);
partResult->set_partition_id(partData.GetPartition());
partResult->set_active(true);
Expand Down

0 comments on commit 4e13ff9

Please sign in to comment.