Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finsh grpc stream without extra empty message for query service. #1415

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ class IRequestCtx
, public virtual IRequestCtxBase
{
friend class TProtoResponseHelper;

public:
using EStreamCtrl = NYdbGrpc::IRequestContextBase::EStreamCtrl;
virtual google::protobuf::Message* GetRequestMut() = 0;

virtual void SetRuHeader(ui64 ru) = 0;
Expand All @@ -418,7 +418,7 @@ class IRequestCtx
virtual void SetStreamingNotify(NYdbGrpc::IRequestContextBase::TOnNextReply&& cb) = 0;
virtual void FinishStream(ui32 status) = 0;

virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) = 0;
virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl flag = EStreamCtrl::CONT) = 0;

virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0;

Expand Down Expand Up @@ -1193,7 +1193,7 @@ class TGRpcRequestWrapperImpl
return GetPeerMetaValues(NYdb::YDB_REQUEST_TYPE_HEADER);
}

void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, IRequestCtx::EStreamCtrl flag = IRequestCtx::EStreamCtrl::CONT) override {
// res->data() pointer is used inside grpc code.
// So this object should be destroyed during grpc_slice destroying routine
auto res = new TString;
Expand All @@ -1208,7 +1208,7 @@ class TGRpcRequestWrapperImpl
(void*)(res->data()), res->size(), freeResult, res);
grpc::Slice sl = grpc::Slice(slice, grpc::Slice::STEAL_REF);
auto data = grpc::ByteBuffer(&sl, 1);
Ctx_->Reply(&data, status);
Ctx_->Reply(&data, status, flag);
}

void SetCostInfo(float consumed_units) override {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx {
Y_ABORT("Unimplemented for local rpc");
}

virtual void SendSerializedResult(TString&&, Ydb::StatusIds::StatusCode) override {
virtual void SendSerializedResult(TString&&, Ydb::StatusIds::StatusCode, EStreamCtrl) override {
Y_ABORT("Unimplemented for local rpc");
}

Expand Down
24 changes: 15 additions & 9 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,10 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
auto& record = ev->Get()->Record.GetRef();

NYql::TIssues issues;
const auto& issueMessage = record.GetResponse().GetQueryIssues();
NYql::IssuesFromMessage(issueMessage, issues);

bool hasTrailingMessage = false;

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());

Expand All @@ -372,8 +372,6 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response);

bool hasTrailingMessage = false;

if (kqpResponse.HasTxMeta()) {
hasTrailingMessage = true;
response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
Expand All @@ -386,13 +384,20 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

if (hasTrailingMessage) {
response.set_status(Ydb::StatusIds::SUCCESS);
response.mutable_issues()->CopyFrom(issueMessage);
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH;
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus(), finishStreamFlag);
this->PassAway();
}
}

ReplyFinishStream(record.GetYdbStatus(), issues);
if (!hasTrailingMessage) {
NYql::TIssues issues;
NYql::IssuesFromMessage(issueMessage, issues);
ReplyFinishStream(record.GetYdbStatus(), issueMessage);
}
}

private:
Expand Down Expand Up @@ -437,10 +442,11 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
response.set_status(status);
response.mutable_issues()->CopyFrom(message);
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
Request_->SendSerializedResult(std::move(out), status);
const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH;
Request_->SendSerializedResult(std::move(out), status, finishStreamFlag);
} else {
Request_->FinishStream(status);
}

Request_->FinishStream(status);
this->PassAway();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
Y_UNUSED(status);
};

void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl) override {
Y_UNUSED(in);
Y_UNUSED(status);
};
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx {
Y_UNUSED(status);
};

void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status, EStreamCtrl flag) override {
Y_UNUSED(in);
Y_UNUSED(status);
Y_UNUSED(flag);
};

void Reply(NProtoBuf::Message* resp, ui32 status = 0) override {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/public_http/grpc_request_context_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ namespace NKikimr::NPublicHttp {
ReplySender(RequestContext, JsonSettings, resp, status);
}

void TGrpcRequestContextWrapper::Reply(grpc::ByteBuffer* resp, ui32 status) {
void TGrpcRequestContextWrapper::Reply(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) {
Y_UNUSED(resp);
Y_UNUSED(status);
Y_UNUSED(ctrl);
Y_ABORT_UNLESS(false, "TGrpcRequestContextWrapper::Reply");
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/public_http/grpc_request_context_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase {
virtual NProtoBuf::Message* GetRequestMut();
virtual NYdbGrpc::TAuthState& GetAuthState();
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0);
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0);
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT);
virtual void ReplyUnauthenticated(const TString& in);
virtual void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details);
virtual TInstant Deadline() const;
Expand Down
21 changes: 15 additions & 6 deletions ydb/library/grpc/server/grpc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ class TGRpcRequestImpl
WriteDataOk(resp, status);
}

void Reply(grpc::ByteBuffer* resp, ui32 status) override {
WriteByteDataOk(resp, status);
void Reply(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) override {
WriteByteDataOk(resp, status, ctrl);
}

void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details) override {
Expand Down Expand Up @@ -314,7 +314,7 @@ class TGRpcRequestImpl
}
}

void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status) {
void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status, EStreamCtrl ctrl) {
auto sz = resp->Length();
if (Writer_) {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
Expand All @@ -332,14 +332,23 @@ class TGRpcRequestImpl
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid buffer copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
auto cb = [this, uResp = std::move(uResp), sz, status]() {
const bool finish = ctrl == EStreamCtrl::FINISH;
auto cb = [this, uResp = std::move(uResp), sz, status, finish]() {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
this, Name_, this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;

StateFunc_ = finish ? &TThis::SetFinishDone : &TThis::NextReply;

ResponseSize += sz;
ResponseStatus = status;
OnBeforeCall();
StreamWriter_->Write(*uResp, GetGRpcTag());
if (finish) {
Finished_ = true;
const auto option = grpc::WriteOptions().set_last_message();
StreamWriter_->WriteAndFinish(*uResp, option, grpc::Status::OK, GetGRpcTag());
} else {
StreamWriter_->Write(*uResp, GetGRpcTag());
}
};
StreamAdaptor_->Enqueue(std::move(cb), false);
}
Expand Down
9 changes: 8 additions & 1 deletion ydb/library/grpc/server/grpc_request_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class IRequestContextBase: public TThrRefBase {
ERROR,
CANCEL
};
enum class EStreamCtrl {
CONT = 0, // Continue stream
FINISH = 1, // Finish stream just after this reply
};

using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>;

using TOnNextReply = std::function<void (size_t left)>;
Expand All @@ -65,7 +70,9 @@ class IRequestContextBase: public TThrRefBase {

//! Send serialised response (The request shoult be created for bytes response type)
//! Implementation can swap ByteBuffer
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0;

//! ctrl - controll stream behaviour. Ignored in case of unary call
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT) = 0;

//! Send grpc UNAUTHENTICATED status
virtual void ReplyUnauthenticated(const TString& in) = 0;
Expand Down
Loading