Skip to content

Commit

Permalink
Merge 04336eb into b9e1b38
Browse files Browse the repository at this point in the history
  • Loading branch information
StekPerepolnen authored Sep 26, 2024
2 parents b9e1b38 + 04336eb commit 573ed04
Show file tree
Hide file tree
Showing 10 changed files with 401 additions and 28 deletions.
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,7 @@ void SetRequestSyncOperationMode(TRequest&) {
template<typename TRpc>
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database,
const TMaybe<TString>& token, const TMaybe<TString>& requestType,
TActorSystem* actorSystem, bool internalCall = false)
{
TActorSystem* actorSystem, bool internalCall = false) {
auto promise = NThreading::NewPromise<typename TRpc::TResponse>();

SetRequestSyncOperationMode(proto);
Expand Down
44 changes: 23 additions & 21 deletions ydb/core/grpc_services/query/rpc_execute_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/rpc_request_base.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/grpc_services/audit_dml_operations.h>
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
Expand Down Expand Up @@ -72,27 +73,29 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
return {Ydb::StatusIds::SUCCESS, {}};
}

class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
class TExecuteScriptRPC : public TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false> {
public:
using TRpcRequestActorBase = TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false>;

static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::GRPC_REQ;
}

TExecuteScriptRPC(TEvExecuteScriptRequest* request)
: Request_(request)
TExecuteScriptRPC(IRequestNoOpCtx* request)
: TRpcRequestActorBase(request)
{}

void Bootstrap() {
NYql::TIssues issues;
const auto& request = *Request_->GetProtoRequest();
const auto& request = GetProtoRequest();

if (request.operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
if (request->operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
issues.AddIssue("ExecuteScript must be asyncronous operation");
return Reply(Ydb::StatusIds::BAD_REQUEST, issues);
}

AuditContextAppend(Request_.get(), request);
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext());
AuditContextAppend(Request.Get(), request);
NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *request, TlsActivationContext->AsActorContext());

Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
if (auto scriptRequest = MakeScriptRequest(issues, status)) {
Expand All @@ -113,7 +116,7 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
)

void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);
NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *GetProtoRequest(), ev, ctx);

Ydb::Operations::Operation operation;
operation.set_id(ev->Get()->OperationId);
Expand All @@ -126,14 +129,14 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
}

THolder<NKqp::TEvKqp::TEvScriptRequest> MakeScriptRequest(NYql::TIssues& issues, Ydb::StatusIds::StatusCode& status) const {
const auto* req = Request_->GetProtoRequest();
const auto traceId = Request_->GetTraceId();
const auto* req = GetProtoRequest();
const auto traceId = Request->GetTraceId();

auto ev = MakeHolder<NKqp::TEvKqp::TEvScriptRequest>();

SetAuthToken(ev, *Request_);
SetDatabase(ev, *Request_);
SetRlPath(ev, *Request_);
SetAuthToken(ev, *Request);
SetDatabase(ev, *Request);
SetRlPath(ev, *Request);

if (traceId) {
ev->Record.SetTraceId(traceId.GetRef());
Expand Down Expand Up @@ -166,12 +169,9 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {

result.set_status(status);

AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), result);

TString serializedResult;
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);
AuditContextAppend(Request.Get(), GetProtoRequest(), result);

Request_->SendSerializedResult(std::move(serializedResult), status);
TProtoResponseHelper::SendProtoResponse(result, status, Request);

PassAway();
}
Expand All @@ -181,9 +181,6 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
result.set_ready(true);
Reply(status, std::move(result), issues);
}

private:
std::unique_ptr<TEvExecuteScriptRequest> Request_;
};

} // namespace
Expand All @@ -197,6 +194,11 @@ void DoExecuteScript(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider
f.RegisterActor(new TExecuteScriptRPC(req));
}

} // namespace NQuery

template<>
IActor* TEvExecuteScriptRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
return new TExecuteScriptRPC(msg);
}

} // namespace NKikimr::NGRpcService
12 changes: 7 additions & 5 deletions ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
return NKikimrServices::TActivity::GRPC_REQ;
}

TFetchScriptResultsRPC(TEvFetchScriptResultsRequest* request)
TFetchScriptResultsRPC(IRequestNoOpCtx* request)
: TRpcRequestActorBase(request)
{}

Expand Down Expand Up @@ -108,10 +108,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T

result.set_status(status);

TString serializedResult;
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);

Request->SendSerializedResult(std::move(serializedResult), status);
TProtoResponseHelper::SendProtoResponse(result, status, Request);

PassAway();
}
Expand Down Expand Up @@ -154,4 +151,9 @@ void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityPro

}

template<>
IActor* TEvFetchScriptResultsRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
return new TFetchScriptResultsRPC(msg);
}

} // namespace NKikimr::NGRpcService
64 changes: 64 additions & 0 deletions ydb/core/viewer/json.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"metadata": {
"execution_id": "4578ea89-93aa2e63-c0ef36df-b5d4c3c8",
"exec_status": "EXEC_STATUS_COMPLETED",
"script_content": {
"syntax": "SYNTAX_YQL_V1",
"text": "SELECT * FROM `/Root/Test`;"
},
"result_sets_meta": [
{
"columns": [
{
"name": "Key",
"type": {
"optional_type": {
"item": {
"type_id": "UINT64"
}
}
}
},
{
"name": "Value",
"type": {
"optional_type": {
"item": {
"type_id": "STRING"
}
}
}
}
]
}
],
"exec_mode": "EXEC_MODE_EXECUTE",
"exec_stats": {
"query_phases": [
{
"duration_us": 1421,
"table_access": [
{
"name": "/Root/Test",
"reads": {
"rows": 15,
"bytes": 255
},
"partitions_count": 1
}
],
"cpu_time_us": 942,
"affected_shards": 1
}
],
"compilation": {
"duration_us": 7351,
"cpu_time_us": 6955
},
"process_cpu_time_us": 72,
"query_plan": "{}",
"total_duration_us": 9039,
"total_cpu_time_us": 7969
}
}
}
20 changes: 20 additions & 0 deletions ydb/core/viewer/json_handlers_query.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "json_handlers.h"
#include "query_execute_script.h"
#include "query_fetch_script.h"

namespace NKikimr::NViewer {

void InitQueryExecuteScriptJsonHandler(TJsonHandlers& handlers) {
handlers.AddHandler("/query/script/execute", new TJsonHandler<TQueryExecuteScript>(TQueryExecuteScript::GetSwagger()));
}

void InitQueryFetchScriptJsonHandler(TJsonHandlers& handlers) {
handlers.AddHandler("/query/script/fetch", new TJsonHandler<TQueryFetchScript>(TQueryFetchScript::GetSwagger()));
}

void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers) {
InitQueryExecuteScriptJsonHandler(jsonHandlers);
InitQueryFetchScriptJsonHandler(jsonHandlers);
}

} // namespace NKikimr::NViewer
96 changes: 96 additions & 0 deletions ydb/core/viewer/query_execute_script.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once
#include "json_local_rpc.h"
#include <ydb/core/grpc_services/rpc_calls.h>
#include <ydb/core/viewer/yaml/yaml.h>
#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h>
#include <ydb/public/api/grpc/ydb_operation_v1.grpc.pb.h>

namespace NKikimr {

namespace NRpcService {

template<>
void SetRequestSyncOperationMode<Ydb::Query::ExecuteScriptRequest>(Ydb::Query::ExecuteScriptRequest& request) {
request.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::ASYNC);
}

}

namespace NViewer {

using TQueryExecuteScriptRpc = TJsonLocalRpc<Ydb::Query::ExecuteScriptRequest,
Ydb::Operations::Operation,
Ydb::Operations::Operation,
Ydb::Query::V1::QueryService,
NKikimr::NGRpcService::TGrpcRequestNoOperationCall<Ydb::Query::ExecuteScriptRequest, Ydb::Operations::Operation>>;

class TQueryExecuteScript : public TQueryExecuteScriptRpc {
public:
using TBase = TQueryExecuteScriptRpc;

TQueryExecuteScript(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
: TBase(viewer, ev)
{
AllowedMethods = {HTTP_METHOD_POST};
}

static YAML::Node GetSwagger() {
YAML::Node node = YAML::Load(R"___(
post:
tags:
- script query
summary: Execute script
description: Execute script
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
database:
type: string
required: true
script_content:
type: object
properties:
text:
type: string
description: query text
required: true
syntax:
type: string
description: |
syntax:
* `SYNTAX_YQL_V1`
* `SYNTAX_PG`
required: false
exec_mode:
type: string
description: |
exec_mode:
* `EXEC_MODE_PARSE`
* `EXEC_MODE_VALIDATE`
* `EXEC_MODE_EXPLAIN`
* `EXEC_MODE_EXECUTE`
required: true
responses:
200:
description: OK
content:
application/json:
schema: {}
400:
description: Bad Request
403:
description: Forbidden
504:
description: Gateway Timeout
)___");
node["get"]["responses"]["200"]["content"]["application/json"]["schema"] = TProtoToYaml::ProtoToYamlSchema<Ydb::Operations::Operation>();
return node;
}
};

}
}
Loading

0 comments on commit 573ed04

Please sign in to comment.