Skip to content

Commit

Permalink
persqueue: add user-agent counters (#10603)
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq authored Oct 23, 2024
1 parent 78f6425 commit eebeb70
Show file tree
Hide file tree
Showing 14 changed files with 475 additions and 10 deletions.
1 change: 1 addition & 0 deletions ydb/core/base/counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ static const THashSet<TString> DATABASE_SERVICES
TString("pqproxy|readSession"),
TString("pqproxy|schemecache"),
TString("pqproxy|mirrorWriteTimeLag"),
TString("pqproxy|userAgents"),
TString("datastreams"),
}};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "data_plane_helpers.h"
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

namespace NKikimr::NPersQueueTests {

Expand Down Expand Up @@ -51,7 +53,8 @@ namespace NKikimr::NPersQueueTests {
std::optional<ui32> partitionGroup,
std::optional<TString> codec,
std::optional<bool> reconnectOnFailure,
THashMap<TString, TString> sessionMeta
THashMap<TString, TString> sessionMeta,
const TString& userAgent
) {
auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId);
if (partitionGroup) settings.PartitionGroupId(*partitionGroup);
Expand All @@ -66,6 +69,9 @@ namespace NKikimr::NPersQueueTests {
}
settings.MaxMemoryUsage(1024*1024*1024*1024ll);
settings.Meta_.Fields = sessionMeta;
if (!userAgent.empty()) {
settings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}});
}
return CreateSimpleWriter(driver, settings);
}

Expand All @@ -79,6 +85,21 @@ namespace NKikimr::NPersQueueTests {
return TPersQueueClient(driver, clientSettings).CreateReadSession(TReadSessionSettings(settings).DisableClusterDiscovery(true));
}

std::shared_ptr<NYdb::NTopic::IReadSession> CreateReader(
NYdb::TDriver& driver,
const NYdb::NTopic::TReadSessionSettings& settings,
std::shared_ptr<NYdb::ICredentialsProviderFactory> creds,
const TString& userAgent
) {
NYdb::NTopic::TTopicClientSettings clientSettings;
if (creds) clientSettings.CredentialsProviderFactory(creds);
auto readerSettings = settings;
if (!userAgent.empty()) {
readerSettings.Header({{NYdb::YDB_APPLICATION_NAME, userAgent}});
}
return NYdb::NTopic::TTopicClient(driver, clientSettings).CreateReadSession(readerSettings);
}

TMaybe<TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<IReadSession>& reader, TDuration timeout) {
while (true) {
auto future = reader->WaitEvent();
Expand All @@ -99,4 +120,25 @@ namespace NKikimr::NPersQueueTests {
}
return {};
}

TMaybe<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<NYdb::NTopic::IReadSession>& reader, TDuration timeout) {
while (true) {
auto future = reader->WaitEvent();
future.Wait(timeout);

TMaybe<NYdb::NTopic::TReadSessionEvent::TEvent> event = reader->GetEvent(false, 1);
if (!event)
return {};
if (auto e = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
return *e;
} else if (auto* e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
e->Confirm();
} else if (auto* e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
e->Confirm();
} else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
return {};
}
}
return {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

namespace NKikimr::NPersQueueTests {

Expand Down Expand Up @@ -34,16 +35,24 @@ namespace NKikimr::NPersQueueTests {
std::optional<ui32> partitionGroup = {},
std::optional<TString> codec = {},
std::optional<bool> reconnectOnFailure = {},
THashMap<TString, TString> sessionMeta = {}
THashMap<TString, TString> sessionMeta = {},
const TString& userAgent = {}
);

std::shared_ptr<NYdb::NPersQueue::IReadSession> CreateReader(
NYdb::TDriver& driver,
const NYdb::NPersQueue::TReadSessionSettings& settings,
std::shared_ptr<NYdb::ICredentialsProviderFactory> creds = nullptr
);

std::shared_ptr<NYdb::NTopic::IReadSession> CreateReader(
NYdb::TDriver& driver,
const NYdb::NTopic::TReadSessionSettings& settings,
std::shared_ptr<NYdb::ICredentialsProviderFactory> creds = nullptr,
const TString& userAgent = ""
);

TMaybe<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<NYdb::NPersQueue::IReadSession>& reader, TDuration timeout = TDuration::Max());
TMaybe<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment(std::shared_ptr<NYdb::NTopic::IReadSession>& reader, TDuration timeout = TDuration::Max());

}
5 changes: 5 additions & 0 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor

void GenerateNextWriteRequest(const NActors::TActorContext& ctx);

void SetupBytesWrittenByUserAgentCounter();
void SetupCounters();
void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath,
bool isServerless, const TString& folderId);
Expand Down Expand Up @@ -570,6 +571,7 @@ class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor
TInstant StartTime;
NKikimr::NPQ::TPercentileCounter InitLatency;
NKikimr::NPQ::TMultiCounter SLIBigLatency;
NYdb::NPersQueue::TCounterPtr BytesWrittenByUserAgent;

THolder<NPersQueue::TTopicNamesConverterFactory> ConverterFactory;
NPersQueue::TDiscoveryConverterPtr DiscoveryConverter;
Expand Down Expand Up @@ -699,6 +701,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
void SendAuthRequest(const TActorContext& ctx);
void CreateInitAndAuthActor(const TActorContext& ctx);

void SetupBytesReadByUserAgentCounter();
void SetupCounters();
void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic);
void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, const TString& cloudId, const TString& dbId,
Expand Down Expand Up @@ -746,6 +749,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
TString Session;
TString PeerName;
TString Database;
TString UserAgent;

bool ClientsideLocksAllowed;
bool BalanceRightNow;
Expand Down Expand Up @@ -929,6 +933,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
NKikimr::NPQ::TPercentileCounter InitLatency;
NKikimr::NPQ::TPercentileCounter CommitLatency;
NKikimr::NPQ::TMultiCounter SLIBigLatency;
NYdb::NPersQueue::TCounterPtr BytesReadByUserAgent;

NKikimr::NPQ::TPercentileCounter ReadLatency;
NKikimr::NPQ::TPercentileCounter ReadLatencyFromDisk;
Expand Down
16 changes: 16 additions & 0 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <ydb/library/actors/core/log.h>
#include <ydb/library/actors/interconnect/interconnect.h>
#include <ydb/services/persqueue_v1/actors/helpers.h>

#include <library/cpp/protobuf/util/repeated_field_utils.h>

#include <util/string/strip.h>
Expand Down Expand Up @@ -655,6 +657,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo
Session = session;
ProtocolVersion = init.GetProtocolVersion();
CommitsDisabled = init.GetCommitsDisabled();
UserAgent = init.GetVersion();

if (ProtocolVersion >= NPersQueue::TReadRequest::ReadParamsInInit) {
ReadSettingsInited = true;
Expand Down Expand Up @@ -835,6 +838,14 @@ void TReadSessionActor::RegisterSessions(const TActorContext& ctx) {
}
}

void TReadSessionActor::SetupBytesReadByUserAgentCounter() {
BytesReadByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents")
->GetSubgroup("host", "")
->GetSubgroup("protocol", "pqv0")
->GetSubgroup("consumer", ClientPath)
->GetSubgroup("user_agent", V1::CleanupCounterValueString(UserAgent))
->GetExpiringNamedCounter("sensor", "BytesReadByUserAgent", true);
}

void TReadSessionActor::SetupCounters()
{
Expand Down Expand Up @@ -864,6 +875,8 @@ void TReadSessionActor::SetupCounters()
if (ProtocolVersion < NPersQueue::TReadRequest::Batching) {
++(*SessionsWithOldBatchingVersion);
}

SetupBytesReadByUserAgentCounter();
}


Expand Down Expand Up @@ -1525,6 +1538,9 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo

Y_ABORT_UNLESS(formedResponse->RequestsInfly == 0);
i64 diff = formedResponse->Response.ByteSize();

BytesReadByUserAgent->Add(diff);

const bool hasMessages = HasMessages(formedResponse->Response.GetBatchedData());
if (hasMessages) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign read id " << ReadIdToResponse << " to read request " << formedResponse->Guid);
Expand Down
17 changes: 17 additions & 0 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <ydb/library/persqueue/topic_parser/counters.h>
#include <ydb/services/lib/sharding/sharding.h>
#include <ydb/services/persqueue_v1/actors/helpers.h>

#include <ydb/library/actors/core/log.h>
#include <util/string/hex.h>
Expand Down Expand Up @@ -256,6 +257,16 @@ void TWriteSessionActor::InitAfterDiscovery(const TActorContext& ctx) {
}


void TWriteSessionActor::SetupBytesWrittenByUserAgentCounter() {
BytesWrittenByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents")
->GetSubgroup("host", "")
->GetSubgroup("protocol", "pqv0")
->GetSubgroup("topic", FullConverter->GetFederationPath())
->GetSubgroup("user_agent", V1::CleanupCounterValueString(UserAgent))
->GetExpiringNamedCounter("sensor", "BytesWrittenByUserAgent", true);
}


void TWriteSessionActor::SetupCounters()
{
if (SessionsCreated) {
Expand Down Expand Up @@ -286,6 +297,8 @@ void TWriteSessionActor::SetupCounters()

SessionsCreated.Inc();
SessionsActive.Inc();

SetupBytesWrittenByUserAgentCounter();
}


Expand All @@ -307,6 +320,8 @@ void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& db

SessionsCreated.Inc();
SessionsActive.Inc();

SetupBytesWrittenByUserAgentCounter();
}


Expand Down Expand Up @@ -851,6 +866,8 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWrite::TPtr& ev, const TActorCont
BytesInflight.Inc(diff);
BytesInflightTotal.Inc(diff);

BytesWrittenByUserAgent->Add(diff);

if (BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended
Y_ABORT_UNLESS(NextRequestInited);
Handler->ReadyForNextRead();
Expand Down
28 changes: 28 additions & 0 deletions ydb/services/persqueue_v1/actors/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,32 @@ bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data) {
return false;
}


TString CleanupCounterValueString(const TString& value) {
// Internal Monitoring system requires metrics values to no longer than 200 characters
// and prohibits some ASCII characters.

TString clean;
constexpr auto valueLenghtLimit = 200;

for (auto c : value) {
switch (c) {
case '|':
case '*':
case '?':
case '"':
case '\'':
case '`':
case '\\':
continue;
default:
clean.push_back(c);
if (clean.size() == valueLenghtLimit) {
break;
}
}
}
return clean;
}

}
2 changes: 2 additions & 0 deletions ydb/services/persqueue_v1/actors/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataB

bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data);

TString CleanupCounterValueString(const TString& value);

}
27 changes: 27 additions & 0 deletions ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(
, AutoPartitioningSupport(false)
{
Y_ASSERT(Request);

if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_APPLICATION_NAME); !values.empty()) {
UserAgent = values[0];
}
if (auto values = Request->GetStreamCtx()->GetPeerMetaValues(NYdb::YDB_SDK_BUILD_INFO_HEADER); !values.empty()) {
SdkBuildInfo = values[0];
}
}

template <bool UseMigrationProtocol>
Expand Down Expand Up @@ -884,6 +891,18 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
}
}

template<bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::SetupBytesReadByUserAgentCounter() {
static constexpr auto protocol = UseMigrationProtocol ? "pqv1" : "topic";
BytesReadByUserAgent = GetServiceCounters(Counters, "pqproxy|userAgents")
->GetSubgroup("host", "")
->GetSubgroup("protocol", protocol)
->GetSubgroup("consumer", ClientPath)
->GetSubgroup("sdk_build_info", CleanupCounterValueString(SdkBuildInfo))
->GetSubgroup("user_agent", CleanupCounterValueString(UserAgent))
->GetExpiringNamedCounter("sensor", "BytesReadByUserAgent", true);
}

template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::SetupCounters() {
if (SessionsCreated) {
Expand Down Expand Up @@ -913,6 +932,8 @@ void TReadSessionActor<UseMigrationProtocol>::SetupCounters() {
++(*SessionsCreated);
++(*SessionsActive);
PartsPerSession.IncFor(Partitions.size(), 1); // for 0

SetupBytesReadByUserAgentCounter();
}

template <bool UseMigrationProtocol>
Expand All @@ -937,6 +958,8 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu
topicCounters.CommitLatency = CommitLatency;
topicCounters.SLIBigLatency = SLIBigLatency;
topicCounters.SLITotal = SLITotal;

SetupBytesReadByUserAgentCounter();
}

template <bool UseMigrationProtocol>
Expand All @@ -960,6 +983,8 @@ void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueu
topicCounters.CommitLatency = CommitLatency;
topicCounters.SLIBigLatency = SLIBigLatency;
topicCounters.SLITotal = SLITotal;

SetupBytesReadByUserAgentCounter();
}

template <bool UseMigrationProtocol>
Expand Down Expand Up @@ -1956,6 +1981,8 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(typename TFormedRead
formedResponse->Response.mutable_read_response()->set_bytes_size(sizeEstimation);
}

BytesReadByUserAgent->Add(sizeEstimation);

if (formedResponse->IsDirectRead) {
auto it = Partitions.find(formedResponse->AssignId);
if (it == Partitions.end()) {
Expand Down
6 changes: 6 additions & 0 deletions ydb/services/persqueue_v1/actors/read_session_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class TReadSessionActor
void CloseSession(PersQueue::ErrorCode::ErrorCode code, const TString& reason, const TActorContext& ctx);
void SendLockPartitionToSelf(ui32 partitionId, TString topicName, TTopicHolder topic, const TActorContext& ctx);

void SetupBytesReadByUserAgentCounter();
void SetupCounters();
void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic);
void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic,
Expand All @@ -343,6 +344,9 @@ class TReadSessionActor
const TString ClientDC;
const TInstant StartTimestamp;

TString SdkBuildInfo;
TString UserAgent = UseMigrationProtocol ? "pqv1 server" : "topic server";

TActorId SchemeCache;
TActorId NewSchemeCache;

Expand Down Expand Up @@ -425,6 +429,8 @@ class TReadSessionActor
::NMonitoring::TDynamicCounters::TCounterPtr Errors;
::NMonitoring::TDynamicCounters::TCounterPtr PipeReconnects;
::NMonitoring::TDynamicCounters::TCounterPtr BytesInflight;
::NMonitoring::TDynamicCounters::TCounterPtr BytesReadByUserAgent;

ui64 BytesInflight_;
ui64 RequestedBytes;
ui32 ReadsInfly;
Expand Down
Loading

0 comments on commit eebeb70

Please sign in to comment.