Skip to content

Commit

Permalink
Merge 2334a5f into b399caf
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Feb 7, 2024
2 parents b399caf + 2334a5f commit 9201ed7
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 58 deletions.
38 changes: 17 additions & 21 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,18 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
return;
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());
Ydb::Query::ExecuteQueryResponsePart response;

auto& kqpResponse = record.GetResponse();
if (NeedReportStats(*Request_->GetProtoRequest())) {
hasTrailingMessage = true;
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
}

Ydb::Query::ExecuteQueryResponsePart response;
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());

if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) {
Expand All @@ -415,25 +421,15 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
hasTrailingMessage = true;
response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
}

if (NeedReportStats(*Request_->GetProtoRequest())) {
hasTrailingMessage = true;
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
}

if (hasTrailingMessage) {
response.set_status(Ydb::StatusIds::SUCCESS);
response.mutable_issues()->CopyFrom(issueMessage);
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out));
}
}

if (!hasTrailingMessage) {
if (hasTrailingMessage) {
response.set_status(record.GetYdbStatus());
response.mutable_issues()->CopyFrom(issueMessage);
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out));
} else {
NYql::TIssues issues;
NYql::IssuesFromMessage(issueMessage, issues);
ReplyFinishStream(record.GetYdbStatus(), issueMessage);
Expand Down
28 changes: 17 additions & 11 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,18 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
PassAway();
}

void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType) {
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
preparingQuery.release(), AppData()->FunctionRegistry);
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

if (AstResult) {
KqpCompileResult->Ast = AstResult->Ast;
}
}

void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
Y_ENSURE(!ev->Get()->QueryId);

Expand Down Expand Up @@ -403,17 +415,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

if (status == Ydb::StatusIds::SUCCESS) {
YQL_ENSURE(kqpResult.PreparingQuery);
{
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry);
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

if (AstResult) {
KqpCompileResult->Ast = AstResult->Ast;
}
}
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);

auto now = TInstant::Now();
auto duration = now - StartTime;
Expand All @@ -423,6 +425,10 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", self: " << ctx.SelfID
<< ", duration: " << duration);
} else {
if (kqpResult.PreparingQuery) {
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);
}

LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed"
<< ", self: " << ctx.SelfID
<< ", status: " << Ydb::StatusIds_StatusCode_Name(status)
Expand Down
47 changes: 42 additions & 5 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ class TAsyncValidateYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResul
, SqlVersion(sqlVersion) {}

void FillResult(TResult& validateResult) const override {
if (!validateResult.Success()) {
return;
}

YQL_ENSURE(SessionCtx->Query().PrepareOnly);
validateResult.PreparedQuery.reset(SessionCtx->Query().PreparingQuery.release());
validateResult.SqlVersion = SqlVersion;
Expand Down Expand Up @@ -211,6 +215,10 @@ class TAsyncExplainYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
, UseDqExplain(useDqExplain) {}

void FillResult(TResult& queryResult) const override {
if (!queryResult.Success()) {
return;
}

if (UseDqExplain) {
TVector<const TString> plans;
for (auto id : SessionCtx->Query().ExecutionOrder) {
Expand Down Expand Up @@ -253,6 +261,10 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
, SqlVersion(sqlVersion) {}

void FillResult(TResult& queryResult) const override {
if (!queryResult.Success()) {
return;
}

for (auto& resultStr : ResultProviderConfig.CommittedResults) {
queryResult.Results.emplace_back(
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
Expand Down Expand Up @@ -300,6 +312,10 @@ class TAsyncExecuteKqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
, ExecuteCtx(executeCtx) {}

void FillResult(TResult& queryResult) const override {
if (!queryResult.Success()) {
return;
}

YQL_ENSURE(ExecuteCtx.QueryResults.size() == 1);
queryResult = std::move(ExecuteCtx.QueryResults[0]);
queryResult.QueryPlan = queryResult.PreparingQuery->GetPhysicalQuery().GetQueryPlan();
Expand All @@ -320,13 +336,28 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
using TResult = IKqpHost::TQueryResult;

TAsyncPrepareYqlResult(TExprNode* queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer,
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion)
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion,
TIntrusivePtr<TKqlTransformContext> transformCtx)
: TKqpAsyncResultBase(queryRoot, exprCtx, transformer)
, QueryCtx(queryCtx)
, ExprCtx(exprCtx)
, TransformCtx(transformCtx)
, QueryText(query.Text)
, SqlVersion(sqlVersion) {}

void FillResult(TResult& prepareResult) const override {
if (!prepareResult.Success()) {
auto exprRoot = GetExprRoot();
if (TransformCtx && TransformCtx->ExplainTransformerInput) {
exprRoot = TransformCtx->ExplainTransformerInput;
}
if (exprRoot) {
prepareResult.PreparingQuery = std::move(QueryCtx->PreparingQuery);
prepareResult.PreparingQuery->MutablePhysicalQuery()->SetQueryAst(KqpExprToPrettyString(*exprRoot, ExprCtx));
}
return;
}

YQL_ENSURE(QueryCtx->PrepareOnly);
YQL_ENSURE(QueryCtx->PreparingQuery);

Expand All @@ -344,6 +375,8 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult

private:
TIntrusivePtr<TKikimrQueryContext> QueryCtx;
NYql::TExprContext& ExprCtx;
TIntrusivePtr<TKqlTransformContext> TransformCtx;
TString QueryText;
TMaybe<TSqlVersion> SqlVersion;
};
Expand Down Expand Up @@ -933,6 +966,7 @@ class TKqpHost : public IKqpHost {
, IsInternalCall(isInternalCall)
, FederatedQuerySetup(federatedQuerySetup)
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
, Config(config)
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
, FakeWorld(ExprCtx->NewWorld(TPosition()))
Expand Down Expand Up @@ -1265,7 +1299,7 @@ class TKqpHost : public IKqpHost {
}

return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
query.Text, sqlVersion);
query.Text, sqlVersion, TransformCtx);
}

IAsyncQueryResultPtr PrepareDataQueryAstInternal(const TKqpQueryRef& queryAst, const TPrepareSettings& settings,
Expand Down Expand Up @@ -1327,7 +1361,7 @@ class TKqpHost : public IKqpHost {
}

return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
query.Text, sqlVersion);
query.Text, sqlVersion, TransformCtx);
}

IAsyncQueryResultPtr PrepareScanQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx,
Expand All @@ -1354,7 +1388,7 @@ class TKqpHost : public IKqpHost {
}

return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
query.Text, sqlVersion);
query.Text, sqlVersion, TransformCtx);
}

IAsyncQueryResultPtr PrepareScanQueryAstInternal(const TKqpQueryRef& queryAst, TExprContext& ctx) {
Expand Down Expand Up @@ -1502,7 +1536,8 @@ class TKqpHost : public IKqpHost {
}

void Init(EKikimrQueryType queryType) {
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry);

ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef();
Expand Down Expand Up @@ -1635,6 +1670,7 @@ class TKqpHost : public IKqpHost {
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;

TIntrusivePtr<TKikimrSessionContext> SessionCtx;
TKikimrConfiguration::TPtr Config;

TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistryHolder;
const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
Expand All @@ -1648,6 +1684,7 @@ class TKqpHost : public IKqpHost {
TExprNode::TPtr FakeWorld;

TIntrusivePtr<TExecuteContext> ExecuteCtx;
TIntrusivePtr<TKqlTransformContext> TransformCtx;
TIntrusivePtr<IKqpRunner> KqpRunner;
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/host/kqp_host_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class TKqpAsyncResultBase : public NYql::IKikimrAsyncResult<TResult> {
YQL_ENSURE(HasResult());

if (Status.GetValue() == NYql::IGraphTransformer::TStatus::Error) {
return NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
TResult result = NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
FillResult(result);
return result;
}

YQL_ENSURE(Status.GetValue() == NYql::IGraphTransformer::TStatus::Ok);
Expand Down Expand Up @@ -244,7 +246,7 @@ class IKqpRunner : public TThrRefBase {

TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry);
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry);

TAutoPtr<NYql::IGraphTransformer> CreateKqpExplainPreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, TIntrusivePtr<TKqlTransformContext> transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry,
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ class TKqpRunner : public IKqpRunner {
public:
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry)
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
: Gateway(gateway)
, Cluster(cluster)
, TypesCtx(*typesCtx)
, SessionCtx(sessionCtx)
, FunctionRegistry(funcRegistry)
, Config(sessionCtx->ConfigPtr())
, TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr()))
, TransformCtx(transformCtx)
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
sessionCtx->TablesPtr()))
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
Expand Down Expand Up @@ -377,9 +377,9 @@ class TKqpRunner : public IKqpRunner {

TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry)
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
{
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, funcRegistry);
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry);
}

} // namespace NKqp
Expand Down
15 changes: 11 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1720,10 +1720,17 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
FillColumnsMeta(phyQuery, response);
} else if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
// The compile timeout cause cancelation execution of request.
// So in case of cancel after we can reply with canceled status
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
} else {
if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
// The compile timeout cause cancelation execution of request.
// So in case of cancel after we can reply with canceled status
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
}

auto& preparedQuery = compileResult->PreparedQuery;
if (preparedQuery && QueryState->ReportStats() && QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
response.SetQueryAst(preparedQuery->GetPhysicalQuery().GetQueryAst());
}
}
}

Expand Down
23 changes: 23 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,29 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2);
}

Y_UNIT_TEST(ExecStatsAst) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();

auto settings = TExecuteQuerySettings()
.StatsMode(EStatsMode::Full);

std::vector<std::pair<TString, EStatus>> cases = {
{ "SELECT 42 AS test_ast_column", EStatus::SUCCESS },
{ "SELECT test_ast_column FROM TwoShard", EStatus::GENERIC_ERROR },
{ "SELECT UNWRAP(42 / 0) AS test_ast_column", EStatus::PRECONDITION_FAILED },
};

for (const auto& [sql, status] : cases) {
auto result = db.ExecuteQuery(sql, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString());

UNIT_ASSERT(result.GetStats().Defined());
UNIT_ASSERT(result.GetStats()->GetAst().Defined());
UNIT_ASSERT_STRING_CONTAINS(*result.GetStats()->GetAst(), "test_ast_column");
}
}

Y_UNIT_TEST(Ddl) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
Expand Down
15 changes: 8 additions & 7 deletions ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,21 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
Iterator_.ReadNext().Subscribe([self](TAsyncExecuteQueryPart partFuture) mutable {
auto part = partFuture.ExtractValue();

if (const auto& st = part.GetStats()) {
self->Stats_ = st;
}

if (!part.IsSuccess()) {
TMaybe<TExecStats> stats;
std::swap(self->Stats_, stats);

if (part.EOS()) {
TVector<NYql::TIssue> issues;
TVector<Ydb::ResultSet> resultProtos;
TMaybe<TExecStats> stats;
TMaybe<TTransaction> tx;

std::swap(self->Issues_, issues);
std::swap(self->ResultSets_, resultProtos);
std::swap(self->Stats_, stats);
std::swap(self->Tx_, tx);

TVector<TResultSet> resultSets;
Expand All @@ -160,7 +165,7 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
std::move(tx)
));
} else {
self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, {}, {}));
self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, std::move(stats), {}));
}

return;
Expand All @@ -185,10 +190,6 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
resultSet.mutable_rows()->Add(inRsProto.rows().begin(), inRsProto.rows().end());
}

if (const auto& st = part.GetStats()) {
self->Stats_ = st;
}

if (const auto& tx = part.GetTransaction()) {
self->Tx_ = tx;
}
Expand Down
Loading

0 comments on commit 9201ed7

Please sign in to comment.