From 4787e02dad56dcd83662cfae67827488a8b9e96d Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 12 Feb 2024 07:15:36 +0000 Subject: [PATCH 1/2] Workload read without consumer and metrics fixes --- ydb/core/persqueue/partition.cpp | 6 ++-- ydb/core/persqueue/partition_init.cpp | 4 +-- ydb/core/persqueue/read_balancer.cpp | 5 +++- .../commands/topic_operations_scenario.cpp | 1 + .../commands/topic_operations_scenario.h | 1 + .../topic_workload/topic_workload_reader.cpp | 30 ++++++++++++------- .../topic_workload/topic_workload_reader.h | 1 + .../topic_workload_run_read.cpp | 3 ++ 8 files changed, 35 insertions(+), 16 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index cd1546fcbec1..d8ce5cb77eb5 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -748,7 +748,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext auto& userInfo = userInfoPair.second; if (!userInfo.LabeledCounters) continue; - if (!userInfo.HasReadRule && !userInfo.Important) + if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important) continue; auto* cac = ac->AddConsumerAggregatedCounters(); cac->SetConsumer(userInfo.User); @@ -1083,7 +1083,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { auto& userInfo = userInfoPair.second; if (!userInfo.LabeledCounters) continue; - if (!userInfo.HasReadRule && !userInfo.Important) + if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important) continue; bool haveChanges = false; userInfo.EndOffset = EndOffset; @@ -1805,7 +1805,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC } else { TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1); } - } else { + } else if (user != CLIENTID_WITHOUT_CONSUMER) { auto ui = UsersInfoStorage->GetIfExists(user); if (ui && ui->LabeledCounters) { ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup()); diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index dd18acf6e45a..e41c5173b1be 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -886,7 +886,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false); WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false); if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) { - subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"}); + subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"}); TopicWriteQuotaWaitCounter = THolder( new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForTopic(counters, IsServerless), {}, @@ -899,7 +899,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { subgroups.pop_back(); } - subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"}); + subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"}); PartitionWriteQuotaWaitCounter = THolder( new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin", diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 00b6ac32f89f..e911b15f5c15 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -1,6 +1,7 @@ #include "read_balancer.h" #include +#include "ydb/core/persqueue/user_info.h" #include #include #include @@ -65,6 +66,7 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA for (const auto& rr : Self->TabletConfig.GetReadRules()) { Self->Consumers[rr]; } + Self->Consumers[CLIENTID_WITHOUT_CONSUMER]; } Self->Inited = true; if (!dataRowset.Next()) @@ -536,7 +538,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr Consumers[rr]; } } - + Consumers[CLIENTID_WITHOUT_CONSUMER]; + TVector> newPartitions; TVector deletedPartitions; TVector> newTablets; diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp index 01860f43c19c..037e921ccf7a 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp @@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector(params.Driver); std::optional txSupport; - auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx); auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver); - auto consumers = describeTopicResult.GetConsumers(); + NYdb::NTopic::TReadSessionSettings settings; + + if (!params.ReadWithoutConsumer) { + auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx); + auto consumers = describeTopicResult.GetConsumers(); - if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; })) - { - WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'."); - exit(EXIT_FAILURE); + if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; })) + { + WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'."); + exit(EXIT_FAILURE); + } + settings.ConsumerName(consumerName).AppendTopics(params.TopicName); + } else { + NYdb::NTopic::TTopicReadSettings topic = params.TopicName; + auto partitions = describeTopicResult.GetPartitions(); + for(auto partition: partitions) { + topic.AppendPartitionIds(partition.GetPartitionId()); + } + settings.WithoutConsumer().AppendTopics(topic); } + if (params.UseTransactions) { txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName); } - NYdb::NTopic::TReadSessionSettings settings; - settings.ConsumerName(consumerName).AppendTopics(params.TopicName); - auto readSession = topicClient->CreateReadSession(settings); WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, "Reader session was created."); @@ -93,7 +103,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta << " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime); } - if (!txSupport || params.UseTopicCommit) { + if (!params.ReadWithoutConsumer && (!txSupport || params.UseTopicCommit)) { dataEvent->Commit(); } } else if (auto* createPartitionStreamEvent = std::get_if(&event)) { diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h index 09ced96f1786..39b091856dab 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h @@ -30,6 +30,7 @@ namespace NYdb { bool UseTopicCommit = false; bool UseTableSelect = true; bool UseTableUpsert = true; + bool ReadWithoutConsumer = false; size_t CommitPeriod = 15; size_t CommitMessages = 1'000'000; }; diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp index 555ca5eb488b..11ac51580b02 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp @@ -34,6 +34,9 @@ void TCommandWorkloadTopicRunRead::Config(TConfig& config) config.Opts->AddLongOption("topic", "Topic name.") .DefaultValue(TOPIC) .StoreResult(&Scenario.TopicName); + config.Opts->AddLongOption("no-consumer", "Read without consumer") + .Hidden() + .StoreTrue(&Scenario.ReadWithoutConsumer); // Specific params config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '-0' ... '-' where n is set in the '--consumers' option.") From 223c39d7254e2e21c5caae7ec7992c01171424fc Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 12 Feb 2024 10:31:03 +0000 Subject: [PATCH 2/2] Fix ut --- ydb/core/persqueue/ut/resources/counters_datastreams.html | 2 +- ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html index 4293a0c49e47..5b88044afcad 100644 --- a/ydb/core/persqueue/ut/resources/counters_datastreams.html +++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html @@ -25,7 +25,7 @@ bin=60000: 0 bin=999999: 0 - name=api.grpc.topic.stream_write.partition_throttled_milliseconds: + name=topic.write.partition_throttled_milliseconds: bin=0: 30 bin=1: 0 bin=10: 0 diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 5258dfd286b2..8e617380ae17 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -463,7 +463,7 @@ namespace NKikimr::NPersQueueTests { "topic.write.bytes", "topic.write.messages", "api.grpc.topic.stream_write.bytes", - "api.grpc.topic.stream_write.partition_throttled_milliseconds", + "topic.write.partition_throttled_milliseconds", "topic.write.message_size_bytes", "api.grpc.topic.stream_write.messages", "topic.write.lag_milliseconds",