diff --git a/ydb/tests/fq/yds/test_recovery.py b/ydb/tests/fq/yds/test_recovery.py new file mode 100644 index 000000000000..c9f0a4649136 --- /dev/null +++ b/ydb/tests/fq/yds/test_recovery.py @@ -0,0 +1,393 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import time +import multiprocessing +import pytest +import os +import random + +import yatest + +from ydb.tests.library.harness import param_constants +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.tools.datastreams_helpers.control_plane import create_stream + +import ydb.public.api.protos.draft.fq_pb2 as fq + +import library.python.retry as retry + + +def run_with_sleep(args): + program_args, time_min, time_max, duration = args + deadline = time.time() + duration + while time.time() < deadline: + yatest.common.execute(program_args) + time.sleep(random.uniform(time_min, time_max)) + + +@pytest.fixture +def kikimr(): + kikimr_conf = StreamingOverKikimrConfig(node_count=8, cloud_mode=True) + kikimr = StreamingOverKikimr(kikimr_conf) + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop() + kikimr.stop_mvp_mock_server() + + +class TestRecovery(TestYdsBase): + @classmethod + def setup_class(cls): + # for retry + cls.retry_conf = retry.RetryConf().upto(seconds=30).waiting(0.1) + + @retry.retry_intrusive + def get_graph_master_node_id(self, query_id): + for node_index in self.kikimr.control_plane.kikimr_cluster.nodes: + if self.kikimr.control_plane.get_task_count(node_index, query_id) > 0: + return node_index + assert False, "No active graphs found" + + def get_ca_count(self, node_index): + result = self.kikimr.control_plane.get_sensors(node_index, "utils").find_sensor({"activity": "DQ_COMPUTE_ACTOR", "sensor": "ActorsAliveByActivity", "execpool": "User"}) + return result if result is not None else 0 + + def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_under_sanitizer(30, 150)): + deadline = time.time() + wait_time + while True: + wcs = 0 + ccs = 0 + list = [] + for node_index in self.kikimr.control_plane.kikimr_cluster.nodes: + wc = self.kikimr.control_plane.get_worker_count(node_index) + cc = self.get_ca_count(node_index) + wcs += wc + ccs += cc + list.append([node_index, wc, cc]) + if wcs == worker_count and ccs == ca_count: + for [s, w, c] in list: + if w * 2 != c: + continue + for [s, w, c] in list: + logging.debug("Node {}, workers {}, ca {}".format(s, w, c)) + return + if time.time() > deadline: + for [s, w, c] in list: + logging.debug("Node {}, workers {}, ca {}".format(s, w, c)) + assert False, "Workers={} and CAs={}, but {} and {} expected".format(wcs, ccs, worker_count, ca_count) + + @yq_v1 + def test_delete(self, client, kikimr): + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.wait_bootstrap(node_index) + + self.kikimr = kikimr + self.init_topics("recovery", partitions_count=2) + + # Consumer and topics to create are written in ya.make file. + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO myyds.`{output_topic}` + SELECT STREAM + * + FROM myyds.`{input_topic}`;'''\ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + logging.debug("Uuid = {}".format(kikimr.uuid)) + + self.dump_workers(2, 4) + + client.abort_query(query_id) + client.wait_query(query_id) + + self.dump_workers(0, 0) + + @yq_v1 + def test_program_state_recovery(self, client, kikimr): + # 100 105 110 115 120 125 130 135 140 (ms) + # [ Bucket1 ) |(emited) + # [ Bucket2 ) |(emited) + # .<------------------------------------- restart + # [ Bucket3 ) |(emited) + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.wait_bootstrap(node_index) + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.wait_discovery(node_index) + + self.kikimr = kikimr + self.init_topics("program_state_recovery", partitions_count=1) + + # Consumer and topics to create are written in ya.make file. + sql = f''' + PRAGMA dq.MaxTasksPerStage="1"; + INSERT INTO myyds.`{self.output_topic}` + SELECT STREAM + Yson::SerializeText(Yson::From(TableRow())) + FROM ( + SELECT STREAM + Sum(t) as sum + FROM ( + SELECT STREAM + Yson::LookupUint64(ys, "time") as t + FROM ( + SELECT STREAM + Yson::Parse(Data) AS ys + FROM myyds.`{self.input_topic}`)) + GROUP BY + HOP(DateTime::FromMilliseconds(CAST(Unwrap(t) as Uint32)), "PT0.01S", "PT0.01S", "PT0.01S"));''' + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + + query_id = client.create_query("test_program_state_recovery", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + logging.debug("Uuid = {}".format(kikimr.uuid)) + master_node_index = self.get_graph_master_node_id(query_id) + logging.debug("Master node {}".format(master_node_index)) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + self.write_stream([f'{{"time" = {i};}}' for i in range(100, 115, 2)]) + + kikimr.compute_plane.wait_completed_checkpoints(query_id, self.kikimr.compute_plane.get_completed_checkpoints(query_id) + 1) + + # restart node with CA + node_to_restart = None + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + wc = kikimr.control_plane.get_worker_count(node_index) + if wc is not None: + if wc > 0 and node_index != master_node_index and node_to_restart is None: + node_to_restart = node_index + assert node_to_restart is not None, "Can't find any task on non master node" + + logging.debug("Restart non-master node {}".format(node_to_restart)) + + kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop() + kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start() + kikimr.control_plane.wait_bootstrap(node_to_restart) + + self.write_stream([f'{{"time" = {i};}}' for i in range(116, 144, 2)]) + + # wait aggregated + expected = [ + '{"sum" = 520u}', + '{"sum" = 570u}', + '{"sum" = 620u}', + ] + received = self.read_stream(3) + assert received == expected + + client.abort_query(query_id) + client.wait_query(query_id) + + self.dump_workers(0, 0) + + @yq_v1 + # @pytest.mark.parametrize( + # "restart_master", + # [False, True], + # ids=["not_master", "master"] + # ) + def test_recovery(self, client, kikimr): + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.wait_bootstrap(node_index) + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + kikimr.control_plane.wait_discovery(node_index) + + self.init_topics("recovery", partitions_count=2) + + self.kikimr = kikimr + + # Consumer and topics to create are written in ya.make file. + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO myyds.`{output_topic}` + SELECT STREAM + * + FROM myyds.`{input_topic}`;'''\ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + logging.debug("Uuid = {}".format(kikimr.uuid)) + master_node_index = self.get_graph_master_node_id(query_id) + logging.debug("Master node {}".format(master_node_index)) + + self.write_stream([str(i) for i in range(1, 11)]) + + d = {} + + read_data = self.read_stream(10) + assert len(read_data) == 10 + for m in read_data: + n = int(m) + assert n >= 1 and n <= 10 + assert n not in d + d[n] = 1 + + self.dump_workers(2, 4) + + node_to_restart = None + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + wc = kikimr.control_plane.get_worker_count(node_index) + if wc is not None: + if wc > 0 and node_index != master_node_index and node_to_restart is None: + node_to_restart = node_index + assert node_to_restart is not None, "Can't find any task on non master node" + + logging.debug("Restart non-master node {}".format(node_to_restart)) + + kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop() + kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start() + kikimr.control_plane.wait_bootstrap(node_to_restart) + + self.dump_workers(2, 4) + + self.write_stream([str(i) for i in range(11, 21)]) + + read_data = self.read_stream(10) + assert len(read_data) == 10 + for m in read_data: + n = int(m) + assert n >= 1 and n <= 20 + if n in d: + d[n] = d[n] + 1 + else: + d[n] = 1 + + assert len(d) == 20 + + logging.debug("Restart Master node {}".format(master_node_index)) + + kikimr.control_plane.kikimr_cluster.nodes[master_node_index].stop() + kikimr.control_plane.kikimr_cluster.nodes[master_node_index].start() + kikimr.control_plane.wait_bootstrap(master_node_index) + master_node_index = self.get_graph_master_node_id(query_id) + + logging.debug("New master node {}".format(master_node_index)) + + self.dump_workers(2, 4) + + self.write_stream([str(i) for i in range(21, 31)]) + + read_data = self.read_stream(10) + assert len(read_data) == 10 + for m in read_data: + n = int(m) + assert n >= 1 and n <= 30 + if n in d: + d[n] = d[n] + 1 + else: + d[n] = 1 + assert len(d) == 30 + + zero_checkpoints_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(query_id, "StartedFromEmptyCheckpoint") + restored_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(query_id, "RestoredFromSavedCheckpoint") + assert restored_metric >= 1, "RestoredFromSavedCheckpoint: {}, StartedFromEmptyCheckpoint: {}".format(restored_metric, zero_checkpoints_metric) + + client.abort_query(query_id) + client.wait_query(query_id) + + def close_ic_session_args(self, node1, node2): + s1 = self.kikimr.control_plane.kikimr_cluster.nodes[node1] + s2 = self.kikimr.control_plane.kikimr_cluster.nodes[node2] + # action = "closepeersocket" + # action = "poisonsession" + action = "closeinputsession" + return [param_constants.kikimr_driver_path(), + "-s", "{}:{}".format(s1.host, s1.grpc_port), + "admin", "debug", "interconnect", action, + "--node", str(s2.node_id)] + + def slowpoke_args(self, node): + s = self.kikimr.control_plane.kikimr_cluster.nodes[node] + return [param_constants.kikimr_driver_path(), + "-s", "{}:{}".format(s.host, s.grpc_port), + "admin", "debug", "interconnect", "slowpoke", + "--pool-id", "4", + "--duration", "30s", + "--sleep-min", yatest_common.plain_or_under_sanitizer("10ms", "50ms"), + "--sleep-max", yatest_common.plain_or_under_sanitizer("100ms", "500ms"), + "--reschedule-min", "10ms", "--reschedule-max", "100ms", + "--num-actors", "2"] + + def start_close_ic_sessions_processes(self): + pool = multiprocessing.Pool() + args = [] + + for node1_index in self.kikimr.control_plane.kikimr_cluster.nodes: + yatest.common.execute(self.slowpoke_args(node1_index)) + for node2_index in self.kikimr.control_plane.kikimr_cluster.nodes: + if node2_index > node1_index: + args.append((self.close_ic_session_args(node1_index, node2_index), 0.1, 2, 30)) + return pool.map_async(run_with_sleep, args) + + @yq_v1 + @pytest.mark.skip(reason="Should be tuned") + def test_ic_disconnection(self, client): + for node_index in self.kikimr.control_plane.kikimr_cluster.nodes: + self.kikimr.control_plane.wait_bootstrap(node_index) + for node_index in self.kikimr.control_plane.kikimr_cluster.nodes: + self.kikimr.control_plane.wait_discovery(node_index) + + self.kikimr = kikimr + self.init_topics("disconnection", partitions_count=2) + input_topic_1 = "disconnection_i_1" + input_topic_2 = "disconnection_i_2" + create_stream(input_topic_1) + create_stream(input_topic_2) + + # Consumer and topics to create are written in ya.make file. + sql = R''' + PRAGMA dq.MaxTasksPerStage="42"; + + INSERT INTO myyds.`{output_topic}` + SELECT (S1.Data || S2.Data) || "" + FROM myyds.`{input_topic_1}` AS S1 + INNER JOIN (SELECT * FROM myyds.`{input_topic_2}`) AS S2 + ON S1.Data = S2.Data + '''\ + .format( + input_topic_1=input_topic_1, + input_topic_2=input_topic_2, + output_topic=self.output_topic, + ) + + close_ic_sessions_future = self.start_close_ic_sessions_processes() + + folder_id = "my_folder" + # automatic query will not clean up metrics after failure + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("disconnected", sql, type=fq.QueryContent.QueryType.STREAMING, automatic=True).result.query_id + automatic_id = "automatic_" + folder_id + + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + + # Checkpointing must be finished + deadline = time.time() + yatest_common.plain_or_under_sanitizer(300, 900) + while True: + status = client.describe_query(query_id).result.query.meta.status + assert status == fq.QueryMeta.RUNNING, "Unexpected status " + fq.QueryMeta.ComputeStatus.Name(status) + completed = self.kikimr.control_plane.get_completed_checkpoints(automatic_id, False) + if completed >= 5: + break + assert time.time() < deadline, "Completed: {}".format(completed) + time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2)) + + close_ic_sessions_future.wait() diff --git a/ydb/tests/fq/yds/test_recovery_match_recognize.py b/ydb/tests/fq/yds/test_recovery_match_recognize.py new file mode 100644 index 000000000000..dcca66c09125 --- /dev/null +++ b/ydb/tests/fq/yds/test_recovery_match_recognize.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import pytest +import logging +import os +import time + +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig +import library.python.retry as retry +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +import ydb.public.api.protos.draft.fq_pb2 as fq + + +@pytest.fixture +def kikimr(request): + kikimr_conf = StreamingOverKikimrConfig(cloud_mode=True, node_count=2) + kikimr = StreamingOverKikimr(kikimr_conf) + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop_mvp_mock_server() + kikimr.stop() + + +class TestRecoveryMatchRecognize(TestYdsBase): + + @classmethod + def setup_class(cls): + # for retry + cls.retry_conf = retry.RetryConf().upto(seconds=30).waiting(0.1) + + @retry.retry_intrusive + def get_graph_master_node_id(self, kikimr, query_id): + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + if kikimr.control_plane.get_task_count(node_index, query_id) > 0: + return node_index + assert False, "No active graphs found" + + def get_ca_count(self, kikimr, node_index): + result = kikimr.control_plane.get_sensors(node_index, "utils").find_sensor( + {"activity": "DQ_COMPUTE_ACTOR", "sensor": "ActorsAliveByActivity", "execpool": "User"} + ) + return result if result is not None else 0 + + def dump_workers(self, kikimr, worker_count, ca_count, wait_time=yatest_common.plain_or_under_sanitizer(30, 150)): + deadline = time.time() + wait_time + while True: + wcs = 0 + ccs = 0 + list = [] + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + wc = kikimr.control_plane.get_worker_count(node_index) + cc = self.get_ca_count(kikimr, node_index) + wcs += wc + ccs += cc + list.append([node_index, wc, cc]) + if wcs == worker_count and ccs == ca_count: + for [s, w, c] in list: + if w * 2 != c: + continue + for [s, w, c] in list: + logging.debug("Node {}, workers {}, ca {}".format(s, w, c)) + return + if time.time() > deadline: + for [s, w, c] in list: + logging.debug("Node {}, workers {}, ca {}".format(s, w, c)) + assert False, "Workers={} and CAs={}, but {} and {} expected".format(wcs, ccs, worker_count, ca_count) + + @yq_v1 + @pytest.mark.parametrize("kikimr", [(None, None, None)], indirect=["kikimr"]) + def test_program_state_recovery(self, kikimr, client, yq_version): + + self.init_topics(f"pq_kikimr_streaming_{yq_version}") + + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + pragma FeatureR010="prototype"; + pragma config.flags("TimeOrderRecoverDelay", "-1000000"); + pragma config.flags("TimeOrderRecoverAhead", "1000000"); + + INSERT INTO myyds.`{output_topic}` + SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) + FROM (SELECT * FROM myyds.`{input_topic}` + WITH ( + format=json_each_row, + SCHEMA + ( + dt UINT64 + ))) + MATCH_RECOGNIZE( + ORDER BY CAST(dt as Timestamp) + MEASURES + LAST(ALL_TRUE.dt) as dt + ONE ROW PER MATCH + PATTERN ( ALL_TRUE ) + DEFINE + ALL_TRUE as True)''' \ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + kikimr.compute_plane.wait_zero_checkpoint(query_id) + + master_node_index = self.get_graph_master_node_id(kikimr, query_id) + logging.debug("Master node {}".format(master_node_index)) + + messages1 = ['{"dt": 1696849942400002}', '{"dt": 1696849942000001}'] + self.write_stream(messages1) + + logging.debug("get_completed_checkpoints {}".format(kikimr.compute_plane.get_completed_checkpoints(query_id))) + kikimr.compute_plane.wait_completed_checkpoints( + query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1 + ) + + # restart node with CA + node_to_restart = None + for node_index in kikimr.control_plane.kikimr_cluster.nodes: + wc = kikimr.control_plane.get_worker_count(node_index) + if wc is not None: + if wc > 0 and node_index != master_node_index and node_to_restart is None: + node_to_restart = node_index + assert node_to_restart is not None, "Can't find any task on non master node" + + logging.debug("Restart non-master node {}".format(node_to_restart)) + + kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop() + kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start() + kikimr.control_plane.wait_bootstrap(node_to_restart) + + messages2 = [ + '{"dt": 1696849942800000}', + '{"dt": 1696849943200003}', + '{"dt": 1696849943300003}', + '{"dt": 1696849943600003}', + '{"dt": 1696849943900003}' + ] + self.write_stream(messages2) + + assert client.get_query_status(query_id) == fq.QueryMeta.RUNNING + + expected = ['{"dt":1696849942000001}', '{"dt":1696849942400002}', '{"dt":1696849942800000}'] + + read_data = self.read_stream(len(expected)) + logging.info("Data was read: {}".format(read_data)) + + assert read_data == expected + + client.abort_query(query_id) + client.wait_query(query_id) + + self.dump_workers(kikimr, 0, 0) diff --git a/ydb/tests/fq/yds/test_recovery_mz.py b/ydb/tests/fq/yds/test_recovery_mz.py new file mode 100644 index 000000000000..a2f76db97f90 --- /dev/null +++ b/ydb/tests/fq/yds/test_recovery_mz.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import time +import pytest +import random +import os +import yatest + +import ydb.tests.library.common.yatest_common as yatest_common +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr +from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig +from ydb.tests.tools.fq_runner.kikimr_runner import TenantConfig +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +import library.python.retry as retry +import ydb.public.api.protos.draft.fq_pb2 as fq + + +@pytest.fixture +def kikimr(): + kikimr_conf = StreamingOverKikimrConfig( + cloud_mode=True, + node_count={"/cp": TenantConfig(1), + "/compute": TenantConfig(8)}) + kikimr = StreamingOverKikimr(kikimr_conf) + # control + kikimr.control_plane.fq_config['control_plane_storage']['mapping'] = {"common_tenant_name": ["/compute"]} + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy'] = {} + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_count'] = 5 + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_retry_policy']['retry_period'] = "30s" + kikimr.control_plane.fq_config['control_plane_storage']['task_lease_ttl'] = "3s" + # compute + kikimr.compute_plane.fq_config['pinger']['ping_period'] = "1s" + kikimr.start_mvp_mock_server() + kikimr.start() + yield kikimr + kikimr.stop() + kikimr.stop_mvp_mock_server() + + +def run_with_sleep(args): + program_args, time_min, time_max, duration = args + deadline = time.time() + duration + while time.time() < deadline: + yatest.common.execute(program_args) + time.sleep(random.uniform(time_min, time_max)) + + +class TestRecovery(TestYdsBase): + + @retry.retry_intrusive + def get_graph_master_node_id(self, query_id): + for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes: + if self.kikimr.compute_plane.get_task_count(node_index, query_id) > 0: + return node_index + assert False, "No active graphs found" + + def get_ca_count(self, node_index): + result = self.kikimr.compute_plane.get_sensors(node_index, "utils").find_sensor({"activity": "DQ_COMPUTE_ACTOR", "sensor": "ActorsAliveByActivity", "execpool": "User"}) + return result if result is not None else 0 + + def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_under_sanitizer(30, 150)): + deadline = time.time() + wait_time + while True: + wcs = 0 + ccs = 0 + list = [] + for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes: + wc = self.kikimr.compute_plane.get_worker_count(node_index) + cc = self.get_ca_count(node_index) + wcs += wc + ccs += cc + list.append([node_index, wc, cc]) + if wcs == worker_count and ccs == ca_count: + for [s, w, c] in list: + if w * 2 != c: + continue + for [s, w, c] in list: + logging.debug("Node {}, workers {}, ca {}".format(s, w, c)) + return + if time.time() > deadline: + for [s, w, c] in list: + logging.debug("Node {}, workers {}, ca {}".format(s, w, c)) + assert False, "Workers={} and CAs={}, but {} and {} expected".format(wcs, ccs, worker_count, ca_count) + + @yq_v1 + def test_recovery(self, kikimr, client, yq_version): + self.init_topics(f"pq_kikimr_streaming_{yq_version}", partitions_count=2) + + self.retry_conf = retry.RetryConf().upto(seconds=30).waiting(0.1) + self.kikimr = kikimr + kikimr.compute_plane.wait_bootstrap() + kikimr.compute_plane.wait_discovery() + + # Consumer and topics to create are written in ya.make file. + sql = R''' + PRAGMA dq.MaxTasksPerStage="2"; + + INSERT INTO myyds.`{output_topic}` + SELECT STREAM + * + FROM myyds.`{input_topic}`;'''\ + .format( + input_topic=self.input_topic, + output_topic=self.output_topic, + ) + client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.RUNNING) + self.kikimr.compute_plane.wait_zero_checkpoint(query_id) + + logging.debug("Uuid = {}".format(kikimr.uuid)) + master_node_index = self.get_graph_master_node_id(query_id) + logging.debug("Master node {}".format(master_node_index)) + + self.write_stream([str(i) for i in range(1, 11)]) + + read_data = self.read_stream(10) + + for message in read_data: + logging.info("Received message: {}".format(message)) + + assert len(read_data) == 10 + + d = {} + for m in read_data: + n = int(m) + assert n >= 1 and n <= 10 + assert n not in d + d[n] = 1 + + self.dump_workers(2, 4) + + node_to_restart = None + for node_index in kikimr.compute_plane.kikimr_cluster.nodes: + wc = kikimr.compute_plane.get_worker_count(node_index) + if wc is not None: + if wc > 0 and node_index != master_node_index and node_to_restart is None: + node_to_restart = node_index + assert node_to_restart is not None, "Can't find any task on non master node" + + logging.debug("Restart non-master node {}".format(node_to_restart)) + + kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].stop() + kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].start() + kikimr.compute_plane.wait_bootstrap(node_to_restart) + + self.dump_workers(2, 4) + + self.write_stream([str(i) for i in range(11, 21)]) + + read_data = self.read_stream(10) + assert len(read_data) == 10 + + for m in read_data: + n = int(m) + assert n >= 1 and n <= 20 + if n in d: + d[n] = d[n] + 1 + else: + d[n] = 1 + + logging.debug("Restart Master node {}".format(master_node_index)) + + kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].stop() + kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].start() + kikimr.compute_plane.wait_bootstrap(master_node_index) + master_node_index = self.get_graph_master_node_id(query_id) + + logging.debug("New master node {}".format(master_node_index)) + + self.dump_workers(2, 4) + + self.write_stream([str(i) for i in range(21, 31)]) + + read_data = self.read_stream(10) + assert len(read_data) == 10 + + for m in read_data: + n = int(m) + assert n >= 1 and n <= 30 + if n in d: + d[n] = d[n] + 1 + else: + d[n] = 1 + + zero_checkpoints_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id, "StartedFromEmptyCheckpoint") + restored_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(query_id, "RestoredFromSavedCheckpoint") + assert restored_metric >= 1, "RestoredFromSavedCheckpoint: {}, StartedFromEmptyCheckpoint: {}".format(restored_metric, zero_checkpoints_metric) + + client.abort_query(query_id) + client.wait_query(query_id) diff --git a/ydb/tests/fq/yds/ya.make b/ydb/tests/fq/yds/ya.make index da9713bceaf9..4b372abde83d 100644 --- a/ydb/tests/fq/yds/ya.make +++ b/ydb/tests/fq/yds/ya.make @@ -33,6 +33,9 @@ TEST_SRCS( test_pq_read_write.py test_public_metrics.py test_read_rules_deletion.py + test_recovery.py + test_recovery_match_recognize.py + test_recovery_mz.py test_restart_query.py test_select_1.py test_select_limit_db_id.py