Skip to content

Commit

Permalink
YQL-17250: Add operation name in fallback message (#627)
Browse files Browse the repository at this point in the history
* YQL-17250: Add operation name in fallback message

* Fix review comments
  • Loading branch information
mxkovalev authored Dec 22, 2023
1 parent d549ab9 commit cc85e0d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
24 changes: 20 additions & 4 deletions ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ bool NeedFallback(const TIssues& issues) {
return false;
}

TIssue WrapIssuesOnHybridFallback(TPosition pos, const TIssues& issues) {
TIssue result(pos, "Hybrid execution fallback on YT");
TIssue WrapIssuesOnHybridFallback(TPosition pos, const TIssues& issues, TString fallbackOpName) {
TIssue result(pos, "Hybrid execution fallback on YT: " + fallbackOpName);
result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO);

const std::function<void(TIssue& issue)> toInfo = [&](TIssue& issue) {
Expand Down Expand Up @@ -800,13 +800,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
if (auto status = dqProvider->GetCallableExecutionTransformer().Transform(delegatedNode, delegatedNodeOutput, ctx); status.Level != TStatus::Async) {
YQL_ENSURE(status.Level != TStatus::Ok, "Asynchronous execution is expected in a happy path.");
if (const auto flags = op.Flags()) {
TString fallbackOpName;
for (const auto& atom : flags.Cast()) {
TStringBuf flagName = atom.Value();
if (flagName.SkipPrefix("FallbackOp")) {
fallbackOpName = flagName;
break;
}
}
for (const auto& atom : flags.Cast()) {
if (atom.Value() == "FallbackOnError") {
input->SetResult(atom.Ptr());
input->SetState(TExprNode::EState::Error);
if (const auto issies = ctx.AssociativeIssues.extract(delegatedNode.Get())) {
if (NeedFallback(issies.mapped())) {
ctx.IssueManager.RaiseIssue(WrapIssuesOnHybridFallback(ctx.GetPosition(input->Pos()), issies.mapped()));
ctx.IssueManager.RaiseIssue(WrapIssuesOnHybridFallback(ctx.GetPosition(input->Pos()), issies.mapped(), fallbackOpName));
return SyncStatus(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true));
} else {
ctx.IssueManager.RaiseIssues(issies.mapped());
Expand Down Expand Up @@ -858,13 +866,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
if (dqWriteStatus != TStatus::Ok) {
output = input;
if (const auto flags = TYtDqProcessWrite(input).Flags()) {
TString fallbackOpName;
for (const auto& atom : flags.Cast()) {
TStringBuf flagName = atom.Value();
if (flagName.SkipPrefix("FallbackOp")) {
fallbackOpName = flagName;
break;
}
}
for (const auto& atom : flags.Cast()) {
if (atom.Value() == "FallbackOnError") {
output->SetResult(atom.Ptr());
output->SetState(TExprNode::EState::Error);
if (const auto issies = ctx.AssociativeIssues.extract(delegatedNode.Get())) {
if (NeedFallback(issies.mapped())) {
ctx.IssueManager.RaiseIssue(WrapIssuesOnHybridFallback(ctx.GetPosition(input->Pos()), issies.mapped()));
ctx.IssueManager.RaiseIssue(WrapIssuesOnHybridFallback(ctx.GetPosition(input->Pos()), issies.mapped(), fallbackOpName));
return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
} else {
ctx.IssueManager.RaiseIssues(issies.mapped());
Expand Down
24 changes: 20 additions & 4 deletions ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
flow = node->IsCallable(TCoToFlow::CallableName()) && node->Head().IsCallable(TYtTableContent::CallableName());
return false;
})) {
TExprNode::TListType flags;
flags.emplace_back(ctx.NewAtom(fill.Pos(), "FallbackOnError", TNodeFlags::Default));
flags.emplace_back(ctx.NewAtom(fill.Pos(), "FallbackOpYtFill", TNodeFlags::Default));

return Build<TYtTryFirst>(ctx, fill.Pos())
.First<TYtDqProcessWrite>()
.World(fill.World())
Expand All @@ -152,7 +156,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
.Build()
.ColumnHints().Build()
.Build()
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
.Flags().Add(std::move(flags)).Build()
.Build()
.Second<TYtFill>()
.InitFrom(fill)
Expand Down Expand Up @@ -230,6 +234,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
auto settings = NYql::AddSetting(sort.Settings().Ref(), EYtSettingType::NoDq, {}, ctx);
auto operation = ctx.ChangeChild(sort.Ref(), TYtTransientOpBase::idx_Settings, std::move(settings));

TExprNode::TListType flags;
flags.emplace_back(ctx.NewAtom(sort.Pos(), "FallbackOnError", TNodeFlags::Default));
flags.emplace_back(ctx.NewAtom(sort.Pos(), "FallbackOpYtSort", TNodeFlags::Default));

return Build<TYtTryFirst>(ctx, sort.Pos())
.First<TYtDqProcessWrite>()
.World(std::move(newWorld))
Expand All @@ -253,7 +261,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
.Build()
.ColumnHints().Build()
.Build()
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
.Flags().Add(std::move(flags)).Build()
.Build()
.Second(std::move(operation))
.Done();
Expand Down Expand Up @@ -402,6 +410,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
.Done();
}

TExprNode::TListType flags;
flags.emplace_back(ctx.NewAtom(map.Pos(), "FallbackOnError", TNodeFlags::Default));
flags.emplace_back(ctx.NewAtom(map.Pos(), "FallbackOpYtMap", TNodeFlags::Default));

return Build<TYtTryFirst>(ctx, map.Pos())
.First<TYtDqProcessWrite>()
.World(std::move(newWorld))
Expand All @@ -414,7 +426,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
.Build()
.ColumnHints().Build()
.Build()
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
.Flags().Add(std::move(flags)).Build()
.Build()
.Second<TYtMap>()
.InitFrom(map)
Expand Down Expand Up @@ -569,6 +581,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {

auto reducer = ctx.NewLambda(reduce.Pos(), ctx.NewArguments(reduce.Pos(), {std::move(arg)}), std::move(body));

TExprNode::TListType flags;
flags.emplace_back(ctx.NewAtom(reduce.Pos(), "FallbackOnError", TNodeFlags::Default));
flags.emplace_back(ctx.NewAtom(reduce.Pos(), "FallbackOpYtReduce", TNodeFlags::Default));

return Build<TYtTryFirst>(ctx, reduce.Pos())
.template First<TYtDqProcessWrite>()
.World(std::move(newWorld))
Expand Down Expand Up @@ -598,7 +614,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
.Build()
.ColumnHints().Build()
.Build()
.Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
.Flags().Add(std::move(flags)).Build()
.Build()
.template Second<TYtOperation>()
.InitFrom(reduce)
Expand Down

0 comments on commit cc85e0d

Please sign in to comment.