From 5d8774333e1371d939063bc48ea3a0dd9d559b2c Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Tue, 1 Oct 2024 18:49:57 +0300 Subject: [PATCH] get rid of mkql results in scripting --- .../rpc_stream_execute_yql_script.cpp | 4 +- ydb/core/kqp/common/events/query.h | 4 +- ydb/core/kqp/common/kqp.h | 6 +- ydb/core/kqp/common/kqp_event_impl.cpp | 1 + ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 28 +++--- ydb/core/kqp/host/kqp_host.cpp | 4 +- ydb/core/kqp/host/kqp_runner.cpp | 16 +--- ydb/core/kqp/host/kqp_transform.h | 5 -- ydb/core/kqp/provider/yql_kikimr_exec.cpp | 8 +- ydb/core/kqp/provider/yql_kikimr_gateway.h | 2 +- .../kqp/session_actor/kqp_session_actor.cpp | 19 ++-- .../kqp/session_actor/kqp_worker_actor.cpp | 4 +- .../kqp/session_actor/kqp_worker_common.cpp | 2 +- ydb/core/load_test/aggregated_result.cpp | 87 ++++++++----------- ydb/core/protos/kqp.proto | 4 +- ydb/core/viewer/viewer_query.h | 8 +- ydb/core/viewer/viewer_query_old.h | 8 +- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 7 +- 18 files changed, 87 insertions(+), 130 deletions(-) diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index 2b80ebc5d9ef..0a5c5b260a75 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -50,7 +50,7 @@ namespace { {} NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle; - google::protobuf::RepeatedPtrField::const_iterator ResultIterator; + google::protobuf::RepeatedPtrField::const_iterator ResultIterator; }; enum EStreamRpcWakeupTag : ui64 { @@ -220,7 +220,7 @@ class TStreamExecuteYqlScriptRPC auto result = response.mutable_result(); try { - NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set()); + result->mutable_result_set()->CopyFrom(kqpResult); } catch (std::exception ex) { ReplyFinishStream(ex.what()); } diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 960927332005..b180f665492d 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocalSetUsePublicResponseDataFormat(true); + } bool IsSerializable() const override { return true; diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 14785394faa1..dc28e5635d3a 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -40,9 +40,9 @@ bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId template inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) { - const auto& results = from.GetResults(); + const auto& results = from.GetYdbResults(); for (const auto& result : results) { - ConvertKqpQueryResultToDbResult(result, to->add_result_sets()); + to->add_result_sets()->CopyFrom(result); } } @@ -80,7 +80,7 @@ class IQueryReplayBackend : public TNonCopyable { /// Accepts query text virtual void Collect(const TString& queryData) = 0; - virtual bool IsNull() { return false; } + virtual bool IsNull() { return false; } virtual ~IQueryReplayBackend() {}; diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index ee4e834e6c88..a64a05c19404 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -94,6 +94,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { Record.MutableRequest()->SetDatabaseId(DatabaseId); } + Record.MutableRequest()->SetUsePublicResponseDataFormat(true); Record.MutableRequest()->SetSessionId(SessionId); Record.MutableRequest()->SetAction(QueryAction); Record.MutableRequest()->SetType(QueryType); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index ee8698df4acf..fe40b75c446f 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -227,7 +227,7 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase< void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) { auto& response = *ev->Get()->Record.GetRef().MutableResponse(); - NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults()); + response.AddYdbResults()->CopyFrom(ResultSet); for (auto& execStats : Executions) { response.MutableQueryStats()->AddExecutions()->Swap(&execStats); } @@ -286,20 +286,18 @@ class TKqpStreamRequestHandler : public TRequestHandlerBase< virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) { auto& record = ev->Get()->Record.GetRef(); if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - if (record.MutableResponse()->GetResults().size()) { + if (record.MutableResponse()->GetYdbResults().size()) { // Send result sets to RPC actor TStreamExecuteYqlScriptRPC auto evStreamPart = MakeHolder(); ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId()); - for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) { + for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) { // Workaround to avoid errors on Pull execution stage which would expect some results - Ydb::ResultSet resultSet; - NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults()); + evStreamPart->Record.AddResults(); } - evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults()); + evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults()); this->Send(TargetActorId, evStreamPart.Release()); - // Save response without data to send it later ResponseHandle = ev.Release(); } else { @@ -405,7 +403,7 @@ class TKqpForwardStreamRequestHandler : public TRequestHandlerBase< auto& response = *ev->Get()->Record.GetRef().MutableResponse(); Ydb::ResultSet resultSet; - NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults()); + response.AddYdbResults()->CopyFrom(resultSet); for (auto& execStats : Executions) { response.MutableQueryStats()->AddExecutions()->Swap(&execStats); } @@ -511,7 +509,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase< auto& response = *ev->Get()->Record.GetRef().MutableResponse(); for (auto& resultSet : ResultSets) { - ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults()); + response.AddYdbResults()->Swap(&resultSet.ResultSet); } TBase::HandleResponse(ev, ctx); @@ -672,8 +670,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp queryResult.AddIssue(NYql::IssueFromMessage(issue)); } - for (auto& result : queryResponse.GetResults()) { - auto arenaResult = google::protobuf::Arena::CreateMessage( + for (auto& result : queryResponse.GetYdbResults()) { + auto arenaResult = google::protobuf::Arena::CreateMessage( queryResult.ProtobufArenaPtr.get()); arenaResult->CopyFrom(result); @@ -1419,11 +1417,11 @@ class TKikimrIcGateway : public IKqpGateway { if (!CheckCluster(cluster)) { return InvalidCluster(cluster); } - + auto analyzePromise = NewPromise(); IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise); RegisterActor(analyzeActor); - + return analyzePromise.GetFuture(); } catch (yexception& e) { return MakeFuture(ResultFromException(e)); @@ -1995,7 +1993,7 @@ class TKikimrIcGateway : public IKqpGateway { } TFuture ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params, - const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings, + const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings, const TMaybe& traceId) override { YQL_ENSURE(cluster == Cluster); @@ -2075,7 +2073,7 @@ class TKikimrIcGateway : public IKqpGateway { } TFuture ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params, - const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings, + const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings, const TMaybe& traceId) override { YQL_ENSURE(cluster == Cluster); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index c98dc7453a20..5a16801c5040 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -288,8 +288,8 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase(queryResult.ProtobufArenaPtr.get())); - NKikimrMiniKQL::TResult* result = queryResult.Results.back(); + google::protobuf::Arena::CreateMessage(queryResult.ProtobufArenaPtr.get())); + Ydb::ResultSet* result = queryResult.Results.back(); if (!result->ParseFromArray(resultStr.data(), resultStr.size())) { queryResult = ResultFromError("Failed to parse run result."); diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index c774e42b96d5..034b3cbe2b86 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -117,17 +117,7 @@ class TPrepareQueryAsyncResult : public TKqpAsyncResultBase results; - for (auto& phyResult : TransformCtx.PhysicalQueryResults) { - auto result = google::protobuf::Arena::CreateMessage( - queryResult.ProtobufArenaPtr.get()); - - result->CopyFrom(phyResult); - results.push_back(result); - } - queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats); - queryResult.Results = std::move(results); } private: @@ -328,7 +318,7 @@ class TKqpRunner : public IKqpRunner { Config), "BuildPhysicalTxs") .Build(false)); - + auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx) .AddServiceTransformers() .Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery") @@ -403,8 +393,8 @@ class TKqpRunner : public IKqpRunner { TKqpProviderContext Pctx; TAutoPtr Transformer; - - TActorSystem* ActorSystem; + + TActorSystem* ActorSystem; }; } // namespace diff --git a/ydb/core/kqp/host/kqp_transform.h b/ydb/core/kqp/host/kqp_transform.h index bdf95715ba9d..0004340de583 100644 --- a/ydb/core/kqp/host/kqp_transform.h +++ b/ydb/core/kqp/host/kqp_transform.h @@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase { NKqpProto::TKqpStatsQuery QueryStats; std::shared_ptr PhysicalQuery; - TVector> MkqlResults; - TVector PhysicalQueryResults; - NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole TMaybe DataQueryBlocks; void Reset() { ReplyTarget = {}; - MkqlResults.clear(); QueryStats = {}; PhysicalQuery = nullptr; - PhysicalQueryResults.clear(); ExplainTransformerInput = nullptr; DataQueryBlocks = Nothing(); } diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 90b5b5494037..6005386a3410 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -997,15 +997,15 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer Results; + TVector Results; NKqpProto::TKqpStatsQuery QueryStats; std::unique_ptr PreparingQuery; std::shared_ptr PreparedQuery; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c97381ba82e4..2406c6e0c707 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1788,7 +1788,7 @@ class TKqpSessionActor : public TActorBootstrapped { // Result for scan query is sent directly to target actor. Y_ABORT_UNLESS(response->GetArena()); if (QueryState->PreparedQuery) { - bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat(); + // bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat(); auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); size_t trailingResultsCount = 0; for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { @@ -1805,14 +1805,15 @@ class TKqpSessionActor : public TActorBootstrapped { continue; } - if (useYdbResponseFormat) { + //if (useYdbResponseFormat) { TMaybe effectiveRowsLimit = FillSettings.RowsLimitPerWrite; if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) { effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit(); } auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit); response->AddYdbResults()->Swap(ydbResult); - } else { + //} + /*else { auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), response->GetArena()); std::optional fillSettings; if (QueryState->PreparedQuery->ResultsSize()) { @@ -1826,7 +1827,7 @@ class TKqpSessionActor : public TActorBootstrapped { } auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena()); response->AddResults()->Swap(finalResult); - } + }*/ } } @@ -1892,10 +1893,10 @@ class TKqpSessionActor : public TActorBootstrapped { AddTrailingInfo(response->Record.GetRef()); NDataIntegrity::LogIntegrityTrails( - request->Get()->GetTraceId(), - request->Get()->GetAction(), - request->Get()->GetType(), - response, + request->Get()->GetTraceId(), + request->Get()->GetAction(), + request->Get()->GetType(), + response, TlsActivationContext->AsActorContext() ); @@ -1955,7 +1956,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->UserRequestContext->TraceId, QueryState->GetAction(), QueryState->GetType(), - QueryResponse, + QueryResponse, TlsActivationContext->AsActorContext() ); diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 587bf6d010d4..272d0534a1c7 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -194,7 +194,7 @@ class TKqpWorkerActor : public TActorBootstrapped { Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup, - QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, + QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, nullptr, QueryState->RequestEv->GetUserRequestContext()); auto& queryRequest = QueryState->RequestEv; @@ -959,7 +959,7 @@ class TKqpWorkerActor : public TActorBootstrapped { // If we have result it must be allocated on protobuf arena Y_ASSERT(result->GetArena()); Y_ASSERT(resp->GetArena() == result->GetArena()); - resp->AddResults()->Swap(result); + resp->AddYdbResults()->Swap(result); } } else { auto resp = ev.MutableResponse(); diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.cpp b/ydb/core/kqp/session_actor/kqp_worker_common.cpp index 63db80b02d88..e7cf7113b049 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_common.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_common.cpp @@ -119,7 +119,7 @@ void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config, << 'b'; ui64 resultsSize = 0; - for (auto& result : record->GetResponse().GetResults()) { + for (auto& result : record->GetResponse().GetYdbResults()) { resultsSize += result.ByteSize(); } diff --git a/ydb/core/load_test/aggregated_result.cpp b/ydb/core/load_test/aggregated_result.cpp index 82ae3dee3e68..548a5628b442 100644 --- a/ydb/core/load_test/aggregated_result.cpp +++ b/ydb/core/load_test/aggregated_result.cpp @@ -6,6 +6,7 @@ #include #include +#include namespace NKikimr { @@ -77,55 +78,35 @@ IOutputStream& operator<<(IOutputStream& output, const TAggregatedResult& result return output; } -using TColumnPositions = THashMap; - -TColumnPositions GetColumnPositionsInResponse(const NKikimrMiniKQL::TType& ttype) { - TColumnPositions columnPositions; - for (const NKikimrMiniKQL::TMember& member : ttype.GetStruct().GetMember()) { - if (member.GetName() == "Data") { - const auto& listStruct = member.GetType().GetList().GetItem().GetStruct(); - for (const NKikimrMiniKQL::TMember& listMember : listStruct.GetMember()) { - columnPositions.emplace(listMember.GetName(), columnPositions.size()); - } - break; - } - } - return columnPositions; -} - -NKikimrMiniKQL::TValue GetOptional(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return listItem.GetStruct(pos).GetOptional(); -} - template -T ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - Y_UNUSED(listItem, pos); +T ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + Y_UNUSED(parser, column); Y_ABORT("unimplemented"); } template<> -ui32 ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return GetOptional(listItem, pos).GetUint32(); +ui32 ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return parser.ColumnParser(column).GetOptionalUint32().GetOrElse(0); } template<> -ui64 ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return GetOptional(listItem, pos).GetUint64(); +ui64 ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return parser.ColumnParser(column).GetOptionalUint64().GetOrElse(0); } template<> -double ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return GetOptional(listItem, pos).GetDouble(); +double ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return parser.ColumnParser(column).GetOptionalDouble().GetOrElse(static_cast(0)); } template<> -TString ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return GetOptional(listItem, pos).GetBytes(); +TString ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return parser.ColumnParser(column).GetOptionalString().GetOrElse(""); } template<> -TInstant ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return TInstant::Seconds(GetOptional(listItem, pos).GetUint32()); +TInstant ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return TInstant::Seconds(parser.ColumnParser(column).GetOptionalUint32().GetOrElse(0)); } bool GetStatName(TStringBuf columnName, TStringBuf& statName, TStringBuf& suffix) { @@ -161,38 +142,40 @@ void SetInAggregatedField(TStringBuf suffix, T value, TAggregatedField& dst) } } -TAggregatedResult GetResultFromValueListItem(const NKikimrMiniKQL::TValue& listItem, const TColumnPositions& columnPositions) { +TAggregatedResult GetResultFromValueListItem(NYdb::TResultSetParser& parser, const NYdb::TResultSet& rs) { TAggregatedResult result; TStringBuf statName; TStringBuf suffix; TStringBuf levelSb; - for (const auto& [column, pos] : columnPositions) { + for (const auto& columnMeta : rs.GetColumnsMeta()) { + TString column = columnMeta.Name; + if (column == "id") { - result.Uuid = ExtractValue(listItem, pos); + result.Uuid = ExtractValue(parser, column); } else if (column == "start") { - result.Start = ExtractValue(listItem, pos); + result.Start = ExtractValue(parser, column); } else if (column == "finish") { - result.Finish = ExtractValue(listItem, pos); + result.Finish = ExtractValue(parser, column); } else if (column == "total_nodes") { - result.Stats.TotalNodes = ExtractValue(listItem, pos); + result.Stats.TotalNodes = ExtractValue(parser, column); } else if (column == "success_nodes") { - result.Stats.SuccessNodes = ExtractValue(listItem, pos); + result.Stats.SuccessNodes = ExtractValue(parser, column); } else if (column == "config") { - result.Config = ExtractValue(listItem, pos); + result.Config = ExtractValue(parser, column); } else if (GetStatName(column, statName, suffix)) { if (statName == "transactions") { if (suffix == "_avg") { - SetInAggregatedField(suffix, ExtractValue(listItem, pos), result.Stats.Transactions); + SetInAggregatedField(suffix, ExtractValue(parser, column), result.Stats.Transactions); } else { - SetInAggregatedField(suffix, ExtractValue(listItem, pos), result.Stats.Transactions); + SetInAggregatedField(suffix, ExtractValue(parser, column), result.Stats.Transactions); } } else if (statName == "transactions_per_sec") { - SetInAggregatedField(suffix, ExtractValue(listItem, pos), result.Stats.TransactionsPerSecond); + SetInAggregatedField(suffix, ExtractValue(parser, column), result.Stats.TransactionsPerSecond); } else if (statName == "errors_per_sec") { - SetInAggregatedField(suffix, ExtractValue(listItem, pos), result.Stats.ErrorsPerSecond); + SetInAggregatedField(suffix, ExtractValue(parser, column), result.Stats.ErrorsPerSecond); } else if (GetPercentileLevel(statName, levelSb)) { auto level = FromString(levelSb); - SetInAggregatedField(suffix, ExtractValue(listItem, pos), result.Stats.Percentiles[level]); + SetInAggregatedField(suffix, ExtractValue(parser, column), result.Stats.Percentiles[level]); } } } @@ -200,16 +183,16 @@ TAggregatedResult GetResultFromValueListItem(const NKikimrMiniKQL::TValue& listI } bool LoadResultFromResponseProto(const NKikimrKqp::TQueryResponse& response, TVector& results) { - const auto& ttype = response.GetResults(0).GetType(); - auto columnPositions = GetColumnPositionsInResponse(ttype); - if (columnPositions.empty()) { - return false; - } + Y_ABORT_UNLESS(response.GetYdbResults().size() > 0); + + NYdb::TResultSet rs(response.GetYdbResults(0)); + NYdb::TResultSetParser parser(response.GetYdbResults(0)); results.clear(); - for (const NKikimrMiniKQL::TValue& listItem : response.GetResults(0).GetValue().GetStruct().Get(0).GetList()) { - results.push_back(GetResultFromValueListItem(listItem, columnPositions)); + while(parser.TryNextRow()) { + results.push_back(GetResultFromValueListItem(parser, rs)); } + return true; } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index a4c6906783b5..f4dcf7457d57 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -251,7 +251,7 @@ message TQueryResponseExtraInfo { message TQueryResponse { optional bytes SessionId = 1; reserved 2; // (deprecated) QueryErrors - repeated NKikimrMiniKQL.TResult Results = 3; + // repeated NKikimrMiniKQL.TResult Results = 3; // optional TQueryProfile Profile = 4; // TODO: Deprecate, use QueryStats reserved 4; optional bytes PreparedQuery = 5; @@ -407,7 +407,7 @@ message TEvPingSessionResponse { message TEvDataQueryStreamPart { optional NActorsProto.TActorId GatewayActorId = 1; - repeated NKikimrMiniKQL.TResult Results = 2; + repeated Ydb.ResultSet Results = 2; }; message TCancelQueryRequest { diff --git a/ydb/core/viewer/viewer_query.h b/ydb/core/viewer/viewer_query.h index c875dc0ba15b..75292362865c 100644 --- a/ydb/core/viewer/viewer_query.h +++ b/ydb/core/viewer/viewer_query.h @@ -546,14 +546,8 @@ class TJsonQuery : public TViewerPipeClient { void MakeOkReply(NJson::TJsonValue& jsonResponse, NKikimrKqp::TEvQueryResponse& record) { const auto& response = record.GetResponse(); - if (response.ResultsSize() > 0 || response.YdbResultsSize() > 0) { + if (response.YdbResultsSize() > 0) { try { - for (const auto& result : response.GetResults()) { - Ydb::ResultSet resultSet; - NKqp::ConvertKqpQueryResultToDbResult(result, &resultSet); - ResultSets.emplace_back().emplace_back(std::move(resultSet)); - } - for (const auto& result : response.GetYdbResults()) { ResultSets.emplace_back().emplace_back(result); } diff --git a/ydb/core/viewer/viewer_query_old.h b/ydb/core/viewer/viewer_query_old.h index 9b130e9be793..a9bc0783431a 100644 --- a/ydb/core/viewer/viewer_query_old.h +++ b/ydb/core/viewer/viewer_query_old.h @@ -493,14 +493,8 @@ class TJsonQueryOld : public TViewerPipeClient { void MakeOkReply(NJson::TJsonValue& jsonResponse, NKikimrKqp::TEvQueryResponse& record) { const auto& response = record.GetResponse(); - if (response.ResultsSize() > 0 || response.YdbResultsSize() > 0) { + if (response.YdbResultsSize() > 0) { try { - for (const auto& result : response.GetResults()) { - Ydb::ResultSet resultSet; - NKqp::ConvertKqpQueryResultToDbResult(result, &resultSet); - ResultSets.emplace_back(std::move(resultSet)); - } - for (const auto& result : response.GetYdbResults()) { ResultSets.emplace_back(result); } diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index fb200de3fed4..5e330ecbc9c9 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -649,10 +649,9 @@ TRequestResult TYdbSetup::YqlScriptRequest(const TRequestOptions& query, TQueryM FillQueryMeta(meta, responseRecord); - resultSets.reserve(responseRecord.results_size()); - for (const auto& result : responseRecord.results()) { - resultSets.emplace_back(); - NKikimr::NKqp::ConvertKqpQueryResultToDbResult(result, &resultSets.back()); + resultSets.reserve(responseRecord.ydbresults_size()); + for (const auto& result : responseRecord.ydbresults()) { + resultSets.emplace_back(result); } return TRequestResult(yqlQueryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues());