Skip to content

Commit

Permalink
get rid of mkql results in scripting (ydb-platform#9997)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit committed Oct 7, 2024
1 parent 462f216 commit 24eb06b
Show file tree
Hide file tree
Showing 31 changed files with 28,173 additions and 174 deletions.
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
// https://protobuf.dev/reference/cpp/arenas/#swap
// Actualy will be copy in case pf remote execution
queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
} else {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
}

ConvertQueryStats(kqpResponse, queryResult);
if (kqpResponse.HasTxMeta()) {
queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/grpc_services/rpc_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActor<TExecuteYqlScriptRPC, TE
auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_);

try {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
const auto& results = kqpResponse.GetYdbResults();
for (const auto& result : results) {
queryResult->add_result_sets()->CopyFrom(result);
}

} catch (const std::exception& ex) {
NYql::TIssues issues;
issues.AddIssue(NYql::ExceptionToIssue(ex));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace {
{}

NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
};

enum EStreamRpcWakeupTag : ui64 {
Expand Down Expand Up @@ -218,7 +218,7 @@ class TStreamExecuteYqlScriptRPC
auto result = response.mutable_result();

try {
NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
result->mutable_result_set()->CopyFrom(kqpResult);
} catch (std::exception ex) {
ReplyFinishStream(ex.what());
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
const TString& poolId = "");

TEvQueryRequest() = default;
TEvQueryRequest() {
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
}

bool IsSerializable() const override {
return true;
Expand Down
11 changes: 1 addition & 10 deletions ydb/core/kqp/common/kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,9 @@

namespace NKikimr::NKqp {

void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);

TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId);
bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId);

template<typename TFrom, typename TTo>
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
const auto& results = from.GetResults();
for (const auto& result : results) {
ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
}
}

class TKqpRequestInfo {
public:
Expand Down Expand Up @@ -80,7 +71,7 @@ class IQueryReplayBackend : public TNonCopyable {
/// Accepts query text
virtual void Collect(const TString& queryData) = 0;

virtual bool IsNull() { return false; }
virtual bool IsNull() { return false; }

virtual ~IQueryReplayBackend() {};

Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetPoolId(PoolId);
}

<<<<<<< HEAD
=======
if (!DatabaseId.empty()) {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}

Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
>>>>>>> a491b73df3 (get rid of mkql results in scripting (#9997))
Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
20 changes: 9 additions & 11 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults());
response.AddYdbResults()->CopyFrom(ResultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -285,20 +285,18 @@ class TKqpStreamRequestHandler : public TRequestHandlerBase<
virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record.GetRef();
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
if (record.MutableResponse()->GetResults().size()) {
if (record.MutableResponse()->GetYdbResults().size()) {
// Send result sets to RPC actor TStreamExecuteYqlScriptRPC
auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>();
ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId());

for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) {
for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) {
// Workaround to avoid errors on Pull execution stage which would expect some results
Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults());
evStreamPart->Record.AddResults();
}

evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults());
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults());
this->Send(TargetActorId, evStreamPart.Release());

// Save response without data to send it later
ResponseHandle = ev.Release();
} else {
Expand Down Expand Up @@ -404,7 +402,7 @@ class TKqpForwardStreamRequestHandler : public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults());
response.AddYdbResults()->CopyFrom(resultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -510,7 +508,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

for (auto& resultSet : ResultSets) {
ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
response.AddYdbResults()->Swap(&resultSet.ResultSet);
}

TBase::HandleResponse(ev, ctx);
Expand Down Expand Up @@ -671,8 +669,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp
queryResult.AddIssue(NYql::IssueFromMessage(issue));
}

for (auto& result : queryResponse.GetResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
for (auto& result : queryResponse.GetYdbResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(
queryResult.ProtobufArenaPtr.get());

arenaResult->CopyFrom(result);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult

for (auto& resultStr : ResultProviderConfig.CommittedResults) {
queryResult.Results.emplace_back(
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
NKikimrMiniKQL::TResult* result = queryResult.Results.back();
google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get()));
Ydb::ResultSet* result = queryResult.Results.back();

if (!result->ParseFromArray(resultStr.data(), resultStr.size())) {
queryResult = ResultFromError<TResult>("Failed to parse run result.");
Expand Down
10 changes: 0 additions & 10 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,7 @@ class TPrepareQueryAsyncResult : public TKqpAsyncResultBase<IKikimrQueryExecutor
}

void FillResult(TResult& queryResult) const override {
TVector<NKikimrMiniKQL::TResult*> results;
for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
queryResult.ProtobufArenaPtr.get());

result->CopyFrom(phyResult);
results.push_back(result);
}

queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
queryResult.Results = std::move(results);
}

private:
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kqp/host/kqp_transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase {
NKqpProto::TKqpStatsQuery QueryStats;
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;

TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;

NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole
TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks;

void Reset() {
ReplyTarget = {};
MkqlResults.clear();
QueryStats = {};
PhysicalQuery = nullptr;
PhysicalQueryResults.clear();
ExplainTransformerInput = nullptr;
DataQueryBlocks = Nothing();
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,16 +980,12 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T

private:
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
const Ydb::ResultSet& resultValue, TExprContext& ctx)
{
TVector<TString> columnHints(NCommon::GetResOrPullColumnHints(node));

auto protoValue = &resultValue;
YQL_ENSURE(resultValue.GetArena());
if (IsRawKikimrResult(resultValue)) {
protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
}

YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom);
YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat);

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,9 @@ class IKikimrGateway : public TThrRefBase {

struct TQueryResult : public TGenericResult {
TString SessionId;
TVector<NKikimrMiniKQL::TResult*> Results;
TVector<Ydb::ResultSet*> Results;
TMaybe<NKikimrKqp::TQueryProfile> Profile; // TODO: Deprecate.
a491b73df3 (get rid of mkql results in scripting (#9997))
NKqpProto::TKqpStatsQuery QueryStats;
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;
Expand Down
30 changes: 0 additions & 30 deletions ydb/core/kqp/session_actor/kqp_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,36 +53,6 @@ bool HasSchemeOrFatalIssues(const TIssue& issue) {

} // namespace

void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to) {
const auto& type = from.GetType();
TStackVec<NKikimrMiniKQL::TType> columnTypes;
Y_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
for (const auto& member : type.GetStruct().GetMember()) {
if (member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
for (const auto& column : member.GetType().GetList().GetItem().GetStruct().GetMember()) {
auto columnMeta = to->add_columns();
columnMeta->set_name(column.GetName());
columnTypes.push_back(column.GetType());
ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type());
}
}
}
for (const auto& responseStruct : from.GetValue().GetStruct()) {
for (const auto& row : responseStruct.GetList()) {
auto newRow = to->add_rows();
ui32 columnCount = static_cast<ui32>(row.StructSize());
Y_ENSURE(columnCount == columnTypes.size());
for (ui32 i = 0; i < columnCount; i++) {
const auto& column = row.GetStruct(i);
ConvertMiniKQLValueToYdbValue(columnTypes[i], column, *newRow->add_items());
}
}
if (responseStruct.Getvalue_valueCase() == NKikimrMiniKQL::TValue::kBool) {
to->set_truncated(responseStruct.GetBool());
}
}
}

TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) {
if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
return Ydb::StatusIds::INTERNAL_ERROR;
Expand Down
27 changes: 5 additions & 22 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
// Result for scan query is sent directly to target actor.
Y_ABORT_UNLESS(response->GetArena());
if (QueryState->PreparedQuery) {
bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
size_t trailingResultsCount = 0;
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
Expand All @@ -1788,28 +1787,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
continue;
}

if (useYdbResponseFormat) {
TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
}
auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
response->AddYdbResults()->Swap(ydbResult);
} else {
auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), response->GetArena());
std::optional<IDataProvider::TFillSettings> fillSettings;
if (QueryState->PreparedQuery->ResultsSize()) {
YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), ""
<< phyQuery.ResultBindingsSize() << " != " << QueryState->PreparedQuery->ResultsSize());
const auto& result = QueryState->PreparedQuery->GetResults(i);
if (result.GetRowsLimit()) {
fillSettings = FillSettings;
fillSettings->RowsLimitPerWrite = result.GetRowsLimit();
}
}
auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena());
response->AddResults()->Swap(finalResult);
TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
}
auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
response->AddYdbResults()->Swap(ydbResult);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
// If we have result it must be allocated on protobuf arena
Y_ASSERT(result->GetArena());
Y_ASSERT(resp->GetArena() == result->GetArena());
resp->AddResults()->Swap(result);
resp->AddYdbResults()->Swap(result);
}
} else {
auto resp = ev.MutableResponse();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config,
<< 'b';

ui64 resultsSize = 0;
for (auto& result : record->GetResponse().GetResults()) {
for (auto& result : record->GetResponse().GetYdbResults()) {
resultsSize += result.ByteSize();
}

Expand Down
37 changes: 33 additions & 4 deletions ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_BY_SIZE = ENABLED);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

result = client.ExecuteYqlScript(R"(
ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_BY_SIZE = ENABLED);
COMMIT;
Expand All @@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 4);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

result = client.ExecuteYqlScript(R"(
ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_BY_SIZE = ENABLED);
COMMIT;
Expand Down Expand Up @@ -989,7 +989,36 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
[[[101u]];[[201u]];[[301u]];[[401u]];[[501u]];[[601u]];[[701u]];[[801u]]];
[[8u]];
[[8u]];
[[8u]]])", StreamResultToYson(it));
[[8u]]
])", StreamResultToYson(it));
}

Y_UNIT_TEST(SelectNullType) {
TKikimrRunner kikimr;
TScriptingClient client(kikimr.GetDriver());
{
auto result = client.ExecuteYqlScript(R"(
CREATE TABLE demo1(id Text, PRIMARY KEY(id));
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = client.ExecuteYqlScript(R"(
UPSERT INTO demo1(id) VALUES("a"),("b"),("c");
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = client.ExecuteYqlScript(R"(
SELECT NULL auto_proc_ FROM demo1 LIMIT 10;
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
[#];[#];[#]
])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(StreamExecuteYqlScriptLeadingEmptyScan) {
Expand Down Expand Up @@ -1206,7 +1235,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {

Y_UNIT_TEST(StreamExecuteYqlScriptPg) {
TKikimrRunner kikimr;

auto settings = TExecuteYqlRequestSettings()
.Syntax(Ydb::Query::SYNTAX_PG);

Expand Down
Loading

0 comments on commit 24eb06b

Please sign in to comment.