diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index cc7672d8015e..5823f213e526 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -59,6 +59,7 @@ struct TDqSettings { static constexpr ui32 CostBasedOptimizationLevel = 0; static constexpr ui32 MaxDPccpDPTableSize = 16400U; static constexpr ui64 MaxAttachmentsSize = 2_GB; + static constexpr bool SplitStageOnDqReplicate = true; }; using TPtr = std::shared_ptr; diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index defa4a1ae8b6..73fff17b955f 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -125,8 +125,8 @@ class TLocalExecutor: public TCounters : State->RandomProvider; TScopedAlloc alloc( - __LOCATION__, - NKikimr::TAlignedPagePoolCounters(), + __LOCATION__, + NKikimr::TAlignedPagePoolCounters(), State->FunctionRegistry->SupportsSizedAllocators(), false); NDq::TDqTaskRunnerContext executionContext; @@ -287,7 +287,7 @@ struct TDqsPipelineConfigurator : public IPipelineConfigurator { } NDq::EChannelMode mode = GetConfiguredChannelMode(State_, typesCtx); pipeline->Add( - NDq::CreateDqBuildPhyStagesTransformer(!State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(true), typesCtx, mode), + NDq::CreateDqBuildPhyStagesTransformer(!State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(TDqSettings::TDefault::SplitStageOnDqReplicate), typesCtx, mode), "BuildPhy"); pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables"); } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp index 8109656191cf..26c0d63b5224 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp @@ -25,7 +25,10 @@ class TDqExecutionValidator { ctx.AddError(YqlIssue(ctx.GetPosition(where.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, err)); } - bool ValidateDqStage(const TExprNode& node) { + bool ValidateDqStage(const TExprNode& node, TNodeSet* visitedStages) { + if (visitedStages) { + visitedStages->insert(&node); + } if (!Visited_.insert(&node).second) { return true; } @@ -36,11 +39,16 @@ class TDqExecutionValidator { ReportError(Ctx_, *bad, TStringBuilder() << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage"); } + + bool hasMapJoin = false; VisitExpr(TDqStageBase(&node).Program().Body().Ptr(), [](const TExprNode::TPtr& n) { return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get()); }, - [&readPerProvider_ = ReadsPerProvider_, &hasErrors, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) { + [&readPerProvider_ = ReadsPerProvider_, &hasErrors, &hasMapJoin, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) { + if (TDqPhyMapJoin::Match(n.Get())) { + hasMapJoin = true; + } if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) { ReportError(ctx, *n, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ"); hasErrors = true; @@ -60,13 +68,33 @@ class TDqExecutionValidator { } ); - for (auto n: TDqStageBase(&node).Inputs()) { - hasErrors |= !ValidateDqNode(n.Ref()); + HasMapJoin_ |= hasMapJoin; + if (hasMapJoin && CheckSelfMapJoin_) { + TNodeSet unitedVisitedStages; + bool nonUniqStages = false; + for (auto n: TDqStageBase(&node).Inputs()) { + TNodeSet inputVisitedStages; + hasErrors |= !ValidateDqNode(n.Ref(), &inputVisitedStages); + const size_t expectedSize = unitedVisitedStages.size() + inputVisitedStages.size(); + unitedVisitedStages.insert(inputVisitedStages.begin(), inputVisitedStages.end()); + nonUniqStages |= (expectedSize != unitedVisitedStages.size()); // Found duplicates - some stage was visited twice from different inputs + } + if (nonUniqStages) { + ReportError(Ctx_, node, TStringBuilder() << "Cannot execute self join using mapjoin strategy in DQ"); + hasErrors = true; + } + if (visitedStages) { + visitedStages->insert(unitedVisitedStages.begin(), unitedVisitedStages.end()); + } + } else { + for (auto n: TDqStageBase(&node).Inputs()) { + hasErrors |= !ValidateDqNode(n.Ref(), visitedStages); + } } if (auto outs = TDqStageBase(&node).Outputs()) { for (auto n: outs.Cast()) { - hasErrors |= !ValidateDqNode(n.Ref()); + hasErrors |= !ValidateDqNode(n.Ref(), nullptr); } } @@ -74,14 +102,14 @@ class TDqExecutionValidator { } - bool ValidateDqNode(const TExprNode& node) { + bool ValidateDqNode(const TExprNode& node, TNodeSet* visitedStages) { if (node.GetState() == TExprNode::EState::ExecutionComplete) { return true; } if (TDqStageBase::Match(&node)) { // visited will be updated inside ValidateDqStage - return ValidateDqStage(node); + return ValidateDqStage(node, visitedStages); } if (!Visited_.insert(&node).second) { @@ -94,10 +122,10 @@ class TDqExecutionValidator { } if (TDqConnection::Match(&node)) { - return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref()); + return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref(), TDqCnValue::Match(&node) ? nullptr : visitedStages); } if (TDqPhyPrecompute::Match(&node)) { - return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref()); + return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref(), nullptr); } if (TDqSource::Match(&node) || TDqTransform::Match(&node) || TDqSink::Match(&node)) { @@ -113,6 +141,9 @@ class TDqExecutionValidator { : TypeCtx_(typeCtx) , Ctx_(ctx) , State_(state) + , CheckSelfMapJoin_(!TypeCtx_.ForceDq + && !State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(TDqSettings::TDefault::SplitStageOnDqReplicate) + && !State_->Settings->IsSpillingEnabled()) {} bool ValidateDqExecution(const TExprNode& node) { @@ -120,7 +151,6 @@ class TDqExecutionValidator { TNodeSet dqNodes; - bool hasJoin = false; if (TDqCnResult::Match(&node)) { dqNodes.insert(TDqCnResult(&node).Output().Stage().Raw()); } else if (TDqQuery::Match(&node)) { @@ -142,35 +172,31 @@ class TDqExecutionValidator { }); } - VisitExpr(node, [&hasJoin](const TExprNode& n) { - if (TMaybeNode(&n)) { - hasJoin = true; - } - return true; - }); - bool hasError = false; for (const auto n: dqNodes) { - hasError |= !ValidateDqNode(*n); + hasError |= !ValidateDqNode(*n, nullptr); if (hasError) { break; } } - for (auto& [integration, nodes]: ReadsPerProvider_) { - TMaybe size; - hasError |= !(size = integration->EstimateReadSize(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), - State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_)); - if (hasError) { - break; + if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq) { + size_t dataSize = 0; + for (auto& [integration, nodes]: ReadsPerProvider_) { + TMaybe size; + hasError |= !(size = integration->EstimateReadSize(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), + State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_)); + if (hasError) { + break; + } + dataSize += *size; } - DataSize_ += *size; - } - if (!hasError && hasJoin && DataSize_ > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) { - ReportError(Ctx_, node, TStringBuilder() << "too big join input: " << DataSize_); - return false; + if (dataSize > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) { + ReportError(Ctx_, node, TStringBuilder() << "too big join input: " << dataSize); + return false; + } } return !hasError; } @@ -178,11 +204,12 @@ class TDqExecutionValidator { const TTypeAnnotationContext& TypeCtx_; TExprContext& Ctx_; - TNodeSet Visited_; - THashMap> ReadsPerProvider_; - size_t DataSize_ = 0; const TDqState::TPtr State_; + const bool CheckSelfMapJoin_; + TNodeSet Visited_; + THashMap> ReadsPerProvider_; + bool HasMapJoin_ = false; }; } diff --git a/ydb/library/yql/tests/sql/dq_file/part3/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part3/canondata/result.json index 8d7d642afa4e..b2d84ca0ef02 100644 --- a/ydb/library/yql/tests/sql/dq_file/part3/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part3/canondata/result.json @@ -1347,6 +1347,37 @@ "uri": "https://{canondata_backend}/1871102/8fb53a3a81ad5d5949727846153c9f6f58a0845e/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-off-Results_/results.txt" } ], + "test.test[join-selfjoin_on_sorted_with_filter-replicate-Analyze]": [ + { + "checksum": "b59f1264995b5b377a58a392fc1d87c8", + "size": 3938, + "uri": "https://{canondata_backend}/1917492/064a3289ad6eaf99ba9f2a34e99fb15ca8194278/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Analyze_/plan.txt" + }, + { + "uri": "file://test.test_join-selfjoin_on_sorted_with_filter-replicate-Analyze_/extracted" + } + ], + "test.test[join-selfjoin_on_sorted_with_filter-replicate-Debug]": [ + { + "checksum": "5d13bd670d234e8cc6261784c84e9012", + "size": 2195, + "uri": "https://{canondata_backend}/1784826/3baf99fc0c22227fef7f1b91df73370c2e22f014/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Debug_/opt.yql_patched" + } + ], + "test.test[join-selfjoin_on_sorted_with_filter-replicate-Plan]": [ + { + "checksum": "db2d64e1503f3bfa45bc79d2d1655935", + "size": 5087, + "uri": "https://{canondata_backend}/1917492/064a3289ad6eaf99ba9f2a34e99fb15ca8194278/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Plan_/plan.txt" + } + ], + "test.test[join-selfjoin_on_sorted_with_filter-replicate-Results]": [ + { + "checksum": "568f3e7e0db9008acecc09f8942dd3c2", + "size": 3076, + "uri": "https://{canondata_backend}/1917492/064a3289ad6eaf99ba9f2a34e99fb15ca8194278/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Results_/results.txt" + } + ], "test.test[join-three_equalities_paren--Analyze]": [ { "checksum": "da428dcd6823eacaf44ce47e2c9951b9", diff --git a/ydb/library/yql/tests/sql/dq_file/part3/canondata/test.test_join-selfjoin_on_sorted_with_filter-replicate-Analyze_/extracted b/ydb/library/yql/tests/sql/dq_file/part3/canondata/test.test_join-selfjoin_on_sorted_with_filter-replicate-Analyze_/extracted new file mode 100644 index 000000000000..43a3b8c515f5 --- /dev/null +++ b/ydb/library/yql/tests/sql/dq_file/part3/canondata/test.test_join-selfjoin_on_sorted_with_filter-replicate-Analyze_/extracted @@ -0,0 +1,7 @@ +/program.sql:
: Info: DQ cannot execute the query + + /program.sql:
: Info: Optimization + + /program.sql:
:7:22: Info: Cannot execute self join using mapjoin strategy in DQ + select * from $in as a inner join $in as b on a.key = b.key; + ^ \ No newline at end of file diff --git a/ydb/library/yql/tests/sql/suites/join/selfjoin_on_sorted_with_filter-replicate.cfg b/ydb/library/yql/tests/sql/suites/join/selfjoin_on_sorted_with_filter-replicate.cfg new file mode 100644 index 000000000000..1aa0bdd86e07 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/join/selfjoin_on_sorted_with_filter-replicate.cfg @@ -0,0 +1,6 @@ +in Input sorted_uniq.txt + +providers dq +pragma dq.HashJoinMode="off"; +pragma dq.SplitStageOnDqReplicate="false"; +pragma dq.SpillingEngine="disable";