Skip to content

Commit

Permalink
Fix stream finishing in case of grpc proxy emmited YDB error
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed Apr 8, 2024
1 parent edcbe4f commit 3d87f93
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 23 deletions.
20 changes: 4 additions & 16 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ class IRequestProxyCtx
friend class TGRpcRequestProxySimple;
friend class TGRpcRequestProxyHandleMethods;
private:
virtual void ReplyUnavaliable() = 0;
virtual void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) = 0;
public:
virtual ~IRequestProxyCtx() = default;
Expand Down Expand Up @@ -574,7 +573,7 @@ class TRefreshTokenImpl
}

void ReplyUnauthenticated(const TString&) override;
void ReplyUnavaliable() override;
void ReplyUnavaliable();
void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
switch (status) {
case Ydb::StatusIds::UNAVAILABLE:
Expand Down Expand Up @@ -719,13 +718,6 @@ class TGRpcRequestBiStreamWrapper
, public TEventLocal<TGRpcRequestBiStreamWrapper<TRpcId, TReq, TResp, RlMode>, TRpcId>
{
private:
void ReplyUnavaliable() override {
Ctx_->Attach(TActorId());
TResponse resp;
FillYdbStatus(resp, IssueManager_.GetIssues(), Ydb::StatusIds::UNAVAILABLE);
Ctx_->WriteAndFinish(std::move(resp), grpc::Status::OK);
}

void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
Ctx_->Attach(TActorId());
TResponse resp;
Expand Down Expand Up @@ -1114,18 +1106,14 @@ class TGRpcRequestWrapperImpl
Ctx_->UseDatabase(database);
}

void ReplyUnavaliable() override {
TResponse* resp = CreateResponseMessage();
TCommonResponseFiller<TResp, TDerived::IsOp>::Fill(*resp, IssueManager.GetIssues(), CostInfo, Ydb::StatusIds::UNAVAILABLE);
FinishRequest();
Reply(resp, Ydb::StatusIds::UNAVAILABLE);
}

void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
TResponse* resp = CreateResponseMessage();
TCommonResponseFiller<TResponse, TDerived::IsOp>::Fill(*resp, IssueManager.GetIssues(), CostInfo, status);
FinishRequest();
Reply(resp, status);
if (Ctx_->IsStreamCall()) {
Ctx_->FinishStreamingOk();
}
}

TString GetPeerName() const override {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/grpc_services/grpc_request_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class TGRpcRequestProxyImpl
const TString error = "Unable to resolve token";
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_AUTH_UNAVAILABLE, error);
requestBaseCtx->RaiseIssue(issue);
requestBaseCtx->ReplyUnavaliable();
requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
requestBaseCtx->FinishSpan();
return;
}
Expand Down Expand Up @@ -213,7 +213,7 @@ class TGRpcRequestProxyImpl
LOG_ERROR(ctx, NKikimrServices::GRPC_SERVER, "Limit for deferred events per database %s reached", databaseName.c_str());
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_DB_NOT_READY, error);
requestBaseCtx->RaiseIssue(issue);
requestBaseCtx->ReplyUnavaliable();
requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
requestBaseCtx->FinishSpan();
return;
}
Expand Down Expand Up @@ -295,7 +295,7 @@ class TGRpcRequestProxyImpl
virtual void PassAway() override {
for (auto& [_, queue] : DeferredEvents) {
for (TEventReqHolder& req : queue) {
req.Ctx->ReplyUnavaliable();
req.Ctx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
req.Ctx->FinishSpan();
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/grpc_request_proxy_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class TGRpcRequestProxySimple
const TString error = "Unable to resolve token";
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_AUTH_UNAVAILABLE, error);
requestBaseCtx->RaiseIssue(issue);
requestBaseCtx->ReplyUnavaliable();
requestBaseCtx->ReplyWithYdbStatus(Ydb::StatusIds::UNAVAILABLE);
return;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/local_grpc/local_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class TContextBase : public NYdbGrpc::IRequestContextBase {
TString GetPeer() const override { return {}; }
bool SslServer() const override { return false; }
bool IsClientLost() const override { return false; }
bool IsStreamCall() const override { return false; }

public:
NYql::TIssues GetIssues() {
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {

const TString query = "SELECT * FROM AS_TABLE(ListReplicate(AsStruct(\"12345678\" AS Key), 100000))";

//TODO: it looks like this check triggers grpc request proxy request leak
/*
{
// Check range for chunk size settings
auto settings = TExecuteQuerySettings().OutputChunkMaxSize(48_MB);
Expand All @@ -271,7 +269,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto streamPart = it.ReadNext().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::BAD_REQUEST, streamPart.GetIssues().ToString());
}
*/

auto settings = TExecuteQuerySettings().OutputChunkMaxSize(10000);
auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
Expand Down
1 change: 1 addition & 0 deletions ydb/core/public_http/grpc_request_context_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase {
virtual bool IsClientLost() const { return false; }
virtual TString GetPeer() const { return {}; }
virtual bool SslServer() const { return false; }
virtual bool IsStreamCall() const { return false; }
};

} // namespace NKikimr::NPublicHttp
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/grpc/server/grpc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ class TGRpcRequestImpl
return ClientLost_.load();
}

bool IsStreamCall() const override {
return bool(StreamAdaptor_);
}

TString GetPeer() const override {
// Decode URL-encoded square brackets
auto ip = TString(this->Context.peer());
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/grpc/server/grpc_request_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class IRequestContextBase: public TThrRefBase {
//! reply in flight
virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0;

virtual bool IsStreamCall() const = 0;

//! Finish streaming reply
virtual void FinishStreamingOk() = 0;

Expand Down

0 comments on commit 3d87f93

Please sign in to comment.