Skip to content

Commit

Permalink
get rid of mkql results in scripting (#9997)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Oct 3, 2024
1 parent 91f8a44 commit a491b73
Show file tree
Hide file tree
Showing 24 changed files with 121 additions and 286 deletions.
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
// https://protobuf.dev/reference/cpp/arenas/#swap
// Actualy will be copy in case pf remote execution
queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
} else {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
}

ConvertQueryStats(kqpResponse, queryResult);
if (kqpResponse.HasTxMeta()) {
queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/grpc_services/rpc_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActor<TExecuteYqlScriptRPC, TE
auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_);

try {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
const auto& results = kqpResponse.GetYdbResults();
for (const auto& result : results) {
queryResult->add_result_sets()->CopyFrom(result);
}

} catch (const std::exception& ex) {
NYql::TIssues issues;
issues.AddIssue(NYql::ExceptionToIssue(ex));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace {
{}

NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
};

enum EStreamRpcWakeupTag : ui64 {
Expand Down Expand Up @@ -220,7 +220,7 @@ class TStreamExecuteYqlScriptRPC
auto result = response.mutable_result();

try {
NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
result->mutable_result_set()->CopyFrom(kqpResult);
} catch (std::exception ex) {
ReplyFinishStream(ex.what());
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
const TString& poolId = "");

TEvQueryRequest() = default;
TEvQueryRequest() {
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
}

bool IsSerializable() const override {
return true;
Expand Down
11 changes: 1 addition & 10 deletions ydb/core/kqp/common/kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,9 @@

namespace NKikimr::NKqp {

void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);

TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId);
bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId);

template<typename TFrom, typename TTo>
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
const auto& results = from.GetResults();
for (const auto& result : results) {
ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
}
}

class TKqpRequestInfo {
public:
Expand Down Expand Up @@ -80,7 +71,7 @@ class IQueryReplayBackend : public TNonCopyable {
/// Accepts query text
virtual void Collect(const TString& queryData) = 0;

virtual bool IsNull() { return false; }
virtual bool IsNull() { return false; }

virtual ~IQueryReplayBackend() {};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}

Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
28 changes: 13 additions & 15 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults());
response.AddYdbResults()->CopyFrom(ResultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -286,20 +286,18 @@ class TKqpStreamRequestHandler : public TRequestHandlerBase<
virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record.GetRef();
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
if (record.MutableResponse()->GetResults().size()) {
if (record.MutableResponse()->GetYdbResults().size()) {
// Send result sets to RPC actor TStreamExecuteYqlScriptRPC
auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>();
ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId());

for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) {
for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) {
// Workaround to avoid errors on Pull execution stage which would expect some results
Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults());
evStreamPart->Record.AddResults();
}

evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults());
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults());
this->Send(TargetActorId, evStreamPart.Release());

// Save response without data to send it later
ResponseHandle = ev.Release();
} else {
Expand Down Expand Up @@ -405,7 +403,7 @@ class TKqpForwardStreamRequestHandler : public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults());
response.AddYdbResults()->CopyFrom(resultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -511,7 +509,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

for (auto& resultSet : ResultSets) {
ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
response.AddYdbResults()->Swap(&resultSet.ResultSet);
}

TBase::HandleResponse(ev, ctx);
Expand Down Expand Up @@ -672,8 +670,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp
queryResult.AddIssue(NYql::IssueFromMessage(issue));
}

for (auto& result : queryResponse.GetResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
for (auto& result : queryResponse.GetYdbResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(
queryResult.ProtobufArenaPtr.get());

arenaResult->CopyFrom(result);
Expand Down Expand Up @@ -1419,11 +1417,11 @@ class TKikimrIcGateway : public IKqpGateway {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
}

auto analyzePromise = NewPromise<TGenericResult>();
IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise);
RegisterActor(analyzeActor);

return analyzePromise.GetFuture();
} catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
Expand Down Expand Up @@ -1995,7 +1993,7 @@ class TKikimrIcGateway : public IKqpGateway {
}

TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TMaybe<TString>& traceId) override
{
YQL_ENSURE(cluster == Cluster);
Expand Down Expand Up @@ -2075,7 +2073,7 @@ class TKikimrIcGateway : public IKqpGateway {
}

TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TMaybe<TString>& traceId) override
{
YQL_ENSURE(cluster == Cluster);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult

for (auto& resultStr : ResultProviderConfig.CommittedResults) {
queryResult.Results.emplace_back(
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
NKikimrMiniKQL::TResult* result = queryResult.Results.back();
google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get()));
Ydb::ResultSet* result = queryResult.Results.back();

if (!result->ParseFromArray(resultStr.data(), resultStr.size())) {
queryResult = ResultFromError<TResult>("Failed to parse run result.");
Expand Down
16 changes: 3 additions & 13 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,7 @@ class TPrepareQueryAsyncResult : public TKqpAsyncResultBase<IKikimrQueryExecutor
}

void FillResult(TResult& queryResult) const override {
TVector<NKikimrMiniKQL::TResult*> results;
for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
queryResult.ProtobufArenaPtr.get());

result->CopyFrom(phyResult);
results.push_back(result);
}

queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
queryResult.Results = std::move(results);
}

private:
Expand Down Expand Up @@ -328,7 +318,7 @@ class TKqpRunner : public IKqpRunner {
Config),
"BuildPhysicalTxs")
.Build(false));

auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery")
Expand Down Expand Up @@ -403,8 +393,8 @@ class TKqpRunner : public IKqpRunner {
TKqpProviderContext Pctx;

TAutoPtr<IGraphTransformer> Transformer;
TActorSystem* ActorSystem;

TActorSystem* ActorSystem;
};

} // namespace
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kqp/host/kqp_transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase {
NKqpProto::TKqpStatsQuery QueryStats;
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;

TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;

NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole
TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks;

void Reset() {
ReplyTarget = {};
MkqlResults.clear();
QueryStats = {};
PhysicalQuery = nullptr;
PhysicalQueryResults.clear();
ExplainTransformerInput = nullptr;
DataQueryBlocks = Nothing();
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,16 +997,12 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T

private:
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
const Ydb::ResultSet& resultValue, TExprContext& ctx)
{
TColumnOrder columnHints(NCommon::GetResOrPullColumnHints(node));

auto protoValue = &resultValue;
YQL_ENSURE(resultValue.GetArena());
if (IsRawKikimrResult(resultValue)) {
protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
}

YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom);
YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ class IKikimrGateway : public TThrRefBase {

struct TQueryResult : public TGenericResult {
TString SessionId;
TVector<NKikimrMiniKQL::TResult*> Results;
TVector<Ydb::ResultSet*> Results;
NKqpProto::TKqpStatsQuery QueryStats;
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;
Expand Down
88 changes: 0 additions & 88 deletions ydb/core/kqp/provider/yql_kikimr_results.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,94 +338,6 @@ void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer,
fillSettings, truncated, true);
}

bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result) {
auto& type = result.GetType();
if (type.GetKind() != NKikimrMiniKQL::ETypeKind::Struct) {
return true;
}

auto& structType = type.GetStruct();
if (structType.MemberSize() != 2) {
return true;
}

return structType.GetMember(0).GetName() != "Data" || structType.GetMember(1).GetName() != "Truncated";
}

NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena)
{
NKikimrMiniKQL::TResult* packedResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(arena);
auto* packedType = packedResult->MutableType();
packedType->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
auto* dataMember = packedType->MutableStruct()->AddMember();
dataMember->SetName("Data");
auto* truncatedMember = packedType->MutableStruct()->AddMember();
truncatedMember->SetName("Truncated");
truncatedMember->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
truncatedMember->MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<bool>::Id);

auto* packedValue = packedResult->MutableValue();
auto* dataValue = packedValue->AddStruct();
auto* dataType = dataMember->MutableType();
auto* truncatedValue = packedValue->AddStruct();

bool truncated = false;
if (result.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
const auto& itemType = result.GetType().GetList().GetItem();

TMap<TString, size_t> memberIndices;
if (itemType.GetKind() == NKikimrMiniKQL::ETypeKind::Struct && columnHints.Size() != 0) {
const auto& structType = itemType.GetStruct();

for (size_t i = 0; i < structType.MemberSize(); ++i) {
memberIndices[structType.GetMember(i).GetName()] = i;
}

dataType->SetKind(NKikimrMiniKQL::ETypeKind::List);
auto* newItem = dataType->MutableList()->MutableItem();
newItem->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
auto* newStructType = newItem->MutableStruct();
for (auto& [column, gen_col] : columnHints) {
auto* memberIndex = memberIndices.FindPtr(gen_col);
YQL_ENSURE(memberIndex);

(*newStructType->AddMember() = structType.GetMember(*memberIndex)).SetName(column);
}
} else {
*dataType = result.GetType();
}

ui64 rowsWritten = 0;
ui64 bytesWritten = 0;
for (auto& item : result.GetValue().GetList()) {
if (ResultsOverflow(rowsWritten, bytesWritten, fillSettings)) {
truncated = true;
break;
}
if (!memberIndices.empty()) {
auto* newStruct = dataValue->AddList();
for (auto& [column, gen_column] : columnHints) {
auto* memberIndex = memberIndices.FindPtr(gen_column);
YQL_ENSURE(memberIndex);

*newStruct->AddStruct() = item.GetStruct(*memberIndex);
}
} else {
*dataValue->AddList() = item;
}

bytesWritten += item.ByteSize();
++rowsWritten;
}
} else {
dataType->CopyFrom(result.GetType());
dataValue->CopyFrom(result.GetValue());
}

truncatedValue->SetBool(truncated);
return packedResult;
}

const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx) {
switch (type.GetKind()) {
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kqp/provider/yql_kikimr_results.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ namespace NYql {
void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer, const NKikimrMiniKQL::TResult& result,
const TColumnOrder& columnHints, const IDataProvider::TFillSettings& fillSettings, bool& truncated);

NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena);

bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result);

const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx);
bool ExportTypeToKikimrProto(const TTypeAnnotationNode& type, NKikimrMiniKQL::TType& protoType, TExprContext& ctx);
TExprNode::TPtr ParseKikimrProtoValue(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
Expand Down
Loading

0 comments on commit a491b73

Please sign in to comment.