Skip to content

Commit

Permalink
Merge a53fceb into 8ebf595
Browse files Browse the repository at this point in the history
  • Loading branch information
StekPerepolnen authored Sep 26, 2024
2 parents 8ebf595 + a53fceb commit 038064e
Show file tree
Hide file tree
Showing 32 changed files with 465 additions and 91 deletions.
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,7 @@ void SetRequestSyncOperationMode(TRequest&) {
template<typename TRpc>
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database,
const TMaybe<TString>& token, const TMaybe<TString>& requestType,
TActorSystem* actorSystem, bool internalCall = false)
{
TActorSystem* actorSystem, bool internalCall = false) {
auto promise = NThreading::NewPromise<typename TRpc::TResponse>();

SetRequestSyncOperationMode(proto);
Expand Down
44 changes: 23 additions & 21 deletions ydb/core/grpc_services/query/rpc_execute_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/rpc_request_base.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/grpc_services/audit_dml_operations.h>
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
Expand Down Expand Up @@ -72,27 +73,29 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
return {Ydb::StatusIds::SUCCESS, {}};
}

class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
class TExecuteScriptRPC : public TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false> {
public:
using TRpcRequestActorBase = TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false>;

static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::GRPC_REQ;
}

TExecuteScriptRPC(TEvExecuteScriptRequest* request)
: Request_(request)
TExecuteScriptRPC(IRequestNoOpCtx* request)
: TRpcRequestActorBase(request)
{}

void Bootstrap() {
NYql::TIssues issues;
const auto& request = *Request_->GetProtoRequest();
const auto& request = GetProtoRequest();

if (request.operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
if (request->operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
issues.AddIssue("ExecuteScript must be asyncronous operation");
return Reply(Ydb::StatusIds::BAD_REQUEST, issues);
}

AuditContextAppend(Request_.get(), request);
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext());
AuditContextAppend(Request.Get(), request);
NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *request, TlsActivationContext->AsActorContext());

Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
if (auto scriptRequest = MakeScriptRequest(issues, status)) {
Expand All @@ -113,7 +116,7 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
)

void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);
NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *GetProtoRequest(), ev, ctx);

Ydb::Operations::Operation operation;
operation.set_id(ev->Get()->OperationId);
Expand All @@ -126,14 +129,14 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
}

THolder<NKqp::TEvKqp::TEvScriptRequest> MakeScriptRequest(NYql::TIssues& issues, Ydb::StatusIds::StatusCode& status) const {
const auto* req = Request_->GetProtoRequest();
const auto traceId = Request_->GetTraceId();
const auto* req = GetProtoRequest();
const auto traceId = Request->GetTraceId();

auto ev = MakeHolder<NKqp::TEvKqp::TEvScriptRequest>();

SetAuthToken(ev, *Request_);
SetDatabase(ev, *Request_);
SetRlPath(ev, *Request_);
SetAuthToken(ev, *Request);
SetDatabase(ev, *Request);
SetRlPath(ev, *Request);

if (traceId) {
ev->Record.SetTraceId(traceId.GetRef());
Expand Down Expand Up @@ -166,12 +169,9 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {

result.set_status(status);

AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), result);

TString serializedResult;
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);
AuditContextAppend(Request.Get(), GetProtoRequest(), result);

Request_->SendSerializedResult(std::move(serializedResult), status);
TProtoResponseHelper::SendProtoResponse(result, status, Request);

PassAway();
}
Expand All @@ -181,9 +181,6 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
result.set_ready(true);
Reply(status, std::move(result), issues);
}

private:
std::unique_ptr<TEvExecuteScriptRequest> Request_;
};

} // namespace
Expand All @@ -197,6 +194,11 @@ void DoExecuteScript(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider
f.RegisterActor(new TExecuteScriptRPC(req));
}

} // namespace NQuery

template<>
IActor* TEvExecuteScriptRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
return new TExecuteScriptRPC(msg);
}

} // namespace NKikimr::NGRpcService
12 changes: 7 additions & 5 deletions ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
return NKikimrServices::TActivity::GRPC_REQ;
}

TFetchScriptResultsRPC(TEvFetchScriptResultsRequest* request)
TFetchScriptResultsRPC(IRequestNoOpCtx* request)
: TRpcRequestActorBase(request)
{}

Expand Down Expand Up @@ -108,10 +108,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T

result.set_status(status);

TString serializedResult;
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);

Request->SendSerializedResult(std::move(serializedResult), status);
TProtoResponseHelper::SendProtoResponse(result, status, Request);

PassAway();
}
Expand Down Expand Up @@ -154,4 +151,9 @@ void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityPro

}

template<>
IActor* TEvFetchScriptResultsRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
return new TFetchScriptResultsRPC(msg);
}

} // namespace NKikimr::NGRpcService
64 changes: 64 additions & 0 deletions ydb/core/viewer/json.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"metadata": {
"execution_id": "4578ea89-93aa2e63-c0ef36df-b5d4c3c8",
"exec_status": "EXEC_STATUS_COMPLETED",
"script_content": {
"syntax": "SYNTAX_YQL_V1",
"text": "SELECT * FROM `/Root/Test`;"
},
"result_sets_meta": [
{
"columns": [
{
"name": "Key",
"type": {
"optional_type": {
"item": {
"type_id": "UINT64"
}
}
}
},
{
"name": "Value",
"type": {
"optional_type": {
"item": {
"type_id": "STRING"
}
}
}
}
]
}
],
"exec_mode": "EXEC_MODE_EXECUTE",
"exec_stats": {
"query_phases": [
{
"duration_us": 1421,
"table_access": [
{
"name": "/Root/Test",
"reads": {
"rows": 15,
"bytes": 255
},
"partitions_count": 1
}
],
"cpu_time_us": 942,
"affected_shards": 1
}
],
"compilation": {
"duration_us": 7351,
"cpu_time_us": 6955
},
"process_cpu_time_us": 72,
"query_plan": "{}",
"total_duration_us": 9039,
"total_cpu_time_us": 7969
}
}
}
20 changes: 20 additions & 0 deletions ydb/core/viewer/json_handlers_query.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "json_handlers.h"
#include "query_execute_script.h"
#include "query_fetch_script.h"

namespace NKikimr::NViewer {

void InitQueryExecuteScriptJsonHandler(TJsonHandlers& handlers) {
handlers.AddHandler("/query/script/execute", new TJsonHandler<TQueryExecuteScript>(TQueryExecuteScript::GetSwagger()));
}

void InitQueryFetchScriptJsonHandler(TJsonHandlers& handlers) {
handlers.AddHandler("/query/script/fetch", new TJsonHandler<TQueryFetchScript>(TQueryFetchScript::GetSwagger()));
}

void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers) {
InitQueryExecuteScriptJsonHandler(jsonHandlers);
InitQueryFetchScriptJsonHandler(jsonHandlers);
}

} // namespace NKikimr::NViewer
6 changes: 6 additions & 0 deletions ydb/core/viewer/json_local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TJsonLocalRpc : public TViewerPipeClient {
std::vector<HTTP_METHOD> AllowedMethods = {};
TAutoPtr<TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>> Result;
NThreading::TFuture<TProtoResponse> RpcFuture;
Ydb::Operations::OperationParams::OperationMode OperationMode = Ydb::Operations::OperationParams::SYNC;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand All @@ -56,6 +57,7 @@ class TJsonLocalRpc : public TViewerPipeClient {
TString name;
name = field->name();
TString value = params.Get(name);
Cerr << "jjjjjjj name " << name << Endl;
if (!value.empty()) {
FieldDescriptor::CppType type = field->cpp_type();
switch (type) {
Expand Down Expand Up @@ -119,6 +121,7 @@ class TJsonLocalRpc : public TViewerPipeClient {
bool Params2Proto(TProtoRequest& request) {
auto postData = Event->Get()->Request.GetPostContent();
if (!postData.empty()) {
Cerr << "jjjjjjjjj GET" << Endl;
try {
NProtobufJson::Json2Proto(postData, request);
}
Expand All @@ -127,6 +130,7 @@ class TJsonLocalRpc : public TViewerPipeClient {
return false;
}
} else {
Cerr << "jjjjjjjjj POST" << Endl;
const auto& params(Event->Get()->Request.GetParams());
Params2Proto(params, request);
}
Expand All @@ -139,6 +143,7 @@ class TJsonLocalRpc : public TViewerPipeClient {
void SendGrpcRequest(TProtoRequest&& request) {
// TODO(xenoxeno): pass trace id
RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem());

RpcFuture.Subscribe([actorId = TBase::SelfId(), actorSystem = TlsActivationContext->ActorSystem()]
(const NThreading::TFuture<TProtoResponse>& future) {
auto& response = future.GetValueSync();
Expand Down Expand Up @@ -168,6 +173,7 @@ class TJsonLocalRpc : public TViewerPipeClient {
return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Method is not allowed"));
}
if (Database.empty()) {
Cerr << "!!!! field 'database' is required" << Endl;
return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "field 'database' is required"));
}
if (TBase::NeedToRedirect()) {
Expand Down
17 changes: 15 additions & 2 deletions ydb/core/viewer/json_pipe_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ TViewerPipeClient::TViewerPipeClient(IViewer* viewer, NMon::TEvHttpInfo::TPtr& e
: Viewer(viewer)
, Event(ev)
{
InitConfig(Event->Get()->Request.GetParams());
InitConfig();
NWilson::TTraceId traceId;
TStringBuf traceparent = Event->Get()->Request.GetHeader("traceparent");
if (traceparent) {
Expand Down Expand Up @@ -589,7 +589,8 @@ std::vector<TNodeId> TViewerPipeClient::GetNodesFromBoardReply(TEvStateStorage::
return GetNodesFromBoardReply(*ev->Get());
}

void TViewerPipeClient::InitConfig(const TCgiParameters& params) {
void TViewerPipeClient::InitConfig() {
const TCgiParameters& params = Event->Get()->Request.GetParams();
Followers = FromStringWithDefault(params.Get("followers"), Followers);
Metrics = FromStringWithDefault(params.Get("metrics"), Metrics);
WithRetry = FromStringWithDefault(params.Get("with_retry"), WithRetry);
Expand All @@ -598,6 +599,18 @@ void TViewerPipeClient::InitConfig(const TCgiParameters& params) {
if (!Database) {
Database = params.Get("tenant");
}
if (!Database) {
auto postData = Event->Get()->Request.GetPostContent();
Cerr << "jjjjjj postData: " << postData << Endl;
static NJson::TJsonReaderConfig JsonConfig;
NJson::TJsonValue requestData;
bool success = NJson::ReadJsonTree(postData, &JsonConfig, &requestData);
Cerr << "jjjjjj success: " << success << Endl;
if (success) {
Database = requestData["database"].GetStringRobust();
Cerr << "jjjjjj Database from post data: " << Database << Endl;
}
}
Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct);
JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), true);
JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), true);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/viewer/json_pipe_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
TRequestResponse<TEvStateStorage::TEvBoardInfo> MakeRequestStateStorageEndpointsLookup(const TString& path, ui64 cookie = 0);
std::vector<TNodeId> GetNodesFromBoardReply(TEvStateStorage::TEvBoardInfo::TPtr& ev);
std::vector<TNodeId> GetNodesFromBoardReply(const TEvStateStorage::TEvBoardInfo& ev);
void InitConfig(const TCgiParameters& params);
void InitConfig();
void InitConfig(const TRequestSettings& settings);
void ClosePipes();
ui32 FailPipeConnect(TTabletId tabletId);
Expand Down
9 changes: 3 additions & 6 deletions ydb/core/viewer/json_storage_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ class TJsonStorageBase : public TViewerPipeClient {
using TThis = TJsonStorageBase;

using TNodeId = ui32;
IViewer* Viewer;
TActorId Initiator;
NMon::TEvHttpInfo::TPtr Event;
THolder<TEvInterconnect::TEvNodesInfo> NodesInfo;
TMap<ui32, NKikimrWhiteboard::TEvVDiskStateResponse> VDiskInfo;
TMap<ui32, NKikimrWhiteboard::TEvPDiskStateResponse> PDiskInfo;
Expand Down Expand Up @@ -112,14 +110,13 @@ class TJsonStorageBase : public TViewerPipeClient {
THashMap<TString, TGroupRow> GroupRowsByGroupId;

TJsonStorageBase(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
: Viewer(viewer)
, Initiator(ev->Sender)
, Event(std::move(ev))
: TBase(viewer, ev)
, Initiator(Event->Sender)
{
const auto& params(Event->Get()->Request.GetParams());
JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), true);
JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), false);
InitConfig(params);
InitConfig();
Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 10000);
FilterTenant = params.Get("tenant");
TString filterStoragePool = params.Get("pool");
Expand Down
7 changes: 2 additions & 5 deletions ydb/core/viewer/json_vdisk_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ class TJsonVDiskRequest : public TViewerPipeClient {
using TThis = TJsonVDiskRequest<RequestType, ResponseType>;
using TBase = TViewerPipeClient;
using THelper = TJsonVDiskRequestHelper<RequestType, ResponseType>;
IViewer* Viewer;
TActorId Initiator;
NMon::TEvHttpInfo::TPtr Event;
TJsonSettings JsonSettings;
bool AllEnums = false;
ui32 Timeout = 0;
Expand All @@ -57,9 +55,8 @@ class TJsonVDiskRequest : public TViewerPipeClient {

public:
TJsonVDiskRequest(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
: Viewer(viewer)
: TBase(viewer, ev)
, Initiator(ev->Sender)
, Event(ev)
{}

void Bootstrap() override {
Expand All @@ -81,7 +78,7 @@ class TJsonVDiskRequest : public TViewerPipeClient {
if (!NodeId) {
NodeId = TlsActivationContext->ActorSystem()->NodeId;
}
TBase::InitConfig(params);
TBase::InitConfig();


JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), false);
Expand Down
Loading

0 comments on commit 038064e

Please sign in to comment.