Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workload read without consumer and metrics fixes #1792

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
@@ -769,7 +769,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
auto& userInfo = userInfoPair.second;
if (!userInfo.LabeledCounters)
continue;
if (!userInfo.HasReadRule && !userInfo.Important)
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
auto* cac = ac->AddConsumerAggregatedCounters();
cac->SetConsumer(userInfo.User);
@@ -1124,7 +1124,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
auto& userInfo = userInfoPair.second;
if (!userInfo.LabeledCounters)
continue;
if (!userInfo.HasReadRule && !userInfo.Important)
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
bool haveChanges = false;
userInfo.EndOffset = EndOffset;
@@ -1228,6 +1228,12 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage);
}
}

if (userInfoPair.first == CLIENTID_WITHOUT_CONSUMER ) {
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get());
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_USAGE].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get());
}

if (haveChanges) {
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters));
}
@@ -1339,6 +1345,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
}
}

if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Get()) {
ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60;
if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) {
haveChanges = true;
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
}
}
return haveChanges;
}

@@ -1849,7 +1863,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
if (LastOffsetHasBeenCommited(userInfo)) {
SendReadingFinished(user);
}
} else {
} else if (user != CLIENTID_WITHOUT_CONSUMER) {
auto ui = UsersInfoStorage->GetIfExists(user);
if (ui && ui->LabeledCounters) {
ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());
4 changes: 2 additions & 2 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
@@ -883,7 +883,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
if (IsQuotingEnabled()) {
subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"});
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
@@ -896,7 +896,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
subgroups.pop_back();
}

subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"});
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/counters_ut.cpp
Original file line number Diff line number Diff line change
@@ -138,8 +138,8 @@ Y_UNIT_TEST(PartitionWriteQuota) {
TStringStream histogramStr;
histogram->OutputHtml(histogramStr);
Cerr << "**** Total histogram: **** \n " << histogramStr.Str() << "**** **** **** ****" << Endl;
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "1000ms")->Val(), 3);
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "2500ms")->Val(), 1);
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "0ms")->Val(),2);
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "2500ms")->Val(), 5);
}
}

34 changes: 17 additions & 17 deletions ydb/core/persqueue/ut/resources/counters_datastreams.html
Original file line number Diff line number Diff line change
@@ -25,30 +25,15 @@
bin=60000: 0
bin=999999: 0

name=api.grpc.topic.stream_write.partition_throttled_milliseconds:
bin=0: 30
bin=1: 0
bin=10: 0
bin=100: 0
bin=1000: 0
bin=10000: 0
bin=20: 0
bin=2500: 0
bin=5: 0
bin=50: 0
bin=500: 0
bin=5000: 0
bin=999999: 0

name=topic.write.lag_milliseconds:
bin=100: 0
bin=1000: 10
bin=1000: 0
bin=10000: 0
bin=180000: 0
bin=200: 0
bin=2000: 0
bin=30000: 0
bin=500: 20
bin=500: 30
bin=5000: 0
bin=60000: 0
bin=999999: 0
@@ -68,4 +53,19 @@
bin=5242880: 0
bin=67108864: 0
bin=99999999: 0

name=topic.write.partition_throttled_milliseconds:
bin=0: 30
bin=1: 0
bin=10: 0
bin=100: 0
bin=1000: 0
bin=10000: 0
bin=20: 0
bin=2500: 0
bin=5: 0
bin=50: 0
bin=500: 0
bin=5000: 0
bin=999999: 0
</pre>
28 changes: 14 additions & 14 deletions ydb/core/persqueue/ut/resources/counters_pqproxy.html
Original file line number Diff line number Diff line change
@@ -14,26 +14,26 @@
Account=asdfgs:
Duration=10000ms: 0
Duration=1000ms: 0
Duration=100ms: 0
Duration=100ms: 3
Duration=1500ms: 0
Duration=2000ms: 0
Duration=200ms: 0
Duration=30000ms: 0
Duration=5000ms: 0
Duration=500ms: 3
Duration=500ms: 0
Duration=550ms: 0
Duration=99999999ms: 0

Account=total:
Duration=10000ms: 0
Duration=1000ms: 0
Duration=100ms: 0
Duration=100ms: 3
Duration=1500ms: 0
Duration=2000ms: 0
Duration=200ms: 0
Duration=30000ms: 0
Duration=5000ms: 0
Duration=500ms: 3
Duration=500ms: 0
Duration=550ms: 0
Duration=99999999ms: 0

@@ -543,29 +543,29 @@

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

OriginDC=cluster:

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

@@ -577,14 +577,14 @@

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

@@ -598,14 +598,14 @@

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

@@ -621,14 +621,14 @@

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

Original file line number Diff line number Diff line change
@@ -14,26 +14,26 @@
Account=federationAccount:
Duration=10000ms: 0
Duration=1000ms: 0
Duration=100ms: 0
Duration=100ms: 3
Duration=1500ms: 0
Duration=2000ms: 0
Duration=200ms: 0
Duration=30000ms: 0
Duration=5000ms: 0
Duration=500ms: 3
Duration=500ms: 0
Duration=550ms: 0
Duration=99999999ms: 0

Account=total:
Duration=10000ms: 0
Duration=1000ms: 0
Duration=100ms: 0
Duration=100ms: 3
Duration=1500ms: 0
Duration=2000ms: 0
Duration=200ms: 0
Duration=30000ms: 0
Duration=5000ms: 0
Duration=500ms: 3
Duration=500ms: 0
Duration=550ms: 0
Duration=99999999ms: 0
</pre>
2 changes: 2 additions & 0 deletions ydb/core/persqueue/ut/resources/counters_topics.html
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@
name=topic.partition.read.inflight_throttled_microseconds_max: 0
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
name=topic.partition.read.throttled_microseconds_max: 0
name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0
name=topic.partition.read_without_consumer.throttled_microseconds_max: 0
name=topic.partition.storage_bytes_max: 0
name=topic.partition.total_count: 2
name=topic.partition.uptime_milliseconds_min: 30000
3 changes: 3 additions & 0 deletions ydb/core/protos/counters_pq.proto
Original file line number Diff line number Diff line change
@@ -237,4 +237,7 @@ enum EPartitionLabeledCounters {
METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES = 38 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}];

METRIC_READ_INFLIGHT_LIMIT_THROTTLED = 39 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read.inflight_throttled_microseconds_max"}];

METRIC_READ_QUOTA_NO_CONSUMER_BYTES = 40 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read_without_consumer.speed_limit_bytes_per_second"}];
METRIC_READ_QUOTA_NO_CONSUMER_USAGE = 41 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read_without_consumer.throttled_microseconds_max"}];
}
Original file line number Diff line number Diff line change
@@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
.UseTopicCommit = OnlyTableInTx,
.UseTableSelect = UseTableSelect && !OnlyTopicInTx,
.UseTableUpsert = !OnlyTopicInTx,
.ReadWithoutConsumer = ReadWithoutConsumer,
.CommitPeriod = CommitPeriod,
.CommitMessages = CommitMessages
};
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ class TTopicOperationsScenario {
bool OnlyTopicInTx = false;
bool OnlyTableInTx = false;
bool UseTableSelect = true;
bool ReadWithoutConsumer = false;

protected:
void CreateTopic(const TString& database,
Original file line number Diff line number Diff line change
@@ -24,23 +24,33 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(params.Driver);
std::optional<TTransactionSupport> txSupport;

auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver);
auto consumers = describeTopicResult.GetConsumers();
NYdb::NTopic::TReadSessionSettings settings;

if (!params.ReadWithoutConsumer) {
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
auto consumers = describeTopicResult.GetConsumers();

if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
{
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
exit(EXIT_FAILURE);
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
{
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
exit(EXIT_FAILURE);
}
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
} else {
NYdb::NTopic::TTopicReadSettings topic = params.TopicName;
auto partitions = describeTopicResult.GetPartitions();
for(auto partition: partitions) {
topic.AppendPartitionIds(partition.GetPartitionId());
}
settings.WithoutConsumer().AppendTopics(topic);
}


if (params.UseTransactions) {
txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName);
}

NYdb::NTopic::TReadSessionSettings settings;
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);

auto readSession = topicClient->CreateReadSession(settings);
WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, "Reader session was created.");

@@ -93,7 +103,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
<< " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
}

if (!txSupport || params.UseTopicCommit) {
if (!params.ReadWithoutConsumer && (!txSupport || params.UseTopicCommit)) {
dataEvent->Commit();
}
} else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ namespace NYdb {
bool UseTopicCommit = false;
bool UseTableSelect = true;
bool UseTableUpsert = true;
bool ReadWithoutConsumer = false;
size_t CommitPeriod = 15;
size_t CommitMessages = 1'000'000;
};
Original file line number Diff line number Diff line change
@@ -34,6 +34,9 @@ void TCommandWorkloadTopicRunRead::Config(TConfig& config)
config.Opts->AddLongOption("topic", "Topic name.")
.DefaultValue(TOPIC)
.StoreResult(&Scenario.TopicName);
config.Opts->AddLongOption("no-consumer", "Read without consumer")
.Hidden()
.StoreTrue(&Scenario.ReadWithoutConsumer);

// Specific params
config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.")
Original file line number Diff line number Diff line change
@@ -463,7 +463,7 @@ namespace NKikimr::NPersQueueTests {
"topic.write.bytes",
"topic.write.messages",
"api.grpc.topic.stream_write.bytes",
"api.grpc.topic.stream_write.partition_throttled_milliseconds",
"topic.write.partition_throttled_milliseconds",
"topic.write.message_size_bytes",
"api.grpc.topic.stream_write.messages",
"topic.write.lag_milliseconds",
Loading