Skip to content

Commit

Permalink
Merge 95999a9 into 213bbe7
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Jan 24, 2024
2 parents 213bbe7 + 95999a9 commit 5b97537
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 63 deletions.
2 changes: 1 addition & 1 deletion ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {

if (hasQueryService) {
server.AddService(new NGRpcService::TGRpcYdbQueryService(ActorSystem.Get(), Counters,
grpcRequestProxies[0], hasDataStreams.IsRlAllowed()));
grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
}

if (hasLogStore) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ namespace Tests {
GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxies[0]));
GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1));
if (Settings->EnableYq) {
GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0]));
GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0]));
Expand Down
106 changes: 45 additions & 61 deletions ydb/services/ydb/ydb_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,60 @@

namespace NKikimr::NGRpcService {

TGRpcYdbQueryService::TGRpcYdbQueryService(NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
const NActors::TActorId& proxyId,
bool rlAllowed,
size_t handlersPerCompletionQueue)
: TGrpcServiceBase(system, counters, proxyId, rlAllowed)
, HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
{
}

TGRpcYdbQueryService::TGRpcYdbQueryService(NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
const TVector<NActors::TActorId>& proxies,
bool rlAllowed,
size_t handlersPerCompletionQueue)
: TGrpcServiceBase(system, counters, proxies, rlAllowed)
, HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
{
}

void TGRpcYdbQueryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
using namespace Ydb::Query;
using namespace NQuery;

auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
size_t proxyCounter = 0;

#ifdef ADD_REQUEST
#error ADD_REQUEST macro already defined
#endif
#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
MakeIntrusive<TGRpcRequest<IN, OUT, TGRpcYdbQueryService>>(this, &Service_, CQ_, \
[this](NYdbGrpc::IRequestContextBase* ctx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ACTION; \
}, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("query", #NAME))->Run();

ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<ExecuteQueryRequest, ExecuteQueryResponsePart>
(ctx, &DoExecuteQuery, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable}));
})

ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<ExecuteScriptRequest, Ydb::Operations::Operation>
(ctx, &DoExecuteScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable}));
})

ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<FetchScriptResultsRequest, FetchScriptResultsResponse>
(ctx, &DoFetchScriptResults, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})

ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<CreateSessionRequest, CreateSessionResponse>
(ctx, &DoCreateSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})

ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<DeleteSessionRequest, DeleteSessionResponse>
(ctx, &DoDeleteSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})

ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<AttachSessionRequest, SessionState>
(ctx, &DoAttachSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})

ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<BeginTransactionRequest, BeginTransactionResponse>
(ctx, &DoBeginTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})

ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<CommitTransactionRequest, CommitTransactionResponse>
(ctx, &DoCommitTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})

ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<RollbackTransactionRequest, RollbackTransactionResponse>
(ctx, &DoRollbackTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})
#define ADD_REQUEST(NAME, IN, OUT, CB, ...) \
for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
for (auto* cq: CQS) { \
MakeIntrusive<TGRpcRequest<IN, OUT, TGRpcYdbQueryService>>(this, &Service_, cq, \
[this, proxyCounter](NYdbGrpc::IRequestContextBase* ctx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \
new TGrpcRequestNoOperationCall<IN, OUT> \
(ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr __VA_OPT__(, TAuditMode::__VA_ARGS__)})); \
}, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("query", #NAME))->Run(); \
++proxyCounter; \
} \
}

ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, DoExecuteQuery, Auditable);
ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, DoExecuteScript, Auditable);
ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, DoFetchScriptResults);
ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, DoCreateSession);
ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, DoDeleteSession);
ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, DoAttachSession);
ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, DoBeginTransaction);
ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, DoCommitTransaction);
ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, DoRollbackTransaction);

#undef ADD_REQUEST
}
Expand Down
17 changes: 17 additions & 0 deletions ydb/services/ydb/ydb_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,25 @@ class TGRpcYdbQueryService
public:
using TGrpcServiceBase<Ydb::Query::V1::QueryService>::TGrpcServiceBase;

TGRpcYdbQueryService(
NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
const NActors::TActorId& proxyId,
bool rlAllowed,
size_t handlersPerCompletionQueue = 1);

TGRpcYdbQueryService(
NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
const TVector<NActors::TActorId>& proxies,
bool rlAllowed,
size_t handlersPerCompletionQueue);

private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);

private:
const size_t HandlersPerCompletionQueue;
};

} // namespace NKikimr::NGRpcService

0 comments on commit 5b97537

Please sign in to comment.