diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 1162ff585eb7..8dffb13eb4ba 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -769,7 +769,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext auto& userInfo = userInfoPair.second; if (!userInfo.LabeledCounters) continue; - if (!userInfo.HasReadRule && !userInfo.Important) + if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important) continue; auto* cac = ac->AddConsumerAggregatedCounters(); cac->SetConsumer(userInfo.User); @@ -1124,7 +1124,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { auto& userInfo = userInfoPair.second; if (!userInfo.LabeledCounters) continue; - if (!userInfo.HasReadRule && !userInfo.Important) + if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important) continue; bool haveChanges = false; userInfo.EndOffset = EndOffset; @@ -1228,6 +1228,12 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage); } } + + if (userInfoPair.first == CLIENTID_WITHOUT_CONSUMER ) { + PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get()); + PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_USAGE].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get()); + } + if (haveChanges) { ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters)); } @@ -1339,6 +1345,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) { PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage); } } + + if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Get()) { + ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60; + if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) { + haveChanges = true; + PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage); + } + } return haveChanges; } @@ -1849,7 +1863,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC if (LastOffsetHasBeenCommited(userInfo)) { SendReadingFinished(user); } - } else { + } else if (user != CLIENTID_WITHOUT_CONSUMER) { auto ui = UsersInfoStorage->GetIfExists(user); if (ui && ui->LabeledCounters) { ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup()); diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 76e431156940..407bdb7a92a5 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -883,7 +883,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false); WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false); if (IsQuotingEnabled()) { - subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"}); + subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"}); TopicWriteQuotaWaitCounter = THolder( new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForTopic(counters, IsServerless), {}, @@ -896,7 +896,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { subgroups.pop_back(); } - subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"}); + subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"}); PartitionWriteQuotaWaitCounter = THolder( new NKikimr::NPQ::TPercentileCounter( NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin", diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html index 4293a0c49e47..6f7e35c67c88 100644 --- a/ydb/core/persqueue/ut/resources/counters_datastreams.html +++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html @@ -25,21 +25,6 @@ bin=60000: 0 bin=999999: 0 - name=api.grpc.topic.stream_write.partition_throttled_milliseconds: - bin=0: 30 - bin=1: 0 - bin=10: 0 - bin=100: 0 - bin=1000: 0 - bin=10000: 0 - bin=20: 0 - bin=2500: 0 - bin=5: 0 - bin=50: 0 - bin=500: 0 - bin=5000: 0 - bin=999999: 0 - name=topic.write.lag_milliseconds: bin=100: 0 bin=1000: 10 @@ -68,4 +53,19 @@ bin=5242880: 0 bin=67108864: 0 bin=99999999: 0 + + name=topic.write.partition_throttled_milliseconds: + bin=0: 30 + bin=1: 0 + bin=10: 0 + bin=100: 0 + bin=1000: 0 + bin=10000: 0 + bin=20: 0 + bin=2500: 0 + bin=5: 0 + bin=50: 0 + bin=500: 0 + bin=5000: 0 + bin=999999: 0 diff --git a/ydb/core/persqueue/ut/resources/counters_topics.html b/ydb/core/persqueue/ut/resources/counters_topics.html index 4e5528739e00..50fd401c7e27 100644 --- a/ydb/core/persqueue/ut/resources/counters_topics.html +++ b/ydb/core/persqueue/ut/resources/counters_topics.html @@ -16,6 +16,8 @@ name=topic.partition.read.inflight_throttled_microseconds_max: 0 name=topic.partition.read.speed_limit_bytes_per_second: 20000000000 name=topic.partition.read.throttled_microseconds_max: 0 + name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0 + name=topic.partition.read_without_consumer.throttled_microseconds_max: 0 name=topic.partition.storage_bytes_max: 0 name=topic.partition.total_count: 2 name=topic.partition.uptime_milliseconds_min: 30000 diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index c381e0f1eb97..8c975d9a4b7b 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -237,4 +237,7 @@ enum EPartitionLabeledCounters { METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES = 38 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}]; METRIC_READ_INFLIGHT_LIMIT_THROTTLED = 39 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read.inflight_throttled_microseconds_max"}]; + + METRIC_READ_QUOTA_NO_CONSUMER_BYTES = 40 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read_without_consumer.speed_limit_bytes_per_second"}]; + METRIC_READ_QUOTA_NO_CONSUMER_USAGE = 41 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read_without_consumer.throttled_microseconds_max"}]; } diff --git a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp index 01860f43c19c..037e921ccf7a 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.cpp @@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector(params.Driver); std::optional txSupport; - auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx); auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver); - auto consumers = describeTopicResult.GetConsumers(); + NYdb::NTopic::TReadSessionSettings settings; + + if (!params.ReadWithoutConsumer) { + auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx); + auto consumers = describeTopicResult.GetConsumers(); - if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; })) - { - WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'."); - exit(EXIT_FAILURE); + if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; })) + { + WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'."); + exit(EXIT_FAILURE); + } + settings.ConsumerName(consumerName).AppendTopics(params.TopicName); + } else { + NYdb::NTopic::TTopicReadSettings topic = params.TopicName; + auto partitions = describeTopicResult.GetPartitions(); + for(auto partition: partitions) { + topic.AppendPartitionIds(partition.GetPartitionId()); + } + settings.WithoutConsumer().AppendTopics(topic); } + if (params.UseTransactions) { txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName); } - NYdb::NTopic::TReadSessionSettings settings; - settings.ConsumerName(consumerName).AppendTopics(params.TopicName); - auto readSession = topicClient->CreateReadSession(settings); WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, "Reader session was created."); @@ -93,7 +103,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta << " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime); } - if (!txSupport || params.UseTopicCommit) { + if (!params.ReadWithoutConsumer && (!txSupport || params.UseTopicCommit)) { dataEvent->Commit(); } } else if (auto* createPartitionStreamEvent = std::get_if(&event)) { diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h index 09ced96f1786..39b091856dab 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_reader.h @@ -30,6 +30,7 @@ namespace NYdb { bool UseTopicCommit = false; bool UseTableSelect = true; bool UseTableUpsert = true; + bool ReadWithoutConsumer = false; size_t CommitPeriod = 15; size_t CommitMessages = 1'000'000; }; diff --git a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp index 555ca5eb488b..11ac51580b02 100644 --- a/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp +++ b/ydb/public/lib/ydb_cli/commands/topic_workload/topic_workload_run_read.cpp @@ -34,6 +34,9 @@ void TCommandWorkloadTopicRunRead::Config(TConfig& config) config.Opts->AddLongOption("topic", "Topic name.") .DefaultValue(TOPIC) .StoreResult(&Scenario.TopicName); + config.Opts->AddLongOption("no-consumer", "Read without consumer") + .Hidden() + .StoreTrue(&Scenario.ReadWithoutConsumer); // Specific params config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '-0' ... '-' where n is set in the '--consumers' option.") diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 5258dfd286b2..8e617380ae17 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -463,7 +463,7 @@ namespace NKikimr::NPersQueueTests { "topic.write.bytes", "topic.write.messages", "api.grpc.topic.stream_write.bytes", - "api.grpc.topic.stream_write.partition_throttled_milliseconds", + "topic.write.partition_throttled_milliseconds", "topic.write.message_size_bytes", "api.grpc.topic.stream_write.messages", "topic.write.lag_milliseconds",