Skip to content

Commit

Permalink
Add grpc request proxy wilson span and trace id on the request contex…
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik authored Dec 20, 2023
1 parent d7d4cd2 commit 530fde1
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 8 deletions.
40 changes: 35 additions & 5 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/core/base/events.h>

#include <ydb/library/actors/wilson/wilson_span.h>

#include <util/stream/str.h>

namespace NKikimr {
Expand Down Expand Up @@ -361,6 +363,10 @@ class IRequestProxyCtx : public virtual IRequestCtxBase {
virtual void ReplyUnauthenticated(const TString& msg = "") = 0;
virtual void ReplyUnavaliable() = 0;

//tracing
virtual void StartTracing(NWilson::TSpan&& span) = 0;
virtual void LegacyFinishSpan() = 0;

// validation
virtual bool Validate(TString& error) = 0;

Expand Down Expand Up @@ -477,6 +483,9 @@ class TRefreshTokenImpl
return Token_;
}

void StartTracing(NWilson::TSpan&& /*span*/) override {}
void LegacyFinishSpan() override {}

void UpdateAuthState(NYdbGrpc::TAuthState::EAuthState state) override {
State_.State = state;
}
Expand Down Expand Up @@ -598,7 +607,7 @@ class TRefreshTokenImpl
return {};
}

TMaybe<TString> GetOpenTelemetryTraceParent() const override {
NWilson::TTraceId GetWilsonTraceId() const override {
return {};
}

Expand Down Expand Up @@ -821,8 +830,8 @@ class TGRpcRequestBiStreamWrapper
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
}

TMaybe<TString> GetOpenTelemetryTraceParent() const override {
return GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER);
NWilson::TTraceId GetWilsonTraceId() const override {
return Span_.GetTraceId();
}

const TMaybe<TString> GetSdkBuildInfo() const {
Expand Down Expand Up @@ -872,6 +881,16 @@ class TGRpcRequestBiStreamWrapper
Y_ABORT("unimplemented for TGRpcRequestBiStreamWrapper");
}

// IRequestProxyCtx
//
void StartTracing(NWilson::TSpan&& span) override {
Span_ = std::move(span);
}

void LegacyFinishSpan() override {
Span_.End();
}

// IRequestCtxBase
//
void AddAuditLogPart(const TStringBuf&, const TString&) override {
Expand All @@ -889,6 +908,7 @@ class TGRpcRequestBiStreamWrapper
TMaybe<NRpcService::TRlPath> RlPath_;
bool RlAllowed_;
IGRpcProxyCounters::TPtr Counters_;
NWilson::TSpan Span_;
};

template <typename TDerived>
Expand Down Expand Up @@ -1147,8 +1167,8 @@ class TGRpcRequestWrapperImpl
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
}

TMaybe<TString> GetOpenTelemetryTraceParent() const override {
return GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER);
NWilson::TTraceId GetWilsonTraceId() const override {
return Span_.GetTraceId();
}

const TMaybe<TString> GetSdkBuildInfo() const {
Expand Down Expand Up @@ -1277,6 +1297,12 @@ class TGRpcRequestWrapperImpl
return AuditLogParts;
}

void StartTracing(NWilson::TSpan&& span) override {
Span_ = std::move(span);
}

void LegacyFinishSpan() override {}

void ReplyGrpcError(grpc::StatusCode code, const TString& msg, const TString& details = "") {
Ctx_->ReplyError(code, msg, details);
}
Expand Down Expand Up @@ -1316,6 +1342,8 @@ class TGRpcRequestWrapperImpl
};
}

protected:
NWilson::TSpan Span_;
private:
TIntrusivePtr<NYdbGrpc::IRequestContextBase> Ctx_;
TIntrusiveConstPtr<NACLib::TUserToken> InternalToken_;
Expand Down Expand Up @@ -1393,6 +1421,8 @@ class TGrpcRequestCall
{ }

void Pass(const IFacilityProvider& facility) override {
this->Span_.End();

PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/grpc_services/base/iface.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <util/generic/fwd.h>
#include <ydb/library/actors/wilson/wilson_span.h>

namespace google::protobuf {
class Message;
Expand All @@ -21,7 +22,7 @@ using TAuditLogHook = std::function<void (ui32 status, const TAuditLogParts&)>;
class IRequestCtxBaseMtSafe {
public:
virtual TMaybe<TString> GetTraceId() const = 0;
virtual TMaybe<TString> GetOpenTelemetryTraceParent() const = 0;
virtual NWilson::TTraceId GetWilsonTraceId() const = 0;
// Returns client provided database name
virtual const TMaybe<TString> GetDatabaseName() const = 0;
// Returns "internal" token (result of ticket parser authentication)
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/grpc_request_check_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ class TGrpcRequestCheckActor

template <typename T>
void HandleAndDie(T& event) {
GrpcRequestBaseCtx_->LegacyFinishSpan();
TGRpcRequestProxyHandleMethods::Handle(event, TlsActivationContext->AsActorContext());
TBase::PassAway();
}
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/grpc_services/grpc_request_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ydb/core/grpc_services/counters/proxy_counters.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/scheme_board/scheme_board.h>
#include <ydb/library/wilson_ids/wilson.h>

#include <shared_mutex>

Expand Down Expand Up @@ -79,19 +80,23 @@ class TGRpcRequestProxyImpl
void HandleSchemeBoard(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev);
void ReplayEvents(const TString& databaseName, const TActorContext& ctx);

void StartTracing(IRequestProxyCtx& ctx);

static bool IsAuthStateOK(const IRequestProxyCtx& ctx);

template <typename TEvent>
void Handle(TAutoPtr<TEventHandle<TEvent>>& event, const TActorContext& ctx) {
IRequestProxyCtx* requestBaseCtx = event->Get();
if (ValidateAndReplyOnError(requestBaseCtx)) {
requestBaseCtx->LegacyFinishSpan();
TGRpcRequestProxyHandleMethods::Handle(event, ctx);
}
}

void Handle(TEvListEndpointsRequest::TPtr& event, const TActorContext& ctx) {
IRequestProxyCtx* requestBaseCtx = event->Get();
if (ValidateAndReplyOnError(requestBaseCtx)) {
requestBaseCtx->LegacyFinishSpan();
TGRpcRequestProxy::Handle(event, ctx);
}
}
Expand Down Expand Up @@ -145,6 +150,9 @@ class TGRpcRequestProxyImpl
return;
}


//StartTracing(*requestBaseCtx);

if (IsAuthStateOK(*requestBaseCtx)) {
Handle(event, ctx);
return;
Expand Down Expand Up @@ -401,6 +409,12 @@ bool TGRpcRequestProxyImpl::IsAuthStateOK(const IRequestProxyCtx& ctx) {
state.NeedAuth == false && !ctx.GetYdbToken();
}

void TGRpcRequestProxyImpl::StartTracing(IRequestProxyCtx& ctx) {
auto traceId = NWilson::TTraceId::NewTraceId(15, Max<ui32>());
NWilson::TSpan grpcRequestProxySpan(TWilsonGrpc::RequestProxy, std::move(traceId), "GrpcRequestProxy");
ctx.StartTracing(std::move(grpcRequestProxySpan));
}

void TGRpcRequestProxyImpl::HandleSchemeBoard(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx) {
TString databaseName = ev->Get()->Path;
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "SchemeBoardUpdate " << databaseName);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx {
return Nothing();
}

TMaybe<TString> GetOpenTelemetryTraceParent() const override {
return Nothing();
NWilson::TTraceId GetWilsonTraceId() const override {
return {};
}

TInstant GetDeadline() const override {
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/wilson_ids/wilson.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ namespace NKikimr {
};
};

struct TWilsonGrpc {
enum {
RequestProxy = 9,
};
};

} // NKikimr

0 comments on commit 530fde1

Please sign in to comment.