Skip to content

Commit

Permalink
[YQ-3761] Sync stable (#10543)
Browse files Browse the repository at this point in the history
  • Loading branch information
APozdniakov authored Oct 18, 2024
1 parent 6fcb3bd commit dddb51e
Show file tree
Hide file tree
Showing 26 changed files with 114 additions and 96 deletions.
38 changes: 12 additions & 26 deletions ydb/library/yql/dq/opt/dq_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,32 +182,18 @@ static void CollectSinkStages(const NNodes::TDqQuery& dqQuery, THashSet<TExprNod
}

NNodes::TExprBase DqMergeQueriesWithSinks(NNodes::TExprBase dqQueryNode, TExprContext& ctx) {
NNodes::TDqQuery dqQuery = dqQueryNode.Cast<NNodes::TDqQuery>();

THashSet<TExprNode::TPtr, TExprNode::TPtrHash> sinkStages;
CollectSinkStages(dqQuery, sinkStages);
TOptimizeExprSettings settings{nullptr};
settings.VisitLambdas = false;
bool deletedDqQueryChild = false;
TExprNode::TPtr newDqQueryNode;
auto status = OptimizeExpr(dqQueryNode.Ptr(), newDqQueryNode, [&sinkStages, &deletedDqQueryChild](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
for (ui32 childIndex = 0; childIndex < node->ChildrenSize(); ++childIndex) {
TExprNode* child = node->Child(childIndex);
if (child->IsCallable(NNodes::TDqQuery::CallableName())) {
NNodes::TDqQuery dqQueryChild(child);
CollectSinkStages(dqQueryChild, sinkStages);
deletedDqQueryChild = true;
return ctx.ChangeChild(*node, childIndex, dqQueryChild.World().Ptr());
}
}
return node;
}, ctx, settings);
YQL_ENSURE(status != IGraphTransformer::TStatus::Error, "Failed to merge DqQuery nodes: " << status);

if (deletedDqQueryChild) {
auto dqQueryBuilder = Build<TDqQuery>(ctx, dqQuery.Pos());
dqQueryBuilder.World(newDqQueryNode->ChildPtr(TDqQuery::idx_World));

auto maybeDqQuery = dqQueryNode.Maybe<NNodes::TDqQuery>();
YQL_ENSURE(maybeDqQuery, "Expected DqQuery!");
auto dqQuery = maybeDqQuery.Cast();

if (auto maybeDqQueryChild = dqQuery.World().Maybe<NNodes::TDqQuery>()) {
auto dqQueryChild = maybeDqQueryChild.Cast();
auto dqQueryBuilder = Build<TDqQuery>(ctx, dqQuery.Pos())
.World(dqQueryChild.World());

THashSet<TExprNode::TPtr, TExprNode::TPtrHash> sinkStages;
CollectSinkStages(dqQuery, sinkStages);
CollectSinkStages(maybeDqQueryChild.Cast(), sinkStages);
auto sinkStagesBuilder = dqQueryBuilder.SinkStages();
for (const TExprNode::TPtr& stage : sinkStages) {
sinkStagesBuilder.Add(stage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "ClSourceSettings"},
"Children": [
{"Index": 0, "Name": "Table", "Type": "TCoAtom"},
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ class TClickHouseDataSourceTypeAnnotationTransformer : public TVisitorTransforme
}

TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 3U, ctx)) {
if (!EnsureArgsCount(*input, 4, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TClSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {

return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TClSourceSettings>()
.World(clReadTable.World())
.Table(clReadTable.Table())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "GenSourceSettings"},
"Children": [
{"Index": 0, "Name": "Cluster", "Type": "TCoAtom"},
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Cluster", "Type": "TCoAtom"},
{"Index": 2, "Name": "Table", "Type": "TCoAtom"},
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 4, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ namespace NYql {
}

TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 5, ctx)) {
if (!EnsureArgsCount(*input, 6, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TGenSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ namespace NYql {
// clang-format off
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TGenSourceSettings>()
.World(genReadTable.World())
.Cluster(genReadTable.DataSource().Cluster())
.Table(genReadTable.Table())
.Token<TCoSecureParam>()
Expand Down
11 changes: 6 additions & 5 deletions ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "DqPqTopicSource"},
"Children": [
{"Index": 0, "Name": "Topic", "Type": "TPqTopic"},
{"Index": 1, "Name": "Columns", "Type": "TExprBase"},
{"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Topic", "Type": "TPqTopic"},
{"Index": 2, "Name": "Columns", "Type": "TExprBase"},
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 4, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,16 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

TStatus HandleDqTopicSource(TExprBase input, TExprContext& ctx) {
if (!EnsureArgsCount(input.Ref(), 5, ctx)) {
if (!EnsureArgsCount(input.Ref(), 6, ctx)) {
return TStatus::Error;
}

TDqPqTopicSource topicSource = input.Cast<TDqPqTopicSource>();

if (!EnsureWorldType(topicSource.World().Ref(), ctx)) {
return TStatus::Error;
}

TPqTopic topic = topicSource.Topic();

if (!EnsureCallable(topic.Ref(), ctx)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class TPqDqIntegration: public TDqIntegrationBase {

return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TDqPqTopicSource>()
.World(pqReadTopic.World())
.Topic(pqReadTopic.Topic())
.Columns(std::move(columnNames))
.Settings(BuildTopicReadSettings(clusterName, dqSettings, read->Pos(), format, ctx))
Expand Down
23 changes: 12 additions & 11 deletions ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,32 @@
"Base": "TCallable",
"Match": {"Type": "CallableBase"},
"Children": [
{"Index": 0, "Name": "Paths", "Type": "TS3Paths"},
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 2, "Name": "RowsLimitHint", "Type": "TCoAtom"},
{"Index": 3, "Name": "Path", "Type": "TCoAtom"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Paths", "Type": "TS3Paths"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "RowsLimitHint", "Type": "TCoAtom"},
{"Index": 4, "Name": "Path", "Type": "TCoAtom"}
]
},
{
"Name": "TS3SourceSettings",
"Base": "TS3SourceSettingsBase",
"Match": {"Type": "Callable", "Name": "S3SourceSettings"},
"Children": [
{"Index": 4, "Name": "SizeLimit", "Type": "TExprBase", "Optional": true},
{"Index": 5, "Name": "PathPattern", "Type": "TExprBase", "Optional": true},
{"Index": 6, "Name": "PathPatternVariant", "Type": "TExprBase", "Optional": true}
{"Index": 5, "Name": "SizeLimit", "Type": "TExprBase", "Optional": true},
{"Index": 6, "Name": "PathPattern", "Type": "TExprBase", "Optional": true},
{"Index": 7, "Name": "PathPatternVariant", "Type": "TExprBase", "Optional": true}
]
},
{
"Name": "TS3ParseSettings",
"Base": "TS3SourceSettingsBase",
"Match": {"Type": "Callable", "Name": "S3ParseSettings"},
"Children": [
{"Index": 4, "Name": "Format", "Type": "TCoAtom"},
{"Index": 5, "Name": "RowType", "Type": "TExprBase"},
{"Index": 6, "Name": "FilterPredicate", "Type": "TCoLambda"},
{"Index": 7, "Name": "Settings", "Type": "TExprBase", "Optional": true}
{"Index": 5, "Name": "Format", "Type": "TCoAtom"},
{"Index": 6, "Name": "RowType", "Type": "TExprBase"},
{"Index": 7, "Name": "FilterPredicate", "Type": "TCoLambda"},
{"Index": 8, "Name": "Settings", "Type": "TExprBase", "Optional": true}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

TStatus HandleS3SourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureMinArgsCount(*input, 4U, ctx)) {
if (!EnsureMinMaxArgsCount(*input, 5, 8, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TS3SourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down Expand Up @@ -335,7 +339,11 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

TStatus HandleS3ParseSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*input, 7U, 8U, ctx)) {
if (!EnsureMinMaxArgsCount(*input, 8, 9, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TS3ParseSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && format != "raw" && format != "json_list") {
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3ParseSettings>()
.World(s3ReadObject.World())
.Paths(s3ReadObject.Object().Paths())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down Expand Up @@ -331,6 +332,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
auto emptyNode = Build<TCoVoid>(ctx, read->Pos()).Done().Ptr();
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3SourceSettings>()
.World(s3ReadObject.World())
.Paths(s3ReadObject.Object().Paths())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "SoSourceSettings"},
"Children": [
{"Index": 0, "Name": "Project", "Type": "TCoAtom"},
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 2, "Name": "RowType", "Type": "TExprBase"},
{"Index": 3, "Name": "SystemColumns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "LabelNames", "Type": "TCoAtomList"},
{"Index": 5, "Name": "From", "Type": "TCoAtom"},
{"Index": 6, "Name": "To", "Type": "TCoAtom"},
{"Index": 7, "Name": "Program", "Type": "TCoAtom"},
{"Index": 8, "Name": "DownsamplingDisabled", "Type": "TCoBool"},
{"Index": 9, "Name": "DownsamplingAggregation", "Type": "TCoAtom"},
{"Index": 10, "Name": "DownsamplingFill", "Type": "TCoAtom"},
{"Index": 11, "Name": "DownsamplingGridSec", "Type": "TCoUint32"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Project", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "RowType", "Type": "TExprBase"},
{"Index": 4, "Name": "SystemColumns", "Type": "TCoAtomList"},
{"Index": 5, "Name": "LabelNames", "Type": "TCoAtomList"},
{"Index": 6, "Name": "From", "Type": "TCoAtom"},
{"Index": 7, "Name": "To", "Type": "TCoAtom"},
{"Index": 8, "Name": "Program", "Type": "TCoAtom"},
{"Index": 9, "Name": "DownsamplingDisabled", "Type": "TCoBool"},
{"Index": 10, "Name": "DownsamplingAggregation", "Type": "TCoAtom"},
{"Index": 11, "Name": "DownsamplingFill", "Type": "TCoAtom"},
{"Index": 12, "Name": "DownsamplingGridSec", "Type": "TCoUint32"}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
}

TStatus HandleSoSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 12U, ctx)) {
if (!EnsureArgsCount(*input, 13, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TSoSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase {

return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TSoSourceSettings>()
.World(soReadObject.World())
.Project(soReadObject.Object().Project())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "YdbSourceSettings"},
"Children": [
{"Index": 0, "Name": "Table", "Type": "TCoAtom"},
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ class TYdbDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {

TStatus HandleYdbSourceSettings(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
Y_UNUSED(output);
if (!EnsureArgsCount(*input, 3U, ctx)) {
if (!EnsureArgsCount(*input, 4, ctx)) {
return TStatus::Error;
}

if (!EnsureWorldType(*input->Child(TYdbSourceSettings::idx_World), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class TYdbDqIntegration: public TDqIntegrationBase {

return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TYdbSourceSettings>()
.World(ydbReadTable.World())
.Table(ydbReadTable.Table())
.Token<TCoSecureParam>()
.Name().Build(token)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2871,9 +2871,9 @@
],
"test.test[window-full/session--Debug]": [
{
"checksum": "b06da41f9a9ea38646c43487f4b8b96a",
"size": 13340,
"uri": "https://{canondata_backend}/1775319/8ac8c87858e0db34f5a3c99b3f4ca1084cccbace/resource.tar.gz#test.test_window-full_session--Debug_/opt.yql_patched"
"checksum": "497487e14c39e6f6eeacce4046bfb2f5",
"size": 14172,
"uri": "https://{canondata_backend}/1777230/34342462b74dcf33b334172059f34ab28ba50ba6/resource.tar.gz#test.test_window-full_session--Debug_/opt.yql_patched"
}
],
"test.test[window-full/session--Plan]": [
Expand All @@ -2885,9 +2885,9 @@
],
"test.test[window-full/session_aliases--Debug]": [
{
"checksum": "e021555a47e83d0b792765a8ee82be94",
"size": 14124,
"uri": "https://{canondata_backend}/1775319/8ac8c87858e0db34f5a3c99b3f4ca1084cccbace/resource.tar.gz#test.test_window-full_session_aliases--Debug_/opt.yql_patched"
"checksum": "cf18b79ffda288cb9cf374749f268298",
"size": 14969,
"uri": "https://{canondata_backend}/1777230/34342462b74dcf33b334172059f34ab28ba50ba6/resource.tar.gz#test.test_window-full_session_aliases--Debug_/opt.yql_patched"
}
],
"test.test[window-full/session_aliases--Plan]": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
(let $3 (DataType 'String))
(let $4 (StructType '('"kind" $3) '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double))))
(let $5 '('"kind" '"labels" '"value" '"ts" 'type))
(let $6 (SoSourceSettings '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15")))
(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15")))
(let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($10) $10) '('('"_logical_id" '199321))))
(let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '199342))))
(let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '199348))))
(let $9 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $8 '"0") '()) '('('type) '('autoref)) '"dq"))
(return (Commit! (Commit! $9 $2) (DataSink '"solomon" '"local_solomon")))
)
Loading

0 comments on commit dddb51e

Please sign in to comment.