diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index f19e2e74e82d..633d24ac02a3 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -626,6 +626,7 @@ class TKqpProxyService : public TActorBootstrapped { const auto queryAction = ev->Get()->GetAction(); TKqpRequestInfo requestInfo(traceId); ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvQueryRequest); + bool explicitSession = true; if (ev->Get()->GetSessionId().empty()) { TProcessResult result; if (!CreateNewSessionWorker(requestInfo, TString(DefaultKikimrPublicClusterName), false, @@ -634,7 +635,7 @@ class TKqpProxyService : public TActorBootstrapped { ReplyProcessError(result.YdbStatus, result.Error, requestId); return; } - + explicitSession = false; ev->Get()->SetSessionId(result.Value->SessionId); } @@ -650,6 +651,16 @@ class TKqpProxyService : public TActorBootstrapped { dbCounters = Counters->GetDbCounters(database); } + if (queryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY || + queryType == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY) { + + if (explicitSession && sessionInfo && !sessionInfo->AttachedRpcId) { + TString error = "Attempt to execute query on explicit session without attach"; + ReplyProcessError(Ydb::StatusIds::BAD_REQUEST, error, requestId); + return; + } + } + PendingRequests.SetSessionId(requestId, sessionId, dbCounters); Counters->ReportQueryRequest(dbCounters, ev->Get()->GetRequestSize(), ev->Get()->GetParametersSize(), ev->Get()->GetQuerySize()); Counters->ReportQueryAction(dbCounters, queryAction); diff --git a/ydb/services/ydb/ydb_query_ut.cpp b/ydb/services/ydb/ydb_query_ut.cpp index f26fe8c73fa9..9b67c4b9a541 100644 --- a/ydb/services/ydb/ydb_query_ut.cpp +++ b/ydb/services/ydb/ydb_query_ut.cpp @@ -68,6 +68,45 @@ Y_UNIT_TEST_SUITE(YdbQueryService) { UNIT_ASSERT(allDoneOk); } + Y_UNIT_TEST(TestForbidExecuteWithoutAttach) { + TKikimrWithGrpcAndRootSchema server; + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + + auto clientConfig = NGRpcProxy::TGRpcClientConfig(location); + + TString sessionId = CreateQuerySession(clientConfig); + + UNIT_ASSERT(sessionId); + + NYdbGrpc::TGRpcClientLow clientLow; + + std::shared_ptr channel; + channel = grpc::CreateChannel("localhost:" + ToString(grpc), grpc::InsecureChannelCredentials()); + + { + std::unique_ptr stub; + stub = Ydb::Query::V1::QueryService::NewStub(channel); + grpc::ClientContext context; + Ydb::Query::ExecuteQueryRequest request; + request.set_session_id(sessionId); + request.set_exec_mode(Ydb::Query::EXEC_MODE_EXECUTE); + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.mutable_tx_control()->set_commit_tx(true); + request.mutable_query_content()->set_text("SELECT 42"); + Ydb::Query::ExecuteQueryResponsePart response; + auto reader = stub->ExecuteQuery(&context, request); + bool res = true; + while (res) { + res = reader->Read(&response); + if (res) { + UNIT_ASSERT_VALUES_EQUAL(response.status(), Ydb::StatusIds::BAD_REQUEST); + } + } + } + } + Y_UNIT_TEST(TestCreateDropAttachSession) { TKikimrWithGrpcAndRootSchema server;