Skip to content

Commit

Permalink
fix streamlookup keys join on unmatched rows (ydb-platform#8422)
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Aug 30, 2024
1 parent 1b2baab commit e923efd
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class TInputTransformStreamLookupBase
outputRowItems[i] = wideInputRow[index];
break;
case EOutputRowItemSource::LookupKey:
outputRowItems[i] = lookupKey.GetElement(index);
outputRowItems[i] = lookupPayload && *lookupPayload ? lookupKey.GetElement(index) : NUdf::TUnboxedValue {};
break;
case EOutputRowItemSource::LookupOther:
if (lookupPayload && *lookupPayload) {
Expand Down
68 changes: 58 additions & 10 deletions ydb/tests/fq/generic/test_streaming_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
from ydb.tests.fq.generic.utils.settings import Settings

DEBUG = 0
TESTCASES = [
# 0
(
Expand Down Expand Up @@ -187,6 +188,7 @@
$enriched = select e.id as id,
$formatTime(DateTime::ParseIso8601(e.ts)) as ts,
e.user as user_id,
u.id as uid,
u.name as name,
u.age as age
from
Expand All @@ -201,35 +203,80 @@
[
(
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":1,"ts":"11:33:44","user_id":2,"name":"Petr","age":25}',
'{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
),
(
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":2,"ts":"11:22:33","user_id":1,"name":"Anya","age":15}',
'{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
'{"id":3,"ts":"11:33:55","user_id":100,"name":null,"age":null}',
'{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","user_id":3,"name":"Masha","age":17}',
'{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","user_id":3,"name":"Masha","age":17}',
'{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","user_id":1,"name":"Anya","age":15}',
'{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","user_id":2,"name":"Petr","age":25}',
'{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
),
]
* 1000,
),
# 5
(
R'''
$input = SELECT * FROM myyds.`{input_topic}`
WITH (
FORMAT=json_each_row,
SCHEMA (
id Int32,
ts String,
ev_type String,
user Int32,
)
) ;
$enriched = select e.id as id,
e.user as user_id,
u.id as uid
from
$input as e
left join {streamlookup} ydb_conn_{table_name}.`users` as u
on(e.user = u.id)
;
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
(
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":1,"uid":2,"user_id":2}',
),
(
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":2,"uid":1,"user_id":1}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
'{"id":3,"uid":null,"user_id":100}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"uid":3,"user_id":3}',
),
],
),
]


Expand Down Expand Up @@ -324,9 +371,10 @@ def test_streamlookup(
offset += 500

read_data = self.read_stream(len(messages))
print(streamlookup, testcase, file=sys.stderr)
print(sql, file=sys.stderr)
print(*zip(messages, read_data), file=sys.stderr, sep="\n")
if DEBUG:
print(streamlookup, testcase, file=sys.stderr)
print(sql, file=sys.stderr)
print(*zip(messages, read_data), file=sys.stderr, sep="\n")
for r, exp in zip(read_data, messages):
r = json.loads(r)
exp = json.loads(exp[1])
Expand Down

0 comments on commit e923efd

Please sign in to comment.