From eebeb709048ef40d05a7a1afb701ae6aff6ad670 Mon Sep 17 00:00:00 2001 From: qyryq Date: Wed, 23 Oct 2024 18:01:02 +0300 Subject: [PATCH] persqueue: add user-agent counters (#10603) --- ydb/core/base/counters.cpp | 1 + .../ut/ut_utils/data_plane_helpers.cpp | 44 +++- .../ut/ut_utils/data_plane_helpers.h | 11 +- .../deprecated/persqueue_v0/grpc_pq_actor.h | 5 + .../persqueue_v0/grpc_pq_read_actor.cpp | 16 ++ .../persqueue_v0/grpc_pq_write_actor.cpp | 17 ++ ydb/services/persqueue_v1/actors/helpers.cpp | 28 ++ ydb/services/persqueue_v1/actors/helpers.h | 2 + .../actors/read_session_actor.cpp | 27 ++ .../persqueue_v1/actors/read_session_actor.h | 6 + .../actors/write_session_actor.cpp | 28 +- .../persqueue_v1/actors/write_session_actor.h | 9 +- .../persqueue_new_schemecache_ut.cpp | 240 +++++++++++++++++- ydb/services/persqueue_v1/persqueue_ut.cpp | 51 +++- 14 files changed, 475 insertions(+), 10 deletions(-) diff --git a/ydb/core/base/counters.cpp b/ydb/core/base/counters.cpp index ddb144838703..915810b80074 100644 --- a/ydb/core/base/counters.cpp +++ b/ydb/core/base/counters.cpp @@ -40,6 +40,7 @@ static const THashSet DATABASE_SERVICES TString("pqproxy|readSession"), TString("pqproxy|schemecache"), TString("pqproxy|mirrorWriteTimeLag"), + TString("pqproxy|userAgents"), TString("datastreams"), }}; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/data_plane_helpers.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/data_plane_helpers.cpp index 0ef9b8750f36..75fd7ae041b3 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/data_plane_helpers.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/data_plane_helpers.cpp @@ -1,4 +1,6 @@ #include "data_plane_helpers.h" +#include +#include namespace NKikimr::NPersQueueTests { @@ -51,7 +53,8 @@ namespace NKikimr::NPersQueueTests { std::optional partitionGroup, std::optional codec, std::optional reconnectOnFailure, - THashMap sessionMeta + THashMap sessionMeta, + const TString& userAgent ) { auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId); if (partitionGroup) settings.PartitionGroupId(*partitionGroup); @@ -66,6 +69,9 @@ namespace NKikimr::NPersQueueTests { } settings.MaxMemoryUsage(1024*1024*1024*1024ll); settings.Meta_.Fields = sessionMeta; + if (!userAgent.empty()) { + settings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}}); + } return CreateSimpleWriter(driver, settings); } @@ -79,6 +85,21 @@ namespace NKikimr::NPersQueueTests { return TPersQueueClient(driver, clientSettings).CreateReadSession(TReadSessionSettings(settings).DisableClusterDiscovery(true)); } + std::shared_ptr CreateReader( + NYdb::TDriver& driver, + const NYdb::NTopic::TReadSessionSettings& settings, + std::shared_ptr creds, + const TString& userAgent + ) { + NYdb::NTopic::TTopicClientSettings clientSettings; + if (creds) clientSettings.CredentialsProviderFactory(creds); + auto readerSettings = settings; + if (!userAgent.empty()) { + readerSettings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}}); + } + return NYdb::NTopic::TTopicClient(driver, clientSettings).CreateReadSession(readerSettings); + } + TMaybe GetNextMessageSkipAssignment(std::shared_ptr& reader, TDuration timeout) { while (true) { auto future = reader->WaitEvent(); @@ -99,4 +120,25 @@ namespace NKikimr::NPersQueueTests { } return {}; } + + TMaybe GetNextMessageSkipAssignment(std::shared_ptr& reader, TDuration timeout) { + while (true) { + auto future = reader->WaitEvent(); + future.Wait(timeout); + + TMaybe event = reader->GetEvent(false, 1); + if (!event) + return {}; + if (auto e = std::get_if(&*event)) { + return *e; + } else if (auto* e = std::get_if(&*event)) { + e->Confirm(); + } else if (auto* e = std::get_if(&*event)) { + e->Confirm(); + } else if (std::get_if(&*event)) { + return {}; + } + } + return {}; + } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/data_plane_helpers.h b/ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/data_plane_helpers.h index 90996a89f991..3b39a0d94946 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/data_plane_helpers.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/data_plane_helpers.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace NKikimr::NPersQueueTests { @@ -34,16 +35,24 @@ namespace NKikimr::NPersQueueTests { std::optional partitionGroup = {}, std::optional codec = {}, std::optional reconnectOnFailure = {}, - THashMap sessionMeta = {} + THashMap sessionMeta = {}, + const TString& userAgent = {} ); std::shared_ptr CreateReader( NYdb::TDriver& driver, const NYdb::NPersQueue::TReadSessionSettings& settings, std::shared_ptr creds = nullptr + ); + std::shared_ptr CreateReader( + NYdb::TDriver& driver, + const NYdb::NTopic::TReadSessionSettings& settings, + std::shared_ptr creds = nullptr, + const TString& userAgent = "" ); TMaybe GetNextMessageSkipAssignment(std::shared_ptr& reader, TDuration timeout = TDuration::Max()); + TMaybe GetNextMessageSkipAssignment(std::shared_ptr& reader, TDuration timeout = TDuration::Max()); } diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h index 6fef53f8ad4e..150dd11b52b8 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h @@ -465,6 +465,7 @@ class TWriteSessionActor : public NActors::TActorBootstrapped ConverterFactory; NPersQueue::TDiscoveryConverterPtr DiscoveryConverter; @@ -699,6 +701,7 @@ class TReadSessionActor : public TActorBootstrapped { void SendAuthRequest(const TActorContext& ctx); void CreateInitAndAuthActor(const TActorContext& ctx); + void SetupBytesReadByUserAgentCounter(); void SetupCounters(); void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic); void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, const TString& cloudId, const TString& dbId, @@ -746,6 +749,7 @@ class TReadSessionActor : public TActorBootstrapped { TString Session; TString PeerName; TString Database; + TString UserAgent; bool ClientsideLocksAllowed; bool BalanceRightNow; @@ -929,6 +933,7 @@ class TReadSessionActor : public TActorBootstrapped { NKikimr::NPQ::TPercentileCounter InitLatency; NKikimr::NPQ::TPercentileCounter CommitLatency; NKikimr::NPQ::TMultiCounter SLIBigLatency; + NYdb::NPersQueue::TCounterPtr BytesReadByUserAgent; NKikimr::NPQ::TPercentileCounter ReadLatency; NKikimr::NPQ::TPercentileCounter ReadLatencyFromDisk; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp index 7c75c017b97a..a09abd643fcd 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp @@ -14,6 +14,8 @@ #include #include +#include + #include #include @@ -655,6 +657,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo Session = session; ProtocolVersion = init.GetProtocolVersion(); CommitsDisabled = init.GetCommitsDisabled(); + UserAgent = init.GetVersion(); if (ProtocolVersion >= NPersQueue::TReadRequest::ReadParamsInInit) { ReadSettingsInited = true; @@ -835,6 +838,14 @@ void TReadSessionActor::RegisterSessions(const TActorContext& ctx) { } } +void TReadSessionActor::SetupBytesReadByUserAgentCounter() { + BytesReadByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents") + ->GetSubgroup("host", "") + ->GetSubgroup("protocol", "pqv0") + ->GetSubgroup("consumer", ClientPath) + ->GetSubgroup("user_agent", V1::CleanupCounterValueString(UserAgent)) + ->GetExpiringNamedCounter("sensor", "BytesReadByUserAgent", true); +} void TReadSessionActor::SetupCounters() { @@ -864,6 +875,8 @@ void TReadSessionActor::SetupCounters() if (ProtocolVersion < NPersQueue::TReadRequest::Batching) { ++(*SessionsWithOldBatchingVersion); } + + SetupBytesReadByUserAgentCounter(); } @@ -1525,6 +1538,9 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo Y_ABORT_UNLESS(formedResponse->RequestsInfly == 0); i64 diff = formedResponse->Response.ByteSize(); + + BytesReadByUserAgent->Add(diff); + const bool hasMessages = HasMessages(formedResponse->Response.GetBatchedData()); if (hasMessages) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign read id " << ReadIdToResponse << " to read request " << formedResponse->Guid); diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp index 9f24ba293679..6e040bb8efa0 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -256,6 +257,16 @@ void TWriteSessionActor::InitAfterDiscovery(const TActorContext& ctx) { } +void TWriteSessionActor::SetupBytesWrittenByUserAgentCounter() { + BytesWrittenByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents") + ->GetSubgroup("host", "") + ->GetSubgroup("protocol", "pqv0") + ->GetSubgroup("topic", FullConverter->GetFederationPath()) + ->GetSubgroup("user_agent", V1::CleanupCounterValueString(UserAgent)) + ->GetExpiringNamedCounter("sensor", "BytesWrittenByUserAgent", true); +} + + void TWriteSessionActor::SetupCounters() { if (SessionsCreated) { @@ -286,6 +297,8 @@ void TWriteSessionActor::SetupCounters() SessionsCreated.Inc(); SessionsActive.Inc(); + + SetupBytesWrittenByUserAgentCounter(); } @@ -307,6 +320,8 @@ void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& db SessionsCreated.Inc(); SessionsActive.Inc(); + + SetupBytesWrittenByUserAgentCounter(); } @@ -851,6 +866,8 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWrite::TPtr& ev, const TActorCont BytesInflight.Inc(diff); BytesInflightTotal.Inc(diff); + BytesWrittenByUserAgent->Add(diff); + if (BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended Y_ABORT_UNLESS(NextRequestInited); Handler->ReadyForNextRead(); diff --git a/ydb/services/persqueue_v1/actors/helpers.cpp b/ydb/services/persqueue_v1/actors/helpers.cpp index 99b0febae676..dccd2f9e717a 100644 --- a/ydb/services/persqueue_v1/actors/helpers.cpp +++ b/ydb/services/persqueue_v1/actors/helpers.cpp @@ -27,4 +27,32 @@ bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data) { return false; } + +TString CleanupCounterValueString(const TString& value) { + // Internal Monitoring system requires metrics values to no longer than 200 characters + // and prohibits some ASCII characters. + + TString clean; + constexpr auto valueLenghtLimit = 200; + + for (auto c : value) { + switch (c) { + case '|': + case '*': + case '?': + case '"': + case '\'': + case '`': + case '\\': + continue; + default: + clean.push_back(c); + if (clean.size() == valueLenghtLimit) { + break; + } + } + } + return clean; +} + } diff --git a/ydb/services/persqueue_v1/actors/helpers.h b/ydb/services/persqueue_v1/actors/helpers.h index 5fca2a34d467..a2e017fc8a25 100644 --- a/ydb/services/persqueue_v1/actors/helpers.h +++ b/ydb/services/persqueue_v1/actors/helpers.h @@ -17,4 +17,6 @@ bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataB bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data); +TString CleanupCounterValueString(const TString& value); + } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 141d330650f9..2b82c93b8d8d 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -62,6 +62,13 @@ TReadSessionActor::TReadSessionActor( , AutoPartitioningSupport(false) { Y_ASSERT(Request); + + if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_APPLICATION_NAME); !values.empty()) { + UserAgent = values[0]; + } + if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_SDK_BUILD_INFO_HEADER); !values.empty()) { + SdkBuildInfo = values[0]; + } } template @@ -884,6 +891,18 @@ void TReadSessionActor::Handle(typename TEvReadInit::TPtr& } } +template +void TReadSessionActor::SetupBytesReadByUserAgentCounter() { + static constexpr auto protocol = UseMigrationProtocol ? "pqv1" : "topic"; + BytesReadByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents") + ->GetSubgroup("host", "") + ->GetSubgroup("protocol", protocol) + ->GetSubgroup("consumer", ClientPath) + ->GetSubgroup("sdk_build_info", CleanupCounterValueString(SdkBuildInfo)) + ->GetSubgroup("user_agent", CleanupCounterValueString(UserAgent)) + ->GetExpiringNamedCounter("sensor", "BytesReadByUserAgent", true); +} + template void TReadSessionActor::SetupCounters() { if (SessionsCreated) { @@ -913,6 +932,8 @@ void TReadSessionActor::SetupCounters() { ++(*SessionsCreated); ++(*SessionsActive); PartsPerSession.IncFor(Partitions.size(), 1); // for 0 + + SetupBytesReadByUserAgentCounter(); } template @@ -937,6 +958,8 @@ void TReadSessionActor::SetupTopicCounters(const NPersQueu topicCounters.CommitLatency = CommitLatency; topicCounters.SLIBigLatency = SLIBigLatency; topicCounters.SLITotal = SLITotal; + + SetupBytesReadByUserAgentCounter(); } template @@ -960,6 +983,8 @@ void TReadSessionActor::SetupTopicCounters(const NPersQueu topicCounters.CommitLatency = CommitLatency; topicCounters.SLIBigLatency = SLIBigLatency; topicCounters.SLITotal = SLITotal; + + SetupBytesReadByUserAgentCounter(); } template @@ -1956,6 +1981,8 @@ void TReadSessionActor::ProcessAnswer(typename TFormedRead formedResponse->Response.mutable_read_response()->set_bytes_size(sizeEstimation); } + BytesReadByUserAgent->Add(sizeEstimation); + if (formedResponse->IsDirectRead) { auto it = Partitions.find(formedResponse->AssignId); if (it == Partitions.end()) { diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h index 76e195c11eeb..fd6d22d1a86e 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.h +++ b/ydb/services/persqueue_v1/actors/read_session_actor.h @@ -321,6 +321,7 @@ class TReadSessionActor void CloseSession(PersQueue::ErrorCode::ErrorCode code, const TString& reason, const TActorContext& ctx); void SendLockPartitionToSelf(ui32 partitionId, TString topicName, TTopicHolder topic, const TActorContext& ctx); + void SetupBytesReadByUserAgentCounter(); void SetupCounters(); void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic); void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, @@ -343,6 +344,9 @@ class TReadSessionActor const TString ClientDC; const TInstant StartTimestamp; + TString SdkBuildInfo; + TString UserAgent = UseMigrationProtocol ? "pqv1 server" : "topic server"; + TActorId SchemeCache; TActorId NewSchemeCache; @@ -425,6 +429,8 @@ class TReadSessionActor ::NMonitoring::TDynamicCounters::TCounterPtr Errors; ::NMonitoring::TDynamicCounters::TCounterPtr PipeReconnects; ::NMonitoring::TDynamicCounters::TCounterPtr BytesInflight; + ::NMonitoring::TDynamicCounters::TCounterPtr BytesReadByUserAgent; + ui64 BytesInflight_; ui64 RequestedBytes; ui32 ReadsInfly; diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index 5231ee7628e9..ba7a717d1b5b 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -201,6 +201,13 @@ TWriteSessionActor::TWriteSessionActor( , LastSourceIdUpdate(TInstant::Zero()) { Y_ASSERT(Request); + + if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_APPLICATION_NAME); !values.empty()) { + UserAgent = values[0]; + } + if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_SDK_BUILD_INFO_HEADER); !values.empty()) { + SdkBuildInfo = values[0]; + } } template @@ -485,7 +492,18 @@ void TWriteSessionActor::InitAfterDiscovery(const TActorCo SLITotal = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsTotal"}, true, "sensor", false); SLIErrors = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"RequestsError"}, true, "sensor", false); SLITotal.Inc(); +} +template +void TWriteSessionActor::SetupBytesWrittenByUserAgentCounter(const TString& topicPath) { + static constexpr auto protocol = UseMigrationProtocol ? "pqv1" : "topic"; + BytesWrittenByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents") + ->GetSubgroup("host", "") + ->GetSubgroup("protocol", protocol) + ->GetSubgroup("topic", topicPath) + ->GetSubgroup("sdk_build_info", CleanupCounterValueString(SdkBuildInfo)) + ->GetSubgroup("user_agent", CleanupCounterValueString(UserAgent)) + ->GetExpiringNamedCounter("sensor", "BytesWrittenByUserAgent", true); } template @@ -517,10 +535,12 @@ void TWriteSessionActor::SetupCounters() } SessionsCreated.Inc(); SessionsActive.Inc(); + + SetupBytesWrittenByUserAgentCounter(FullConverter->GetFederationPath()); } template -void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId) +void TWriteSessionActor::SetupCounters(const TActorContext& ctx, const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId) { if (SessionsCreated) { return; @@ -536,6 +556,8 @@ void TWriteSessionActor::SetupCounters(const TString& clou SessionsCreated.Inc(); SessionsActive.Inc(); + + SetupBytesWrittenByUserAgentCounter(NPersQueue::GetFullTopicPath(ctx, dbPath, FullConverter->GetPrimaryPath())); } template @@ -590,7 +612,7 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse: if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { const auto& tabletConfig = Config.GetPQTabletConfig(); - SetupCounters(tabletConfig.GetYcCloudId(), tabletConfig.GetYdbDatabaseId(), + SetupCounters(ctx, tabletConfig.GetYcCloudId(), tabletConfig.GetYdbDatabaseId(), tabletConfig.GetYdbDatabasePath(), entry.DomainInfo->IsServerless(), tabletConfig.GetYcFolderId()); } else { @@ -1246,6 +1268,8 @@ void TWriteSessionActor::SendWriteRequest(typename TWriteR ctx.Send(PartitionWriterCache, std::move(event)); + BytesWrittenByUserAgent->Add(request->ByteSize); + SentRequests.push_back(std::move(request)); } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 8654beea4bff..aef94f37da55 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -61,8 +61,8 @@ class TWriteSessionActor // Codec ID size in bytes static constexpr ui32 CODEC_ID_SIZE = 1; - //TODO: get user agent from headers - static constexpr auto UserAgent = UseMigrationProtocol ? "pqv1 server" : "topic server"; + TString UserAgent = UseMigrationProtocol ? "pqv1 server" : "topic server"; + TString SdkBuildInfo; static constexpr auto ProtoName = UseMigrationProtocol ? "v1" : "topic"; public: @@ -162,8 +162,9 @@ class TWriteSessionActor void PrepareRequest(THolder&& ev, const TActorContext& ctx); void SendWriteRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx); + void SetupBytesWrittenByUserAgentCounter(const TString& topicPath); void SetupCounters(); - void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId); + void SetupCounters(const TActorContext& ctx, const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId); private: void CreatePartitionWriterCache(const TActorContext& ctx); @@ -235,6 +236,8 @@ class TWriteSessionActor NKikimr::NPQ::TMultiCounter Errors; std::vector CodecCounters; + NYdb::NPersQueue::TCounterPtr BytesWrittenByUserAgent; + TIntrusiveConstPtr Token; TString Auth; // Got 'update_token_request', authentication or authorization in progress, diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index 9398c47c5a43..c6459123ee15 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -1,4 +1,5 @@ #include "actors/read_session_actor.h" +#include "actors/helpers.h" #include #include #include @@ -405,6 +406,29 @@ namespace NKikimr::NPersQueueTests { UNIT_ASSERT(equal); }; + auto checkUserAgentCounters = + [cloudId, folderId, databaseId] + (auto monPort, const TString& sensor, const TString& protocol, const TString& userAgent, const TString& topic, const TString& consumer) { + auto counters = SendQuery(monPort, "/counters/counters=pqproxy/subsystem=userAgents/json"); + const auto sensors = counters["sensors"].GetArray(); + for (const auto& s : sensors) { + const auto& labels = s["labels"]; + if (labels["sensor"].GetString() != sensor) { + continue; + } + UNIT_ASSERT_VALUES_EQUAL(labels["host"].GetString(), ""); + UNIT_ASSERT_VALUES_EQUAL(labels["protocol"].GetString(), protocol); + if (!topic.empty()) { + UNIT_ASSERT_VALUES_EQUAL(labels["topic"].GetString(), topic); + } else if (!consumer.empty()) { + UNIT_ASSERT_VALUES_EQUAL(labels["consumer"].GetString(), consumer); + } else { + UNIT_FAIL("Neither topic nor consumer were provided"); + } + UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), NGRpcProxy::V1::CleanupCounterValueString(userAgent)); + } + }; + { NYdb::NScheme::TSchemeClient schemeClient(*ydbDriver); NYdb::NScheme::TPermissions permissions("user@builtin", {"ydb.generic.read", "ydb.generic.write"}); @@ -415,13 +439,15 @@ namespace NKikimr::NPersQueueTests { UNIT_ASSERT(result.IsSuccess()); } + static constexpr auto userAgent = "test-client/v0.1 ' ?*'\"`| "; + { auto newDriverCfg = driverCfg; newDriverCfg.SetAuthToken("user@builtin"); ydbDriver = MakeHolder(newDriverCfg); - auto writer = CreateSimpleWriter(*ydbDriver, fullTopicName, "123", 1); + auto writer = CreateSimpleWriter(*ydbDriver, fullTopicName, "123", 1, {}, {}, {}, userAgent); for (int i = 0; i < 4; ++i) { bool res = writer->Write(TString(10, 'a')); UNIT_ASSERT(res); @@ -429,6 +455,7 @@ namespace NKikimr::NPersQueueTests { NYdb::NPersQueue::TReadSessionSettings settings; settings.ConsumerName(consumerName).AppendTopics(topicName); + settings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}}); auto reader = CreateReader(*ydbDriver, settings); auto msg = GetNextMessageSkipAssignment(reader); @@ -495,6 +522,217 @@ namespace NKikimr::NPersQueueTests { }, topicName, consumerName, "", "" ); + + checkUserAgentCounters(monPort, "BytesWrittenByUserAgent", "pqv1", userAgent, fullTopicName, ""); + checkUserAgentCounters(monPort, "BytesReadByUserAgent", "pqv1", userAgent, "", consumerName); + } + }; + + testWriteStat1stClass("user1"); + testWriteStat1stClass("some@random@consumer"); + } + + Y_UNIT_TEST(TestWriteStat1stClassTopicAPI) { + auto testWriteStat1stClass = [](const TString& consumerName) { + TTestServer server(false); + server.ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true); + server.StartServer(); + server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::TX_PROXY_SCHEME_CACHE}); + + const TString topicName{"account2/topic2"}; + const TString fullTopicName{"/Root/account2/topic2"}; + const TString folderId{"somefolder"}; + const TString cloudId{"somecloud"}; + const TString databaseId{"root"}; + UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, + server.AnnoyingClient->AlterUserAttributes("/", "Root", + {{"folder_id", folderId}, + {"cloud_id", cloudId}, + {"database_id", databaseId}})); + + server.AnnoyingClient->SetNoConfigMode(); + server.AnnoyingClient->FullInit(); + server.AnnoyingClient->InitUserRegistry(); + server.AnnoyingClient->MkDir("/Root", "account2"); + server.AnnoyingClient->CreateTopicNoLegacy(fullTopicName, 5); + + const auto monPort = TPortManager().GetPort(); + auto Counters = server.CleverServer->GetGRpcServerRootCounters(); + NActors::TSyncHttpMon Monitoring({ + .Port = monPort, + .Address = "localhost", + .Threads = 3, + .Title = "root", + .Host = "localhost", + }); + Monitoring.RegisterCountersPage("counters", "Counters", Counters); + Monitoring.Start(); + + auto driverCfg = NYdb::TDriverConfig() + .SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort) + .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)) + .SetDatabase("/Root"); + + auto ydbDriver = MakeHolder(driverCfg); + auto client = MakeHolder(*ydbDriver); + + { + auto res = client->AlterTopic(fullTopicName, + NYdb::NTopic::TAlterTopicSettings() + .BeginAddConsumer(consumerName) + .EndAddConsumer() + ); + res.Wait(); + UNIT_ASSERT(res.GetValue().IsSuccess()); + } + + auto checkCounters = + [cloudId, folderId, databaseId](auto monPort, + const std::set& canonicalSensorNames, + const TString& stream, const TString& consumer, + const TString& host, const TString& shard) { + auto counters = GetCounters1stClass(monPort, "datastreams", "%2FRoot", cloudId, + databaseId, folderId, stream, consumer, host, + shard); + const auto sensors = counters["sensors"].GetArray(); + std::set sensorNames; + std::transform(sensors.begin(), sensors.end(), + std::inserter(sensorNames, sensorNames.begin()), + [](auto& el) { + return el["labels"]["name"].GetString(); + }); + auto equal = sensorNames == canonicalSensorNames; + UNIT_ASSERT(equal); + }; + + auto checkUserAgentCounters = + [cloudId, folderId, databaseId] + (auto monPort, const TString& sensor, const TString& protocol, const TString& userAgent, const TString& topic, const TString& consumer) { + auto counters = SendQuery(monPort, "/counters/counters=pqproxy/subsystem=userAgents/json"); + const auto sensors = counters["sensors"].GetArray(); + for (const auto& s : sensors) { + const auto& labels = s["labels"]; + if (labels["sensor"].GetString() != sensor) { + continue; + } + UNIT_ASSERT_VALUES_EQUAL(labels["host"].GetString(), ""); + UNIT_ASSERT_VALUES_EQUAL(labels["protocol"].GetString(), protocol); + if (!topic.empty()) { + UNIT_ASSERT_VALUES_EQUAL(labels["topic"].GetString(), topic); + } else if (!consumer.empty()) { + UNIT_ASSERT_VALUES_EQUAL(labels["consumer"].GetString(), consumer); + } else { + UNIT_FAIL("Neither topic nor consumer were provided"); + } + UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), NGRpcProxy::V1::CleanupCounterValueString(userAgent)); + } + }; + + { + NYdb::NScheme::TSchemeClient schemeClient(*ydbDriver); + NYdb::NScheme::TPermissions permissions("user@builtin", {"ydb.generic.read", "ydb.generic.write"}); + + auto result = schemeClient.ModifyPermissions("/Root", + NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions)).ExtractValueSync(); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT(result.IsSuccess()); + } + + static constexpr auto userAgent = "test-client/v0.1 ' ?*'\"`| "; + + { + auto newDriverCfg = driverCfg; + newDriverCfg.SetAuthToken("user@builtin"); + + ydbDriver = MakeHolder(newDriverCfg); + + auto topicClient = NYdb::NTopic::TTopicClient(*ydbDriver); + auto writer = topicClient.CreateSimpleBlockingWriteSession(NYdb::NTopic::TWriteSessionSettings() + .Path(fullTopicName) + .MessageGroupId("123") + .ProducerId("123") + .Codec(NYdb::NPersQueue::ECodec::RAW) + .DirectWriteToPartition(false) + .Header({{NYdb::YDB_APPLICATION_NAME, userAgent}}) + ); + + for (int i = 0; i < 4; ++i) { + bool res = writer->Write(TString(10, 'a')); + UNIT_ASSERT(res); + } + + NYdb::NTopic::TReadSessionSettings settings; + settings.ConsumerName(consumerName).AppendTopics(topicName); + + auto reader = CreateReader(*ydbDriver, settings, nullptr, userAgent); + + auto msg = GetNextMessageSkipAssignment(reader); + UNIT_ASSERT(msg); + + checkCounters(monPort, + { + "api.grpc.topic.stream_read.commits", + "api.grpc.topic.stream_read.partition_session.errors", + "api.grpc.topic.stream_read.partition_session.started", + "api.grpc.topic.stream_read.partition_session.stopped", + "api.grpc.topic.stream_read.partition_session.count", + "api.grpc.topic.stream_read.partition_session.starting_count", + "api.grpc.topic.stream_read.partition_session.stopping_count", + "api.grpc.topic.stream_write.errors", + "api.grpc.topic.stream_write.sessions_active_count", + "api.grpc.topic.stream_write.sessions_created", + }, + topicName, "", "", "" + ); + + checkCounters(monPort, + { + "api.grpc.topic.stream_read.commits", + "api.grpc.topic.stream_read.partition_session.errors", + "api.grpc.topic.stream_read.partition_session.started", + "api.grpc.topic.stream_read.partition_session.stopped", + "api.grpc.topic.stream_read.partition_session.count", + "api.grpc.topic.stream_read.partition_session.starting_count", + "api.grpc.topic.stream_read.partition_session.stopping_count", + + }, + topicName, consumerName, "", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + { + "topic.read.lag_milliseconds", + "topic.write.bytes", + "topic.write.messages", + "topic.write.discarded_bytes", + "topic.write.discarded_messages", + "api.grpc.topic.stream_write.bytes", + "topic.write.partition_throttled_milliseconds", + "topic.write.message_size_bytes", + "api.grpc.topic.stream_write.messages", + "topic.write.lag_milliseconds", + "topic.write.uncompressed_bytes", + "api.grpc.topic.stream_read.bytes", + "api.grpc.topic.stream_read.messages", + "topic.read.bytes", + "topic.read.messages", + }, + topicName, "", "", "" + ); + + checkCounters(server.CleverServer->GetRuntime()->GetMonPort(), + { + "topic.read.lag_milliseconds", + "api.grpc.topic.stream_read.bytes", + "api.grpc.topic.stream_read.messages", + "topic.read.bytes", + "topic.read.messages", + }, + topicName, consumerName, "", "" + ); + + checkUserAgentCounters(monPort, "BytesWrittenByUserAgent", "topic", userAgent, fullTopicName, ""); + checkUserAgentCounters(monPort, "BytesReadByUserAgent", "topic", userAgent, "", consumerName); } }; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 2b6a3b517db6..41c561d4a8d6 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -1,4 +1,5 @@ #include "actors/read_session_actor.h" +#include "actors/helpers.h" #include #include #include @@ -43,6 +44,7 @@ #include #include #include +#include #include @@ -3698,6 +3700,34 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; UNIT_ASSERT(equal); }; + auto checkUserAgentCounters = []( + auto monPort, + const TString& sensor, + const TString& protocol, + const TString& userAgent, + const TString& topic, + const TString& consumer + ) { + auto counters = SendQuery(monPort, "/counters/counters=pqproxy/subsystem=userAgents/json"); + const auto sensors = counters["sensors"].GetArray(); + for (const auto& s : sensors) { + const auto& labels = s["labels"]; + if (labels["sensor"].GetString() != sensor) { + continue; + } + UNIT_ASSERT_VALUES_EQUAL(labels["host"].GetString(), ""); + UNIT_ASSERT_VALUES_EQUAL(labels["protocol"].GetString(), protocol); + if (!topic.empty()) { + UNIT_ASSERT_VALUES_EQUAL(labels["topic"].GetString(), topic); + } else if (!consumer.empty()) { + UNIT_ASSERT_VALUES_EQUAL(labels["consumer"].GetString(), consumer); + } else { + UNIT_FAIL("Neither topic nor consumer were provided"); + } + UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), NGRpcProxy::V1::CleanupCounterValueString(userAgent)); + } + }; + auto settings = PQSettings(0, 1, "10"); settings.PQConfig.MutableQuotingConfig()->SetEnableQuoting(true); @@ -3729,7 +3759,18 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; auto driver = server.AnnoyingClient->GetDriver(); - auto writer = CreateWriter(*driver, "account/topic1", "base64:AAAAaaaa____----12", 0, "raw"); + static constexpr auto userAgent = "test-client/v0.1 ' ?*'\"`| "; + + auto writer = CreateWriter( + *driver, + NYdb::NPersQueue::TWriteSessionSettings() + .Path("account/topic1") + .MessageGroupId("base64:AAAAaaaa____----12") + .PartitionGroupId(0) + .Codec(NYdb::NPersQueue::ECodec::RAW) + .Header({{NYdb::YDB_APPLICATION_NAME, userAgent}}), + nullptr + ); auto msg = writer->GetEvent(true); UNIT_ASSERT(msg); // ReadyToAcceptEvent @@ -3785,10 +3826,13 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; "", "cluster", "", "" ); + checkUserAgentCounters(monPort, "BytesWrittenByUserAgent", "pqv1", userAgent, "account/topic1", ""); + { NYdb::NPersQueue::TReadSessionSettings settings; settings.ConsumerName(originallyProvidedConsumerName) - .AppendTopics(TString("account/topic1")).ReadOriginal({"dc1"}); + .AppendTopics(TString("account/topic1")).ReadOriginal({"dc1"}) + .Header({{NYdb::YDB_APPLICATION_NAME, userAgent}}); auto reader = CreateReader(*driver, settings); @@ -3868,6 +3912,9 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; }, "", "Dc1", consumerName, consumerPath ); + + checkUserAgentCounters(server.CleverServer->GetRuntime()->GetMonPort(), + "BytesReadByUserAgent", "pqv1", userAgent, "", consumerPath); } };