Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get rid of mkql results in scripting #9997

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
// https://protobuf.dev/reference/cpp/arenas/#swap
// Actualy will be copy in case pf remote execution
queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
} else {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
}

ConvertQueryStats(kqpResponse, queryResult);
if (kqpResponse.HasTxMeta()) {
queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/grpc_services/rpc_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActor<TExecuteYqlScriptRPC, TE
auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_);

try {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
const auto& results = kqpResponse.GetYdbResults();
for (const auto& result : results) {
queryResult->add_result_sets()->CopyFrom(result);
}

} catch (const std::exception& ex) {
NYql::TIssues issues;
issues.AddIssue(NYql::ExceptionToIssue(ex));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace {
{}

NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
};

enum EStreamRpcWakeupTag : ui64 {
Expand Down Expand Up @@ -220,7 +220,7 @@ class TStreamExecuteYqlScriptRPC
auto result = response.mutable_result();

try {
NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
result->mutable_result_set()->CopyFrom(kqpResult);
} catch (std::exception ex) {
ReplyFinishStream(ex.what());
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
const TString& poolId = "");

TEvQueryRequest() = default;
TEvQueryRequest() {
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
}

bool IsSerializable() const override {
return true;
Expand Down
11 changes: 1 addition & 10 deletions ydb/core/kqp/common/kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,9 @@

namespace NKikimr::NKqp {

void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);

TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId);
bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId);

template<typename TFrom, typename TTo>
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
const auto& results = from.GetResults();
for (const auto& result : results) {
ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
}
}

class TKqpRequestInfo {
public:
Expand Down Expand Up @@ -80,7 +71,7 @@ class IQueryReplayBackend : public TNonCopyable {
/// Accepts query text
virtual void Collect(const TString& queryData) = 0;

virtual bool IsNull() { return false; }
virtual bool IsNull() { return false; }

virtual ~IQueryReplayBackend() {};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}

Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
28 changes: 13 additions & 15 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults());
response.AddYdbResults()->CopyFrom(ResultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -286,20 +286,18 @@ class TKqpStreamRequestHandler : public TRequestHandlerBase<
virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record.GetRef();
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
if (record.MutableResponse()->GetResults().size()) {
if (record.MutableResponse()->GetYdbResults().size()) {
// Send result sets to RPC actor TStreamExecuteYqlScriptRPC
auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>();
ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId());

for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) {
for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) {
// Workaround to avoid errors on Pull execution stage which would expect some results
Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults());
evStreamPart->Record.AddResults();
}

evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults());
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults());
this->Send(TargetActorId, evStreamPart.Release());

// Save response without data to send it later
ResponseHandle = ev.Release();
} else {
Expand Down Expand Up @@ -405,7 +403,7 @@ class TKqpForwardStreamRequestHandler : public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults());
response.AddYdbResults()->CopyFrom(resultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -511,7 +509,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

for (auto& resultSet : ResultSets) {
ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
response.AddYdbResults()->Swap(&resultSet.ResultSet);
}

TBase::HandleResponse(ev, ctx);
Expand Down Expand Up @@ -672,8 +670,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp
queryResult.AddIssue(NYql::IssueFromMessage(issue));
}

for (auto& result : queryResponse.GetResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
for (auto& result : queryResponse.GetYdbResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(
queryResult.ProtobufArenaPtr.get());

arenaResult->CopyFrom(result);
Expand Down Expand Up @@ -1419,11 +1417,11 @@ class TKikimrIcGateway : public IKqpGateway {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
}

auto analyzePromise = NewPromise<TGenericResult>();
IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise);
RegisterActor(analyzeActor);

return analyzePromise.GetFuture();
} catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
Expand Down Expand Up @@ -1995,7 +1993,7 @@ class TKikimrIcGateway : public IKqpGateway {
}

TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TMaybe<TString>& traceId) override
{
YQL_ENSURE(cluster == Cluster);
Expand Down Expand Up @@ -2075,7 +2073,7 @@ class TKikimrIcGateway : public IKqpGateway {
}

TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TMaybe<TString>& traceId) override
{
YQL_ENSURE(cluster == Cluster);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult

for (auto& resultStr : ResultProviderConfig.CommittedResults) {
queryResult.Results.emplace_back(
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
NKikimrMiniKQL::TResult* result = queryResult.Results.back();
google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get()));
Ydb::ResultSet* result = queryResult.Results.back();

if (!result->ParseFromArray(resultStr.data(), resultStr.size())) {
queryResult = ResultFromError<TResult>("Failed to parse run result.");
Expand Down
16 changes: 3 additions & 13 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,7 @@ class TPrepareQueryAsyncResult : public TKqpAsyncResultBase<IKikimrQueryExecutor
}

void FillResult(TResult& queryResult) const override {
TVector<NKikimrMiniKQL::TResult*> results;
for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
queryResult.ProtobufArenaPtr.get());

result->CopyFrom(phyResult);
results.push_back(result);
}

queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
queryResult.Results = std::move(results);
}

private:
Expand Down Expand Up @@ -328,7 +318,7 @@ class TKqpRunner : public IKqpRunner {
Config),
"BuildPhysicalTxs")
.Build(false));

auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery")
Expand Down Expand Up @@ -403,8 +393,8 @@ class TKqpRunner : public IKqpRunner {
TKqpProviderContext Pctx;

TAutoPtr<IGraphTransformer> Transformer;
TActorSystem* ActorSystem;

TActorSystem* ActorSystem;
};

} // namespace
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kqp/host/kqp_transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase {
NKqpProto::TKqpStatsQuery QueryStats;
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;

TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;

NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole
TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks;

void Reset() {
ReplyTarget = {};
MkqlResults.clear();
QueryStats = {};
PhysicalQuery = nullptr;
PhysicalQueryResults.clear();
ExplainTransformerInput = nullptr;
DataQueryBlocks = Nothing();
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,16 +997,12 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T

private:
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
const Ydb::ResultSet& resultValue, TExprContext& ctx)
{
TColumnOrder columnHints(NCommon::GetResOrPullColumnHints(node));

auto protoValue = &resultValue;
YQL_ENSURE(resultValue.GetArena());
if (IsRawKikimrResult(resultValue)) {
protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
}

YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom);
YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ class IKikimrGateway : public TThrRefBase {

struct TQueryResult : public TGenericResult {
TString SessionId;
TVector<NKikimrMiniKQL::TResult*> Results;
TVector<Ydb::ResultSet*> Results;
NKqpProto::TKqpStatsQuery QueryStats;
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;
Expand Down
88 changes: 0 additions & 88 deletions ydb/core/kqp/provider/yql_kikimr_results.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,94 +338,6 @@ void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer,
fillSettings, truncated, true);
}

bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result) {
auto& type = result.GetType();
if (type.GetKind() != NKikimrMiniKQL::ETypeKind::Struct) {
return true;
}

auto& structType = type.GetStruct();
if (structType.MemberSize() != 2) {
return true;
}

return structType.GetMember(0).GetName() != "Data" || structType.GetMember(1).GetName() != "Truncated";
}

NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena)
{
NKikimrMiniKQL::TResult* packedResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(arena);
auto* packedType = packedResult->MutableType();
packedType->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
auto* dataMember = packedType->MutableStruct()->AddMember();
dataMember->SetName("Data");
auto* truncatedMember = packedType->MutableStruct()->AddMember();
truncatedMember->SetName("Truncated");
truncatedMember->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
truncatedMember->MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<bool>::Id);

auto* packedValue = packedResult->MutableValue();
auto* dataValue = packedValue->AddStruct();
auto* dataType = dataMember->MutableType();
auto* truncatedValue = packedValue->AddStruct();

bool truncated = false;
if (result.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
const auto& itemType = result.GetType().GetList().GetItem();

TMap<TString, size_t> memberIndices;
if (itemType.GetKind() == NKikimrMiniKQL::ETypeKind::Struct && columnHints.Size() != 0) {
const auto& structType = itemType.GetStruct();

for (size_t i = 0; i < structType.MemberSize(); ++i) {
memberIndices[structType.GetMember(i).GetName()] = i;
}

dataType->SetKind(NKikimrMiniKQL::ETypeKind::List);
auto* newItem = dataType->MutableList()->MutableItem();
newItem->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
auto* newStructType = newItem->MutableStruct();
for (auto& [column, gen_col] : columnHints) {
auto* memberIndex = memberIndices.FindPtr(gen_col);
YQL_ENSURE(memberIndex);

(*newStructType->AddMember() = structType.GetMember(*memberIndex)).SetName(column);
}
} else {
*dataType = result.GetType();
}

ui64 rowsWritten = 0;
ui64 bytesWritten = 0;
for (auto& item : result.GetValue().GetList()) {
if (ResultsOverflow(rowsWritten, bytesWritten, fillSettings)) {
truncated = true;
break;
}
if (!memberIndices.empty()) {
auto* newStruct = dataValue->AddList();
for (auto& [column, gen_column] : columnHints) {
auto* memberIndex = memberIndices.FindPtr(gen_column);
YQL_ENSURE(memberIndex);

*newStruct->AddStruct() = item.GetStruct(*memberIndex);
}
} else {
*dataValue->AddList() = item;
}

bytesWritten += item.ByteSize();
++rowsWritten;
}
} else {
dataType->CopyFrom(result.GetType());
dataValue->CopyFrom(result.GetValue());
}

truncatedValue->SetBool(truncated);
return packedResult;
}

const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx) {
switch (type.GetKind()) {
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kqp/provider/yql_kikimr_results.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ namespace NYql {
void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer, const NKikimrMiniKQL::TResult& result,
const TColumnOrder& columnHints, const IDataProvider::TFillSettings& fillSettings, bool& truncated);

NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena);

bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result);

const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx);
bool ExportTypeToKikimrProto(const TTypeAnnotationNode& type, NKikimrMiniKQL::TType& protoType, TExprContext& ctx);
TExprNode::TPtr ParseKikimrProtoValue(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
Expand Down
Loading
Loading