diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 646f49130507..9f2f52f17bb8 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -111,7 +111,7 @@ class TInputTransformStreamLookupBase outputRowItems[i] = wideInputRow[index]; break; case EOutputRowItemSource::LookupKey: - outputRowItems[i] = lookupKey.GetElement(index); + outputRowItems[i] = lookupPayload && *lookupPayload ? lookupKey.GetElement(index) : NUdf::TUnboxedValue {}; break; case EOutputRowItemSource::LookupOther: if (lookupPayload && *lookupPayload) { diff --git a/ydb/tests/fq/generic/test_streaming_join.py b/ydb/tests/fq/generic/test_streaming_join.py index 3840ceec3b96..dc5e54d85a46 100644 --- a/ydb/tests/fq/generic/test_streaming_join.py +++ b/ydb/tests/fq/generic/test_streaming_join.py @@ -10,6 +10,7 @@ from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase from ydb.tests.fq.generic.utils.settings import Settings +DEBUG = 0 TESTCASES = [ # 0 ( @@ -187,6 +188,7 @@ $enriched = select e.id as id, $formatTime(DateTime::ParseIso8601(e.ts)) as ts, e.user as user_id, + u.id as uid, u.name as name, u.age as age from @@ -201,35 +203,80 @@ [ ( '{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}', - '{"id":1,"ts":"11:33:44","user_id":2,"name":"Petr","age":25}', + '{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}', ), ( '{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}', - '{"id":2,"ts":"11:22:33","user_id":1,"name":"Anya","age":15}', + '{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}', ), ( '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}', - '{"id":3,"ts":"11:33:55","user_id":100,"name":null,"age":null}', + '{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}', ), ( '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}', - '{"id":4,"ts":"11:33:56","user_id":3,"name":"Masha","age":17}', + '{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}', ), ( '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}', - '{"id":5,"ts":"11:33:57","user_id":3,"name":"Masha","age":17}', + '{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}', ), ( '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}', - '{"id":6,"ts":"11:22:38","user_id":1,"name":"Anya","age":15}', + '{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}', ), ( '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}', - '{"id":7,"ts":"11:33:49","user_id":2,"name":"Petr","age":25}', + '{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}', ), ] * 1000, ), + # 5 + ( + R''' + $input = SELECT * FROM myyds.`{input_topic}` + WITH ( + FORMAT=json_each_row, + SCHEMA ( + id Int32, + ts String, + ev_type String, + user Int32, + ) + ) ; + + $enriched = select e.id as id, + e.user as user_id, + u.id as uid + from + $input as e + left join {streamlookup} ydb_conn_{table_name}.`users` as u + on(e.user = u.id) + ; + + insert into myyds.`{output_topic}` + select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched; + ''', + [ + ( + '{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}', + '{"id":1,"uid":2,"user_id":2}', + ), + ( + '{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}', + '{"id":2,"uid":1,"user_id":1}', + ), + ( + '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}', + '{"id":3,"uid":null,"user_id":100}', + ), + ( + '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}', + '{"id":4,"uid":3,"user_id":3}', + ), + ], + ), ] @@ -324,9 +371,10 @@ def test_streamlookup( offset += 500 read_data = self.read_stream(len(messages)) - print(streamlookup, testcase, file=sys.stderr) - print(sql, file=sys.stderr) - print(*zip(messages, read_data), file=sys.stderr, sep="\n") + if DEBUG: + print(streamlookup, testcase, file=sys.stderr) + print(sql, file=sys.stderr) + print(*zip(messages, read_data), file=sys.stderr, sep="\n") for r, exp in zip(read_data, messages): r = json.loads(r) exp = json.loads(exp[1])