Skip to content

Commit

Permalink
improve handling of query cancelling (ydb-platform#14169)
Browse files Browse the repository at this point in the history
  • Loading branch information
adameat authored Feb 4, 2025
1 parent 0e5fbe3 commit 647a02f
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions ydb/core/viewer/viewer_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,30 @@ class TJsonQuery : public TViewerPipeClient {
Become(&TThis::StateWork, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup());
}

void Cancelled() {
void CancelQuery() {
if (SessionId) {
auto event = std::make_unique<NKqp::TEvKqp::TEvCancelQueryRequest>();
event->Record.MutableRequest()->SetSessionId(SessionId);
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release());
if (QueryResponse && !QueryResponse.IsDone()) {
QueryResponse.Error("QueryCancelled");
}
}
}

void CloseSession() {
if (SessionId) {
if (QueryResponse && !QueryResponse.IsDone()) {
CancelQuery();
}
auto event = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>();
event->Record.MutableRequest()->SetSessionId(SessionId);
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release());
}
}

void Cancelled() {
CancelQuery();
PassAway();
}

Expand All @@ -160,11 +178,7 @@ class TJsonQuery : public TViewerPipeClient {
if (QueryId) {
Viewer->EndRunningQuery(QueryId, SelfId());
}
if (SessionId) {
auto event = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>();
event->Record.MutableRequest()->SetSessionId(SessionId);
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release());
}
CloseSession();
TBase::PassAway();
}

Expand Down Expand Up @@ -353,6 +367,7 @@ class TJsonQuery : public TViewerPipeClient {
}
ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
QueryResponse = MakeRequest<NKqp::TEvKqp::TEvQueryResponse>(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());

}

private:
Expand Down Expand Up @@ -565,6 +580,7 @@ class TJsonQuery : public TViewerPipeClient {
NYql::IssuesFromMessage(record.GetIssues(), issues);
MakeErrorReply(jsonResponse, NYdb::TStatus(NYdb::EStatus(record.GetStatusCode()), NYdb::NAdapters::ToSdkIssues(std::move(issues))));
}
CancelQuery();
ReplyWithJsonAndPassAway(jsonResponse);
}

Expand Down

0 comments on commit 647a02f

Please sign in to comment.