Skip to content

Commit

Permalink
Merge 223c39d into 00dcc23
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Feb 12, 2024
2 parents 00dcc23 + 223c39d commit ecf65aa
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 18 deletions.
6 changes: 3 additions & 3 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
auto& userInfo = userInfoPair.second;
if (!userInfo.LabeledCounters)
continue;
if (!userInfo.HasReadRule && !userInfo.Important)
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
auto* cac = ac->AddConsumerAggregatedCounters();
cac->SetConsumer(userInfo.User);
Expand Down Expand Up @@ -1083,7 +1083,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
auto& userInfo = userInfoPair.second;
if (!userInfo.LabeledCounters)
continue;
if (!userInfo.HasReadRule && !userInfo.Important)
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
bool haveChanges = false;
userInfo.EndOffset = EndOffset;
Expand Down Expand Up @@ -1805,7 +1805,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
} else {
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
}
} else {
} else if (user != CLIENTID_WITHOUT_CONSUMER) {
auto ui = UsersInfoStorage->GetIfExists(user);
if (ui && ui->LabeledCounters) {
ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"});
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
Expand All @@ -899,7 +899,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
subgroups.pop_back();
}

subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"});
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "read_balancer.h"

#include <ydb/core/persqueue/events/internal.h>
#include "ydb/core/persqueue/user_info.h"
#include <ydb/core/protos/counters_pq.pb.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/tablet/tablet_exception.h>
Expand Down Expand Up @@ -65,6 +66,7 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
for (const auto& rr : Self->TabletConfig.GetReadRules()) {
Self->Consumers[rr];
}
Self->Consumers[CLIENTID_WITHOUT_CONSUMER];
}
Self->Inited = true;
if (!dataRowset.Next())
Expand Down Expand Up @@ -536,7 +538,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
Consumers[rr];
}
}

Consumers[CLIENTID_WITHOUT_CONSUMER];

TVector<std::pair<ui32, TPartInfo>> newPartitions;
TVector<ui32> deletedPartitions;
TVector<std::pair<ui64, TTabletInfo>> newTablets;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/resources/counters_datastreams.html
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
bin=60000: 0
bin=999999: 0

name=api.grpc.topic.stream_write.partition_throttled_milliseconds:
name=topic.write.partition_throttled_milliseconds:
bin=0: 30
bin=1: 0
bin=10: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
.UseTopicCommit = OnlyTableInTx,
.UseTableSelect = UseTableSelect && !OnlyTopicInTx,
.UseTableUpsert = !OnlyTopicInTx,
.ReadWithoutConsumer = ReadWithoutConsumer,
.CommitPeriod = CommitPeriod,
.CommitMessages = CommitMessages
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class TTopicOperationsScenario {
bool OnlyTopicInTx = false;
bool OnlyTableInTx = false;
bool UseTableSelect = true;
bool ReadWithoutConsumer = false;

protected:
void CreateTopic(const TString& database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,33 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(params.Driver);
std::optional<TTransactionSupport> txSupport;

auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver);
auto consumers = describeTopicResult.GetConsumers();
NYdb::NTopic::TReadSessionSettings settings;

if (!params.ReadWithoutConsumer) {
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
auto consumers = describeTopicResult.GetConsumers();

if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
{
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
exit(EXIT_FAILURE);
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
{
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
exit(EXIT_FAILURE);
}
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
} else {
NYdb::NTopic::TTopicReadSettings topic = params.TopicName;
auto partitions = describeTopicResult.GetPartitions();
for(auto partition: partitions) {
topic.AppendPartitionIds(partition.GetPartitionId());
}
settings.WithoutConsumer().AppendTopics(topic);
}


if (params.UseTransactions) {
txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName);
}

NYdb::NTopic::TReadSessionSettings settings;
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);

auto readSession = topicClient->CreateReadSession(settings);
WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, "Reader session was created.");

Expand Down Expand Up @@ -93,7 +103,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
<< " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
}

if (!txSupport || params.UseTopicCommit) {
if (!params.ReadWithoutConsumer && (!txSupport || params.UseTopicCommit)) {
dataEvent->Commit();
}
} else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace NYdb {
bool UseTopicCommit = false;
bool UseTableSelect = true;
bool UseTableUpsert = true;
bool ReadWithoutConsumer = false;
size_t CommitPeriod = 15;
size_t CommitMessages = 1'000'000;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ void TCommandWorkloadTopicRunRead::Config(TConfig& config)
config.Opts->AddLongOption("topic", "Topic name.")
.DefaultValue(TOPIC)
.StoreResult(&Scenario.TopicName);
config.Opts->AddLongOption("no-consumer", "Read without consumer")
.Hidden()
.StoreTrue(&Scenario.ReadWithoutConsumer);

// Specific params
config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.")
Expand Down
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ namespace NKikimr::NPersQueueTests {
"topic.write.bytes",
"topic.write.messages",
"api.grpc.topic.stream_write.bytes",
"api.grpc.topic.stream_write.partition_throttled_milliseconds",
"topic.write.partition_throttled_milliseconds",
"topic.write.message_size_bytes",
"api.grpc.topic.stream_write.messages",
"topic.write.lag_milliseconds",
Expand Down

0 comments on commit ecf65aa

Please sign in to comment.