From 3d87f93048e200c9dca5cea44653a8c0c3e7d0db Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Fri, 5 Apr 2024 16:58:01 +0000 Subject: [PATCH] Fix stream finishing in case of grpc proxy emmited YDB error --- ydb/core/grpc_services/base/base.h | 20 ++++--------------- ydb/core/grpc_services/grpc_request_proxy.cpp | 6 +++--- .../grpc_request_proxy_simple.cpp | 2 +- .../grpc_services/local_grpc/local_grpc.h | 1 + ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 4 +--- .../grpc_request_context_wrapper.h | 1 + ydb/library/grpc/server/grpc_request.h | 4 ++++ ydb/library/grpc/server/grpc_request_base.h | 2 ++ 8 files changed, 17 insertions(+), 23 deletions(-) diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 66155cce6bdf..510a55061b1e 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -359,7 +359,6 @@ class IRequestProxyCtx friend class TGRpcRequestProxySimple; friend class TGRpcRequestProxyHandleMethods; private: - virtual void ReplyUnavaliable() = 0; virtual void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) = 0; public: virtual ~IRequestProxyCtx() = default; @@ -574,7 +573,7 @@ class TRefreshTokenImpl } void ReplyUnauthenticated(const TString&) override; - void ReplyUnavaliable() override; + void ReplyUnavaliable(); void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { switch (status) { case Ydb::StatusIds::UNAVAILABLE: @@ -719,13 +718,6 @@ class TGRpcRequestBiStreamWrapper , public TEventLocal, TRpcId> { private: - void ReplyUnavaliable() override { - Ctx_->Attach(TActorId()); - TResponse resp; - FillYdbStatus(resp, IssueManager_.GetIssues(), Ydb::StatusIds::UNAVAILABLE); - Ctx_->WriteAndFinish(std::move(resp), grpc::Status::OK); - } - void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { Ctx_->Attach(TActorId()); TResponse resp; @@ -1114,18 +1106,14 @@ class TGRpcRequestWrapperImpl Ctx_->UseDatabase(database); } - void ReplyUnavaliable() override { - TResponse* resp = CreateResponseMessage(); - TCommonResponseFiller::Fill(*resp, IssueManager.GetIssues(), CostInfo, Ydb::StatusIds::UNAVAILABLE); - FinishRequest(); - Reply(resp, Ydb::StatusIds::UNAVAILABLE); - } - void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { TResponse* resp = CreateResponseMessage(); TCommonResponseFiller::Fill(*resp, IssueManager.GetIssues(), CostInfo, status); FinishRequest(); Reply(resp, status); + if (Ctx_->IsStreamCall()) { + Ctx_->FinishStreamingOk(); + } } TString GetPeerName() const override { diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 6f5675c05950..18c99a16d953 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -169,7 +169,7 @@ class TGRpcRequestProxyImpl const TString error = "Unable to resolve token"; const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_AUTH_UNAVAILABLE, error); requestBaseCtx->RaiseIssue(issue); - requestBaseCtx->ReplyUnavaliable(); + requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE); requestBaseCtx->FinishSpan(); return; } @@ -213,7 +213,7 @@ class TGRpcRequestProxyImpl LOG_ERROR(ctx, NKikimrServices::GRPC_SERVER, "Limit for deferred events per database %s reached", databaseName.c_str()); const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_DB_NOT_READY, error); requestBaseCtx->RaiseIssue(issue); - requestBaseCtx->ReplyUnavaliable(); + requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE); requestBaseCtx->FinishSpan(); return; } @@ -295,7 +295,7 @@ class TGRpcRequestProxyImpl virtual void PassAway() override { for (auto& [_, queue] : DeferredEvents) { for (TEventReqHolder& req : queue) { - req.Ctx->ReplyUnavaliable(); + req.Ctx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE); req.Ctx->FinishSpan(); } } diff --git a/ydb/core/grpc_services/grpc_request_proxy_simple.cpp b/ydb/core/grpc_services/grpc_request_proxy_simple.cpp index b78fb7790861..3ad80fbbf542 100644 --- a/ydb/core/grpc_services/grpc_request_proxy_simple.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy_simple.cpp @@ -121,7 +121,7 @@ class TGRpcRequestProxySimple const TString error = "Unable to resolve token"; const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_AUTH_UNAVAILABLE, error); requestBaseCtx->RaiseIssue(issue); - requestBaseCtx->ReplyUnavaliable(); + requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE); return; } diff --git a/ydb/core/grpc_services/local_grpc/local_grpc.h b/ydb/core/grpc_services/local_grpc/local_grpc.h index 074c6fdbfc79..0e61dbf871db 100644 --- a/ydb/core/grpc_services/local_grpc/local_grpc.h +++ b/ydb/core/grpc_services/local_grpc/local_grpc.h @@ -76,6 +76,7 @@ class TContextBase : public NYdbGrpc::IRequestContextBase { TString GetPeer() const override { return {}; } bool SslServer() const override { return false; } bool IsClientLost() const override { return false; } + bool IsStreamCall() const override { return false; } public: NYql::TIssues GetIssues() { diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 8912fb1e8c17..fd3d8d31dfc8 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -261,8 +261,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { const TString query = "SELECT * FROM AS_TABLE(ListReplicate(AsStruct(\"12345678\" AS Key), 100000))"; -//TODO: it looks like this check triggers grpc request proxy request leak -/* { // Check range for chunk size settings auto settings = TExecuteQuerySettings().OutputChunkMaxSize(48_MB); @@ -271,7 +269,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto streamPart = it.ReadNext().GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::BAD_REQUEST, streamPart.GetIssues().ToString()); } -*/ + auto settings = TExecuteQuerySettings().OutputChunkMaxSize(10000); auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h index e1bcf4d5f0f4..e264582c3e98 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.h +++ b/ydb/core/public_http/grpc_request_context_wrapper.h @@ -48,6 +48,7 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase { virtual bool IsClientLost() const { return false; } virtual TString GetPeer() const { return {}; } virtual bool SslServer() const { return false; } + virtual bool IsStreamCall() const { return false; } }; } // namespace NKikimr::NPublicHttp diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h index c1f249f36db0..8a4774fcc2f6 100644 --- a/ydb/library/grpc/server/grpc_request.h +++ b/ydb/library/grpc/server/grpc_request.h @@ -118,6 +118,10 @@ class TGRpcRequestImpl return ClientLost_.load(); } + bool IsStreamCall() const override { + return bool(StreamAdaptor_); + } + TString GetPeer() const override { // Decode URL-encoded square brackets auto ip = TString(this->Context.peer()); diff --git a/ydb/library/grpc/server/grpc_request_base.h b/ydb/library/grpc/server/grpc_request_base.h index e03fb57c356b..2ad89993c850 100644 --- a/ydb/library/grpc/server/grpc_request_base.h +++ b/ydb/library/grpc/server/grpc_request_base.h @@ -112,6 +112,8 @@ class IRequestContextBase: public TThrRefBase { //! reply in flight virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0; + virtual bool IsStreamCall() const = 0; + //! Finish streaming reply virtual void FinishStreamingOk() = 0;