From 27d87203f75a7b5a4911238355f87360599cb5e1 Mon Sep 17 00:00:00 2001 From: Arseny <90683415+what-the-fawk@users.noreply.github.com> Date: Wed, 25 Dec 2024 18:11:19 +0300 Subject: [PATCH] Http start query (#11759) Co-authored-by: Arseny Bolotnikov --- .../fq/libs/http_api_client/http_client.py | 22 +++++- ydb/core/public_http/fq_handlers.h | 76 ++++++++++++++++++- ydb/core/public_http/http_service.cpp | 1 + ydb/core/public_http/openapi/openapi.yaml | 25 ++++++ ydb/tests/fq/http_api/test_http_api.py | 32 +++++++- 5 files changed, 150 insertions(+), 6 deletions(-) diff --git a/ydb/core/fq/libs/http_api_client/http_client.py b/ydb/core/fq/libs/http_api_client/http_client.py index 88b9e781c52e..e5770ce9adde 100644 --- a/ydb/core/fq/libs/http_api_client/http_client.py +++ b/ydb/core/fq/libs/http_api_client/http_client.py @@ -11,7 +11,7 @@ from .query_results import YQResults -MAX_RETRY_FOR_SESSION = 4 +MAX_RETRY_FOR_SESSION = 100 BACK_OFF_FACTOR = 0.3 TIME_BETWEEN_RETRIES = 1000 ERROR_CODES = (500, 502, 504) @@ -150,6 +150,22 @@ def create_query( self._validate_http_error(response, expected_code=expected_code) return response.json()["id"] + def start_query( + self, + query_id: str, + request_id=None, + idempotency_key: str | None = None, + expected_code: int = 204, + ): + response = self.session.post( + self._compose_api_url(f"/api/fq/v1/queries/{query_id}/start"), + headers=self._build_headers(idempotency_key=idempotency_key, request_id=request_id), + params=self._build_params(), + ) + + self._validate_http_error(response, expected_code) + return response + def get_query_status(self, query_id, request_id=None, expected_code=200) -> Any: response = self.session.get( self._compose_api_url(f"/api/fq/v1/queries/{query_id}/status"), @@ -272,9 +288,7 @@ def get_query_result_set(self, query_id: str, result_set_index: int, raw_format: return YQResults(result).results - def get_query_all_result_sets( - self, query_id: str, result_set_count: int, raw_format: bool = False - ) -> Any: + def get_query_all_result_sets(self, query_id: str, result_set_count: int, raw_format: bool = False) -> Any: result = [] for i in range(0, result_set_count): r = self.get_query_result_set(query_id, result_set_index=i, raw_format=raw_format) diff --git a/ydb/core/public_http/fq_handlers.h b/ydb/core/public_http/fq_handlers.h index 6fd53b9701d3..1a30ba6b409f 100644 --- a/ydb/core/public_http/fq_handlers.h +++ b/ydb/core/public_http/fq_handlers.h @@ -10,6 +10,8 @@ #include #include +#include + namespace NKikimr::NPublicHttp { using namespace NActors; @@ -249,6 +251,7 @@ void SetIdempotencyKey(T& dst, const TString& key) { template class TGrpcCallWrapper : public TActorBootstrapped> { +protected: THttpRequestContext RequestContext; typedef std::function(TIntrusivePtr ctx)> TGrpcProxyEventFactory; @@ -278,6 +281,10 @@ class TGrpcCallWrapper : public TActorBootstrapped(); if (Parse(*grpcRequest)) { TIntrusivePtr requestContext = new TGrpcRequestContextWrapper(RequestContext, std::move(grpcRequest), &SendReply); @@ -354,7 +361,6 @@ class TGrpcCallWrapper : public TActorBootstrapped(resp->GetArena()); FqConvert(typedResponse->operation(), *httpResult); FqPackToJson(json, *httpResult, jsonSettings); - requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str()); return; } @@ -396,4 +402,72 @@ DECLARE_YQ_GRPC_ACTOR(GetQueryStatus, GetQueryStatus); DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(StopQuery, ControlQuery); DECLARE_YQ_GRPC_ACTOR(GetResultData, GetResultData); +class TJsonStartQuery : public TGrpcCallWrapper { +public: + typedef TGrpcCallWrapper TGrpcCallWrapperBase; + + TJsonStartQuery(const THttpRequestContext& ctx) + : TGrpcCallWrapperBase(ctx, &NGRpcService::CreateFederatedQueryDescribeQueryRequestOperationCall) + {} + + void BootstrapWrapper(const TActorContext& ctx) override { + + auto describeRequest = std::make_unique(); + if (!Parse(*describeRequest)) { + this->Die(ctx); + return; + } + + TProtoStringType queryId = describeRequest->Getquery_id(); + TIntrusivePtr requestContext = MakeIntrusive( + RequestContext, + std::move(describeRequest), + [query_id = std::move(queryId), actorSystem = TActivationContext::ActorSystem()](const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) { + + Y_ABORT_UNLESS(resp); + Y_ABORT_UNLESS(resp->GetArena()); + + auto* typedResponse = static_cast(resp); + if (!typedResponse->operation().result().template Is()) { + TStringStream json; + auto httpResult = std::unique_ptr(new FQHttp::Error()); + FqConvert(typedResponse->operation(), *httpResult); + FqPackToJson(json, *httpResult, jsonSettings); + requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str()); + return; + } + + std::unique_ptr describeResult = std::unique_ptr(new FederatedQuery::DescribeQueryResult()); + if (!typedResponse->operation().result().UnpackTo(&*describeResult)) { + requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Error in response unpack"); + return; + } + + // modify + auto modifyRequest = std::unique_ptr(new FederatedQuery::ModifyQueryRequest()); + + modifyRequest->set_query_id(query_id); + *modifyRequest->mutable_content() = describeResult->query().content(); + modifyRequest->set_execute_mode(::FederatedQuery::ExecuteMode::RUN); + modifyRequest->set_state_load_mode(::FederatedQuery::StateLoadMode::STATE_LOAD_MODE_UNSPECIFIED); + modifyRequest->set_previous_revision(describeResult->query().meta().Getlast_job_query_revision()); + modifyRequest->set_idempotency_key(requestContext.GetIdempotencyKey()); + + TIntrusivePtr requestContextModify = new TGrpcRequestContextWrapper( + requestContext, + std::move(modifyRequest), + TGrpcCallWrapper::SendReply + ); + + // new event -> new EventFactory + actorSystem->Send(NGRpcService::CreateGRpcRequestProxyId(), NGRpcService::CreateFederatedQueryModifyQueryRequestOperationCall(std::move(requestContextModify)).release()); + }); + + ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(std::move(requestContext)).release()); + this->Die(ctx); + } +}; + +#undef TGrpcCallWrapperBase + } // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/http_service.cpp b/ydb/core/public_http/http_service.cpp index ae6d72209d46..81fa9c66ebfb 100644 --- a/ydb/core/public_http/http_service.cpp +++ b/ydb/core/public_http/http_service.cpp @@ -57,6 +57,7 @@ namespace { Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/status", CreateHttpHandler()); Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/results/{result_set_index}", CreateHttpHandler()); Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/stop", CreateHttpHandler()); + Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/start", CreateHttpHandler()); } void Bootstrap(const TActorContext& ctx) { diff --git a/ydb/core/public_http/openapi/openapi.yaml b/ydb/core/public_http/openapi/openapi.yaml index 6a490bebd3d4..358dfc1bbd17 100644 --- a/ydb/core/public_http/openapi/openapi.yaml +++ b/ydb/core/public_http/openapi/openapi.yaml @@ -185,6 +185,31 @@ paths: required: true schema: type: string + '/queries/{query_id}/start': + post: + responses: + '204': + description: No Content + '400': + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/GenericError' + parameters: + - $ref: '#/components/parameters/Idempotency-Key' + - $ref: '#/components/parameters/Authorization' + - $ref: '#/components/parameters/x-request-id' + - $ref: '#/components/parameters/db' + - $ref: '#/components/parameters/project' + summary: start stopped query + operationId: start-query + parameters: + - name: query_id + in: path + required: true + schema: + type: string '/queries/{query_id}/results/{result_set_index}': parameters: - name: query_id diff --git a/ydb/tests/fq/http_api/test_http_api.py b/ydb/tests/fq/http_api/test_http_api.py index 74e454afe53e..08130fb11c18 100644 --- a/ydb/tests/fq/http_api/test_http_api.py +++ b/ydb/tests/fq/http_api/test_http_api.py @@ -76,7 +76,7 @@ def test_simple_analytics_query(self): assert len(query_id) == 20 status = client.get_query_status(query_id) - assert status in ["FAILED", "RUNNING", "COMPLETED"] + assert status in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"] wait_for_query_status(client, query_id, ["COMPLETED"]) query_json = client.get_query(query_id) @@ -98,6 +98,14 @@ def test_simple_analytics_query(self): response = client.stop_query(query_id) assert response.status_code == 204 + response = client.start_query(query_id) + assert response.status_code == 204 + + assert client.get_query_status(query_id) in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"] + + response = client.stop_query(query_id) + assert response.status_code == 204 + def test_empty_query(self): with self.create_client() as client: with pytest.raises( @@ -228,6 +236,28 @@ def test_stop_idempotency(self): self.streaming_over_kikimr.compute_plane.start() c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + def test_restart_idempotency(self): + c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr) + self.streaming_over_kikimr.compute_plane.stop() + query_id = c.create_query("select1", "select 1").result.query_id + c.wait_query_status(query_id, fq.QueryMeta.STARTING) + + with self.create_client() as client: + response1 = client.stop_query(query_id, idempotency_key="Z") + assert response1.status_code == 204 + + response2 = client.start_query(query_id, idempotency_key="Z") + assert response2.status_code == 204 + + response2 = client.start_query(query_id, idempotency_key="Z") + assert response2.status_code == 204 + + response1 = client.stop_query(query_id, idempotency_key="Z") + assert response1.status_code == 204 + + self.streaming_over_kikimr.compute_plane.start() + c.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + def test_simple_streaming_query(self): self.init_topics("simple_streaming_query", create_output=False) c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)