diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index d82796cd8655..1eb5d4d25a12 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -55,8 +55,8 @@ bool NeedFallback(const TIssues& issues) { return false; } -TIssue WrapIssuesOnHybridFallback(TPosition pos, const TIssues& issues) { - TIssue result(pos, "Hybrid execution fallback on YT"); +TIssue WrapIssuesOnHybridFallback(TPosition pos, const TIssues& issues, TString fallbackOpName) { + TIssue result(pos, "Hybrid execution fallback on YT: " + fallbackOpName); result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO); const std::function toInfo = [&](TIssue& issue) { @@ -800,13 +800,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { if (auto status = dqProvider->GetCallableExecutionTransformer().Transform(delegatedNode, delegatedNodeOutput, ctx); status.Level != TStatus::Async) { YQL_ENSURE(status.Level != TStatus::Ok, "Asynchronous execution is expected in a happy path."); if (const auto flags = op.Flags()) { + TString fallbackOpName; + for (const auto& atom : flags.Cast()) { + TStringBuf flagName = atom.Value(); + if (flagName.SkipPrefix("FallbackOp")) { + fallbackOpName = flagName; + break; + } + } for (const auto& atom : flags.Cast()) { if (atom.Value() == "FallbackOnError") { input->SetResult(atom.Ptr()); input->SetState(TExprNode::EState::Error); if (const auto issies = ctx.AssociativeIssues.extract(delegatedNode.Get())) { if (NeedFallback(issies.mapped())) { - ctx.IssueManager.RaiseIssue(WrapIssuesOnHybridFallback(ctx.GetPosition(input->Pos()), issies.mapped())); + ctx.IssueManager.RaiseIssue(WrapIssuesOnHybridFallback(ctx.GetPosition(input->Pos()), issies.mapped(), fallbackOpName)); return SyncStatus(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true)); } else { ctx.IssueManager.RaiseIssues(issies.mapped()); @@ -858,13 +866,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { if (dqWriteStatus != TStatus::Ok) { output = input; if (const auto flags = TYtDqProcessWrite(input).Flags()) { + TString fallbackOpName; + for (const auto& atom : flags.Cast()) { + TStringBuf flagName = atom.Value(); + if (flagName.SkipPrefix("FallbackOp")) { + fallbackOpName = flagName; + break; + } + } for (const auto& atom : flags.Cast()) { if (atom.Value() == "FallbackOnError") { output->SetResult(atom.Ptr()); output->SetState(TExprNode::EState::Error); if (const auto issies = ctx.AssociativeIssues.extract(delegatedNode.Get())) { if (NeedFallback(issies.mapped())) { - ctx.IssueManager.RaiseIssue(WrapIssuesOnHybridFallback(ctx.GetPosition(input->Pos()), issies.mapped())); + ctx.IssueManager.RaiseIssue(WrapIssuesOnHybridFallback(ctx.GetPosition(input->Pos()), issies.mapped(), fallbackOpName)); return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true); } else { ctx.IssueManager.RaiseIssues(issies.mapped()); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index c6762e7b1ddd..a97cdb901800 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -129,6 +129,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { flow = node->IsCallable(TCoToFlow::CallableName()) && node->Head().IsCallable(TYtTableContent::CallableName()); return false; })) { + TExprNode::TListType flags; + flags.emplace_back(ctx.NewAtom(fill.Pos(), "FallbackOnError", TNodeFlags::Default)); + flags.emplace_back(ctx.NewAtom(fill.Pos(), "FallbackOpYtFill", TNodeFlags::Default)); + return Build(ctx, fill.Pos()) .First() .World(fill.World()) @@ -152,7 +156,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Build() .ColumnHints().Build() .Build() - .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .Flags().Add(std::move(flags)).Build() .Build() .Second() .InitFrom(fill) @@ -230,6 +234,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { auto settings = NYql::AddSetting(sort.Settings().Ref(), EYtSettingType::NoDq, {}, ctx); auto operation = ctx.ChangeChild(sort.Ref(), TYtTransientOpBase::idx_Settings, std::move(settings)); + TExprNode::TListType flags; + flags.emplace_back(ctx.NewAtom(sort.Pos(), "FallbackOnError", TNodeFlags::Default)); + flags.emplace_back(ctx.NewAtom(sort.Pos(), "FallbackOpYtSort", TNodeFlags::Default)); + return Build(ctx, sort.Pos()) .First() .World(std::move(newWorld)) @@ -253,7 +261,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Build() .ColumnHints().Build() .Build() - .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .Flags().Add(std::move(flags)).Build() .Build() .Second(std::move(operation)) .Done(); @@ -402,6 +410,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Done(); } + TExprNode::TListType flags; + flags.emplace_back(ctx.NewAtom(map.Pos(), "FallbackOnError", TNodeFlags::Default)); + flags.emplace_back(ctx.NewAtom(map.Pos(), "FallbackOpYtMap", TNodeFlags::Default)); + return Build(ctx, map.Pos()) .First() .World(std::move(newWorld)) @@ -414,7 +426,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Build() .ColumnHints().Build() .Build() - .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .Flags().Add(std::move(flags)).Build() .Build() .Second() .InitFrom(map) @@ -569,6 +581,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { auto reducer = ctx.NewLambda(reduce.Pos(), ctx.NewArguments(reduce.Pos(), {std::move(arg)}), std::move(body)); + TExprNode::TListType flags; + flags.emplace_back(ctx.NewAtom(reduce.Pos(), "FallbackOnError", TNodeFlags::Default)); + flags.emplace_back(ctx.NewAtom(reduce.Pos(), "FallbackOpYtReduce", TNodeFlags::Default)); + return Build(ctx, reduce.Pos()) .template First() .World(std::move(newWorld)) @@ -598,7 +614,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Build() .ColumnHints().Build() .Build() - .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .Flags().Add(std::move(flags)).Build() .Build() .template Second() .InitFrom(reduce)