Skip to content

Commit

Permalink
Forbid query execution on explicit session without attach (ydb-platfo…
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik authored Feb 13, 2024
1 parent ec18660 commit 57cf0e9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
13 changes: 12 additions & 1 deletion ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
const auto queryAction = ev->Get()->GetAction();
TKqpRequestInfo requestInfo(traceId);
ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvQueryRequest);
bool explicitSession = true;
if (ev->Get()->GetSessionId().empty()) {
TProcessResult<TKqpSessionInfo*> result;
if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false,
Expand All @@ -634,7 +635,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
ReplyProcessError(result.YdbStatus, result.Error, requestId);
return;
}

explicitSession = false;
ev->Get()->SetSessionId(result.Value->SessionId);
}

Expand All @@ -650,6 +651,16 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
dbCounters = Counters->GetDbCounters(database);
}

if (queryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY ||
queryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY) {

if (explicitSession && sessionInfo && !sessionInfo->AttachedRpcId) {
TString error = "Attempt to execute query on explicit session without attach";
ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId);
return;
}
}

PendingRequests.SetSessionId(requestId, sessionId, dbCounters);
Counters->ReportQueryRequest(dbCounters, ev->Get()->GetRequestSize(), ev->Get()->GetParametersSize(), ev->Get()->GetQuerySize());
Counters->ReportQueryAction(dbCounters, queryAction);
Expand Down
39 changes: 39 additions & 0 deletions ydb/services/ydb/ydb_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,45 @@ Y_UNIT_TEST_SUITE(YdbQueryService) {
UNIT_ASSERT(allDoneOk);
}

Y_UNIT_TEST(TestForbidExecuteWithoutAttach) {
TKikimrWithGrpcAndRootSchema server;

ui16 grpc = server.GetPort();
TString location = TStringBuilder() << "localhost:" << grpc;

auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);

TString sessionId = CreateQuerySession(clientConfig);

UNIT_ASSERT(sessionId);

NYdbGrpc::TGRpcClientLow clientLow;

std::shared_ptr<grpc::Channel> channel;
channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials());

{
std::unique_ptr<Ydb::Query::V1::QueryService::Stub> stub;
stub = Ydb::Query::V1::QueryService::NewStub(channel);
grpc::ClientContext context;
Ydb::Query::ExecuteQueryRequest request;
request.set_session_id(sessionId);
request.set_exec_mode(Ydb::Query::EXEC_MODE_EXECUTE);
request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
request.mutable_tx_control()->set_commit_tx(true);
request.mutable_query_content()->set_text("SELECT 42");
Ydb::Query::ExecuteQueryResponsePart response;
auto reader = stub->ExecuteQuery(&context, request);
bool res = true;
while (res) {
res = reader->Read(&response);
if (res) {
UNIT_ASSERT_VALUES_EQUAL(response.status(), Ydb::StatusIds::BAD_REQUEST);
}
}
}
}

Y_UNIT_TEST(TestCreateDropAttachSession) {
TKikimrWithGrpcAndRootSchema server;

Expand Down

0 comments on commit 57cf0e9

Please sign in to comment.