Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamlookupjoin: fix joining from multi-partition stream (backport #8622) #8737

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/tasks/dq_connection_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void BuildStreamLookupChannels(TGraph& graph, const NNodes::TDqPhyStage& stage,
auto& originStageInfo = graph.GetStageInfo(cnStreamLookup.Output().Stage());
auto outputIndex = FromString<ui32>(cnStreamLookup.Output().Index().Value());

BuildMapChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc);
BuildUnionAllChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc);
}

template <typename TGraph>
Expand Down
205 changes: 124 additions & 81 deletions ydb/tests/fq/generic/test_streaming_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
import json
import sys
from collections import Counter
from operator import itemgetter

import ydb.public.api.protos.draft.fq_pb2 as fq
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
Expand All @@ -11,6 +13,31 @@
from ydb.tests.fq.generic.utils.settings import Settings

DEBUG = 0


def ResequenceId(messages):
res = []
i = 1
for pair in messages:
rpair = []
for it in pair:
src = json.loads(it)
src["id"] = i
rpair += [json.dumps(src)]
res += [tuple(rpair)]
i += 1
return res


def freeze(json):
t = type(json)
if t == dict:
return frozenset((k, freeze(v)) for k, v in json.items())
if t == list:
return tuple(map(freeze, json))
return json


TESTCASES = [
# 0
(
Expand Down Expand Up @@ -96,17 +123,19 @@
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'),
('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'),
('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'),
('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'),
('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'),
('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'),
('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'),
('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'),
]
* 20,
ResequenceId(
[
('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'),
('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'),
('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'),
('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'),
('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'),
('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'),
('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'),
('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'),
]
* 20
),
),
# 3
(
Expand Down Expand Up @@ -137,37 +166,39 @@
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
(
'{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}',
),
(
'{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}',
'{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}',
),
]
* 10,
ResequenceId(
[
(
'{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}',
),
(
'{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}',
'{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}',
),
]
* 10
),
),
# 4
(
Expand Down Expand Up @@ -200,37 +231,39 @@
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
[
(
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
),
(
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
'{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
),
]
* 1000,
ResequenceId(
[
(
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
'{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
),
(
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
'{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
'{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
),
(
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
'{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
'{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
),
(
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
'{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
),
(
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
'{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
),
]
* 1000
),
),
# 5
(
Expand Down Expand Up @@ -334,12 +367,23 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting
@yq_v1
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder_slj"}], indirect=True)
@pytest.mark.parametrize("streamlookup", [False, True])
@pytest.mark.parametrize("partitions_count", [1, 3])
@pytest.mark.parametrize("streamlookup", [False, True] if DEBUG else [True])
@pytest.mark.parametrize("testcase", [*range(len(TESTCASES))])
def test_streamlookup(
self, kikimr, testcase, streamlookup, fq_client: FederatedQueryClient, settings: Settings, yq_version
self,
kikimr,
testcase,
streamlookup,
partitions_count,
fq_client: FederatedQueryClient,
settings: Settings,
yq_version,
):
self.init_topics(f"pq_yq_streaming_test_lookup_{streamlookup}{testcase}_{yq_version}")
self.init_topics(
f"pq_yq_str_lookup_{partitions_count}{streamlookup}{testcase}_{yq_version}",
partitions_count=partitions_count,
)
fq_client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))

table_name = 'join_table'
Expand All @@ -359,7 +403,7 @@ def test_streamlookup(
)

query_id = fq_client.create_query(
f"streamlookup_{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING
f"streamlookup_{partitions_count}{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING
).result.query_id
fq_client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)
Expand All @@ -375,10 +419,9 @@ def test_streamlookup(
print(streamlookup, testcase, file=sys.stderr)
print(sql, file=sys.stderr)
print(*zip(messages, read_data), file=sys.stderr, sep="\n")
for r, exp in zip(read_data, messages):
r = json.loads(r)
exp = json.loads(exp[1])
assert r == exp
read_data_ctr = Counter(map(freeze, map(json.loads, read_data)))
messages_ctr = Counter(map(freeze, map(json.loads, map(itemgetter(1), messages))))
assert read_data_ctr == messages_ctr

fq_client.abort_query(query_id)
fq_client.wait_query(query_id)
Expand Down
Loading