Skip to content

Commit

Permalink
YQL-18069: Fix incomplete DqProcessWrite execution
Browse files Browse the repository at this point in the history
  • Loading branch information
mxkovalev committed May 6, 2024
1 parent da52d32 commit 79ecc96
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
bool enableChunkCombining = IsChunkCombiningEnabled();

auto storeDep = [&opDeps, &opDepsOrder](const TYtOutput& out, const TExprNode* reader, const TExprNode* sec, const TExprNode* path) {
const auto realOp = GetRealOperation(out.Operation()).Raw();
auto& res = opDeps[realOp];
const auto op = out.Operation().Raw();
auto& res = opDeps[op];
if (res.empty()) {
opDepsOrder.push_back(realOp);
opDepsOrder.push_back(op);
}
res.emplace_back(reader, sec, out.Raw(), path);
};
Expand Down Expand Up @@ -935,7 +935,7 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
if (!node.IsCallable(OPS_WITH_SORTED_OUTPUT)) {
return {};
}
auto op = TYtOutputOpBase(&node);
auto op = GetRealOperation(TExprBase(&node));

bool hasOtherSortedOuts = false;
bool changedOutSort = false;
Expand Down Expand Up @@ -1561,7 +1561,7 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
TNodeOnNodeOwnedMap newOps;
for (auto& x: opDeps) {
auto writer = x.first;
if (const size_t outCount = TYtOutputOpBase(writer).Output().Size(); outCount > 1 && writer->GetState() != TExprNode::EState::ExecutionComplete
if (const size_t outCount = GetRealOperation(TExprBase(writer)).Output().Size(); outCount > 1 && writer->GetState() != TExprNode::EState::ExecutionComplete
&& writer->GetState() != TExprNode::EState::ExecutionInProgress
&& (!writer->HasResult() || writer->GetResult().Type() != TExprNode::World)
&& ProcessedUnusedOuts.find(writer->UniqueId()) == ProcessedUnusedOuts.end())
Expand Down Expand Up @@ -1634,7 +1634,7 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
continue;
}

const TYtOutputOpBase operation(x.first);
const TYtOutputOpBase operation = GetRealOperation(TExprBase(x.first));
const bool canUpdateOp = !operation.Ref().StartsExecution() && !operation.Ref().HasResult() && !operation.Maybe<TYtCopy>();
const bool canChangeNativeTypeForOp = !operation.Maybe<TYtMerge>() && !operation.Maybe<TYtSort>();

Expand Down Expand Up @@ -1762,7 +1762,7 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
}

static TExprNode::TPtr SuppressUnusedOuts(const TExprNode& node, const TDynBitMap& usedOuts, TExprContext& ctx) {
auto op = TYtOutputOpBase(&node);
auto op = GetRealOperation(TExprBase(&node));
size_t lambdaNdx = 0;
bool withArg = true;
bool ordered = false;
Expand Down Expand Up @@ -1954,7 +1954,7 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
TNodeOnNodeOwnedMap newOps;
for (auto& x: opDeps) {
auto writer = x.first;
const TYtOutputOpBase op(writer);
const TYtOutputOpBase op = GetRealOperation(TExprBase(writer));
if (const size_t outCount = op.Output().Size(); outCount > 1 && !BeingExecuted(*writer)
&& (!op.Maybe<TYtMapReduce>() || GetMapDirectOutputsCount(op.Maybe<TYtMapReduce>().Cast()) == 0) // TODO: optimize this case
&& ProcessedMultiOuts.find(writer->UniqueId()) == ProcessedMultiOuts.end())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1793,9 +1793,9 @@
],
"test.test[optimizers-yql-2582_limit_for_join_input--Debug]": [
{
"checksum": "90d6c53cbe9d95ea10c10b17a8894d21",
"size": 5474,
"uri": "https://{canondata_backend}/1937492/7ae37c32b42bb57d4df171a62ced7ab76867a8ea/resource.tar.gz#test.test_optimizers-yql-2582_limit_for_join_input--Debug_/opt.yql_patched"
"checksum": "96f2eb2bdc2e2ec287e5bd43fb009aaa",
"size": 5489,
"uri": "https://{canondata_backend}/995452/9dc6bba399464476af3ac8cb3b80496fa9b678dc/resource.tar.gz#test.test_optimizers-yql-2582_limit_for_join_input--Debug_/opt.yql_patched"
}
],
"test.test[optimizers-yql-2582_limit_for_join_input--Plan]": [
Expand Down

0 comments on commit 79ecc96

Please sign in to comment.