diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index 99448deb3660..0670eba446bd 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -216,7 +216,7 @@ void BuildStreamLookupChannels(TGraph& graph, const NNodes::TDqPhyStage& stage, auto& originStageInfo = graph.GetStageInfo(cnStreamLookup.Output().Stage()); auto outputIndex = FromString(cnStreamLookup.Output().Index().Value()); - BuildMapChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc); + BuildUnionAllChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc); } template diff --git a/ydb/tests/fq/generic/test_streaming_join.py b/ydb/tests/fq/generic/test_streaming_join.py index dc5e54d85a46..16269e2129cf 100644 --- a/ydb/tests/fq/generic/test_streaming_join.py +++ b/ydb/tests/fq/generic/test_streaming_join.py @@ -2,6 +2,8 @@ import os import json import sys +from collections import Counter +from operator import itemgetter import ydb.public.api.protos.draft.fq_pb2 as fq from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 @@ -11,6 +13,31 @@ from ydb.tests.fq.generic.utils.settings import Settings DEBUG = 0 + + +def ResequenceId(messages): + res = [] + i = 1 + for pair in messages: + rpair = [] + for it in pair: + src = json.loads(it) + src["id"] = i + rpair += [json.dumps(src)] + res += [tuple(rpair)] + i += 1 + return res + + +def freeze(json): + t = type(json) + if t == dict: + return frozenset((k, freeze(v)) for k, v in json.items()) + if t == list: + return tuple(map(freeze, json)) + return json + + TESTCASES = [ # 0 ( @@ -96,17 +123,19 @@ insert into myyds.`{output_topic}` select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched; ''', - [ - ('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'), - ('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'), - ('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'), - ('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'), - ('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'), - ('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'), - ('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'), - ('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'), - ] - * 20, + ResequenceId( + [ + ('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'), + ('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'), + ('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'), + ('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'), + ('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'), + ('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'), + ('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'), + ('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'), + ] + * 20 + ), ), # 3 ( @@ -137,37 +166,39 @@ insert into myyds.`{output_topic}` select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched; ''', - [ - ( - '{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}', - '{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}', - ), - ( - '{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}', - '{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}', - ), - ( - '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}', - '{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}', - ), - ( - '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}', - '{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}', - ), - ( - '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}', - '{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}', - ), - ( - '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}', - '{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}', - ), - ( - '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}', - '{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}', - ), - ] - * 10, + ResequenceId( + [ + ( + '{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}', + '{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}', + ), + ( + '{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}', + '{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}', + ), + ( + '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}', + '{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}', + ), + ( + '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}', + '{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}', + ), + ( + '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}', + '{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}', + ), + ( + '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}', + '{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}', + ), + ( + '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}', + '{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}', + ), + ] + * 10 + ), ), # 4 ( @@ -200,37 +231,39 @@ insert into myyds.`{output_topic}` select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched; ''', - [ - ( - '{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}', - '{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}', - ), - ( - '{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}', - '{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}', - ), - ( - '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}', - '{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}', - ), - ( - '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}', - '{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}', - ), - ( - '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}', - '{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}', - ), - ( - '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}', - '{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}', - ), - ( - '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}', - '{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}', - ), - ] - * 1000, + ResequenceId( + [ + ( + '{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}', + '{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}', + ), + ( + '{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}', + '{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}', + ), + ( + '{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}', + '{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}', + ), + ( + '{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}', + '{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}', + ), + ( + '{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}', + '{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}', + ), + ( + '{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}', + '{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}', + ), + ( + '{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}', + '{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}', + ), + ] + * 1000 + ), ), # 5 ( @@ -334,12 +367,23 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting @yq_v1 @pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True) @pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder_slj"}], indirect=True) - @pytest.mark.parametrize("streamlookup", [False, True]) + @pytest.mark.parametrize("partitions_count", [1, 3]) + @pytest.mark.parametrize("streamlookup", [False, True] if DEBUG else [True]) @pytest.mark.parametrize("testcase", [*range(len(TESTCASES))]) def test_streamlookup( - self, kikimr, testcase, streamlookup, fq_client: FederatedQueryClient, settings: Settings, yq_version + self, + kikimr, + testcase, + streamlookup, + partitions_count, + fq_client: FederatedQueryClient, + settings: Settings, + yq_version, ): - self.init_topics(f"pq_yq_streaming_test_lookup_{streamlookup}{testcase}_{yq_version}") + self.init_topics( + f"pq_yq_str_lookup_{partitions_count}{streamlookup}{testcase}_{yq_version}", + partitions_count=partitions_count, + ) fq_client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) table_name = 'join_table' @@ -359,7 +403,7 @@ def test_streamlookup( ) query_id = fq_client.create_query( - f"streamlookup_{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING + f"streamlookup_{partitions_count}{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING ).result.query_id fq_client.wait_query_status(query_id, fq.QueryMeta.RUNNING) kikimr.compute_plane.wait_zero_checkpoint(query_id) @@ -375,10 +419,9 @@ def test_streamlookup( print(streamlookup, testcase, file=sys.stderr) print(sql, file=sys.stderr) print(*zip(messages, read_data), file=sys.stderr, sep="\n") - for r, exp in zip(read_data, messages): - r = json.loads(r) - exp = json.loads(exp[1]) - assert r == exp + read_data_ctr = Counter(map(freeze, map(json.loads, read_data))) + messages_ctr = Counter(map(freeze, map(json.loads, map(itemgetter(1), messages)))) + assert read_data_ctr == messages_ctr fq_client.abort_query(query_id) fq_client.wait_query(query_id)