From dcce157282bd05632ed7a543c2378d634bcc9e4e Mon Sep 17 00:00:00 2001 From: niksaveliev Date: Thu, 29 Feb 2024 16:26:13 +0600 Subject: [PATCH] Workload read without consumer and metrics fixes (#1792) --- ydb/core/persqueue/partition.cpp | 20 +++++++++-- ydb/core/persqueue/partition_init.cpp | 4 +-- ydb/core/persqueue/ut/counters_ut.cpp | 4 +-- .../ut/resources/counters_datastreams.html | 34 +++++++++---------- .../ut/resources/counters_pqproxy.html | 28 +++++++-------- .../counters_pqproxy_firstclass.html | 8 ++--- .../ut/resources/counters_topics.html | 2 ++ ydb/core/protos/counters_pq.proto | 3 ++ .../commands/topic_operations_scenario.cpp | 1 + .../commands/topic_operations_scenario.h | 1 + .../topic_workload/topic_workload_reader.cpp | 30 ++++++++++------ .../topic_workload/topic_workload_reader.h | 1 + .../topic_workload_run_read.cpp | 3 ++ .../persqueue_new_schemecache_ut.cpp | 2 +- 14 files changed, 88 insertions(+), 53 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 2d069b876db0..a5a25bec724d 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -769,7 +769,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext auto& userInfo = userInfoPair.second; if (!userInfo.LabeledCounters) continue; - if (!userInfo.HasReadRule && !userInfo.Important) + if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important) continue; auto* cac = ac->AddConsumerAggregatedCounters(); cac->SetConsumer(userInfo.User); @@ -1124,7 +1124,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { auto& userInfo = userInfoPair.second; if (!userInfo.LabeledCounters) continue; - if (!userInfo.HasReadRule && !userInfo.Important) + if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important) continue; bool haveChanges = false; userInfo.EndOffset = EndOffset; @@ -1228,6 +1228,12 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage); } } + + if (userInfoPair.first == CLIENTID_WITHOUT_CONSUMER ) { + PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get()); + PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_USAGE].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get()); + } + if (haveChanges) { ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters)); } @@ -1339,6 +1345,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage); } } + + if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Get()) { + ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60; + if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) { + haveChanges = true; + PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage); + } + } return haveChanges; } @@ -1853,7 +1867,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC if (LastOffsetHasBeenCommited(userInfo)) { SendReadingFinished(user); } - } else { + } else if (user != CLIENTID_WITHOUT_CONSUMER) { auto ui = UsersInfoStorage->GetIfExists(user); if (ui && ui->LabeledCounters) { ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup()); diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 76e431156940..407bdb7a92a5 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -883,7 +883,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false); WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false); if (IsQuotingEnabled()) { - subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"}); + subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"}); TopicWriteQuotaWaitCounter = THolder( new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForTopic(counters, IsServerless), {}, @@ -896,7 +896,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { subgroups.pop_back(); } - subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"}); + subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"}); PartitionWriteQuotaWaitCounter = THolder( new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin", diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index d634bbd66842..74c5ed95981b 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -138,8 +138,8 @@ Y_UNIT_TEST(PartitionWriteQuota) { TStringStream histogramStr; histogram->OutputHtml(histogramStr); Cerr << "**** Total histogram: **** \n " << histogramStr.Str() << "**** **** **** ****" << Endl; - UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "1000ms")->Val(), 3); - UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "2500ms")->Val(), 1); + UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "0ms")->Val(),2); + UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "2500ms")->Val(), 5); } } diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html index 4293a0c49e47..9de064cfaf99 100644 --- a/ydb/core/persqueue/ut/resources/counters_datastreams.html +++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html @@ -25,30 +25,15 @@ bin=60000: 0 bin=999999: 0 - name=api.grpc.topic.stream_write.partition_throttled_milliseconds: - bin=0: 30 - bin=1: 0 - bin=10: 0 - bin=100: 0 - bin=1000: 0 - bin=10000: 0 - bin=20: 0 - bin=2500: 0 - bin=5: 0 - bin=50: 0 - bin=500: 0 - bin=5000: 0 - bin=999999: 0 - name=topic.write.lag_milliseconds: bin=100: 0 - bin=1000: 10 + bin=1000: 0 bin=10000: 0 bin=180000: 0 bin=200: 0 bin=2000: 0 bin=30000: 0 - bin=500: 20 + bin=500: 30 bin=5000: 0 bin=60000: 0 bin=999999: 0 @@ -68,4 +53,19 @@ bin=5242880: 0 bin=67108864: 0 bin=99999999: 0 + + name=topic.write.partition_throttled_milliseconds: + bin=0: 30 + bin=1: 0 + bin=10: 0 + bin=100: 0 + bin=1000: 0 + bin=10000: 0 + bin=20: 0 + bin=2500: 0 + bin=5: 0 + bin=50: 0 + bin=500: 0 + bin=5000: 0 + bin=999999: 0 diff --git a/ydb/core/persqueue/ut/resources/counters_pqproxy.html b/ydb/core/persqueue/ut/resources/counters_pqproxy.html index de2bdae32895..530620620daa 100644 --- a/ydb/core/persqueue/ut/resources/counters_pqproxy.html +++ b/ydb/core/persqueue/ut/resources/counters_pqproxy.html @@ -14,26 +14,26 @@ Account=asdfgs: Duration=10000ms: 0 Duration=1000ms: 0 - Duration=100ms: 0 + Duration=100ms: 3 Duration=1500ms: 0 Duration=2000ms: 0 Duration=200ms: 0 Duration=30000ms: 0 Duration=5000ms: 0 - Duration=500ms: 3 + Duration=500ms: 0 Duration=550ms: 0 Duration=99999999ms: 0 Account=total: Duration=10000ms: 0 Duration=1000ms: 0 - Duration=100ms: 0 + Duration=100ms: 3 Duration=1500ms: 0 Duration=2000ms: 0 Duration=200ms: 0 Duration=30000ms: 0 Duration=5000ms: 0 - Duration=500ms: 3 + Duration=500ms: 0 Duration=550ms: 0 Duration=99999999ms: 0 @@ -543,14 +543,14 @@ sensor=TimeLagsOriginal: Interval=10000ms: 0 - Interval=1000ms: 10 + Interval=1000ms: 0 Interval=100ms: 0 Interval=180000ms: 0 Interval=2000ms: 0 Interval=200ms: 0 Interval=30000ms: 0 Interval=5000ms: 0 - Interval=500ms: 20 + Interval=500ms: 30 Interval=60000ms: 0 Interval=999999ms: 0 @@ -558,14 +558,14 @@ sensor=TimeLagsOriginal: Interval=10000ms: 0 - Interval=1000ms: 10 + Interval=1000ms: 0 Interval=100ms: 0 Interval=180000ms: 0 Interval=2000ms: 0 Interval=200ms: 0 Interval=30000ms: 0 Interval=5000ms: 0 - Interval=500ms: 20 + Interval=500ms: 30 Interval=60000ms: 0 Interval=999999ms: 0 @@ -577,14 +577,14 @@ sensor=TimeLagsOriginal: Interval=10000ms: 0 - Interval=1000ms: 10 + Interval=1000ms: 0 Interval=100ms: 0 Interval=180000ms: 0 Interval=2000ms: 0 Interval=200ms: 0 Interval=30000ms: 0 Interval=5000ms: 0 - Interval=500ms: 20 + Interval=500ms: 30 Interval=60000ms: 0 Interval=999999ms: 0 @@ -598,14 +598,14 @@ sensor=TimeLagsOriginal: Interval=10000ms: 0 - Interval=1000ms: 10 + Interval=1000ms: 0 Interval=100ms: 0 Interval=180000ms: 0 Interval=2000ms: 0 Interval=200ms: 0 Interval=30000ms: 0 Interval=5000ms: 0 - Interval=500ms: 20 + Interval=500ms: 30 Interval=60000ms: 0 Interval=999999ms: 0 @@ -621,14 +621,14 @@ sensor=TimeLagsOriginal: Interval=10000ms: 0 - Interval=1000ms: 10 + Interval=1000ms: 0 Interval=100ms: 0 Interval=180000ms: 0 Interval=2000ms: 0 Interval=200ms: 0 Interval=30000ms: 0 Interval=5000ms: 0 - Interval=500ms: 20 + Interval=500ms: 30 Interval=60000ms: 0 Interval=999999ms: 0 diff --git a/ydb/core/persqueue/ut/resources/counters_pqproxy_firstclass.html b/ydb/core/persqueue/ut/resources/counters_pqproxy_firstclass.html index 9ad3c45a168b..6fbbfec01a67 100644 --- a/ydb/core/persqueue/ut/resources/counters_pqproxy_firstclass.html +++ b/ydb/core/persqueue/ut/resources/counters_pqproxy_firstclass.html @@ -14,26 +14,26 @@ Account=federationAccount: Duration=10000ms: 0 Duration=1000ms: 0 - Duration=100ms: 0 + Duration=100ms: 3 Duration=1500ms: 0 Duration=2000ms: 0 Duration=200ms: 0 Duration=30000ms: 0 Duration=5000ms: 0 - Duration=500ms: 3 + Duration=500ms: 0 Duration=550ms: 0 Duration=99999999ms: 0 Account=total: Duration=10000ms: 0 Duration=1000ms: 0 - Duration=100ms: 0 + Duration=100ms: 3 Duration=1500ms: 0 Duration=2000ms: 0 Duration=200ms: 0 Duration=30000ms: 0 Duration=5000ms: 0 - Duration=500ms: 3 + Duration=500ms: 0 Duration=550ms: 0 Duration=99999999ms: 0 diff --git a/ydb/core/persqueue/ut/resources/counters_topics.html b/ydb/core/persqueue/ut/resources/counters_topics.html index 4e5528739e00..50fd401c7e27 100644 --- a/ydb/core/persqueue/ut/resources/counters_topics.html +++ b/ydb/core/persqueue/ut/resources/counters_topics.html @@ -16,6 +16,8 @@ name=topic.partition.read.inflight_throttled_microseconds_max: 0 name=topic.partition.read.speed_limit_bytes_per_second: 20000000000 name=topic.partition.read.throttled_microseconds_max: 0 + name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0 + name=topic.partition.read_without_consumer.throttled_microseconds_max: 0 name=topic.partition.storage_bytes_max: 0 name=topic.partition.total_count: 2 name=topic.partition.uptime_milliseconds_min: 30000 diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index c381e0f1eb97..8c975d9a4b7b 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -237,4 +237,7 @@ enum EPartitionLabeledCounters { METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES = 38 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}]; METRIC_READ_INFLIGHT_LIMIT_THROTTLED = 39 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read.inflight_throttled_microseconds_max"}]; + + METRIC_READ_QUOTA_NO_CONSUMER_BYTES = 40 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read_without_consumer.speed_limit_bytes_per_second"}]; + METRIC_READ_QUOTA_NO_CONSUMER_USAGE = 41 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read_without_consumer.throttled_microseconds_max"}]; } diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp index 01860f43c19c..037e921ccf7a 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp @@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector(params.Driver); std::optional txSupport; - auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx); auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver); - auto consumers = describeTopicResult.GetConsumers(); + NYdb::NTopic::TReadSessionSettings settings; + + if (!params.ReadWithoutConsumer) { + auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx); + auto consumers = describeTopicResult.GetConsumers(); - if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; })) - { - WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'."); - exit(EXIT_FAILURE); + if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; })) + { + WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'."); + exit(EXIT_FAILURE); + } + settings.ConsumerName(consumerName).AppendTopics(params.TopicName); + } else { + NYdb::NTopic::TTopicReadSettings topic = params.TopicName; + auto partitions = describeTopicResult.GetPartitions(); + for(auto partition: partitions) { + topic.AppendPartitionIds(partition.GetPartitionId()); + } + settings.WithoutConsumer().AppendTopics(topic); } + if (params.UseTransactions) { txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName); } - NYdb::NTopic::TReadSessionSettings settings; - settings.ConsumerName(consumerName).AppendTopics(params.TopicName); - auto readSession = topicClient->CreateReadSession(settings); WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, "Reader session was created."); @@ -93,7 +103,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta << " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime); } - if (!txSupport || params.UseTopicCommit) { + if (!params.ReadWithoutConsumer && (!txSupport || params.UseTopicCommit)) { dataEvent->Commit(); } } else if (auto* createPartitionStreamEvent = std::get_if(&event)) { diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h index 09ced96f1786..39b091856dab 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h @@ -30,6 +30,7 @@ namespace NYdb { bool UseTopicCommit = false; bool UseTableSelect = true; bool UseTableUpsert = true; + bool ReadWithoutConsumer = false; size_t CommitPeriod = 15; size_t CommitMessages = 1'000'000; }; diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp index 555ca5eb488b..11ac51580b02 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp @@ -34,6 +34,9 @@ void TCommandWorkloadTopicRunRead::Config(TConfig& config) config.Opts->AddLongOption("topic", "Topic name.") .DefaultValue(TOPIC) .StoreResult(&Scenario.TopicName); + config.Opts->AddLongOption("no-consumer", "Read without consumer") + .Hidden() + .StoreTrue(&Scenario.ReadWithoutConsumer); // Specific params config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '-0' ... '-' where n is set in the '--consumers' option.") diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 5258dfd286b2..8e617380ae17 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -463,7 +463,7 @@ namespace NKikimr::NPersQueueTests { "topic.write.bytes", "topic.write.messages", "api.grpc.topic.stream_write.bytes", - "api.grpc.topic.stream_write.partition_throttled_milliseconds", + "topic.write.partition_throttled_milliseconds", "topic.write.message_size_bytes", "api.grpc.topic.stream_write.messages", "topic.write.lag_milliseconds",