From 3a955eb1a209cf8ad8c97bb53fa4da1993fd8c5c Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Tue, 15 Oct 2024 09:18:26 +0000 Subject: [PATCH 1/2] fix style --- ydb/tests/fq/yds/test_row_dispatcher.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 61b760f3d98b..c56a65fef48c 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -89,9 +89,9 @@ def test_read_raw_format_with_row_dispatcher(self, kikimr, client): ) connections = client.list_connections(fq.Acl.Visibility.PRIVATE).result.connection assert len(connections) == 1 - assert connections[0].content.setting.data_streams.shared_reading == True + assert connections[0].content.setting.data_streams.shared_reading - self.init_topics("test_read_raw_format_without_row_dispatcher", create_output=False) + self.init_topics("test_read_raw_format_with_row_dispatcher", create_output=False) output_topic = "pq_test_pq_read_write_output" create_stream(output_topic, partitions_count=1) create_read_rule(output_topic, self.consumer_name) @@ -106,7 +106,7 @@ def test_read_raw_format_with_row_dispatcher(self, kikimr, client): assert self.read_stream(len(data), topic_path=output_topic) == data wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) stop_yds_query(client, query_id) - + sql2 = Rf'''INSERT INTO {YDS_CONNECTION}.`{output_topic}` SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}` WITH (format=raw, SCHEMA (data String NOT NULL)) WHERE data != "romashka";''' @@ -290,7 +290,7 @@ def test_filter_missing_fields(self, kikimr, client): client.create_yds_connection( YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True ) - self.init_topics("test_filter") + self.init_topics("test_filter_missing_fields") sql = Rf''' INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` @@ -525,9 +525,9 @@ def test_stop_start_with_filter(self, kikimr, client): client.create_yds_connection( YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True ) - self.init_topics("test_stop_start", create_output=False) + self.init_topics("test_stop_start_with_filter", create_output=False) - output_topic = "test_stop_start" + output_topic = "test_stop_start_with_filter" create_stream(output_topic, partitions_count=1) create_read_rule(output_topic, self.consumer_name) @@ -717,7 +717,7 @@ def test_many_partitions(self, kikimr, client): client.create_yds_connection( YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True ) - self.init_topics("test_simple_not_null", partitions_count=4) + self.init_topics("test_many_partitions", partitions_count=4) sql = Rf''' INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` From 94f9b8f6455113d6af09ff8eb395d340ed128bf7 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Tue, 15 Oct 2024 10:41:42 +0000 Subject: [PATCH 2/2] try to fix flaky tests --- ydb/tests/fq/yds/test_row_dispatcher.py | 10 +++++----- ydb/tests/tools/fq_runner/kikimr_runner.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index c56a65fef48c..cd8a60e49feb 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -496,7 +496,7 @@ def test_stop_start(self, kikimr, client): assert self.read_stream(len(expected), topic_path=output_topic) == expected kikimr.compute_plane.wait_completed_checkpoints( - query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1 + query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 ) stop_yds_query(client, query_id) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @@ -544,7 +544,7 @@ def test_stop_start_with_filter(self, kikimr, client): self.write_stream(data) kikimr.compute_plane.wait_completed_checkpoints( - query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 10 + query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 ) stop_yds_query(client, query_id) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @@ -594,7 +594,7 @@ def test_restart_compute_node(self, kikimr, client): assert self.read_stream(len(expected), topic_path=self.output_topic) == expected kikimr.compute_plane.wait_completed_checkpoints( - query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1 + query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 ) wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1) @@ -611,7 +611,7 @@ def test_restart_compute_node(self, kikimr, client): expected = ['103', '104'] assert self.read_stream(len(expected), topic_path=self.output_topic) == expected kikimr.compute_plane.wait_completed_checkpoints( - query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1 + query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 ) node_index = 1 @@ -674,7 +674,7 @@ def test_3_sessions(self, kikimr, client): wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) kikimr.compute_plane.wait_completed_checkpoints( - query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 1 + query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 2 ) stop_yds_query(client, query_id1) diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index 3cbdd565d4a5..7d369236bde0 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -517,7 +517,7 @@ def fill_config(self, control_plane): fq_config['row_dispatcher'] = { 'enabled': True, - 'timeout_before_start_session_sec': 2, + 'timeout_before_start_session_sec': 5, 'send_status_period_sec': 2, 'max_session_used_memory': 1000000, 'without_consumer': True}