diff --git a/ydb/library/yql/providers/dq/provider/exec/ya.make b/ydb/library/yql/providers/dq/provider/exec/ya.make index 6a0d432b7f3b..202c297a5140 100644 --- a/ydb/library/yql/providers/dq/provider/exec/ya.make +++ b/ydb/library/yql/providers/dq/provider/exec/ya.make @@ -9,6 +9,7 @@ PEERDIR( library/cpp/yson/node library/cpp/svnversion library/cpp/digest/md5 + library/cpp/threading/future ydb/public/lib/yson_value ydb/public/sdk/cpp/client/ydb_driver ydb/library/yql/core diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index f73661f690ec..707948c13428 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -48,6 +48,7 @@ #include #include #include +#include #include #include @@ -55,6 +56,7 @@ #include #include +#include namespace NYql { @@ -125,8 +127,8 @@ class TLocalExecutor: public TCounters : State->RandomProvider; auto alloc = std::make_shared( - __LOCATION__, - NKikimr::TAlignedPagePoolCounters(), + __LOCATION__, + NKikimr::TAlignedPagePoolCounters(), State->FunctionRegistry->SupportsSizedAllocators(), false); NDq::TDqTaskRunnerContext executionContext; @@ -432,8 +434,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters public: TDqExecTransformer(const TDqStatePtr& state, const ISkiffConverterPtr& skiffConverter) : State(state) - , ExecState(MakeIntrusive()) , SkiffConverter(skiffConverter) + , ExecPrecomputeState_(MakeIntrusive()) + , UploadCache_(std::make_shared()) { AddHandler({TStringBuf("Result")}, RequireNone(), Hndl(&TDqExecTransformer::HandleResult)); AddHandler({TStringBuf("Pull")}, RequireNone(), Hndl(&TDqExecTransformer::HandlePull)); @@ -442,15 +445,14 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters } void Rewind() override { - ExecState = MakeIntrusive(); - FileLinks.clear(); - ModulesMapping.clear(); + ExecPrecomputeState_ = MakeIntrusive(); + UploadCache_ = std::make_shared(); TExecTransformerBase::Rewind(); } private: - struct TExecState : public TThrRefBase { + struct TExecPrecomputeState : public TThrRefBase { TAdaptiveLock Lock; struct TItem : public TIntrusiveListItem { @@ -458,13 +460,12 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters TAsyncTransformCallback Callback; }; - using TQueueType = TIntrusiveListWithAutoDelete; + using TQueueType = TIntrusiveListWithAutoDelete; TQueueType Completed; - NThreading::TPromise Promise = NThreading::NewPromise(); - bool HasResult = false; + TNodeMap> PrecomputeFutures; // Precompute node -> future }; - using TExecStatePtr = TIntrusivePtr; + using TExecPrecomputeStatePtr = TIntrusivePtr; void GetResultType(TString* type, TVector* columns, const TExprNode& resOrPull, const TExprNode& resOrPullInput) const { @@ -492,16 +493,16 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters if (path.StartsWith(NKikimr::NMiniKQL::StaticModulePrefix) || !State->Settings->EnableStrip.Get() || !State->Settings->EnableStrip.Get().GetOrElse(false)) { - ModulesMapping.emplace(objectId, path); + UploadCache_->ModulesMapping.emplace(objectId, path); return std::make_tuple(path, objectId); } - TFileLinkPtr& fileLink = FileLinks[objectId]; + TFileLinkPtr& fileLink = UploadCache_->FileLinks[objectId]; if (!fileLink) { fileLink = State->FileStorage->PutFileStripped(path, md5); } - ModulesMapping.emplace(objectId + DqStrippedSuffied, path); + UploadCache_->ModulesMapping.emplace(objectId + DqStrippedSuffied, path); return std::make_tuple(fileLink->GetPath(), objectId + DqStrippedSuffied); } @@ -869,7 +870,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters ui64 executionTimeout = State->Settings->_LiteralTimeout.Get().GetOrElse(TDqSettings::TDefault::LiteralTimeout); try { - auto result = TMaybeNode(input).Cast(); + auto result = TResult(input); THashMap resSettings; for (auto s: result.Settings()) { @@ -878,19 +879,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters } } - auto precomputes = FindIndependentPrecomputes(result.Input().Ptr()); - if (!precomputes.empty()) { - auto status = HandlePrecomputes(precomputes, ctx, resSettings, executionTimeout); - if (status.Level != TStatus::Ok) { - if (status == TStatus::Async) { - return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture& completedFuture) { - completedFuture.GetValue(); - return HandlePrecomputeAsyncComplete(execState); - })); - } else { - return SyncStatus(status); - } - } + auto statusPair = HandlePrecomputes(result, resSettings, ctx, executionTimeout); + if (statusPair.first.Level != TStatus::Ok) { + return statusPair; } IDataProvider::TFillSettings fillSettings = NCommon::GetFillSettings(result.Ref()); @@ -1009,7 +1000,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters graphParams["Evaluation"] = ToString(!ctx.Step.IsDone(TExprStep::ExprEval)); future = State->ExecutePlan( State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams, - settings, progressWriter, ModulesMapping, fillSettings.Discard, executionTimeout); + settings, progressWriter, UploadCache_->ModulesMapping, fillSettings.Discard, executionTimeout); } } @@ -1273,19 +1264,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters auto publicIds = GetPublicIds(pull.Ptr()); YQL_ENSURE(!oneGraphPerQuery || publicIds->GraphsCount == 1, "Internal error: only one graph per query is allowed"); - auto precomputes = FindIndependentPrecomputes(pull.Input().Ptr()); - if (!precomputes.empty()) { - auto status = HandlePrecomputes(precomputes, ctx, pullSettings, executionTimeout); - if (status.Level != TStatus::Ok) { - if (status == TStatus::Async) { - return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture& completedFuture) { - completedFuture.GetValue(); - return HandlePrecomputeAsyncComplete(execState); - })); - } else { - return SyncStatus(status); - } - } + auto statusPair = HandlePrecomputes(pull, pullSettings, ctx, executionTimeout); + if (statusPair.first.Level != TStatus::Ok) { + return statusPair; } TString type; @@ -1456,7 +1437,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds); auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams, - settings, progressWriter, ModulesMapping, fillSettings.Discard, executionTimeout); + settings, progressWriter, UploadCache_->ModulesMapping, fillSettings.Discard, executionTimeout); future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture& completedFuture) { YQL_ENSURE(!completedFuture.HasException()); @@ -1756,41 +1737,29 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters return hasPrecompute; } - static void CompleteNode(const TExecStatePtr& execState, TExprNode* node, const TAsyncTransformCallback& callback) { - auto item = MakeHolder(); + static void CompleteNode(const TExecPrecomputeStatePtr& execState, TExprNode* node, const TAsyncTransformCallback& callback) { + auto item = MakeHolder(); item->Node = node; item->Callback = callback; - NThreading::TPromise promiseToSet; with_lock(execState->Lock) { execState->Completed.PushBack(item.Release()); - if (!execState->HasResult) { - execState->HasResult = true; - promiseToSet = execState->Promise; - } - } - - if (promiseToSet.Initialized()) { - promiseToSet.SetValue(); } } - static TAsyncTransformCallback HandlePrecomputeAsyncComplete(TExecStatePtr execState) { + static TAsyncTransformCallback HandlePrecomputeAsyncComplete(TExecPrecomputeStatePtr execState) { return TAsyncTransformCallback([execState](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { output = input; input->SetState(TExprNode::EState::ExecutionRequired); TStatus combinedStatus = TStatus::Repeat; - TExecState::TQueueType completed; - auto newPromise = NThreading::NewPromise(); - { - TGuard guard(execState->Lock); + TExecPrecomputeState::TQueueType completed; + with_lock(execState->Lock) { completed.Swap(execState->Completed); - execState->Promise.Swap(newPromise); - execState->HasResult = false; } for (auto& item : completed) { TExprNode::TPtr callableOutput; + execState->PrecomputeFutures.erase(item.Node); auto status = item.Callback(item.Node, callableOutput, ctx); if (status.Level != TStatus::Error) { YQL_ENSURE(callableOutput == item.Node, "Unsupported node rewrite"); @@ -1802,8 +1771,13 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters }); } - IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx, const THashMap& providerParams, ui64 executionTimeout) { - + IGraphTransformer::TStatus RunPrecomputes( + const TNodeOnNodeOwnedMap& precomputes, + TExprContext& ctx, + const THashMap& providerParams, + ui64 executionTimeout, + std::vector>& futures + ) { IDataProvider::TFillSettings fillSettings; fillSettings.AllResultsBytesLimit.Clear(); fillSettings.RowsLimitPerWrite.Clear(); @@ -1811,13 +1785,17 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters commonSettings->EnableFullResultWrite = false; IGraphTransformer::TStatus combinedStatus = TStatus::Ok; + futures.clear(); for (auto [_, input]: precomputes) { TString uniqId = TStringBuilder() << input->Content() << "(#" << input->UniqueId() << ')'; YQL_LOG_CTX_SCOPE(uniqId); + NThreading::TFuture& precomputeFuture = ExecPrecomputeState_->PrecomputeFutures[input.Get()]; if (input->GetState() > TExprNode::EState::ExecutionRequired) { YQL_CLOG(DEBUG, ProviderDq) << "Continue async execution"; combinedStatus = combinedStatus.Combine(TStatus::Async); + YQL_ENSURE(precomputeFuture.Initialized()); + futures.push_back(precomputeFuture); continue; } @@ -1852,12 +1830,13 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters } YQL_CLOG(DEBUG, ProviderDq) << "Freezing files for " << input->Content(); if (filesRes.first.Level == TStatus::Async) { - filesRes.second.Subscribe([execState = ExecState, node = input.Get(), logCtx](const TAsyncTransformCallbackFuture& future) { + precomputeFuture = filesRes.second.Apply([execState = ExecPrecomputeState_, node = input.Get(), logCtx](const TAsyncTransformCallbackFuture& future) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx); YQL_ENSURE(!future.HasException()); YQL_CLOG(DEBUG, ProviderDq) << "Finishing freezing files"; CompleteNode(execState, node, future.GetValue()); }); + futures.push_back(precomputeFuture); } continue; } @@ -1970,12 +1949,12 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds); auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), {}, secureParams, graphParams, - settings, progressWriter, ModulesMapping, false, executionTimeout); + settings, progressWriter, UploadCache_->ModulesMapping, false, executionTimeout); executionPlanner.Destroy(); bool neverFallback = settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default) == EFallbackPolicy::Never; - future.Subscribe([publicIds, state = State, startTime, execState = ExecState, node = input.Get(), neverFallback, logCtx](const NThreading::TFuture& completedFuture) { + precomputeFuture = future.Apply([publicIds, state = State, startTime, execState = ExecPrecomputeState_, node = input.Get(), neverFallback, logCtx](const NThreading::TFuture& completedFuture) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx); YQL_ENSURE(!completedFuture.HasException()); const IDqGateway::TResult& res = completedFuture.GetValueSync(); @@ -2045,10 +2024,28 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters } }); combinedStatus = combinedStatus.Combine(IGraphTransformer::TStatus::Async); + futures.push_back(precomputeFuture); } return combinedStatus; } + TStatusCallbackPair HandlePrecomputes(const TResOrPullBase& resOrPull, const THashMap& settings, TExprContext& ctx, ui64 executionTimeout) { + TStatus status = TStatus::Ok; + auto precomputes = FindIndependentPrecomputes(resOrPull.Input().Ptr()); + if (!precomputes.empty()) { + std::vector> futures; + status = RunPrecomputes(precomputes, ctx, settings, executionTimeout, futures); + YQL_CLOG(TRACE, ProviderDq) << "RunPrecomputes returns status " << status << ", with " << futures.size() << " futures"; + if (status == TStatus::Async) { + return std::make_pair(status, NThreading::WaitAny(futures).Apply([execState = ExecPrecomputeState_](const TFuture& completedFuture) { + completedFuture.TryRethrow(); + return HandlePrecomputeAsyncComplete(execState); + })); + } + } + return SyncStatus(status); + } + IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx, const THashMap& providerParams, const TPublicIds::TPtr& publicIds) const { @@ -2069,10 +2066,9 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters private: TDqStatePtr State; - TExecStatePtr ExecState; ISkiffConverterPtr SkiffConverter; - mutable THashMap FileLinks; - mutable THashMap ModulesMapping; + TExecPrecomputeStatePtr ExecPrecomputeState_; + TUploadCache::TPtr UploadCache_; const ui64 MaxFileReadSize = 1_MB; }; diff --git a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json index 3d92de3a0f7f..4745406f16aa 100644 --- a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json @@ -653,6 +653,39 @@ } ], "test.test[csee-same_l1-default.txt-Results]": [], + "test.test[dq-precompute_parallel_indep--Analyze]": [ + { + "checksum": "4f6978cd58bf614cf032ab65c9ccd92e", + "size": 16119, + "uri": "https://{canondata_backend}/1942525/68adc93267fab0086b1faf825d05122058d5f469/resource.tar.gz#test.test_dq-precompute_parallel_indep--Analyze_/plan.txt" + } + ], + "test.test[dq-precompute_parallel_indep--Debug]": [ + { + "checksum": "60e05e19e13cece105b65405a56027f7", + "size": 5127, + "uri": "https://{canondata_backend}/1942525/68adc93267fab0086b1faf825d05122058d5f469/resource.tar.gz#test.test_dq-precompute_parallel_indep--Debug_/opt.yql_patched" + } + ], + "test.test[dq-precompute_parallel_indep--Plan]": [ + { + "checksum": "f0c8afc03280179f72fde524e4bbeaf1", + "size": 16363, + "uri": "https://{canondata_backend}/1942525/68adc93267fab0086b1faf825d05122058d5f469/resource.tar.gz#test.test_dq-precompute_parallel_indep--Plan_/plan.txt" + } + ], + "test.test[dq-precompute_parallel_indep--Results]": [ + { + "checksum": "0c4292db52668dfefd7453df371175ee", + "size": 78, + "uri": "https://{canondata_backend}/1942525/68adc93267fab0086b1faf825d05122058d5f469/resource.tar.gz#test.test_dq-precompute_parallel_indep--Results_/Output.txt" + }, + { + "checksum": "778fa10c4a890f9bfdaf2e98a8f09462", + "size": 377, + "uri": "https://{canondata_backend}/1942525/68adc93267fab0086b1faf825d05122058d5f469/resource.tar.gz#test.test_dq-precompute_parallel_indep--Results_/Output.yqlrun.txt.attr" + } + ], "test.test[expr-expr_add_literal_nulls-default.txt-Analyze]": [ { "checksum": "b4dd508a329723c74293d80f0278c705", diff --git a/ydb/library/yql/tests/sql/dq_file/part2/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part2/canondata/result.json index 013f211038c1..9ce26611a5b3 100644 --- a/ydb/library/yql/tests/sql/dq_file/part2/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part2/canondata/result.json @@ -772,6 +772,49 @@ } ], "test.test[distinct-distinct_count_and_full_count-default.txt-Results]": [], + "test.test[dq-precompute_parallel_mix--Analyze]": [ + { + "checksum": "191f1cfae918330a0d2b2b7c5880565e", + "size": 10762, + "uri": "https://{canondata_backend}/1937027/06ef7ef6dee3ee697013fd133a8e8a843e5f5de9/resource.tar.gz#test.test_dq-precompute_parallel_mix--Analyze_/plan.txt" + } + ], + "test.test[dq-precompute_parallel_mix--Debug]": [ + { + "checksum": "d3976ed532b22b1acd4ebdf94a6a2676", + "size": 3920, + "uri": "https://{canondata_backend}/1937027/06ef7ef6dee3ee697013fd133a8e8a843e5f5de9/resource.tar.gz#test.test_dq-precompute_parallel_mix--Debug_/opt.yql_patched" + } + ], + "test.test[dq-precompute_parallel_mix--Plan]": [ + { + "checksum": "1e3854603cf82672f83af3c5bf63caf8", + "size": 10884, + "uri": "https://{canondata_backend}/1937027/06ef7ef6dee3ee697013fd133a8e8a843e5f5de9/resource.tar.gz#test.test_dq-precompute_parallel_mix--Plan_/plan.txt" + } + ], + "test.test[dq-precompute_parallel_mix--Results]": [ + { + "checksum": "2c10838daac18c57801f0c309c433310", + "size": 62, + "uri": "https://{canondata_backend}/1937027/06ef7ef6dee3ee697013fd133a8e8a843e5f5de9/resource.tar.gz#test.test_dq-precompute_parallel_mix--Results_/Output1.txt" + }, + { + "checksum": "99a23a80241cdd0057583910f9155f61", + "size": 600, + "uri": "https://{canondata_backend}/1937027/06ef7ef6dee3ee697013fd133a8e8a843e5f5de9/resource.tar.gz#test.test_dq-precompute_parallel_mix--Results_/Output1.yqlrun.txt.attr" + }, + { + "checksum": "cb0e24eef504f21bcf257bd15077a828", + "size": 190, + "uri": "https://{canondata_backend}/1937027/06ef7ef6dee3ee697013fd133a8e8a843e5f5de9/resource.tar.gz#test.test_dq-precompute_parallel_mix--Results_/Output2.txt" + }, + { + "checksum": "99a23a80241cdd0057583910f9155f61", + "size": 600, + "uri": "https://{canondata_backend}/1937027/06ef7ef6dee3ee697013fd133a8e8a843e5f5de9/resource.tar.gz#test.test_dq-precompute_parallel_mix--Results_/Output2.yqlrun.txt.attr" + } + ], "test.test[expr-as_table_emptylist2-default.txt-Analyze]": [ { "checksum": "55515ae638f317612d048052be489bfd", diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json index d86ffd7189c1..60f5df629ba7 100644 --- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json +++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json @@ -5046,6 +5046,20 @@ "uri": "https://{canondata_backend}/1937027/973c239492ba32946806ddc66cf0af4b38c06ae8/resource.tar.gz#test_sql2yql.test_dq-precompute_parallel_/sql.yql" } ], + "test_sql2yql.test[dq-precompute_parallel_indep]": [ + { + "checksum": "decf219a1091da410ee315ad657f3fd4", + "size": 3402, + "uri": "https://{canondata_backend}/1903885/e05277820455e95dc3443a43a34b1956971f88cf/resource.tar.gz#test_sql2yql.test_dq-precompute_parallel_indep_/sql.yql" + } + ], + "test_sql2yql.test[dq-precompute_parallel_mix]": [ + { + "checksum": "c557f52fe0852d86a73c69134c80e332", + "size": 4230, + "uri": "https://{canondata_backend}/1937027/08c3c9c6866171c02a77702fcc07f26f028f4001/resource.tar.gz#test_sql2yql.test_dq-precompute_parallel_mix_/sql.yql" + } + ], "test_sql2yql.test[dq-precompute_result]": [ { "checksum": "043145e8db2d44a158f122c3a724e034", @@ -23673,6 +23687,20 @@ "uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_dq-precompute_parallel_/formatted.sql" } ], + "test_sql_format.test[dq-precompute_parallel_indep]": [ + { + "checksum": "5ee71b61a11efee1059774aa4478b4fc", + "size": 215, + "uri": "https://{canondata_backend}/1903885/e05277820455e95dc3443a43a34b1956971f88cf/resource.tar.gz#test_sql_format.test_dq-precompute_parallel_indep_/formatted.sql" + } + ], + "test_sql_format.test[dq-precompute_parallel_mix]": [ + { + "checksum": "ef7b7571431f5c4e198b4df24fab006a", + "size": 322, + "uri": "https://{canondata_backend}/1937027/08c3c9c6866171c02a77702fcc07f26f028f4001/resource.tar.gz#test_sql_format.test_dq-precompute_parallel_mix_/formatted.sql" + } + ], "test_sql_format.test[dq-precompute_result]": [ { "checksum": "e9aad7ef908ef2e5d99e7fd838fdc4e9", diff --git a/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_indep.cfg b/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_indep.cfg new file mode 100644 index 000000000000..1ff2e85554e1 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_indep.cfg @@ -0,0 +1,5 @@ +providers dq +in Input1 input.txt +in Input2 input.txt +in Input3 input.txt +out Output output.txt diff --git a/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_indep.sql b/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_indep.sql new file mode 100644 index 000000000000..e8b17b2a125c --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_indep.sql @@ -0,0 +1,5 @@ +use plato; + +insert into Output select sum(cast(key as int32)) from Input1; +insert into Output select sum(cast(key as int32)) from Input2; +insert into Output select sum(cast(key as int32)) from Input3; diff --git a/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_mix.cfg b/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_mix.cfg new file mode 100644 index 000000000000..cf64efcf8e96 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_mix.cfg @@ -0,0 +1,4 @@ +providers dq +in Input input.txt +out Output1 output1.txt +out Output2 output2.txt diff --git a/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_mix.sql b/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_mix.sql new file mode 100644 index 000000000000..87a8e60eb7cf --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/dq/precompute_parallel_mix.sql @@ -0,0 +1,7 @@ +use plato; + +$a = select key from Input order by key limit 1; +$b = select key from Input order by key limit 1 offset 1; + +insert into Output1 select * from Input where key <= $a; +insert into Output2 select * from Input where key >= $a and key != $b;