Skip to content

Commit

Permalink
Merge 5d87743 into 2355ac3
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Oct 2, 2024
2 parents 2355ac3 + 5d87743 commit 4e8bdca
Show file tree
Hide file tree
Showing 18 changed files with 87 additions and 130 deletions.
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace {
{}

NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
};

enum EStreamRpcWakeupTag : ui64 {
Expand Down Expand Up @@ -220,7 +220,7 @@ class TStreamExecuteYqlScriptRPC
auto result = response.mutable_result();

try {
NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
result->mutable_result_set()->CopyFrom(kqpResult);
} catch (std::exception ex) {
ReplyFinishStream(ex.what());
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
const TString& poolId = "");

TEvQueryRequest() = default;
TEvQueryRequest() {
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
}

bool IsSerializable() const override {
return true;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/common/kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId

template<typename TFrom, typename TTo>
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
const auto& results = from.GetResults();
const auto& results = from.GetYdbResults();
for (const auto& result : results) {
ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
to->add_result_sets()->CopyFrom(result);
}
}

Expand Down Expand Up @@ -80,7 +80,7 @@ class IQueryReplayBackend : public TNonCopyable {
/// Accepts query text
virtual void Collect(const TString& queryData) = 0;

virtual bool IsNull() { return false; }
virtual bool IsNull() { return false; }

virtual ~IQueryReplayBackend() {};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}

Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
28 changes: 13 additions & 15 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults());
response.AddYdbResults()->CopyFrom(ResultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -286,20 +286,18 @@ class TKqpStreamRequestHandler : public TRequestHandlerBase<
virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record.GetRef();
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
if (record.MutableResponse()->GetResults().size()) {
if (record.MutableResponse()->GetYdbResults().size()) {
// Send result sets to RPC actor TStreamExecuteYqlScriptRPC
auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>();
ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId());

for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) {
for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) {
// Workaround to avoid errors on Pull execution stage which would expect some results
Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults());
evStreamPart->Record.AddResults();
}

evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults());
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults());
this->Send(TargetActorId, evStreamPart.Release());

// Save response without data to send it later
ResponseHandle = ev.Release();
} else {
Expand Down Expand Up @@ -405,7 +403,7 @@ class TKqpForwardStreamRequestHandler : public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults());
response.AddYdbResults()->CopyFrom(resultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -511,7 +509,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

for (auto& resultSet : ResultSets) {
ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
response.AddYdbResults()->Swap(&resultSet.ResultSet);
}

TBase::HandleResponse(ev, ctx);
Expand Down Expand Up @@ -672,8 +670,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp
queryResult.AddIssue(NYql::IssueFromMessage(issue));
}

for (auto& result : queryResponse.GetResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
for (auto& result : queryResponse.GetYdbResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(
queryResult.ProtobufArenaPtr.get());

arenaResult->CopyFrom(result);
Expand Down Expand Up @@ -1419,11 +1417,11 @@ class TKikimrIcGateway : public IKqpGateway {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
}

auto analyzePromise = NewPromise<TGenericResult>();
IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise);
RegisterActor(analyzeActor);

return analyzePromise.GetFuture();
} catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
Expand Down Expand Up @@ -1995,7 +1993,7 @@ class TKikimrIcGateway : public IKqpGateway {
}

TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TMaybe<TString>& traceId) override
{
YQL_ENSURE(cluster == Cluster);
Expand Down Expand Up @@ -2075,7 +2073,7 @@ class TKikimrIcGateway : public IKqpGateway {
}

TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TMaybe<TString>& traceId) override
{
YQL_ENSURE(cluster == Cluster);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult

for (auto& resultStr : ResultProviderConfig.CommittedResults) {
queryResult.Results.emplace_back(
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
NKikimrMiniKQL::TResult* result = queryResult.Results.back();
google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get()));
Ydb::ResultSet* result = queryResult.Results.back();

if (!result->ParseFromArray(resultStr.data(), resultStr.size())) {
queryResult = ResultFromError<TResult>("Failed to parse run result.");
Expand Down
16 changes: 3 additions & 13 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,7 @@ class TPrepareQueryAsyncResult : public TKqpAsyncResultBase<IKikimrQueryExecutor
}

void FillResult(TResult& queryResult) const override {
TVector<NKikimrMiniKQL::TResult*> results;
for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
queryResult.ProtobufArenaPtr.get());

result->CopyFrom(phyResult);
results.push_back(result);
}

queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
queryResult.Results = std::move(results);
}

private:
Expand Down Expand Up @@ -328,7 +318,7 @@ class TKqpRunner : public IKqpRunner {
Config),
"BuildPhysicalTxs")
.Build(false));

auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery")
Expand Down Expand Up @@ -403,8 +393,8 @@ class TKqpRunner : public IKqpRunner {
TKqpProviderContext Pctx;

TAutoPtr<IGraphTransformer> Transformer;
TActorSystem* ActorSystem;

TActorSystem* ActorSystem;
};

} // namespace
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kqp/host/kqp_transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase {
NKqpProto::TKqpStatsQuery QueryStats;
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;

TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;

NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole
TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks;

void Reset() {
ReplyTarget = {};
MkqlResults.clear();
QueryStats = {};
PhysicalQuery = nullptr;
PhysicalQueryResults.clear();
ExplainTransformerInput = nullptr;
DataQueryBlocks = Nothing();
}
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,15 +997,15 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T

private:
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
const Ydb::ResultSet& resultValue, TExprContext& ctx)
{
TColumnOrder columnHints(NCommon::GetResOrPullColumnHints(node));

auto protoValue = &resultValue;
YQL_ENSURE(resultValue.GetArena());
if (IsRawKikimrResult(resultValue)) {
protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
}
//if (IsRawKikimrResult(resultValue)) {
// protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
//}

YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom);
YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ class IKikimrGateway : public TThrRefBase {

struct TQueryResult : public TGenericResult {
TString SessionId;
TVector<NKikimrMiniKQL::TResult*> Results;
TVector<Ydb::ResultSet*> Results;
NKqpProto::TKqpStatsQuery QueryStats;
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;
Expand Down
19 changes: 10 additions & 9 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1788,7 +1788,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
// Result for scan query is sent directly to target actor.
Y_ABORT_UNLESS(response->GetArena());
if (QueryState->PreparedQuery) {
bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
// bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
size_t trailingResultsCount = 0;
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
Expand All @@ -1805,14 +1805,15 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
continue;
}

if (useYdbResponseFormat) {
//if (useYdbResponseFormat) {
TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
}
auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
response->AddYdbResults()->Swap(ydbResult);
} else {
//}
/*else {
auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), response->GetArena());
std::optional<IDataProvider::TFillSettings> fillSettings;
if (QueryState->PreparedQuery->ResultsSize()) {
Expand All @@ -1826,7 +1827,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena());
response->AddResults()->Swap(finalResult);
}
}*/
}
}

Expand Down Expand Up @@ -1892,10 +1893,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
AddTrailingInfo(response->Record.GetRef());

NDataIntegrity::LogIntegrityTrails(
request->Get()->GetTraceId(),
request->Get()->GetAction(),
request->Get()->GetType(),
response,
request->Get()->GetTraceId(),
request->Get()->GetAction(),
request->Get()->GetType(),
response,
TlsActivationContext->AsActorContext()
);

Expand Down Expand Up @@ -1955,7 +1956,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->UserRequestContext->TraceId,
QueryState->GetAction(),
QueryState->GetType(),
QueryResponse,
QueryResponse,
TlsActivationContext->AsActorContext()
);

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/session_actor/kqp_worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry,
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry,
!Settings.LongSession, false, nullptr, nullptr, nullptr, QueryState->RequestEv->GetUserRequestContext());

auto& queryRequest = QueryState->RequestEv;
Expand Down Expand Up @@ -959,7 +959,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
// If we have result it must be allocated on protobuf arena
Y_ASSERT(result->GetArena());
Y_ASSERT(resp->GetArena() == result->GetArena());
resp->AddResults()->Swap(result);
resp->AddYdbResults()->Swap(result);
}
} else {
auto resp = ev.MutableResponse();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config,
<< 'b';

ui64 resultsSize = 0;
for (auto& result : record->GetResponse().GetResults()) {
for (auto& result : record->GetResponse().GetYdbResults()) {
resultsSize += result.ByteSize();
}

Expand Down
Loading

0 comments on commit 4e8bdca

Please sign in to comment.