From 02f3fa6d5c758cd5a2addd97a9c51276676c55a6 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 30 Jan 2024 16:31:47 +0100 Subject: [PATCH] Finsh grpc stream without extra empty message for query service. (#1415) https://github.com/ydb-platform/ydb/issues/1314 --- ydb/core/grpc_services/base/base.h | 8 +++---- ydb/core/grpc_services/local_rpc/local_rpc.h | 2 +- .../grpc_services/query/rpc_execute_query.cpp | 24 ++++++++++++------- .../actors/kafka_create_partitions_actor.cpp | 2 +- .../actors/kafka_create_topics_actor.cpp | 3 ++- .../grpc_request_context_wrapper.cpp | 3 ++- .../grpc_request_context_wrapper.h | 2 +- ydb/library/grpc/server/grpc_request.h | 21 +++++++++++----- ydb/library/grpc/server/grpc_request_base.h | 9 ++++++- 9 files changed, 49 insertions(+), 25 deletions(-) diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 344dbac4f85f..ccf75a8c5091 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -407,8 +407,8 @@ class IRequestCtx , public virtual IRequestCtxBase { friend class TProtoResponseHelper; - public: + using EStreamCtrl = NYdbGrpc::IRequestContextBase::EStreamCtrl; virtual google::protobuf::Message* GetRequestMut() = 0; virtual void SetRuHeader(ui64 ru) = 0; @@ -418,7 +418,7 @@ class IRequestCtx virtual void SetStreamingNotify(NYdbGrpc::IRequestContextBase::TOnNextReply&& cb) = 0; virtual void FinishStream(ui32 status) = 0; - virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) = 0; + virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl flag = EStreamCtrl::CONT) = 0; virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0; @@ -1193,7 +1193,7 @@ class TGRpcRequestWrapperImpl return GetPeerMetaValues(NYdb::YDB_REQUEST_TYPE_HEADER); } - void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override { + void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, IRequestCtx::EStreamCtrl flag = IRequestCtx::EStreamCtrl::CONT) override { // res->data() pointer is used inside grpc code. // So this object should be destroyed during grpc_slice destroying routine auto res = new TString; @@ -1208,7 +1208,7 @@ class TGRpcRequestWrapperImpl (void*)(res->data()), res->size(), freeResult, res); grpc::Slice sl = grpc::Slice(slice, grpc::Slice::STEAL_REF); auto data = grpc::ByteBuffer(&sl, 1); - Ctx_->Reply(&data, status); + Ctx_->Reply(&data, status, flag); } void SetCostInfo(float consumed_units) override { diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index 85a98ddd1593..362ae2be87b3 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -183,7 +183,7 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx { Y_ABORT("Unimplemented for local rpc"); } - virtual void SendSerializedResult(TString&&, Ydb::StatusIds::StatusCode) override { + virtual void SendSerializedResult(TString&&, Ydb::StatusIds::StatusCode, EStreamCtrl) override { Y_ABORT("Unimplemented for local rpc"); } diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 88c3b86e5685..738c5ea52a41 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -378,10 +378,10 @@ class TExecuteQueryRPC : public TActorBootstrapped { void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) { auto& record = ev->Get()->Record.GetRef(); - NYql::TIssues issues; const auto& issueMessage = record.GetResponse().GetQueryIssues(); - NYql::IssuesFromMessage(issueMessage, issues); + bool hasTrailingMessage = false; + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { Request_->SetRuHeader(record.GetConsumedRu()); @@ -391,8 +391,6 @@ class TExecuteQueryRPC : public TActorBootstrapped { AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response); - bool hasTrailingMessage = false; - if (kqpResponse.HasTxMeta()) { hasTrailingMessage = true; response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id()); @@ -408,13 +406,20 @@ class TExecuteQueryRPC : public TActorBootstrapped { if (hasTrailingMessage) { response.set_status(Ydb::StatusIds::SUCCESS); + response.mutable_issues()->CopyFrom(issueMessage); TString out; Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); - Request_->SendSerializedResult(std::move(out), record.GetYdbStatus()); + const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH; + Request_->SendSerializedResult(std::move(out), record.GetYdbStatus(), finishStreamFlag); + this->PassAway(); } } - ReplyFinishStream(record.GetYdbStatus(), issues); + if (!hasTrailingMessage) { + NYql::TIssues issues; + NYql::IssuesFromMessage(issueMessage, issues); + ReplyFinishStream(record.GetYdbStatus(), issueMessage); + } } private: @@ -459,10 +464,11 @@ class TExecuteQueryRPC : public TActorBootstrapped { response.set_status(status); response.mutable_issues()->CopyFrom(message); Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); - Request_->SendSerializedResult(std::move(out), status); + const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH; + Request_->SendSerializedResult(std::move(out), status, finishStreamFlag); + } else { + Request_->FinishStream(status); } - - Request_->FinishStream(status); this->PassAway(); } diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp index 5bdd06dce583..f864f7d06c05 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp @@ -153,7 +153,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt Y_UNUSED(status); }; - void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override { + void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl) override { Y_UNUSED(in); Y_UNUSED(status); }; diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp index f70d6ec20d25..09df52046547 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp @@ -151,9 +151,10 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx { Y_UNUSED(status); }; - void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override { + void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl flag) override { Y_UNUSED(in); Y_UNUSED(status); + Y_UNUSED(flag); }; void Reply(NProtoBuf::Message* resp, ui32 status = 0) override { diff --git a/ydb/core/public_http/grpc_request_context_wrapper.cpp b/ydb/core/public_http/grpc_request_context_wrapper.cpp index f8ee6543eed4..cd9d157c2e54 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.cpp +++ b/ydb/core/public_http/grpc_request_context_wrapper.cpp @@ -34,9 +34,10 @@ namespace NKikimr::NPublicHttp { ReplySender(RequestContext, JsonSettings, resp, status); } - void TGrpcRequestContextWrapper::Reply(grpc::ByteBuffer* resp, ui32 status) { + void TGrpcRequestContextWrapper::Reply(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) { Y_UNUSED(resp); Y_UNUSED(status); + Y_UNUSED(ctrl); Y_ABORT_UNLESS(false, "TGrpcRequestContextWrapper::Reply"); } diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h index e287f63d58b4..e1bcf4d5f0f4 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.h +++ b/ydb/core/public_http/grpc_request_context_wrapper.h @@ -27,7 +27,7 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase { virtual NProtoBuf::Message* GetRequestMut(); virtual NYdbGrpc::TAuthState& GetAuthState(); virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0); - virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0); + virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT); virtual void ReplyUnauthenticated(const TString& in); virtual void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details); virtual TInstant Deadline() const; diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h index cced4a51e07a..c1f249f36db0 100644 --- a/ydb/library/grpc/server/grpc_request.h +++ b/ydb/library/grpc/server/grpc_request.h @@ -204,8 +204,8 @@ class TGRpcRequestImpl WriteDataOk(resp, status); } - void Reply(grpc::ByteBuffer* resp, ui32 status) override { - WriteByteDataOk(resp, status); + void Reply(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) override { + WriteByteDataOk(resp, status, ctrl); } void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details) override { @@ -314,7 +314,7 @@ class TGRpcRequestImpl } } - void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status) { + void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) { auto sz = resp->Length(); if (Writer_) { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_, @@ -332,14 +332,23 @@ class TGRpcRequestImpl // because of std::function cannot hold move-only captured object // we allocate shared object on heap to avoid buffer copy auto uResp = MakeIntrusive>(resp); - auto cb = [this, uResp = std::move(uResp), sz, status]() { + const bool finish = ctrl == EStreamCtrl::FINISH; + auto cb = [this, uResp = std::move(uResp), sz, status, finish]() { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)", this, Name_, this->Context.peer().c_str()); - StateFunc_ = &TThis::NextReply; + + StateFunc_ = finish ? &TThis::SetFinishDone : &TThis::NextReply; + ResponseSize += sz; ResponseStatus = status; OnBeforeCall(); - StreamWriter_->Write(*uResp, GetGRpcTag()); + if (finish) { + Finished_ = true; + const auto option = grpc::WriteOptions().set_last_message(); + StreamWriter_->WriteAndFinish(*uResp, option, grpc::Status::OK, GetGRpcTag()); + } else { + StreamWriter_->Write(*uResp, GetGRpcTag()); + } }; StreamAdaptor_->Enqueue(std::move(cb), false); } diff --git a/ydb/library/grpc/server/grpc_request_base.h b/ydb/library/grpc/server/grpc_request_base.h index 8e565fa43b51..e03fb57c356b 100644 --- a/ydb/library/grpc/server/grpc_request_base.h +++ b/ydb/library/grpc/server/grpc_request_base.h @@ -46,6 +46,11 @@ class IRequestContextBase: public TThrRefBase { ERROR, CANCEL }; + enum class EStreamCtrl { + CONT = 0, // Continue stream + FINISH = 1, // Finish stream just after this reply + }; + using TAsyncFinishResult = NThreading::TFuture; using TOnNextReply = std::function; @@ -65,7 +70,9 @@ class IRequestContextBase: public TThrRefBase { //! Send serialised response (The request shoult be created for bytes response type) //! Implementation can swap ByteBuffer - virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0; + + //! ctrl - controll stream behaviour. Ignored in case of unary call + virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT) = 0; //! Send grpc UNAUTHENTICATED status virtual void ReplyUnauthenticated(const TString& in) = 0;