diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index eb4b4a8deee2..17d67babb7d3 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -305,8 +305,7 @@ void SetRequestSyncOperationMode(TRequest&) { template NThreading::TFuture DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database, const TMaybe& token, const TMaybe& requestType, - TActorSystem* actorSystem, bool internalCall = false) -{ + TActorSystem* actorSystem, bool internalCall = false) { auto promise = NThreading::NewPromise(); SetRequestSyncOperationMode(proto); diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp index 3c573606a726..97eeb4ef7294 100644 --- a/ydb/core/grpc_services/query/rpc_execute_script.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -72,27 +73,29 @@ std::tuple FillKqpRequest( return {Ydb::StatusIds::SUCCESS, {}}; } -class TExecuteScriptRPC : public TActorBootstrapped { +class TExecuteScriptRPC : public TRpcRequestActor { public: + using TRpcRequestActorBase = TRpcRequestActor; + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::GRPC_REQ; } - TExecuteScriptRPC(TEvExecuteScriptRequest* request) - : Request_(request) + TExecuteScriptRPC(IRequestNoOpCtx* request) + : TRpcRequestActorBase(request) {} void Bootstrap() { NYql::TIssues issues; - const auto& request = *Request_->GetProtoRequest(); + const auto& request = GetProtoRequest(); - if (request.operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) { + if (request->operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) { issues.AddIssue("ExecuteScript must be asyncronous operation"); return Reply(Ydb::StatusIds::BAD_REQUEST, issues); } - AuditContextAppend(Request_.get(), request); - NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext()); + AuditContextAppend(Request.Get(), request); + NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *request, TlsActivationContext->AsActorContext()); Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS; if (auto scriptRequest = MakeScriptRequest(issues, status)) { @@ -113,7 +116,7 @@ class TExecuteScriptRPC : public TActorBootstrapped { ) void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) { - NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx); + NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *GetProtoRequest(), ev, ctx); Ydb::Operations::Operation operation; operation.set_id(ev->Get()->OperationId); @@ -126,14 +129,14 @@ class TExecuteScriptRPC : public TActorBootstrapped { } THolder MakeScriptRequest(NYql::TIssues& issues, Ydb::StatusIds::StatusCode& status) const { - const auto* req = Request_->GetProtoRequest(); - const auto traceId = Request_->GetTraceId(); + const auto* req = GetProtoRequest(); + const auto traceId = Request->GetTraceId(); auto ev = MakeHolder(); - SetAuthToken(ev, *Request_); - SetDatabase(ev, *Request_); - SetRlPath(ev, *Request_); + SetAuthToken(ev, *Request); + SetDatabase(ev, *Request); + SetRlPath(ev, *Request); if (traceId) { ev->Record.SetTraceId(traceId.GetRef()); @@ -166,12 +169,9 @@ class TExecuteScriptRPC : public TActorBootstrapped { result.set_status(status); - AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), result); - - TString serializedResult; - Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult); + AuditContextAppend(Request.Get(), GetProtoRequest(), result); - Request_->SendSerializedResult(std::move(serializedResult), status); + TProtoResponseHelper::SendProtoResponse(result, status, Request); PassAway(); } @@ -181,9 +181,6 @@ class TExecuteScriptRPC : public TActorBootstrapped { result.set_ready(true); Reply(status, std::move(result), issues); } - -private: - std::unique_ptr Request_; }; } // namespace @@ -197,6 +194,11 @@ void DoExecuteScript(std::unique_ptr p, const IFacilityProvider f.RegisterActor(new TExecuteScriptRPC(req)); } +} // namespace NQuery + +template<> +IActor* TEvExecuteScriptRequest::CreateRpcActor(IRequestNoOpCtx* msg) { + return new TExecuteScriptRPC(msg); } } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp index 5f729d990731..0ccc0094e520 100644 --- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp +++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp @@ -35,7 +35,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActorSendSerializedResult(std::move(serializedResult), status); + TProtoResponseHelper::SendProtoResponse(result, status, Request); PassAway(); } @@ -154,4 +151,9 @@ void DoFetchScriptResults(std::unique_ptr p, const IFacilityPro } +template<> +IActor* TEvFetchScriptResultsRequest::CreateRpcActor(IRequestNoOpCtx* msg) { + return new TFetchScriptResultsRPC(msg); +} + } // namespace NKikimr::NGRpcService diff --git a/ydb/core/viewer/json.json b/ydb/core/viewer/json.json new file mode 100644 index 000000000000..31720eb29b79 --- /dev/null +++ b/ydb/core/viewer/json.json @@ -0,0 +1,64 @@ +{ + "metadata": { + "execution_id": "4578ea89-93aa2e63-c0ef36df-b5d4c3c8", + "exec_status": "EXEC_STATUS_COMPLETED", + "script_content": { + "syntax": "SYNTAX_YQL_V1", + "text": "SELECT * FROM `/Root/Test`;" + }, + "result_sets_meta": [ + { + "columns": [ + { + "name": "Key", + "type": { + "optional_type": { + "item": { + "type_id": "UINT64" + } + } + } + }, + { + "name": "Value", + "type": { + "optional_type": { + "item": { + "type_id": "STRING" + } + } + } + } + ] + } + ], + "exec_mode": "EXEC_MODE_EXECUTE", + "exec_stats": { + "query_phases": [ + { + "duration_us": 1421, + "table_access": [ + { + "name": "/Root/Test", + "reads": { + "rows": 15, + "bytes": 255 + }, + "partitions_count": 1 + } + ], + "cpu_time_us": 942, + "affected_shards": 1 + } + ], + "compilation": { + "duration_us": 7351, + "cpu_time_us": 6955 + }, + "process_cpu_time_us": 72, + "query_plan": "{}", + "total_duration_us": 9039, + "total_cpu_time_us": 7969 + } + } +} \ No newline at end of file diff --git a/ydb/core/viewer/json_handlers_query.cpp b/ydb/core/viewer/json_handlers_query.cpp new file mode 100644 index 000000000000..a79c05b54c1f --- /dev/null +++ b/ydb/core/viewer/json_handlers_query.cpp @@ -0,0 +1,20 @@ +#include "json_handlers.h" +#include "query_execute_script.h" +#include "query_fetch_script.h" + +namespace NKikimr::NViewer { + +void InitQueryExecuteScriptJsonHandler(TJsonHandlers& handlers) { + handlers.AddHandler("/query/script/execute", new TJsonHandler(TQueryExecuteScript::GetSwagger())); +} + +void InitQueryFetchScriptJsonHandler(TJsonHandlers& handlers) { + handlers.AddHandler("/query/script/fetch", new TJsonHandler(TQueryFetchScript::GetSwagger())); +} + +void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers) { + InitQueryExecuteScriptJsonHandler(jsonHandlers); + InitQueryFetchScriptJsonHandler(jsonHandlers); +} + +} // namespace NKikimr::NViewer diff --git a/ydb/core/viewer/json_local_rpc.h b/ydb/core/viewer/json_local_rpc.h index 9ea337fcefe8..42d40fb37c91 100644 --- a/ydb/core/viewer/json_local_rpc.h +++ b/ydb/core/viewer/json_local_rpc.h @@ -33,6 +33,7 @@ class TJsonLocalRpc : public TViewerPipeClient { std::vector AllowedMethods = {}; TAutoPtr> Result; NThreading::TFuture RpcFuture; + Ydb::Operations::OperationParams::OperationMode OperationMode = Ydb::Operations::OperationParams::SYNC; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -56,6 +57,7 @@ class TJsonLocalRpc : public TViewerPipeClient { TString name; name = field->name(); TString value = params.Get(name); + Cerr << "jjjjjjj name " << name << Endl; if (!value.empty()) { FieldDescriptor::CppType type = field->cpp_type(); switch (type) { @@ -119,6 +121,7 @@ class TJsonLocalRpc : public TViewerPipeClient { bool Params2Proto(TProtoRequest& request) { auto postData = Event->Get()->Request.GetPostContent(); if (!postData.empty()) { + Cerr << "jjjjjjjjj GET" << Endl; try { NProtobufJson::Json2Proto(postData, request); } @@ -127,6 +130,7 @@ class TJsonLocalRpc : public TViewerPipeClient { return false; } } else { + Cerr << "jjjjjjjjj POST" << Endl; const auto& params(Event->Get()->Request.GetParams()); Params2Proto(params, request); } @@ -139,6 +143,7 @@ class TJsonLocalRpc : public TViewerPipeClient { void SendGrpcRequest(TProtoRequest&& request) { // TODO(xenoxeno): pass trace id RpcFuture = NRpcService::DoLocalRpc(std::move(request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem()); + RpcFuture.Subscribe([actorId = TBase::SelfId(), actorSystem = TlsActivationContext->ActorSystem()] (const NThreading::TFuture& future) { auto& response = future.GetValueSync(); @@ -168,6 +173,7 @@ class TJsonLocalRpc : public TViewerPipeClient { return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Method is not allowed")); } if (Database.empty()) { + Cerr << "!!!! field 'database' is required" << Endl; return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "field 'database' is required")); } if (TBase::NeedToRedirect()) { diff --git a/ydb/core/viewer/json_pipe_req.cpp b/ydb/core/viewer/json_pipe_req.cpp index ecd896f8494c..0d35fc25722c 100644 --- a/ydb/core/viewer/json_pipe_req.cpp +++ b/ydb/core/viewer/json_pipe_req.cpp @@ -25,7 +25,7 @@ TViewerPipeClient::TViewerPipeClient(IViewer* viewer, NMon::TEvHttpInfo::TPtr& e : Viewer(viewer) , Event(ev) { - InitConfig(Event->Get()->Request.GetParams()); + InitConfig(); NWilson::TTraceId traceId; TStringBuf traceparent = Event->Get()->Request.GetHeader("traceparent"); if (traceparent) { @@ -589,7 +589,8 @@ std::vector TViewerPipeClient::GetNodesFromBoardReply(TEvStateStorage:: return GetNodesFromBoardReply(*ev->Get()); } -void TViewerPipeClient::InitConfig(const TCgiParameters& params) { +void TViewerPipeClient::InitConfig() { + const TCgiParameters& params = Event->Get()->Request.GetParams(); Followers = FromStringWithDefault(params.Get("followers"), Followers); Metrics = FromStringWithDefault(params.Get("metrics"), Metrics); WithRetry = FromStringWithDefault(params.Get("with_retry"), WithRetry); @@ -598,6 +599,18 @@ void TViewerPipeClient::InitConfig(const TCgiParameters& params) { if (!Database) { Database = params.Get("tenant"); } + if (!Database) { + auto postData = Event->Get()->Request.GetPostContent(); + Cerr << "jjjjjj postData: " << postData << Endl; + static NJson::TJsonReaderConfig JsonConfig; + NJson::TJsonValue requestData; + bool success = NJson::ReadJsonTree(postData, &JsonConfig, &requestData); + Cerr << "jjjjjj success: " << success << Endl; + if (success) { + Database = requestData["database"].GetStringRobust(); + Cerr << "jjjjjj Database from post data: " << Database << Endl; + } + } Direct = FromStringWithDefault(params.Get("direct"), Direct); JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), true); JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), true); diff --git a/ydb/core/viewer/json_pipe_req.h b/ydb/core/viewer/json_pipe_req.h index e5f7665f74f2..8e1192a6c8b0 100644 --- a/ydb/core/viewer/json_pipe_req.h +++ b/ydb/core/viewer/json_pipe_req.h @@ -268,7 +268,7 @@ class TViewerPipeClient : public TActorBootstrapped { TRequestResponse MakeRequestStateStorageEndpointsLookup(const TString& path, ui64 cookie = 0); std::vector GetNodesFromBoardReply(TEvStateStorage::TEvBoardInfo::TPtr& ev); std::vector GetNodesFromBoardReply(const TEvStateStorage::TEvBoardInfo& ev); - void InitConfig(const TCgiParameters& params); + void InitConfig(); void InitConfig(const TRequestSettings& settings); void ClosePipes(); ui32 FailPipeConnect(TTabletId tabletId); diff --git a/ydb/core/viewer/json_storage_base.h b/ydb/core/viewer/json_storage_base.h index 1f8616ce6bbc..01c7e635cb20 100644 --- a/ydb/core/viewer/json_storage_base.h +++ b/ydb/core/viewer/json_storage_base.h @@ -38,9 +38,7 @@ class TJsonStorageBase : public TViewerPipeClient { using TThis = TJsonStorageBase; using TNodeId = ui32; - IViewer* Viewer; TActorId Initiator; - NMon::TEvHttpInfo::TPtr Event; THolder NodesInfo; TMap VDiskInfo; TMap PDiskInfo; @@ -112,14 +110,13 @@ class TJsonStorageBase : public TViewerPipeClient { THashMap GroupRowsByGroupId; TJsonStorageBase(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) - : Viewer(viewer) - , Initiator(ev->Sender) - , Event(std::move(ev)) + : TBase(viewer, ev) + , Initiator(Event->Sender) { const auto& params(Event->Get()->Request.GetParams()); JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), true); JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); - InitConfig(params); + InitConfig(); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); FilterTenant = params.Get("tenant"); TString filterStoragePool = params.Get("pool"); diff --git a/ydb/core/viewer/json_vdisk_req.h b/ydb/core/viewer/json_vdisk_req.h index 71ccc636ca7f..0fa40f7a3339 100644 --- a/ydb/core/viewer/json_vdisk_req.h +++ b/ydb/core/viewer/json_vdisk_req.h @@ -37,9 +37,7 @@ class TJsonVDiskRequest : public TViewerPipeClient { using TThis = TJsonVDiskRequest; using TBase = TViewerPipeClient; using THelper = TJsonVDiskRequestHelper; - IViewer* Viewer; TActorId Initiator; - NMon::TEvHttpInfo::TPtr Event; TJsonSettings JsonSettings; bool AllEnums = false; ui32 Timeout = 0; @@ -57,9 +55,8 @@ class TJsonVDiskRequest : public TViewerPipeClient { public: TJsonVDiskRequest(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) - : Viewer(viewer) + : TBase(viewer, ev) , Initiator(ev->Sender) - , Event(ev) {} void Bootstrap() override { @@ -81,7 +78,7 @@ class TJsonVDiskRequest : public TViewerPipeClient { if (!NodeId) { NodeId = TlsActivationContext->ActorSystem()->NodeId; } - TBase::InitConfig(params); + TBase::InitConfig(); JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), false); diff --git a/ydb/core/viewer/pdisk_info.h b/ydb/core/viewer/pdisk_info.h index 78b105f6b891..aa36f242ece6 100644 --- a/ydb/core/viewer/pdisk_info.h +++ b/ydb/core/viewer/pdisk_info.h @@ -56,7 +56,7 @@ class TPDiskInfo : public TViewerPipeClient { if (!NodeId) { NodeId = TlsActivationContext->ActorSystem()->NodeId; } - TBase::InitConfig(params); + TBase::InitConfig(); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); Retries = FromStringWithDefault(params.Get("retries"), 3); diff --git a/ydb/core/viewer/pdisk_restart.h b/ydb/core/viewer/pdisk_restart.h index 440a4429a082..e318c0dc2c57 100644 --- a/ydb/core/viewer/pdisk_restart.h +++ b/ydb/core/viewer/pdisk_restart.h @@ -24,8 +24,6 @@ class TJsonPDiskRestart : public TViewerPipeClient { protected: using TThis = TJsonPDiskRestart; using TBase = TViewerPipeClient; - IViewer* Viewer; - NMon::TEvHttpInfo::TPtr Event; ui32 Timeout = 0; ui32 ActualRetries = 0; ui32 Retries = 0; @@ -39,8 +37,7 @@ class TJsonPDiskRestart : public TViewerPipeClient { public: TJsonPDiskRestart(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) - : Viewer(viewer) - , Event(ev) + : TBase(viewer, ev) {} void Bootstrap() override { @@ -69,7 +66,7 @@ class TJsonPDiskRestart : public TViewerPipeClient { if (!NodeId) { NodeId = TlsActivationContext->ActorSystem()->NodeId; } - TBase::InitConfig(params); + TBase::InitConfig(); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); Retries = FromStringWithDefault(params.Get("retries"), 0); diff --git a/ydb/core/viewer/pdisk_status.h b/ydb/core/viewer/pdisk_status.h index a8330716ef46..fb7e1b3820de 100644 --- a/ydb/core/viewer/pdisk_status.h +++ b/ydb/core/viewer/pdisk_status.h @@ -13,8 +13,6 @@ class TPDiskStatus : public TViewerPipeClient { protected: using TThis = TPDiskStatus; using TBase = TViewerPipeClient; - IViewer* Viewer; - NMon::TEvHttpInfo::TPtr Event; ui32 Timeout = 0; std::unique_ptr Response; @@ -24,8 +22,7 @@ class TPDiskStatus : public TViewerPipeClient { public: TPDiskStatus(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) - : Viewer(viewer) - , Event(ev) + : TBase(viewer, ev) {} void Bootstrap() override { @@ -89,7 +86,7 @@ class TPDiskStatus : public TViewerPipeClient { return PassAway(); } - TBase::InitConfig(params); + TBase::InitConfig(); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); diff --git a/ydb/core/viewer/query_execute_script.h b/ydb/core/viewer/query_execute_script.h new file mode 100644 index 000000000000..96f0fb09907a --- /dev/null +++ b/ydb/core/viewer/query_execute_script.h @@ -0,0 +1,96 @@ +#pragma once +#include "json_local_rpc.h" +#include +#include +#include +#include + +namespace NKikimr { + +namespace NRpcService { + +template<> +void SetRequestSyncOperationMode(Ydb::Query::ExecuteScriptRequest& request) { + request.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::ASYNC); +} + +} + +namespace NViewer { + +using TQueryExecuteScriptRpc = TJsonLocalRpc>; + +class TQueryExecuteScript : public TQueryExecuteScriptRpc { +public: + using TBase = TQueryExecuteScriptRpc; + + TQueryExecuteScript(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) + : TBase(viewer, ev) + { + AllowedMethods = {HTTP_METHOD_POST}; + } + + static YAML::Node GetSwagger() { + YAML::Node node = YAML::Load(R"___( + post: + tags: + - script query + summary: Execute script + description: Execute script + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + database: + type: string + required: true + script_content: + type: object + properties: + text: + type: string + description: query text + required: true + syntax: + type: string + description: | + syntax: + * `SYNTAX_YQL_V1` + * `SYNTAX_PG` + required: false + exec_mode: + type: string + description: | + exec_mode: + * `EXEC_MODE_PARSE` + * `EXEC_MODE_VALIDATE` + * `EXEC_MODE_EXPLAIN` + * `EXEC_MODE_EXECUTE` + required: true + responses: + 200: + description: OK + content: + application/json: + schema: {} + 400: + description: Bad Request + 403: + description: Forbidden + 504: + description: Gateway Timeout + )___"); + node["get"]["responses"]["200"]["content"]["application/json"]["schema"] = TProtoToYaml::ProtoToYamlSchema(); + return node; + } +}; + +} +} diff --git a/ydb/core/viewer/query_fetch_script.h b/ydb/core/viewer/query_fetch_script.h new file mode 100644 index 000000000000..6a64d230b00f --- /dev/null +++ b/ydb/core/viewer/query_fetch_script.h @@ -0,0 +1,76 @@ +#pragma once +#include "json_local_rpc.h" +#include +#include +#include + +namespace NKikimr::NViewer { + +using TQueryFetchScriptRpc = TJsonLocalRpc>; + +class TQueryFetchScript : public TQueryFetchScriptRpc { +public: + using TBase = TQueryFetchScriptRpc; + + TQueryFetchScript(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) + : TBase(viewer, ev) + { + AllowedMethods = {HTTP_METHOD_GET}; + } + + static YAML::Node GetSwagger() { + YAML::Node node = YAML::Load(R"___( + get: + tags: + - script query + summary: Get operation + description: Check status for a given operation + parameters: + - name: database + in: query + description: database name + required: true + type: string + - name: operation_id + in: query + description: operation id + required: true + type: string + - name: result_set_index + in: query + description: result set index + required: false + type: string + - name: fetch_token + in: query + description: fetch token + required: false + type: string + - name: rows_limit + in: query + description: rows limit (less than 1000 allowed) + required: false + type: string + responses: + 200: + description: OK + content: + application/json: + schema: {} + 400: + description: Bad Request + 403: + description: Forbidden + 504: + description: Gateway Timeout + )___"); + node["get"]["responses"]["200"]["content"]["application/json"]["schema"] = TProtoToYaml::ProtoToYamlSchema(); + return node; + } +}; + +} diff --git a/ydb/core/viewer/vdisk_evict.h b/ydb/core/viewer/vdisk_evict.h index 624762654ad1..7c563cb0910b 100644 --- a/ydb/core/viewer/vdisk_evict.h +++ b/ydb/core/viewer/vdisk_evict.h @@ -24,8 +24,6 @@ class TJsonVDiskEvict : public TViewerPipeClient { protected: using TThis = TJsonVDiskEvict; using TBase = TViewerPipeClient; - IViewer* Viewer; - NMon::TEvHttpInfo::TPtr Event; ui32 Timeout = 0; ui32 ActualRetries = 0; ui32 Retries = 0; @@ -42,8 +40,7 @@ class TJsonVDiskEvict : public TViewerPipeClient { public: TJsonVDiskEvict(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) - : Viewer(viewer) - , Event(ev) + : TBase(viewer, ev) {} inline ui32 GetRequiredParam(const TCgiParameters& params, const std::string& name, ui32& obj) { @@ -90,7 +87,7 @@ class TJsonVDiskEvict : public TViewerPipeClient { 0, NMon::IEvHttpInfoRes::EContentType::Custom)); return PassAway(); } - TBase::InitConfig(params); + TBase::InitConfig(); Force = FromStringWithDefault(params.Get("force"), false); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); diff --git a/ydb/core/viewer/viewer.cpp b/ydb/core/viewer/viewer.cpp index 51f382b2c430..b32cc5c0eaad 100644 --- a/ydb/core/viewer/viewer.cpp +++ b/ydb/core/viewer/viewer.cpp @@ -32,6 +32,7 @@ extern void InitViewerBrowseJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitPDiskJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitVDiskJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitOperationJsonHandlers(TJsonHandlers& jsonHandlers); +extern void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitSchemeJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitStorageJsonHandlers(TJsonHandlers& jsonHandlers); @@ -119,6 +120,13 @@ class TViewer : public TActorBootstrapped, public IViewer { .UseAuth = true, .AllowedSIDs = monitoringAllowedSIDs, }); + mon->RegisterActorPage({ + .RelPath = "query", + .ActorSystem = ctx.ExecutorThread.ActorSystem, + .ActorId = ctx.SelfID, + .UseAuth = true, + .AllowedSIDs = monitoringAllowedSIDs, + }); mon->RegisterActorPage({ .RelPath = "scheme", .ActorSystem = ctx.ExecutorThread.ActorSystem, @@ -144,6 +152,7 @@ class TViewer : public TActorBootstrapped, public IViewer { InitVDiskJsonHandlers(JsonHandlers); InitStorageJsonHandlers(JsonHandlers); InitOperationJsonHandlers(JsonHandlers); + InitQueryJsonHandlers(JsonHandlers); InitSchemeJsonHandlers(JsonHandlers); InitViewerBrowseJsonHandlers(JsonHandlers); diff --git a/ydb/core/viewer/viewer_autocomplete.h b/ydb/core/viewer/viewer_autocomplete.h index 4d7c8a63295a..93967c9c5448 100644 --- a/ydb/core/viewer/viewer_autocomplete.h +++ b/ydb/core/viewer/viewer_autocomplete.h @@ -50,7 +50,7 @@ class TJsonAutocomplete : public TViewerPipeClient { : TBase(viewer, ev) { const auto& params(Event->Get()->Request.GetParams()); - InitConfig(params); + InitConfig(); ParseCgiParameters(params); if (IsPostContent()) { TStringBuf content = Event->Get()->Request.GetPostContent(); diff --git a/ydb/core/viewer/viewer_bscontrollerinfo.h b/ydb/core/viewer/viewer_bscontrollerinfo.h index 95b7c51be01a..48a9162b6ac3 100644 --- a/ydb/core/viewer/viewer_bscontrollerinfo.h +++ b/ydb/core/viewer/viewer_bscontrollerinfo.h @@ -25,7 +25,7 @@ class TJsonBSControllerInfo : public TViewerPipeClient { JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), false); JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); - InitConfig(params); + InitConfig(); RequestBSControllerInfo(); Become(&TThis::StateRequestedInfo, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup()); } diff --git a/ydb/core/viewer/viewer_cluster.h b/ydb/core/viewer/viewer_cluster.h index 681041d1d8be..28d82fb11baf 100644 --- a/ydb/core/viewer/viewer_cluster.h +++ b/ydb/core/viewer/viewer_cluster.h @@ -98,7 +98,7 @@ class TJsonCluster : public TViewerPipeClient { const auto& params(Event->Get()->Request.GetParams()); JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), true); JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); - InitConfig(params); + InitConfig(); Tablets = FromStringWithDefault(params.Get("tablets"), false); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); OffloadMerge = FromStringWithDefault(params.Get("offload_merge"), OffloadMerge); diff --git a/ydb/core/viewer/viewer_compute.h b/ydb/core/viewer/viewer_compute.h index 19ff4a470cbe..198b1681d158 100644 --- a/ydb/core/viewer/viewer_compute.h +++ b/ydb/core/viewer/viewer_compute.h @@ -14,7 +14,6 @@ using namespace NActors; class TJsonCompute : public TViewerPipeClient { using TThis = TJsonCompute; using TBase = TViewerPipeClient; - IViewer* Viewer; THashMap TenantByPath; THashMap TenantBySubDomainKey; THashMap HiveBySubDomainKey; @@ -25,7 +24,6 @@ class TJsonCompute : public TViewerPipeClient { THashMap> TabletInfoIndex; THashMap HiveNodeStatsIndex; THashMap TenantPathByNodeId; - NMon::TEvHttpInfo::TPtr Event; TVector NodeIds; THashSet PassedNodeIds; THashSet FoundNodeIds; @@ -70,8 +68,7 @@ class TJsonCompute : public TViewerPipeClient { public: TJsonCompute(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) - : Viewer(viewer) - , Event(ev) + : TBase(viewer, ev) {} TString GetDomainId(TPathId pathId) { @@ -98,7 +95,7 @@ class TJsonCompute : public TViewerPipeClient { const auto& params(Event->Get()->Request.GetParams()); JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), true); JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); - InitConfig(params); + InitConfig(); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); Tablets = FromStringWithDefault(params.Get("tablets"), Tablets); Path = params.Get("path"); diff --git a/ydb/core/viewer/viewer_describe.h b/ydb/core/viewer/viewer_describe.h index f42213dbcbfa..eeeee5702fe8 100644 --- a/ydb/core/viewer/viewer_describe.h +++ b/ydb/core/viewer/viewer_describe.h @@ -53,7 +53,7 @@ class TJsonDescribe : public TViewerPipeClient { JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); ExpandSubElements = FromStringWithDefault(params.Get("subs"), ExpandSubElements); - InitConfig(params); + InitConfig(); if (params.Has("schemeshard_id")) { THolder request = MakeHolder(); diff --git a/ydb/core/viewer/viewer_hiveinfo.h b/ydb/core/viewer/viewer_hiveinfo.h index 197024042701..24bd19cb0d63 100644 --- a/ydb/core/viewer/viewer_hiveinfo.h +++ b/ydb/core/viewer/viewer_hiveinfo.h @@ -28,7 +28,7 @@ class TJsonHiveInfo : public TViewerPipeClient { JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); NodeId = FromStringWithDefault(params.Get("node"), 0); - InitConfig(params); + InitConfig(); if (hiveId != 0 ) { TAutoPtr request = new TEvHive::TEvRequestHiveInfo(); if (params.Has("tablet_id")) { diff --git a/ydb/core/viewer/viewer_hivestats.h b/ydb/core/viewer/viewer_hivestats.h index 80cee5c512a1..04396e6c8254 100644 --- a/ydb/core/viewer/viewer_hivestats.h +++ b/ydb/core/viewer/viewer_hivestats.h @@ -9,16 +9,13 @@ using namespace NActors; class TJsonHiveStats : public TViewerPipeClient { using TThis = TJsonHiveStats; using TBase = TViewerPipeClient; - IViewer* Viewer; - NMon::TEvHttpInfo::TPtr Event; TAutoPtr HiveStats; TJsonSettings JsonSettings; ui32 Timeout = 0; public: TJsonHiveStats(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev) - : Viewer(viewer) - , Event(ev) + : TBase(viewer, ev) {} void Bootstrap() override { @@ -27,7 +24,7 @@ class TJsonHiveStats : public TViewerPipeClient { JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), true); JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); - InitConfig(params); + InitConfig(); if (hiveId != 0 ) { THolder request = MakeHolder(); request->Record.SetReturnFollowers(FromStringWithDefault(params.Get("followers"), false)); diff --git a/ydb/core/viewer/viewer_hotkeys.h b/ydb/core/viewer/viewer_hotkeys.h index db16ce1fc014..43dfc2fcfc41 100644 --- a/ydb/core/viewer/viewer_hotkeys.h +++ b/ydb/core/viewer/viewer_hotkeys.h @@ -45,7 +45,7 @@ class TJsonHotkeys : public TViewerPipeClient { Limit = FromStringWithDefault(params.Get("limit"), 10); PollingFactor = std::max(0.0f, std::min(FromStringWithDefault(params.Get("polling_factor"), 0.2), 1.0f)); EnableSampling = FromStringWithDefault(params.Get("enable_sampling"), false); - InitConfig(params); + InitConfig(); THolder request = MakeHolder(); FillParams(request->Record.MutableDescribePath(), params); diff --git a/ydb/core/viewer/viewer_netinfo.h b/ydb/core/viewer/viewer_netinfo.h index edddfa3010b8..fe1223a1d255 100644 --- a/ydb/core/viewer/viewer_netinfo.h +++ b/ydb/core/viewer/viewer_netinfo.h @@ -12,12 +12,10 @@ using namespace NActors; class TJsonNetInfo : public TViewerPipeClient { using TThis = TJsonNetInfo; using TBase = TViewerPipeClient; - IViewer* Viewer; std::unordered_map TenantByPath; std::unordered_map TenantBySubDomainKey; std::unordered_map> NavigateResult; std::unique_ptr HiveStats; - NMon::TEvHttpInfo::TPtr Event; std::vector NodeIds; std::unordered_map> NodeSysInfo; std::unordered_map> NodeNetInfo; @@ -29,15 +27,14 @@ class TJsonNetInfo : public TViewerPipeClient { public: TJsonNetInfo(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) - : Viewer(viewer) - , Event(ev) + : TBase(viewer, ev) {} void Bootstrap() override { const auto& params(Event->Get()->Request.GetParams()); JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), true); JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); - InitConfig(params); + InitConfig(); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); Path = params.Get("path"); diff --git a/ydb/core/viewer/viewer_nodes.h b/ydb/core/viewer/viewer_nodes.h index 48d608f479bb..8dc27de56cc8 100644 --- a/ydb/core/viewer/viewer_nodes.h +++ b/ydb/core/viewer/viewer_nodes.h @@ -608,7 +608,7 @@ class TJsonNodes : public TViewerPipeClient { const auto& params(Event->Get()->Request.GetParams()); JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), true); JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); - InitConfig(params); + InitConfig(); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); FieldsRequired.set(+ENodeFields::NodeId); UptimeSeconds = FromStringWithDefault(params.Get("uptime"), 0); diff --git a/ydb/core/viewer/viewer_query_old.h b/ydb/core/viewer/viewer_query_old.h index 9b130e9be793..80d75d533d2d 100644 --- a/ydb/core/viewer/viewer_query_old.h +++ b/ydb/core/viewer/viewer_query_old.h @@ -94,11 +94,10 @@ class TJsonQueryOld : public TViewerPipeClient { } TJsonQueryOld(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) - : Viewer(viewer) - , Event(ev) + : TBase(viewer, ev) { const auto& params(Event->Get()->Request.GetParams()); - InitConfig(params); + InitConfig(); ParseCgiParameters(params); if (IsPostContent()) { TStringBuf content = Event->Get()->Request.GetPostContent(); @@ -113,7 +112,7 @@ class TJsonQueryOld : public TViewerPipeClient { auto& request = ViewerRequest->Get()->Record.GetQueryRequest(); TCgiParameters params(request.GetUri()); - InitConfig(params); + InitConfig(); ParseCgiParameters(params); TStringBuf content = request.GetContent(); diff --git a/ydb/core/viewer/viewer_render.h b/ydb/core/viewer/viewer_render.h index 44aea71caff0..ae90873607d8 100644 --- a/ydb/core/viewer/viewer_render.h +++ b/ydb/core/viewer/viewer_render.h @@ -25,7 +25,7 @@ class TJsonRender : public TViewerPipeClient { { const auto& params(Event->Get()->Request.GetParams()); - InitConfig(params); + InitConfig(); Timeout = FromStringWithDefault(params.Get("timeout"), 30000); } @@ -35,7 +35,7 @@ class TJsonRender : public TViewerPipeClient { auto& request = ViewerRequest->Get()->Record.GetRenderRequest(); TCgiParameters params(request.GetUri()); - InitConfig(params); + InitConfig(); Direct = true; Timeout = ViewerRequest->Get()->Record.GetTimeout(); } diff --git a/ydb/core/viewer/viewer_tenants.h b/ydb/core/viewer/viewer_tenants.h index d0c2baa1c637..bd5086abef46 100644 --- a/ydb/core/viewer/viewer_tenants.h +++ b/ydb/core/viewer/viewer_tenants.h @@ -10,9 +10,7 @@ using namespace NActors; class TJsonTenants : public TViewerPipeClient { using TThis = TJsonTenants; using TBase = TViewerPipeClient; - IViewer* Viewer; NKikimrViewer::TTenants Result; - NMon::TEvHttpInfo::TPtr Event; TJsonSettings JsonSettings; ui32 Timeout = 0; bool State = true; @@ -20,15 +18,14 @@ class TJsonTenants : public TViewerPipeClient { public: TJsonTenants(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev) - : Viewer(viewer) - , Event(ev) + : TBase(viewer, ev) {} void Bootstrap() override { const auto& params(Event->Get()->Request.GetParams()); JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), true); JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); - InitConfig(params); + InitConfig(); Timeout = FromStringWithDefault(params.Get("timeout"), 10000); State = FromStringWithDefault(params.Get("state"), true); TIntrusivePtr domains = AppData()->DomainsInfo; diff --git a/ydb/core/viewer/viewer_ut.cpp b/ydb/core/viewer/viewer_ut.cpp index 696ad9918465..da4f1849929d 100644 --- a/ydb/core/viewer/viewer_ut.cpp +++ b/ydb/core/viewer/viewer_ut.cpp @@ -26,6 +26,7 @@ #include #include +#include using namespace NKikimr; using namespace NViewer; @@ -1831,4 +1832,115 @@ Y_UNIT_TEST_SUITE(Viewer) { auto resultSets = json["Databases"].GetArray(); UNIT_ASSERT_EQUAL_C(1, resultSets.size(), response); } + + static const ui32 ROWS_N = 15; + static const ui32 ROWS_LIMIT = 5; + + TString PostExecuteScript(TKeepAliveHttpClient& httpClient, TString query) { + TStringStream requestBody; + requestBody + << "{ \"database\": \"/Root\"," + << " \"script_content\": {" + << " \"text\": \"" << query << "\"}," + << " \"exec_mode\": \"EXEC_MODE_EXECUTE\" }"; + TStringStream responseStream; + TKeepAliveHttpClient::THeaders headers; + headers["Content-Type"] = "application/json"; + headers["Authorization"] = "test_ydb_token"; + const TKeepAliveHttpClient::THttpCode statusCode = httpClient.DoPost("/query/script/execute?timeout=600000", requestBody.Str(), &responseStream, headers); + const TString response = responseStream.ReadAll(); + UNIT_ASSERT_EQUAL_C(statusCode, HTTP_OK, statusCode << ": " << response); + return response; + } + + TString GetOperation(TKeepAliveHttpClient& httpClient, TString id) { + TStringStream requestBody; + TStringStream responseStream; + TKeepAliveHttpClient::THeaders headers; + headers["Content-Type"] = "application/json"; + headers["Authorization"] = "test_ydb_token"; + id = std::regex_replace(id.c_str(), std::regex("/"), "%2F"); + const TKeepAliveHttpClient::THttpCode statusCode = httpClient.DoGet(TStringBuilder() + << "/operation/get?timeout=600000&id=" << id + << "&database=%2FRoot", &responseStream, headers); + const TString response = responseStream.ReadAll(); + UNIT_ASSERT_EQUAL_C(statusCode, HTTP_OK, statusCode << ": " << response); + return response; + } + + TString GetFetchScript(TKeepAliveHttpClient& httpClient, TString id) { + TStringStream requestBody; + TStringStream responseStream; + TKeepAliveHttpClient::THeaders headers; + headers["Content-Type"] = "application/json"; + headers["Authorization"] = "test_ydb_token"; + id = std::regex_replace(id.c_str(), std::regex("/"), "%2F"); + const TKeepAliveHttpClient::THttpCode statusCode = httpClient.DoGet(TStringBuilder() + << "/query/script/fetch?timeout=600000&operation_id=" << id + << "&database=%2FRoot" + << "&rows_limit=" << ROWS_LIMIT, &responseStream, headers); + const TString response = responseStream.ReadAll(); + UNIT_ASSERT_EQUAL_C(statusCode, HTTP_OK, statusCode << ": " << response); + return response; + } + + Y_UNIT_TEST(QueryExecuteScript) { + TPortManager tp; + ui16 port = tp.GetPort(2134); + ui16 grpcPort = tp.GetPort(2135); + ui16 monPort = tp.GetPort(8765); + auto settings = TServerSettings(port); + settings.InitKikimrRunConfig() + .SetNodeCount(1) + .SetUseRealThreads(true) + .SetDomainName("Root") + .SetUseSectorMap(true) + .SetMonitoringPortOffset(monPort, true); + + TServer server(settings); + server.EnableGRpc(grpcPort); + TClient client(settings); + client.InitRootScheme(); + + TTestActorRuntime& runtime = *server.GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE); + + TKeepAliveHttpClient httpClient("localhost", monPort); + + Cerr << "iiiiiii 0-1 " << Endl; + PostQuery(httpClient, "CREATE TABLE `/Root/Test` (Key Uint64, Value String, PRIMARY KEY (Key));", "execute-query"); + Cerr << "iiiiiii 0-2 " << Endl; + for (ui32 i = 1; i <= ROWS_N; ++i) { + PostQuery(httpClient, TStringBuilder() << "INSERT INTO `/Root/Test` (Key, Value) VALUES (" << i << ", 'testvalue');", "execute-query"); + } + Cerr << "iiiiiii 0-3 " << Endl; + + NJson::TJsonReaderConfig jsonCfg; + NJson::TJsonValue json; + + Cerr << "iiiiiii 1 " << Endl; + TString response = PostExecuteScript(httpClient, "SELECT * FROM `/Root/Test`;"); + Cerr << "iiiiiii 1 response: " << response << Endl; + NJson::ReadJsonTree(response, &jsonCfg, &json, /* throwOnError = */ true); + UNIT_ASSERT_EQUAL_C(json["status"].GetString(), "SUCCESS", response); + TString id = json["id"].GetString(); + + Sleep(TDuration::MilliSeconds(1000)); + + Cerr << "iiiiiii 2 " << Endl; + response = GetOperation(httpClient, id); + Cerr << "iiiiiii 2 response: " << response << Endl; + NJson::ReadJsonTree(response, &jsonCfg, &json, /* throwOnError = */ true); + UNIT_ASSERT_EQUAL_C(json["issues"].GetArray().size(), 0, response); + + Cerr << "iiiiiii 3 " << Endl; + response = GetFetchScript(httpClient, id); + Cerr << "iiiiiii 3 response: " << response << Endl; + NJson::ReadJsonTree(response, &jsonCfg, &json, /* throwOnError = */ true); + UNIT_ASSERT_EQUAL_C(json["status"].GetString(), "SUCCESS", response); + auto rows = json["result_set"].GetMap().at("rows").GetArray(); + UNIT_ASSERT_EQUAL_C(rows.size(), ROWS_LIMIT, response); + Cerr << "iiiiiii 4 " << Endl; + } + } diff --git a/ydb/core/viewer/ya.make b/ydb/core/viewer/ya.make index 69056b3bfbd7..5f65fb6c7d4c 100644 --- a/ydb/core/viewer/ya.make +++ b/ydb/core/viewer/ya.make @@ -24,6 +24,7 @@ SRCS( json_handlers_pdisk.cpp json_handlers_scheme.cpp json_handlers_storage.cpp + json_handlers_query.cpp json_handlers_vdisk.cpp json_handlers_viewer.cpp json_handlers_pq.cpp @@ -43,6 +44,8 @@ SRCS( pdisk_restart.h pdisk_status.h query_autocomplete_helper.h + query_execute_script.h + query_fetch_script.h scheme_directory.h storage_groups.h vdisk_blobindexstat.h