Skip to content

Commit

Permalink
YQL: Fix lineage for flatten columns (#8908)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxkovalev authored Sep 13, 2024
1 parent b91f3b1 commit 13965ec
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 28 deletions.
80 changes: 52 additions & 28 deletions ydb/library/yql/core/services/yql_lineage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ class TLineageScanner {
}

TMaybe<TFieldsLineage> ScanExprLineage(const TExprNode& node, const TExprNode* arg, const TLineage* src,
TNodeMap<TMaybe<TFieldsLineage>>& visited) {
TNodeMap<TMaybe<TFieldsLineage>>& visited,
const THashMap<const TExprNode*, TString>& flattenColumns) {
if (&node == arg) {
return Nothing();
}
Expand All @@ -308,6 +309,10 @@ class TLineageScanner {
return it->second;
}

if (auto itFlatten = flattenColumns.find(&node); itFlatten != flattenColumns.end()) {
return it->second = *(*src->Fields).FindPtr(itFlatten->second);
}

if (node.IsCallable("Member")) {
if (&node.Head() == arg && src) {
return it->second = *(*src->Fields).FindPtr(node.Tail().Content());
Expand All @@ -325,7 +330,7 @@ class TLineageScanner {
}
}

auto inner = ScanExprLineage(node.Head(), arg, src, visited);
auto inner = ScanExprLineage(node.Head(), arg, src, visited, {});
if (!inner) {
return Nothing();
}
Expand Down Expand Up @@ -365,7 +370,7 @@ class TLineageScanner {
continue;
}

auto inner = ScanExprLineage(*child, arg, src, visited);
auto inner = ScanExprLineage(*child, arg, src, visited, {});
if (!inner) {
return Nothing();
}
Expand All @@ -392,10 +397,11 @@ class TLineageScanner {
}

void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src,
TFieldLineageSet& dst, const TString& newTransforms = "") {
TFieldLineageSet& dst, const THashMap<const TExprNode*, TString>& flattenColumns,
const TString& newTransforms = "") {

TNodeMap<TMaybe<TFieldsLineage>> visited;
auto res = ScanExprLineage(expr, &arg, &src, visited);
auto res = ScanExprLineage(expr, &arg, &src, visited, flattenColumns);
if (!res) {
for (const auto& f : *src.Fields) {
for (const auto& i: f.second.Items) {
Expand All @@ -410,7 +416,8 @@ class TLineageScanner {
}

void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src,
TFieldsLineage& dst, bool produceStruct, const TString& newTransforms = "") {
TFieldsLineage& dst, bool produceStruct, const THashMap<const TExprNode*, TString>& flattenColumns,
const TString& newTransforms = "") {
if (produceStruct) {
auto root = &expr;
while (root->IsCallable("Just")) {
Expand All @@ -427,7 +434,7 @@ class TLineageScanner {
for (const auto& x : root->Children()) {
auto fieldName = x->Head().Content();
auto& s = (*dst.StructItems)[fieldName];
MergeLineageFromUsedFields(x->Tail(), arg, src, s, newTransforms);
MergeLineageFromUsedFields(x->Tail(), arg, src, s, flattenColumns, newTransforms);
}
} else if (root->IsCallable("Member") && &root->Head() == &arg) {
auto fieldName = root->Tail().Content();
Expand All @@ -436,11 +443,11 @@ class TLineageScanner {
}
}

MergeLineageFromUsedFields(expr, arg, src, dst.Items, newTransforms);
MergeLineageFromUsedFields(expr, arg, src, dst.Items, flattenColumns, newTransforms);
}

void FillStructLineage(TLineage& lineage, const TExprNode* value, const TExprNode& arg, const TLineage& innerLineage,
const TTypeAnnotationNode* extType) {
const TTypeAnnotationNode* extType, const THashMap<const TExprNode*, TString>& flattenColumns) {
TMaybe<TString> oneField;
if (value && value->IsCallable("Member") && &value->Head() == &arg) {
TString field(value->Tail().Content());
Expand All @@ -462,8 +469,8 @@ class TLineageScanner {
TLineage left, right;
left.Fields.ConstructInPlace();
right.Fields.ConstructInPlace();
FillStructLineage(left, value->Child(1), arg, innerLineage, extType);
FillStructLineage(right, value->Child(2), arg, innerLineage, extType);
FillStructLineage(left, value->Child(1), arg, innerLineage, extType, {});
FillStructLineage(right, value->Child(2), arg, innerLineage, extType, {});
for (const auto& f : *left.Fields) {
auto& res = (*lineage.Fields)[f.first];
res.Items.insert(f.second.Items.begin(), f.second.Items.end());
Expand All @@ -483,7 +490,7 @@ class TLineageScanner {
auto& res = (*lineage.Fields)[field];
const auto& expr = child->Tail();
TString newTransforms;
auto root = &expr;
const TExprNode* root = &expr;
while (root->IsCallable("Just")) {
root = &root->Head();
}
Expand All @@ -492,7 +499,7 @@ class TLineageScanner {
newTransforms = "Copy";
}

MergeLineageFromUsedFields(expr, arg, innerLineage, res, true, newTransforms);
MergeLineageFromUsedFields(expr, arg, innerLineage, res, true, flattenColumns, newTransforms);
}

return;
Expand Down Expand Up @@ -526,13 +533,30 @@ class TLineageScanner {
const auto& lambda = node.Tail();
const auto& arg = lambda.Head().Head();
const auto& body = lambda.Tail();
const TExprNode* value;
THashMap<const TExprNode*, TString> flattenColumns;
const TExprNode* value = &body.Tail();
if (body.IsCallable({"OptionalIf", "FlatListIf"})) {
value = &body.Tail();
} else if (body.IsCallable("Just")) {
value = &body.Head();
} else if (body.IsCallable({"FlatMap", "OrderedFlatMap"})) {
value = &body.Head();
if (lambda.GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
value = &body;
while(value->IsCallable({"FlatMap", "OrderedFlatMap"})) {
if (value->Head().IsCallable("Member") && &value->Head().Head() == &arg) {
TString field(value->Head().Tail().Content());
flattenColumns.emplace(value->Tail().Head().HeadPtr().Get(), field);
}
value = &value->Tail().Tail();
}
if (value->IsCallable("Just")) {
value = &value->Head();
} else if (value->IsCallable({"OptionalIf", "FlatListIf"})) {
value = &value->Tail();
}
} else {
value = &body.Head();
}
} else {
Warning(body);
return;
Expand All @@ -544,7 +568,7 @@ class TLineageScanner {
}

lineage.Fields.ConstructInPlace();
FillStructLineage(lineage, value, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()));
FillStructLineage(lineage, value, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()), flattenColumns);
}

void HandleAggregate(TLineage& lineage, const TExprNode& node) {
Expand Down Expand Up @@ -578,12 +602,12 @@ class TLineageScanner {
// merge all used fields from init/update handlers
auto initHandler = payload->Child(1)->Child(1);
auto updateHandler = payload->Child(1)->Child(2);
MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, source, false);
MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, source, false);
MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, source, false, {});
MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, source, false, {});
} else if (payload->Child(1)->IsCallable("AggApply")) {
auto extractHandler = payload->Child(1)->Child(2);
bool produceStruct = payload->Child(1)->Head().Content() == "some";
MergeLineageFromUsedFields(extractHandler->Tail(), extractHandler->Head().Head(), innerLineage, source, produceStruct);
MergeLineageFromUsedFields(extractHandler->Tail(), extractHandler->Head().Head(), innerLineage, source, produceStruct, {});
} else {
Warning(*payload->Child(1));
lineage.Fields.Clear();
Expand Down Expand Up @@ -612,7 +636,7 @@ class TLineageScanner {
}

lineage.Fields.ConstructInPlace();
FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()));
FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()), {});
}

void HandlePartitionByKeys(TLineage& lineage, const TExprNode& node) {
Expand All @@ -630,7 +654,7 @@ class TLineageScanner {
}

lineage.Fields.ConstructInPlace();
FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()));
FillStructLineage(lineage, nullptr, arg, innerLineage, GetSeqItemType(body.GetTypeAnn()), {});
}

void HandleExtend(TLineage& lineage, const TExprNode& node) {
Expand Down Expand Up @@ -709,8 +733,8 @@ class TLineageScanner {
auto& res = (*lineage.Fields)[sessionColumn->Content()];
const auto& initHandler = node.Child(4)->Child(2);
const auto& updateHandler = node.Child(4)->Child(2);
MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false);
MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false);
MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false, {});
MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false, {});
}
}

Expand All @@ -730,12 +754,12 @@ class TLineageScanner {
} else if (list->Tail().IsCallable({"Lag","Lead","Rank","DenseRank","PercentRank"})) {
const auto& lambda = list->Tail().Child(1);
bool produceStruct = list->Tail().IsCallable({"Lag","Lead"});
MergeLineageFromUsedFields(lambda->Tail(), lambda->Head().Head(), innerLineage, res, produceStruct);
MergeLineageFromUsedFields(lambda->Tail(), lambda->Head().Head(), innerLineage, res, produceStruct, {});
} else if (list->Tail().IsCallable("WindowTraits")) {
const auto& initHandler = list->Tail().Child(1);
const auto& updateHandler = list->Tail().Child(2);
MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false);
MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false);
MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res, false, {});
MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res, false, {});
} else {
lineage.Fields.Clear();
return;
Expand Down Expand Up @@ -850,15 +874,15 @@ class TLineageScanner {
if (child->IsCallable("AsStruct")) {
for (const auto& f : child->Children()) {
TNodeMap<TMaybe<TFieldsLineage>> visited;
auto res = ScanExprLineage(f->Tail(), nullptr, nullptr, visited);
auto res = ScanExprLineage(f->Tail(), nullptr, nullptr, visited, {});
if (res) {
auto name = f->Head().Content();
(*lineage.Fields)[name].MergeFrom(*res);
}
}
} else {
TNodeMap<TMaybe<TFieldsLineage>> visited;
auto res = ScanExprLineage(*child, nullptr, nullptr, visited);
auto res = ScanExprLineage(*child, nullptr, nullptr, visited, {});
if (res) {
for (const auto& i : structType->GetItems()) {
if (i->GetName().StartsWith("_yql_sys_")) {
Expand Down
22 changes: 22 additions & 0 deletions ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,28 @@
}
],
"test.test[limit-limit_over_sort_desc_in_subquery--Results]": [],
"test.test[lineage-flatten_by--Analyze]": [
{
"checksum": "de38a224e0104e35a4d3f64f505d9158",
"size": 7782,
"uri": "https://{canondata_backend}/1773845/4743168c84575c5ee74764d6369a8a7b6f309d6e/resource.tar.gz#test.test_lineage-flatten_by--Analyze_/plan.txt"
}
],
"test.test[lineage-flatten_by--Debug]": [
{
"checksum": "df47e3dc178c04a8c1c6fc0a83120230",
"size": 3759,
"uri": "https://{canondata_backend}/1773845/4743168c84575c5ee74764d6369a8a7b6f309d6e/resource.tar.gz#test.test_lineage-flatten_by--Debug_/opt.yql_patched"
}
],
"test.test[lineage-flatten_by--Plan]": [
{
"checksum": "de38a224e0104e35a4d3f64f505d9158",
"size": 7782,
"uri": "https://{canondata_backend}/1773845/4743168c84575c5ee74764d6369a8a7b6f309d6e/resource.tar.gz#test.test_lineage-flatten_by--Plan_/plan.txt"
}
],
"test.test[lineage-flatten_by--Results]": [],
"test.test[lineage-grouping_sets--Analyze]": [
{
"checksum": "7cd08ec1563a4f59f7b1dd446b2dd421",
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/yql/tests/sql/hybrid_file/part9/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,20 @@
"uri": "https://{canondata_backend}/1809005/2a59475dc877549ac4197a291aacd77d92f24ab4/resource.tar.gz#test.test_limit-empty_input_after_limit-default.txt-Plan_/plan.txt"
}
],
"test.test[lineage-flatten_by--Debug]": [
{
"checksum": "ca67fb8416e26fcc6941474954bc8efa",
"size": 3102,
"uri": "https://{canondata_backend}/1900335/8db5941a4ed2bc94d6ae42d0eae7b6c741fa5a59/resource.tar.gz#test.test_lineage-flatten_by--Debug_/opt.yql_patched"
}
],
"test.test[lineage-flatten_by--Plan]": [
{
"checksum": "d8e99e1cc64bfe7d765d01c4f3c575e8",
"size": 8880,
"uri": "https://{canondata_backend}/1900335/8db5941a4ed2bc94d6ae42d0eae7b6c741fa5a59/resource.tar.gz#test.test_lineage-flatten_by--Plan_/plan.txt"
}
],
"test.test[match_recognize-alerts_without_order-default.txt-Debug]": [
{
"checksum": "acba759d95a9b70640e6418dc1febb2d",
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/yql/tests/sql/sql2yql/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -10597,6 +10597,13 @@
"uri": "https://{canondata_backend}/1784826/8212a6594777651314d94a2e2f95179c0016604c/resource.tar.gz#test_sql2yql.test_lineage-error_type_/sql.yql"
}
],
"test_sql2yql.test[lineage-flatten_by]": [
{
"checksum": "a761432fec83da9adc9a7828296bda6f",
"size": 4072,
"uri": "https://{canondata_backend}/1937367/b35833bd1950efa4b6fa264900a396b8f3f198a8/resource.tar.gz#test_sql2yql.test_lineage-flatten_by_/sql.yql"
}
],
"test_sql2yql.test[lineage-flatten_list_nested_lambda]": [
{
"checksum": "1405a87aecd4676d7955fff219819b5f",
Expand Down Expand Up @@ -30253,6 +30260,13 @@
"uri": "https://{canondata_backend}/1784826/8212a6594777651314d94a2e2f95179c0016604c/resource.tar.gz#test_sql_format.test_lineage-error_type_/formatted.sql"
}
],
"test_sql_format.test[lineage-flatten_by]": [
{
"checksum": "3f32f309ac009b3158e11e36cc0a92b7",
"size": 451,
"uri": "https://{canondata_backend}/1937367/b35833bd1950efa4b6fa264900a396b8f3f198a8/resource.tar.gz#test_sql_format.test_lineage-flatten_by_/formatted.sql"
}
],
"test_sql_format.test[lineage-flatten_list_nested_lambda]": [
{
"checksum": "3fdec3c3ffc5993a6088aa56eac4fcea",
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/tests/sql/suites/lineage/flatten_by.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
in Input input_list_2.txt
21 changes: 21 additions & 0 deletions ydb/library/yql/tests/sql/suites/lineage/flatten_by.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use plato;

$subquery1 =
SELECT
key, subkey, z
FROM Input
FLATTEN LIST BY value as z;

$subquery2 =
SELECT
key, subkey, value as z, value2
FROM Input
FLATTEN LIST BY (value, value2);

INSERT INTO @tmp1 WITH TRUNCATE
SELECT *
FROM $subquery1;

INSERT INTO @tmp2 WITH TRUNCATE
SELECT *
FROM $subquery2;
3 changes: 3 additions & 0 deletions ydb/library/yql/tests/sql/suites/lineage/input_list_2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"key"="075";"subkey"="1";"value"=["abc";"cde"];"value2"=["efg"; "ghj"]};
{"key"="020";"subkey"="3";"value"=["qqq";"ttt"];"value2"=["ppp";"rrr"]};

10 changes: 10 additions & 0 deletions ydb/library/yql/tests/sql/suites/lineage/input_list_2.txt.attr
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"_yql_row_spec"={
"Type"=["StructType";[
["key";["DataType";"String";];];
["subkey";["DataType";"String";];];
["value";["ListType";["DataType";"String";];];];
["value2";["ListType";["DataType";"String";];];];
];];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,28 @@
"uri": "https://{canondata_backend}/1903280/4c77300cd3fef018d27d7f75b6ff956e63258b21/resource.tar.gz#test.test_limit-limit_over_sort_desc_in_subquery--Results_/results.txt"
}
],
"test.test[lineage-flatten_by--Debug]": [
{
"checksum": "b9673e0336e9f6e9bfa2f44fc8b88803",
"size": 3035,
"uri": "https://{canondata_backend}/1775319/264e08443d41b710cd528563fbaa24c32c366555/resource.tar.gz#test.test_lineage-flatten_by--Debug_/opt.yql"
}
],
"test.test[lineage-flatten_by--Lineage]": [
{
"checksum": "f4929578f1fe2fce5f566df8020cc0ad",
"size": 3001,
"uri": "https://{canondata_backend}/1775319/264e08443d41b710cd528563fbaa24c32c366555/resource.tar.gz#test.test_lineage-flatten_by--Lineage_/results.txt"
}
],
"test.test[lineage-flatten_by--Plan]": [
{
"checksum": "093e3952e33d4d1b806cce1781e9e189",
"size": 8880,
"uri": "https://{canondata_backend}/1775319/264e08443d41b710cd528563fbaa24c32c366555/resource.tar.gz#test.test_lineage-flatten_by--Plan_/plan.txt"
}
],
"test.test[lineage-flatten_by--Results]": [],
"test.test[lineage-grouping_sets--Debug]": [
{
"checksum": "9d2798e2536159bea2cb8dc1a8089078",
Expand Down

0 comments on commit 13965ec

Please sign in to comment.