diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index dfb2ed24949e..d60d704873e5 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -394,12 +394,18 @@ class TExecuteQueryRPC : public TActorBootstrapped { return; } - if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - Request_->SetRuHeader(record.GetConsumedRu()); + Ydb::Query::ExecuteQueryResponsePart response; - auto& kqpResponse = record.GetResponse(); + if (NeedReportStats(*Request_->GetProtoRequest())) { + hasTrailingMessage = true; + FillQueryStats(*response.mutable_exec_stats(), kqpResponse); + if (NeedReportAst(*Request_->GetProtoRequest())) { + response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst()); + } + } - Ydb::Query::ExecuteQueryResponsePart response; + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + Request_->SetRuHeader(record.GetConsumedRu()); if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) { for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) { @@ -415,25 +421,15 @@ class TExecuteQueryRPC : public TActorBootstrapped { hasTrailingMessage = true; response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id()); } - - if (NeedReportStats(*Request_->GetProtoRequest())) { - hasTrailingMessage = true; - FillQueryStats(*response.mutable_exec_stats(), kqpResponse); - if (NeedReportAst(*Request_->GetProtoRequest())) { - response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst()); - } - } - - if (hasTrailingMessage) { - response.set_status(Ydb::StatusIds::SUCCESS); - response.mutable_issues()->CopyFrom(issueMessage); - TString out; - Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); - ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out)); - } } - if (!hasTrailingMessage) { + if (hasTrailingMessage) { + response.set_status(record.GetYdbStatus()); + response.mutable_issues()->CopyFrom(issueMessage); + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); + ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out)); + } else { NYql::TIssues issues; NYql::IssuesFromMessage(issueMessage, issues); ReplyFinishStream(record.GetYdbStatus(), issueMessage); diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 283c6a2cde88..29fa1de3bf9f 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -372,6 +372,18 @@ class TKqpCompileActor : public TActorBootstrapped { PassAway(); } + void FillCompileResult(std::unique_ptr preparingQuery, NKikimrKqp::EQueryType queryType) { + auto preparedQueryHolder = std::make_shared( + preparingQuery.release(), AppData()->FunctionRegistry); + preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType); + KqpCompileResult->PreparedQuery = preparedQueryHolder; + KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery()); + + if (AstResult) { + KqpCompileResult->Ast = AstResult->Ast; + } + } + void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) { Y_ENSURE(!ev->Get()->QueryId); @@ -403,17 +415,7 @@ class TKqpCompileActor : public TActorBootstrapped { if (status == Ydb::StatusIds::SUCCESS) { YQL_ENSURE(kqpResult.PreparingQuery); - { - auto preparedQueryHolder = std::make_shared( - kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry); - preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType); - KqpCompileResult->PreparedQuery = preparedQueryHolder; - KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery()); - - if (AstResult) { - KqpCompileResult->Ast = AstResult->Ast; - } - } + FillCompileResult(std::move(kqpResult.PreparingQuery), queryType); auto now = TInstant::Now(); auto duration = now - StartTime; @@ -423,6 +425,10 @@ class TKqpCompileActor : public TActorBootstrapped { << ", self: " << ctx.SelfID << ", duration: " << duration); } else { + if (kqpResult.PreparingQuery) { + FillCompileResult(std::move(kqpResult.PreparingQuery), queryType); + } + LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed" << ", self: " << ctx.SelfID << ", status: " << Ydb::StatusIds_StatusCode_Name(status) diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 750fe8120dbc..dec005879dec 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -182,6 +182,10 @@ class TAsyncValidateYqlResult : public TKqpAsyncResultBaseQuery().PrepareOnly); validateResult.PreparedQuery.reset(SessionCtx->Query().PreparingQuery.release()); validateResult.SqlVersion = SqlVersion; @@ -211,6 +215,10 @@ class TAsyncExplainYqlResult : public TKqpAsyncResultBase plans; for (auto id : SessionCtx->Query().ExecutionOrder) { @@ -253,6 +261,10 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase(queryResult.ProtobufArenaPtr.get())); @@ -300,6 +312,10 @@ class TAsyncExecuteKqlResult : public TKqpAsyncResultBaseGetPhysicalQuery().GetQueryPlan(); @@ -320,13 +336,28 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase queryCtx, const TKqpQueryRef& query, TMaybe sqlVersion) + TIntrusivePtr queryCtx, const TKqpQueryRef& query, TMaybe sqlVersion, + TIntrusivePtr transformCtx) : TKqpAsyncResultBase(queryRoot, exprCtx, transformer) , QueryCtx(queryCtx) + , ExprCtx(exprCtx) + , TransformCtx(transformCtx) , QueryText(query.Text) , SqlVersion(sqlVersion) {} void FillResult(TResult& prepareResult) const override { + if (!prepareResult.Success()) { + auto exprRoot = GetExprRoot(); + if (TransformCtx && TransformCtx->ExplainTransformerInput) { + exprRoot = TransformCtx->ExplainTransformerInput; + } + if (exprRoot) { + prepareResult.PreparingQuery = std::move(QueryCtx->PreparingQuery); + prepareResult.PreparingQuery->MutablePhysicalQuery()->SetQueryAst(KqpExprToPrettyString(*exprRoot, ExprCtx)); + } + return; + } + YQL_ENSURE(QueryCtx->PrepareOnly); YQL_ENSURE(QueryCtx->PreparingQuery); @@ -344,6 +375,8 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase QueryCtx; + NYql::TExprContext& ExprCtx; + TIntrusivePtr TransformCtx; TString QueryText; TMaybe SqlVersion; }; @@ -933,6 +966,7 @@ class TKqpHost : public IKqpHost { , IsInternalCall(isInternalCall) , FederatedQuerySetup(federatedQuerySetup) , SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken)) + , Config(config) , TypesCtx(MakeIntrusive()) , PlanBuilder(CreatePlanBuilder(*TypesCtx)) , FakeWorld(ExprCtx->NewWorld(TPosition())) @@ -1265,7 +1299,7 @@ class TKqpHost : public IKqpHost { } return MakeIntrusive(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(), - query.Text, sqlVersion); + query.Text, sqlVersion, TransformCtx); } IAsyncQueryResultPtr PrepareDataQueryAstInternal(const TKqpQueryRef& queryAst, const TPrepareSettings& settings, @@ -1327,7 +1361,7 @@ class TKqpHost : public IKqpHost { } return MakeIntrusive(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(), - query.Text, sqlVersion); + query.Text, sqlVersion, TransformCtx); } IAsyncQueryResultPtr PrepareScanQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx, @@ -1354,7 +1388,7 @@ class TKqpHost : public IKqpHost { } return MakeIntrusive(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(), - query.Text, sqlVersion); + query.Text, sqlVersion, TransformCtx); } IAsyncQueryResultPtr PrepareScanQueryAstInternal(const TKqpQueryRef& queryAst, TExprContext& ctx) { @@ -1502,7 +1536,8 @@ class TKqpHost : public IKqpHost { } void Init(EKikimrQueryType queryType) { - KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry); + TransformCtx = MakeIntrusive(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr()); + KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry); ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef(); ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef(); @@ -1635,6 +1670,7 @@ class TKqpHost : public IKqpHost { std::optional FederatedQuerySetup; TIntrusivePtr SessionCtx; + TKikimrConfiguration::TPtr Config; TIntrusivePtr FuncRegistryHolder; const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry; @@ -1648,6 +1684,7 @@ class TKqpHost : public IKqpHost { TExprNode::TPtr FakeWorld; TIntrusivePtr ExecuteCtx; + TIntrusivePtr TransformCtx; TIntrusivePtr KqpRunner; NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})}; diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h index 550f9e2776d3..17110a986926 100644 --- a/ydb/core/kqp/host/kqp_host_impl.h +++ b/ydb/core/kqp/host/kqp_host_impl.h @@ -34,7 +34,9 @@ class TKqpAsyncResultBase : public NYql::IKikimrAsyncResult { YQL_ENSURE(HasResult()); if (Status.GetValue() == NYql::IGraphTransformer::TStatus::Error) { - return NYql::NCommon::ResultFromErrors(ExprCtx.IssueManager.GetIssues()); + TResult result = NYql::NCommon::ResultFromErrors(ExprCtx.IssueManager.GetIssues()); + FillResult(result); + return result; } YQL_ENSURE(Status.GetValue() == NYql::IGraphTransformer::TStatus::Ok); @@ -244,7 +246,7 @@ class IKqpRunner : public TThrRefBase { TIntrusivePtr CreateKqpRunner(TIntrusivePtr gateway, const TString& cluster, const TIntrusivePtr& typesCtx, const TIntrusivePtr& sessionCtx, - const NMiniKQL::IFunctionRegistry& funcRegistry); + const TIntrusivePtr& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry); TAutoPtr CreateKqpExplainPreparedTransformer(TIntrusivePtr gateway, const TString& cluster, TIntrusivePtr transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry, diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 6e0d9b7f98bc..8e113670b9ca 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -137,14 +137,14 @@ class TKqpRunner : public IKqpRunner { public: TKqpRunner(TIntrusivePtr gateway, const TString& cluster, const TIntrusivePtr& typesCtx, const TIntrusivePtr& sessionCtx, - const NMiniKQL::IFunctionRegistry& funcRegistry) + const TIntrusivePtr& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry) : Gateway(gateway) , Cluster(cluster) , TypesCtx(*typesCtx) , SessionCtx(sessionCtx) , FunctionRegistry(funcRegistry) , Config(sessionCtx->ConfigPtr()) - , TransformCtx(MakeIntrusive(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr())) + , TransformCtx(transformCtx) , OptimizeCtx(MakeIntrusive(cluster, Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr())) , BuildQueryCtx(MakeIntrusive()) @@ -377,9 +377,9 @@ class TKqpRunner : public IKqpRunner { TIntrusivePtr CreateKqpRunner(TIntrusivePtr gateway, const TString& cluster, const TIntrusivePtr& typesCtx, const TIntrusivePtr& sessionCtx, - const NMiniKQL::IFunctionRegistry& funcRegistry) + const TIntrusivePtr& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry) { - return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, funcRegistry); + return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry); } } // namespace NKqp diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 00bb19061486..91f2e1be4fe1 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1720,10 +1720,17 @@ class TKqpSessionActor : public TActorBootstrapped { const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); FillColumnsMeta(phyQuery, response); - } else if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) { - // The compile timeout cause cancelation execution of request. - // So in case of cancel after we can reply with canceled status - ev.SetYdbStatus(Ydb::StatusIds::CANCELLED); + } else { + if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) { + // The compile timeout cause cancelation execution of request. + // So in case of cancel after we can reply with canceled status + ev.SetYdbStatus(Ydb::StatusIds::CANCELLED); + } + + auto& preparedQuery = compileResult->PreparedQuery; + if (preparedQuery && QueryState->ReportStats() && QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) { + response.SetQueryAst(preparedQuery->GetPhysicalQuery().GetQueryAst()); + } } } diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 32181f5da43c..1ed5d74b7d32 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -553,6 +553,29 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2); } + Y_UNIT_TEST(ExecStatsAst) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto settings = TExecuteQuerySettings() + .StatsMode(EStatsMode::Full); + + std::vector> cases = { + { "SELECT 42 AS test_ast_column", EStatus::SUCCESS }, + { "SELECT test_ast_column FROM TwoShard", EStatus::GENERIC_ERROR }, + { "SELECT UNWRAP(42 / 0) AS test_ast_column", EStatus::PRECONDITION_FAILED }, + }; + + for (const auto& [sql, status] : cases) { + auto result = db.ExecuteQuery(sql, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString()); + + UNIT_ASSERT(result.GetStats().Defined()); + UNIT_ASSERT(result.GetStats()->GetAst().Defined()); + UNIT_ASSERT_STRING_CONTAINS(*result.GetStats()->GetAst(), "test_ast_column"); + } + } + Y_UNIT_TEST(Ddl) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp index ceeaf25ab919..ef5854b983c2 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp @@ -136,16 +136,21 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { Iterator_.ReadNext().Subscribe([self](TAsyncExecuteQueryPart partFuture) mutable { auto part = partFuture.ExtractValue(); + if (const auto& st = part.GetStats()) { + self->Stats_ = st; + } + if (!part.IsSuccess()) { + TMaybe stats; + std::swap(self->Stats_, stats); + if (part.EOS()) { TVector issues; TVector resultProtos; - TMaybe stats; TMaybe tx; std::swap(self->Issues_, issues); std::swap(self->ResultSets_, resultProtos); - std::swap(self->Stats_, stats); std::swap(self->Tx_, tx); TVector resultSets; @@ -160,7 +165,7 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { std::move(tx) )); } else { - self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, {}, {})); + self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, std::move(stats), {})); } return; @@ -185,10 +190,6 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { resultSet.mutable_rows()->Add(inRsProto.rows().begin(), inRsProto.rows().end()); } - if (const auto& st = part.GetStats()) { - self->Stats_ = st; - } - if (const auto& tx = part.GetTransaction()) { self->Tx_ = tx; } diff --git a/ydb/public/sdk/cpp/client/ydb_query/stats.cpp b/ydb/public/sdk/cpp/client/ydb_query/stats.cpp index f5fbc9d6c02e..c007547d4e84 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/stats.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/stats.cpp @@ -46,6 +46,16 @@ TMaybe TExecStats::GetPlan() const { return proto.query_plan(); } +TMaybe TExecStats::GetAst() const { + auto proto = Impl_->Proto; + + if (proto.query_ast().empty()) { + return {}; + } + + return proto.query_ast(); +} + TDuration TExecStats::GetTotalDuration() const { return TDuration::MicroSeconds(Impl_->Proto.total_duration_us()); } diff --git a/ydb/public/sdk/cpp/client/ydb_query/stats.h b/ydb/public/sdk/cpp/client/ydb_query/stats.h index 1fed19f6e353..3a62045a72f9 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/stats.h +++ b/ydb/public/sdk/cpp/client/ydb_query/stats.h @@ -28,6 +28,7 @@ class TExecStats { TString ToString(bool withPlan = false) const; TMaybe GetPlan() const; + TMaybe GetAst() const; TDuration GetTotalDuration() const; TDuration GetTotalCpuTime() const; diff --git a/ydb/tests/fq/s3/test_bindings.py b/ydb/tests/fq/s3/test_bindings.py index 9c7ce151bdde..f3e7f21b5fbb 100644 --- a/ydb/tests/fq/s3/test_bindings.py +++ b/ydb/tests/fq/s3/test_bindings.py @@ -586,3 +586,36 @@ def test_count_for_pg_binding(self, kikimr, s3, client, pg_syntax): else: assert result_set.columns[0].type.type_id == ydb.Type.UINT64 assert result_set.rows[0].items[0].uint64_value == 1 + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_ast_in_failed_query_compilation(self, kikimr, s3, client): + resource = boto3.resource( + "s3", + endpoint_url=s3.s3_url, + aws_access_key_id="key", + aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("bindbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + connection_id = client.create_storage_connection("bb", "bindbucket").result.connection_id + + data_column = ydb.Column(name="data", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING)) + client.create_object_storage_binding(name="s3binding", + path="/", + format="raw", + connection_id=connection_id, + columns=[data_column]) + + sql = R''' + SELECT some_unknown_column FROM bindings.`s3binding`; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + + ast = client.describe_query(query_id).result.query.ast.data + assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast" diff --git a/ydb/tests/fq/yds/test_select_1.py b/ydb/tests/fq/yds/test_select_1.py index 6c43b72ffae4..116c37dd5ee7 100644 --- a/ydb/tests/fq/yds/test_select_1.py +++ b/ydb/tests/fq/yds/test_select_1.py @@ -120,11 +120,11 @@ def test_compile_error(self, client, yq_version): assert "Failed to parse query" in describe_string, describe_string @yq_all - def test_ast_in_failed_query(self, client): - sql = "SELECT unwrap(1 / 0)" + def test_ast_in_failed_query_runtime(self, client): + sql = "SELECT unwrap(42 / 0) AS error_column" query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.FAILED) - ast = str(client.describe_query(query_id).result.query.ast) - assert ast != "", "Query ast not found" + ast = client.describe_query(query_id).result.query.ast.data + assert "(\'\"error_column\" (Unwrap (/ (Int32 \'\"42\")" in ast, "Invalid query ast"