Skip to content

Commit

Permalink
Finsh grpc stream without extra empty message for query service. (#1415)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik authored Jan 30, 2024
1 parent 42c0a05 commit 02f3fa6
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 25 deletions.
8 changes: 4 additions & 4 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ class IRequestCtx
, public virtual IRequestCtxBase
{
friend class TProtoResponseHelper;

public:
using EStreamCtrl = NYdbGrpc::IRequestContextBase::EStreamCtrl;
virtual google::protobuf::Message* GetRequestMut() = 0;

virtual void SetRuHeader(ui64 ru) = 0;
Expand All @@ -418,7 +418,7 @@ class IRequestCtx
virtual void SetStreamingNotify(NYdbGrpc::IRequestContextBase::TOnNextReply&& cb) = 0;
virtual void FinishStream(ui32 status) = 0;

virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) = 0;
virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl flag = EStreamCtrl::CONT) = 0;

virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0;

Expand Down Expand Up @@ -1193,7 +1193,7 @@ class TGRpcRequestWrapperImpl
return GetPeerMetaValues(NYdb::YDB_REQUEST_TYPE_HEADER);
}

void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, IRequestCtx::EStreamCtrl flag = IRequestCtx::EStreamCtrl::CONT) override {
// res->data() pointer is used inside grpc code.
// So this object should be destroyed during grpc_slice destroying routine
auto res = new TString;
Expand All @@ -1208,7 +1208,7 @@ class TGRpcRequestWrapperImpl
(void*)(res->data()), res->size(), freeResult, res);
grpc::Slice sl = grpc::Slice(slice, grpc::Slice::STEAL_REF);
auto data = grpc::ByteBuffer(&sl, 1);
Ctx_->Reply(&data, status);
Ctx_->Reply(&data, status, flag);
}

void SetCostInfo(float consumed_units) override {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx {
Y_ABORT("Unimplemented for local rpc");
}

virtual void SendSerializedResult(TString&&, Ydb::StatusIds::StatusCode) override {
virtual void SendSerializedResult(TString&&, Ydb::StatusIds::StatusCode, EStreamCtrl) override {
Y_ABORT("Unimplemented for local rpc");
}

Expand Down
24 changes: 15 additions & 9 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,10 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
auto& record = ev->Get()->Record.GetRef();

NYql::TIssues issues;
const auto& issueMessage = record.GetResponse().GetQueryIssues();
NYql::IssuesFromMessage(issueMessage, issues);

bool hasTrailingMessage = false;

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());

Expand All @@ -391,8 +391,6 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response);

bool hasTrailingMessage = false;

if (kqpResponse.HasTxMeta()) {
hasTrailingMessage = true;
response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
Expand All @@ -408,13 +406,20 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

if (hasTrailingMessage) {
response.set_status(Ydb::StatusIds::SUCCESS);
response.mutable_issues()->CopyFrom(issueMessage);
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH;
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus(), finishStreamFlag);
this->PassAway();
}
}

ReplyFinishStream(record.GetYdbStatus(), issues);
if (!hasTrailingMessage) {
NYql::TIssues issues;
NYql::IssuesFromMessage(issueMessage, issues);
ReplyFinishStream(record.GetYdbStatus(), issueMessage);
}
}

private:
Expand Down Expand Up @@ -459,10 +464,11 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
response.set_status(status);
response.mutable_issues()->CopyFrom(message);
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
Request_->SendSerializedResult(std::move(out), status);
const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH;
Request_->SendSerializedResult(std::move(out), status, finishStreamFlag);
} else {
Request_->FinishStream(status);
}

Request_->FinishStream(status);
this->PassAway();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
Y_UNUSED(status);
};

void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl) override {
Y_UNUSED(in);
Y_UNUSED(status);
};
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx {
Y_UNUSED(status);
};

void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl flag) override {
Y_UNUSED(in);
Y_UNUSED(status);
Y_UNUSED(flag);
};

void Reply(NProtoBuf::Message* resp, ui32 status = 0) override {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/public_http/grpc_request_context_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ namespace NKikimr::NPublicHttp {
ReplySender(RequestContext, JsonSettings, resp, status);
}

void TGrpcRequestContextWrapper::Reply(grpc::ByteBuffer* resp, ui32 status) {
void TGrpcRequestContextWrapper::Reply(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) {
Y_UNUSED(resp);
Y_UNUSED(status);
Y_UNUSED(ctrl);
Y_ABORT_UNLESS(false, "TGrpcRequestContextWrapper::Reply");
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/public_http/grpc_request_context_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase {
virtual NProtoBuf::Message* GetRequestMut();
virtual NYdbGrpc::TAuthState& GetAuthState();
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0);
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0);
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT);
virtual void ReplyUnauthenticated(const TString& in);
virtual void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details);
virtual TInstant Deadline() const;
Expand Down
21 changes: 15 additions & 6 deletions ydb/library/grpc/server/grpc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ class TGRpcRequestImpl
WriteDataOk(resp, status);
}

void Reply(grpc::ByteBuffer* resp, ui32 status) override {
WriteByteDataOk(resp, status);
void Reply(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) override {
WriteByteDataOk(resp, status, ctrl);
}

void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details) override {
Expand Down Expand Up @@ -314,7 +314,7 @@ class TGRpcRequestImpl
}
}

void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status) {
void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) {
auto sz = resp->Length();
if (Writer_) {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
Expand All @@ -332,14 +332,23 @@ class TGRpcRequestImpl
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid buffer copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
auto cb = [this, uResp = std::move(uResp), sz, status]() {
const bool finish = ctrl == EStreamCtrl::FINISH;
auto cb = [this, uResp = std::move(uResp), sz, status, finish]() {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
this, Name_, this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;

StateFunc_ = finish ? &TThis::SetFinishDone : &TThis::NextReply;

ResponseSize += sz;
ResponseStatus = status;
OnBeforeCall();
StreamWriter_->Write(*uResp, GetGRpcTag());
if (finish) {
Finished_ = true;
const auto option = grpc::WriteOptions().set_last_message();
StreamWriter_->WriteAndFinish(*uResp, option, grpc::Status::OK, GetGRpcTag());
} else {
StreamWriter_->Write(*uResp, GetGRpcTag());
}
};
StreamAdaptor_->Enqueue(std::move(cb), false);
}
Expand Down
9 changes: 8 additions & 1 deletion ydb/library/grpc/server/grpc_request_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class IRequestContextBase: public TThrRefBase {
ERROR,
CANCEL
};
enum class EStreamCtrl {
CONT = 0, // Continue stream
FINISH = 1, // Finish stream just after this reply
};

using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>;

using TOnNextReply = std::function<void (size_t left)>;
Expand All @@ -65,7 +70,9 @@ class IRequestContextBase: public TThrRefBase {

//! Send serialised response (The request shoult be created for bytes response type)
//! Implementation can swap ByteBuffer
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0;

//! ctrl - controll stream behaviour. Ignored in case of unary call
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT) = 0;

//! Send grpc UNAUTHENTICATED status
virtual void ReplyUnauthenticated(const TString& in) = 0;
Expand Down

0 comments on commit 02f3fa6

Please sign in to comment.