Skip to content

Commit

Permalink
Use runtime grpc event dispatching instead of legacy one for PQ events
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed Jan 25, 2024
1 parent 9d0a376 commit d12e0d5
Show file tree
Hide file tree
Showing 16 changed files with 269 additions and 242 deletions.
7 changes: 6 additions & 1 deletion ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,12 @@ class TGrpcRequestCall
void Pass(const IFacilityProvider& facility) override {
this->Span_.End();

PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility);
try {
PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility);
} catch (const std::exception& ex) {
this->RaiseIssue(NYql::TIssue{TStringBuilder() << "unexpected exception: " << ex.what()});
this->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR);
}
}

TRateLimiterMode GetRlMode() const override {
Expand Down
12 changes: 0 additions & 12 deletions ydb/core/grpc_services/grpc_request_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,20 +560,8 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) {
HFunc(TEvStreamTopicDirectReadRequest, PreHandle);
HFunc(TEvCommitOffsetRequest, PreHandle);
HFunc(TEvPQReadInfoRequest, PreHandle);
HFunc(TEvPQDropTopicRequest, PreHandle);
HFunc(TEvPQCreateTopicRequest, PreHandle);
HFunc(TEvPQAlterTopicRequest, PreHandle);
HFunc(TEvPQAddReadRuleRequest, PreHandle);
HFunc(TEvPQRemoveReadRuleRequest, PreHandle);
HFunc(TEvPQDescribeTopicRequest, PreHandle);
HFunc(TEvDiscoverPQClustersRequest, PreHandle);
HFunc(TEvCoordinationSessionRequest, PreHandle);
HFunc(TEvDropTopicRequest, PreHandle);
HFunc(TEvCreateTopicRequest, PreHandle);
HFunc(TEvAlterTopicRequest, PreHandle);
HFunc(TEvDescribeTopicRequest, PreHandle);
HFunc(TEvDescribeConsumerRequest, PreHandle);
HFunc(TEvDescribePartitionRequest, PreHandle);
HFunc(TEvNodeCheckRequest, PreHandle);
HFunc(TEvProxyRuntimeEvent, PreHandle);

Expand Down
12 changes: 0 additions & 12 deletions ydb/core/grpc_services/grpc_request_proxy_handle_methods.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,10 @@ class TGRpcRequestProxyHandleMethods {
static void Handle(TEvStreamTopicDirectReadRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvNodeCheckRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx);
};

}
Expand Down
16 changes: 0 additions & 16 deletions ydb/core/grpc_services/rpc_calls.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
#include <ydb/public/api/protos/ydb_discovery.pb.h>
#include <ydb/public/api/protos/ydb_monitoring.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/api/protos/ydb_persqueue_cluster_discovery.pb.h>
#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>
#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/public/api/protos/ydb_federation_discovery.pb.h>

#include <ydb/public/api/grpc/draft/dummy.pb.h>
Expand Down Expand Up @@ -69,21 +67,7 @@ using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvSt
using TEvStreamTopicDirectReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicDirectRead, Ydb::Topic::StreamDirectReadMessage::FromClient, Ydb::Topic::StreamDirectReadMessage::FromServer, TRateLimiterMode::RuManual>;
using TEvCommitOffsetRequest = TGRpcRequestWrapper<TRpcServices::EvTopicCommitOffset, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse, true>;
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>;
using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>;
using TEvPQAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAlterTopic, Ydb::PersQueue::V1::AlterTopicRequest, Ydb::PersQueue::V1::AlterTopicResponse, true>;
using TEvPQDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDescribeTopic, Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse, true>;
using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAddReadRule, Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse, true>;
using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>;

//TODO: Change this to runtime dispatching!
using TEvDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDropTopic, Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse, true, TRateLimiterMode::Rps>;
using TEvCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCreateTopic, Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse, true, TRateLimiterMode::Rps>;
using TEvAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTopic, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse, true, TRateLimiterMode::Rps>;
using TEvDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeTopic, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse, true, TRateLimiterMode::Rps>;
using TEvDescribeConsumerRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeConsumer, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse, true, TRateLimiterMode::Rps>;
using TEvDescribePartitionRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribePartition, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse, true, TRateLimiterMode::Rps>;

using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>;
using TEvListFederationDatabasesRequest = TGRpcRequestWrapper<TRpcServices::EvListFederationDatabases, Ydb::FederationDiscovery::ListFederationDatabasesRequest, Ydb::FederationDiscovery::ListFederationDatabasesResponse, true>;

Expand Down
24 changes: 24 additions & 0 deletions ydb/core/grpc_services/rpc_calls_topic.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>

#include "rpc_calls.h"

namespace NKikimr::NGRpcService {

using TEvDropTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse>;
using TEvCreateTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse>;
using TEvAlterTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse>;
using TEvDescribeTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse>;
using TEvDescribeConsumerRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse>;
using TEvDescribePartitionRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>;

using TEvPQDropTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse>;
using TEvPQCreateTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse>;
using TEvPQAlterTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::AlterTopicRequest, Ydb::PersQueue::V1::AlterTopicResponse>;
using TEvPQDescribeTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse>;
using TEvPQAddReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse>;
using TEvPQRemoveReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse>;

}
32 changes: 32 additions & 0 deletions ydb/core/grpc_services/service_topic.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <memory>

namespace NKikimr {

namespace NGRpcProxy::V1 {
class IClustersCfgProvider;
struct TClustersCfg;
}

namespace NGRpcService {

class IRequestOpCtx;
class IFacilityProvider;

void DoDropTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoCreateTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>);
void DoAlterTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoDescribeConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoDescribePartitionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);

void DoPQDropTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoPQCreateTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>);
void DoPQAlterTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>);
void DoPQDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoPQAddReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoPQRemoveReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);

}
}
2 changes: 1 addition & 1 deletion ydb/core/viewer/json_local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "json_pipe_req.h"

#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
#include <ydb/core/grpc_services/rpc_calls.h>
#include <ydb/core/grpc_services/rpc_calls_topic.h>
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>

Expand Down
3 changes: 1 addition & 2 deletions ydb/services/persqueue_v1/actors/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "partition_id.h"

#include <ydb/core/base/events.h>
#include <ydb/core/grpc_services/rpc_calls.h>
#include <ydb/core/grpc_services/rpc_calls_topic.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/percentile_counter.h>
Expand All @@ -15,7 +15,6 @@

#include <util/generic/guid.h>


namespace NKikimr::NGRpcProxy::V1 {

using namespace Ydb;
Expand Down
Loading

0 comments on commit d12e0d5

Please sign in to comment.