Skip to content

Commit

Permalink
Remove legacy method (ydb-platform#3501)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik authored Apr 5, 2024
1 parent 8594ffc commit d6ef09c
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 72 deletions.
19 changes: 0 additions & 19 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,6 @@ class IRequestOpCtx : public IRequestCtx {
// Legacy, do not use for modern code
virtual void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message) = 0;
// Legacy, do not use for modern code
virtual void SendResult(Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message) = 0;
};

class IRequestNoOpCtx : public IRequestCtx {
Expand Down Expand Up @@ -936,22 +933,6 @@ class TGrpcResponseSenderImpl : public IRequestOpCtx {
self->Reply(resp, operation.status());
}

void SendResult(Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message) override
{
auto self = Derived();
self->FinishRequest();
auto resp = self->CreateResponseMessage();
auto deferred = resp->mutable_operation();
deferred->set_ready(true);
deferred->set_status(status);
deferred->mutable_issues()->MergeFrom(message);
if (self->CostInfo) {
deferred->mutable_cost_info()->Swap(self->CostInfo);
}
self->Reply(resp, status);
}

void SendResult(const google::protobuf::Message& result,
Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message) override
Expand Down
14 changes: 0 additions & 14 deletions ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,6 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx {
CbWrapper(resp);
}

void SendResult(Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<NGRpcService::TYdbIssueMessageType>& message) override
{
TResp resp;
auto deferred = resp.mutable_operation();
deferred->set_ready(true);
deferred->set_status(status);
deferred->mutable_issues()->MergeFrom(message);
if (CostInfo) {
deferred->mutable_cost_info()->CopyFrom(*CostInfo);
}
CbWrapper(resp);
}

void SendOperation(const Ydb::Operations::Operation& operation) override {
TResp resp;
resp.mutable_operation()->CopyFrom(operation);
Expand Down
13 changes: 4 additions & 9 deletions ydb/core/grpc_services/rpc_deferrable.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ class TRpcOperationRequestActor : public TRpcRequestWithOperationParamsActor<TDe
void Reply(Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message, const TActorContext& ctx)
{
Request_->SendResult(status, message);
NYql::TIssues issues;
IssuesFromMessage(message, issues);
Request_->RaiseIssues(issues);
Request_->ReplyWithYdbStatus(status);
NWilson::EndSpanWithStatus(Span_, status);
this->Die(ctx);
}
Expand All @@ -226,14 +229,6 @@ class TRpcOperationRequestActor : public TRpcRequestWithOperationParamsActor<TDe
this->Die(ctx);
}

void ReplyWithResult(Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message, const TActorContext &ctx)
{
Request_->SendResult(status, message);
NWilson::EndSpanWithStatus(Span_, status);
this->Die(ctx);
}

template<typename TResult>
void ReplyWithResult(Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message,
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/grpc_services/rpc_discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ class TListEndpointsRPC : public TActorBootstrapped<TListEndpointsRPC> {
Discoverer = {};

auto issue = MakeIssue(ErrorToIssueCode(ev->Get()->Status), ev->Get()->Error);
google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issueMessages;
NYql::IssueToMessage(issue, issueMessages.Add());
Reply(ErrorToStatusCode(ev->Get()->Status), issueMessages);
Request->RaiseIssue(issue);
Reply(ErrorToStatusCode(ev->Get()->Status));
}

static NKikimrIssues::TIssuesIds::EIssueCode ErrorToIssueCode(TEvDiscovery::TEvError::EStatus status) {
Expand Down Expand Up @@ -159,9 +158,8 @@ class TListEndpointsRPC : public TActorBootstrapped<TListEndpointsRPC> {
PassAway();
}

template <typename... Args>
void Reply(Args&&... args) {
Request->SendResult(std::forward<Args>(args)...);
void Reply(Ydb::StatusIds::StatusCode status) {
Request->ReplyWithYdbStatus(status);
PassAway();
}
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_execute_scheme_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TExecuteSchemeQueryRPC : public TRpcKqpRequestActor<TExecuteSchemeQueryRPC
const auto& kqpResponse = record.GetResponse();
const auto& issueMessage = kqpResponse.GetQueryIssues();

ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, ctx);
Reply(Ydb::StatusIds::SUCCESS, issueMessage, ctx);
} else {
return OnGenericQueryResponseError(record, ctx);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/rpc_load_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T

void OnBeforePoison(const TActorContext&) override {
// Client is gone, but we need to "reply" anyway?
Request->SendResult(Ydb::StatusIds::CANCELLED, {});
Request->ReplyWithYdbStatus(Ydb::StatusIds::CANCELLED);
}

bool ReportCostInfoEnabled() const {
Expand Down Expand Up @@ -299,7 +299,7 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices

void OnBeforePoison(const TActorContext&) override {
// Client is gone, but we need to "reply" anyway?
Request->SendResult(Ydb::StatusIds::CANCELLED, {});
Request->ReplyWithYdbStatus(Ydb::StatusIds::CANCELLED);
}

bool ReportCostInfoEnabled() const {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/grpc_services/rpc_rollback_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ class TRollbackTransactionRPC : public TRpcKqpRequestActor<TRollbackTransactionR
void ReplyWithResult(StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message,
const TActorContext& ctx) {
Request_->SendResult(status, message);
NYql::TIssues issues;
IssuesFromMessage(message, issues);
Request_->RaiseIssues(issues);
Request_->ReplyWithYdbStatus(status);
Die(ctx);
}
};
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/rpc_scheme_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class TRpcSchemeRequestActor : public TRpcOperationRequestActor<TDerived, TReque

Ydb::StatusIds::StatusCode ydbStatus = NKikimr::YdbStatusFromProxyStatus(msg);
if (!NKikimr::IsTxProxyInProgress(ydbStatus)) {
return this->ReplyWithResult(ydbStatus, issueMessage, ctx);
return this->Reply(ydbStatus, issueMessage, ctx);
}

ui64 schemeShardTabletId = msg->Record.GetSchemeShardTabletId();
auto request = std::make_unique<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(msg->Record.GetTxId());
SetSchemeShardId(schemeShardTabletId);
Expand Down
8 changes: 0 additions & 8 deletions ydb/core/kafka_proxy/actors/control_plane_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,6 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
ProcessYdbStatusCode(status);
};

void SendResult(
Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {

Y_UNUSED(message);
ProcessYdbStatusCode(status);
};

const Ydb::Operations::OperationParams& operation_params() const {
return DummyParams;
}
Expand Down
8 changes: 0 additions & 8 deletions ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,6 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
ProcessYdbStatusCode(status);
};

void SendResult(
Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {

Y_UNUSED(message);
ProcessYdbStatusCode(status);
};

const Ydb::Operations::OperationParams& operation_params() const {
return DummyParams;
}
Expand Down
10 changes: 8 additions & 2 deletions ydb/services/persqueue_v1/grpc_pq_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ void TPQReadService::Handle(NGRpcService::TEvCommitOffsetRequest::TPtr& ev, cons
if (HaveClusters && (Clusters.empty() || LocalCluster.empty())) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, "new commit offset request failed - cluster is not known yet");

ev->Get()->SendResult(ConvertPersQueueInternalCodeToStatus(PersQueue::ErrorCode::INITIALIZING), FillInfoResponse("cluster initializing", PersQueue::ErrorCode::INITIALIZING)); //CANCELLED
NYql::TIssues issues;
IssuesFromMessage(FillInfoResponse("cluster initializing", PersQueue::ErrorCode::INITIALIZING), issues); //CANCELLED
ev->Get()->RaiseIssues(issues);
ev->Get()->ReplyWithYdbStatus(ConvertPersQueueInternalCodeToStatus(PersQueue::ErrorCode::INITIALIZING));
return;
} else {
ctx.Register(new TCommitOffsetActor(ev->Release().Release(), *TopicsHandler, SchemeCache, NewSchemeCache, Counters));
Expand All @@ -189,7 +192,10 @@ void TPQReadService::Handle(NGRpcService::TEvPQReadInfoRequest::TPtr& ev, const
if (HaveClusters && (Clusters.empty() || LocalCluster.empty())) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, "new read info request failed - cluster is not known yet");

ev->Get()->SendResult(ConvertPersQueueInternalCodeToStatus(PersQueue::ErrorCode::INITIALIZING), FillInfoResponse("cluster initializing", PersQueue::ErrorCode::INITIALIZING)); //CANCELLED
NYql::TIssues issues;
IssuesFromMessage(FillInfoResponse("cluster initializing", PersQueue::ErrorCode::INITIALIZING), issues); //CANCELLED
ev->Get()->RaiseIssues(issues);
ev->Get()->ReplyWithYdbStatus(ConvertPersQueueInternalCodeToStatus(PersQueue::ErrorCode::INITIALIZING));
return;
} else {
//ctx.Register(new TReadInfoActor(ev->Release().Release(), Clusters, LocalCluster, SchemeCache, NewSchemeCache, Counters));
Expand Down

0 comments on commit d6ef09c

Please sign in to comment.