Skip to content

Commit

Permalink
Http start query (#11759)
Browse files Browse the repository at this point in the history
Co-authored-by: Arseny Bolotnikov <abolotnikov12@yandex-team.ru>
  • Loading branch information
what-the-fawk and Arseny Bolotnikov authored Dec 25, 2024
1 parent dfda963 commit 27d8720
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 6 deletions.
22 changes: 18 additions & 4 deletions ydb/core/fq/libs/http_api_client/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from .query_results import YQResults

MAX_RETRY_FOR_SESSION = 4
MAX_RETRY_FOR_SESSION = 100
BACK_OFF_FACTOR = 0.3
TIME_BETWEEN_RETRIES = 1000
ERROR_CODES = (500, 502, 504)
Expand Down Expand Up @@ -150,6 +150,22 @@ def create_query(
self._validate_http_error(response, expected_code=expected_code)
return response.json()["id"]

def start_query(
self,
query_id: str,
request_id=None,
idempotency_key: str | None = None,
expected_code: int = 204,
):
response = self.session.post(
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/start"),
headers=self._build_headers(idempotency_key=idempotency_key, request_id=request_id),
params=self._build_params(),
)

self._validate_http_error(response, expected_code)
return response

def get_query_status(self, query_id, request_id=None, expected_code=200) -> Any:
response = self.session.get(
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/status"),
Expand Down Expand Up @@ -272,9 +288,7 @@ def get_query_result_set(self, query_id: str, result_set_index: int, raw_format:

return YQResults(result).results

def get_query_all_result_sets(
self, query_id: str, result_set_count: int, raw_format: bool = False
) -> Any:
def get_query_all_result_sets(self, query_id: str, result_set_count: int, raw_format: bool = False) -> Any:
result = []
for i in range(0, result_set_count):
r = self.get_query_result_set(query_id, result_set_index=i, raw_format=raw_format)
Expand Down
76 changes: 75 additions & 1 deletion ydb/core/public_http/fq_handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <ydb/library/services/services.pb.h>
#include <ydb/core/public_http/protos/fq.pb.h>

#include <type_traits>

namespace NKikimr::NPublicHttp {

using namespace NActors;
Expand Down Expand Up @@ -249,6 +251,7 @@ void SetIdempotencyKey(T& dst, const TString& key) {

template <typename GrpcProtoRequestType, typename HttpProtoRequestType, typename GrpcProtoResultType, typename HttpProtoResultType, typename GrpcProtoResponseType>
class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoRequestType, HttpProtoRequestType, GrpcProtoResultType, HttpProtoResultType, GrpcProtoResponseType>> {
protected:
THttpRequestContext RequestContext;

typedef std::function<std::unique_ptr<NGRpcService::TEvProxyRuntimeEvent>(TIntrusivePtr<NYdbGrpc::IRequestContextBase> ctx)> TGrpcProxyEventFactory;
Expand Down Expand Up @@ -278,6 +281,10 @@ class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoReq
}

void Bootstrap(const TActorContext& ctx) {
BootstrapWrapper(ctx);
}

virtual void BootstrapWrapper(const TActorContext& ctx) {
auto grpcRequest = std::make_unique<TGrpcProtoRequestType>();
if (Parse(*grpcRequest)) {
TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = new TGrpcRequestContextWrapper(RequestContext, std::move(grpcRequest), &SendReply);
Expand Down Expand Up @@ -354,7 +361,6 @@ class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoReq
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
FqConvert(typedResponse->operation(), *httpResult);
FqPackToJson(json, *httpResult, jsonSettings);

requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
return;
}
Expand Down Expand Up @@ -396,4 +402,72 @@ DECLARE_YQ_GRPC_ACTOR(GetQueryStatus, GetQueryStatus);
DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(StopQuery, ControlQuery);
DECLARE_YQ_GRPC_ACTOR(GetResultData, GetResultData);

class TJsonStartQuery : public TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse> {
public:
typedef TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse> TGrpcCallWrapperBase;

TJsonStartQuery(const THttpRequestContext& ctx)
: TGrpcCallWrapperBase(ctx, &NGRpcService::CreateFederatedQueryDescribeQueryRequestOperationCall)
{}

void BootstrapWrapper(const TActorContext& ctx) override {

auto describeRequest = std::make_unique<FederatedQuery::DescribeQueryRequest>();
if (!Parse(*describeRequest)) {
this->Die(ctx);
return;
}

TProtoStringType queryId = describeRequest->Getquery_id();
TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = MakeIntrusive<TGrpcRequestContextWrapper>(
RequestContext,
std::move(describeRequest),
[query_id = std::move(queryId), actorSystem = TActivationContext::ActorSystem()](const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) {

Y_ABORT_UNLESS(resp);
Y_ABORT_UNLESS(resp->GetArena());

auto* typedResponse = static_cast<FederatedQuery::DescribeQueryResponse*>(resp);
if (!typedResponse->operation().result().template Is<FederatedQuery::DescribeQueryResult>()) {
TStringStream json;
auto httpResult = std::unique_ptr<FQHttp::Error>(new FQHttp::Error());
FqConvert(typedResponse->operation(), *httpResult);
FqPackToJson(json, *httpResult, jsonSettings);
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
return;
}

std::unique_ptr<FederatedQuery::DescribeQueryResult> describeResult = std::unique_ptr<FederatedQuery::DescribeQueryResult>(new FederatedQuery::DescribeQueryResult());
if (!typedResponse->operation().result().UnpackTo(&*describeResult)) {
requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Error in response unpack");
return;
}

// modify
auto modifyRequest = std::unique_ptr<FederatedQuery::ModifyQueryRequest>(new FederatedQuery::ModifyQueryRequest());

modifyRequest->set_query_id(query_id);
*modifyRequest->mutable_content() = describeResult->query().content();
modifyRequest->set_execute_mode(::FederatedQuery::ExecuteMode::RUN);
modifyRequest->set_state_load_mode(::FederatedQuery::StateLoadMode::STATE_LOAD_MODE_UNSPECIFIED);
modifyRequest->set_previous_revision(describeResult->query().meta().Getlast_job_query_revision());
modifyRequest->set_idempotency_key(requestContext.GetIdempotencyKey());

TIntrusivePtr<TGrpcRequestContextWrapper> requestContextModify = new TGrpcRequestContextWrapper(
requestContext,
std::move(modifyRequest),
TGrpcCallWrapper<FederatedQuery::ModifyQueryRequest, int, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>::SendReply
);

// new event -> new EventFactory
actorSystem->Send(NGRpcService::CreateGRpcRequestProxyId(), NGRpcService::CreateFederatedQueryModifyQueryRequestOperationCall(std::move(requestContextModify)).release());
});

ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(std::move(requestContext)).release());
this->Die(ctx);
}
};

#undef TGrpcCallWrapperBase

} // namespace NKikimr::NPublicHttp
1 change: 1 addition & 0 deletions ydb/core/public_http/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace {
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/status", CreateHttpHandler<TJsonGetQueryStatus>());
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/results/{result_set_index}", CreateHttpHandler<TJsonGetResultData>());
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/stop", CreateHttpHandler<TJsonStopQuery>());
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/start", CreateHttpHandler<TJsonStartQuery>());
}

void Bootstrap(const TActorContext& ctx) {
Expand Down
25 changes: 25 additions & 0 deletions ydb/core/public_http/openapi/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,31 @@ paths:
required: true
schema:
type: string
'/queries/{query_id}/start':
post:
responses:
'204':
description: No Content
'400':
description: Bad Request
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
parameters:
- $ref: '#/components/parameters/Idempotency-Key'
- $ref: '#/components/parameters/Authorization'
- $ref: '#/components/parameters/x-request-id'
- $ref: '#/components/parameters/db'
- $ref: '#/components/parameters/project'
summary: start stopped query
operationId: start-query
parameters:
- name: query_id
in: path
required: true
schema:
type: string
'/queries/{query_id}/results/{result_set_index}':
parameters:
- name: query_id
Expand Down
32 changes: 31 additions & 1 deletion ydb/tests/fq/http_api/test_http_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_simple_analytics_query(self):
assert len(query_id) == 20

status = client.get_query_status(query_id)
assert status in ["FAILED", "RUNNING", "COMPLETED"]
assert status in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"]

wait_for_query_status(client, query_id, ["COMPLETED"])
query_json = client.get_query(query_id)
Expand All @@ -98,6 +98,14 @@ def test_simple_analytics_query(self):
response = client.stop_query(query_id)
assert response.status_code == 204

response = client.start_query(query_id)
assert response.status_code == 204

assert client.get_query_status(query_id) in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"]

response = client.stop_query(query_id)
assert response.status_code == 204

def test_empty_query(self):
with self.create_client() as client:
with pytest.raises(
Expand Down Expand Up @@ -228,6 +236,28 @@ def test_stop_idempotency(self):
self.streaming_over_kikimr.compute_plane.start()
c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)

def test_restart_idempotency(self):
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
self.streaming_over_kikimr.compute_plane.stop()
query_id = c.create_query("select1", "select 1").result.query_id
c.wait_query_status(query_id, fq.QueryMeta.STARTING)

with self.create_client() as client:
response1 = client.stop_query(query_id, idempotency_key="Z")
assert response1.status_code == 204

response2 = client.start_query(query_id, idempotency_key="Z")
assert response2.status_code == 204

response2 = client.start_query(query_id, idempotency_key="Z")
assert response2.status_code == 204

response1 = client.stop_query(query_id, idempotency_key="Z")
assert response1.status_code == 204

self.streaming_over_kikimr.compute_plane.start()
c.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

def test_simple_streaming_query(self):
self.init_topics("simple_streaming_query", create_output=False)
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
Expand Down

0 comments on commit 27d8720

Please sign in to comment.