Skip to content

Commit

Permalink
YQ-3763 Fix flaky fq/yds tests (#10428)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Oct 16, 2024
1 parent fff7685 commit a4966da
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void TDqPqReadActorBase::SaveState(const NDqProto::TCheckpoint& /*checkpoint*/,
partitionState->SetCluster(cluster);
partitionState->SetPartition(partition);
partitionState->SetOffset(offset);
SRC_LOG_D("SessionId: " << GetSessionId() << " SaveState: partition " << partition << ", offset: " << offset);
}

stateProto.SetStartingMessageTimestampMs(StartingMessageTimestamp.MilliSeconds());
Expand Down
79 changes: 38 additions & 41 deletions ydb/tests/fq/yds/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import ydb.tests.library.common.yatest_common as yatest_common
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
from ydb.tests.tools.fq_runner.kikimr_runner import TenantConfig
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
from ydb.tests.tools.datastreams_helpers.control_plane import create_stream
Expand All @@ -33,7 +34,7 @@ def run_with_sleep(args):

@pytest.fixture
def kikimr():
kikimr_conf = StreamingOverKikimrConfig(node_count=8, cloud_mode=True)
kikimr_conf = StreamingOverKikimrConfig(node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}, cloud_mode=True)
kikimr = StreamingOverKikimr(kikimr_conf)
kikimr.start_mvp_mock_server()
kikimr.start()
Expand All @@ -50,8 +51,8 @@ def setup_class(cls):

@retry.retry_intrusive
def get_graph_master_node_id(self, query_id):
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
if self.kikimr.control_plane.get_task_count(node_index, query_id) > 0:
for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes:
if self.kikimr.compute_plane.get_task_count(node_index, query_id) > 0:
return node_index
assert False, "No active graphs found"

Expand All @@ -61,9 +62,9 @@ def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_
wcs = 0
ccs = 0
list = []
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
wc = self.kikimr.control_plane.get_worker_count(node_index)
cc = self.kikimr.control_plane.get_ca_count(node_index)
for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes:
wc = self.kikimr.compute_plane.get_worker_count(node_index)
cc = self.kikimr.compute_plane.get_ca_count(node_index)
wcs += wc
ccs += cc
list.append([node_index, wc, cc])
Expand All @@ -81,8 +82,8 @@ def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_

@yq_v1
def test_delete(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
kikimr.control_plane.wait_bootstrap()
kikimr.compute_plane.wait_bootstrap()

self.kikimr = kikimr
self.init_topics("recovery", partitions_count=2)
Expand Down Expand Up @@ -117,10 +118,9 @@ def test_program_state_recovery(self, client, kikimr):
# [ Bucket2 ) |(emited)
# .<------------------------------------- restart
# [ Bucket3 ) |(emited)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
kikimr.control_plane.wait_bootstrap()
kikimr.compute_plane.wait_bootstrap()
kikimr.compute_plane.wait_discovery()

self.kikimr = kikimr
self.init_topics("program_state_recovery", partitions_count=1)
Expand Down Expand Up @@ -162,18 +162,18 @@ def test_program_state_recovery(self, client, kikimr):

# restart node with CA
node_to_restart = None
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
wc = kikimr.control_plane.get_worker_count(node_index)
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
wc = kikimr.compute_plane.get_worker_count(node_index)
if wc is not None:
if wc > 0 and node_index != master_node_index and node_to_restart is None:
node_to_restart = node_index
assert node_to_restart is not None, "Can't find any task on non master node"

logging.debug("Restart non-master node {}".format(node_to_restart))

kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.control_plane.wait_bootstrap(node_to_restart)
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.compute_plane.wait_bootstrap(node_to_restart)

self.write_stream([f'{{"time" = {i};}}' for i in range(116, 144, 2)])

Expand All @@ -198,10 +198,9 @@ def test_program_state_recovery(self, client, kikimr):
# ids=["not_master", "master"]
# )
def test_recovery(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
kikimr.control_plane.wait_bootstrap()
kikimr.compute_plane.wait_bootstrap()
kikimr.compute_plane.wait_discovery()

self.init_topics("recovery", partitions_count=2)

Expand Down Expand Up @@ -242,18 +241,18 @@ def test_recovery(self, client, kikimr):
self.dump_workers(2, 4)

node_to_restart = None
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
wc = kikimr.control_plane.get_worker_count(node_index)
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
wc = kikimr.compute_plane.get_worker_count(node_index)
if wc is not None:
if wc > 0 and node_index != master_node_index and node_to_restart is None:
node_to_restart = node_index
assert node_to_restart is not None, "Can't find any task on non master node"

logging.debug("Restart non-master node {}".format(node_to_restart))

kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.control_plane.wait_bootstrap(node_to_restart)
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.compute_plane.wait_bootstrap(node_to_restart)

self.dump_workers(2, 4)

Expand All @@ -273,9 +272,9 @@ def test_recovery(self, client, kikimr):

logging.debug("Restart Master node {}".format(master_node_index))

kikimr.control_plane.kikimr_cluster.nodes[master_node_index].stop()
kikimr.control_plane.kikimr_cluster.nodes[master_node_index].start()
kikimr.control_plane.wait_bootstrap(master_node_index)
kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].stop()
kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].start()
kikimr.compute_plane.wait_bootstrap(master_node_index)
master_node_index = self.get_graph_master_node_id(query_id)

logging.debug("New master node {}".format(master_node_index))
Expand All @@ -295,10 +294,10 @@ def test_recovery(self, client, kikimr):
d[n] = 1
assert len(d) == 30

zero_checkpoints_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(
zero_checkpoints_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(
query_id, "StartedFromEmptyCheckpoint"
)
restored_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(
restored_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(
query_id, "RestoredFromSavedCheckpoint"
)
assert restored_metric >= 1, "RestoredFromSavedCheckpoint: {}, StartedFromEmptyCheckpoint: {}".format(
Expand Down Expand Up @@ -420,10 +419,9 @@ def test_ic_disconnection(self, client):

@yq_v1
def test_program_state_recovery_error_if_no_states(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
kikimr.control_plane.wait_bootstrap()
kikimr.compute_plane.wait_bootstrap()
kikimr.compute_plane.wait_discovery()
self.init_topics("error_if_no_states", partitions_count=1)

sql = R'''
Expand All @@ -443,17 +441,16 @@ def test_program_state_recovery_error_if_no_states(self, client, kikimr):
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].stop()
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()

session = kikimr.driver.table_client.session().create()
checkpoint_table_prefix = "/local/CheckpointCoordinatorStorage_" + kikimr.uuid + '/states'
session.transaction().execute(f"DELETE FROM `{checkpoint_table_prefix}`", commit_tx=True)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].start()
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
kikimr.compute_plane.wait_bootstrap()

client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/yds/test_recovery_mz.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@pytest.fixture
def kikimr():
kikimr_conf = StreamingOverKikimrConfig(
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(8)}
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}
)
kikimr = StreamingOverKikimr(kikimr_conf)
# control
Expand Down
18 changes: 9 additions & 9 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ def test_read_raw_format_with_row_dispatcher(self, kikimr, client):
)
connections = client.list_connections(fq.Acl.Visibility.PRIVATE).result.connection
assert len(connections) == 1
assert connections[0].content.setting.data_streams.shared_reading is True
assert connections[0].content.setting.data_streams.shared_reading

self.init_topics("test_read_raw_format_without_row_dispatcher", create_output=False)
self.init_topics("test_read_raw_format_with_row_dispatcher", create_output=False)
output_topic = "pq_test_pq_read_write_output"
create_stream(output_topic, partitions_count=1)
create_read_rule(output_topic, self.consumer_name)
Expand Down Expand Up @@ -292,7 +292,7 @@ def test_filter_missing_fields(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_filter")
self.init_topics("test_filter_missing_fields")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
Expand Down Expand Up @@ -503,7 +503,7 @@ def test_stop_start(self, kikimr, client):
assert self.read_stream(len(expected), topic_path=output_topic) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)
stop_yds_query(client, query_id)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
Expand Down Expand Up @@ -551,7 +551,7 @@ def test_stop_start_with_filter(self, kikimr, client):
self.write_stream(data)

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 10
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 10 # long sleep to send status from topic_session to read_actor
)
stop_yds_query(client, query_id)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
Expand Down Expand Up @@ -601,7 +601,7 @@ def test_restart_compute_node(self, kikimr, client):
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
Expand All @@ -618,7 +618,7 @@ def test_restart_compute_node(self, kikimr, client):
expected = ['103', '104']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)

node_index = 1
Expand Down Expand Up @@ -681,7 +681,7 @@ def test_3_sessions(self, kikimr, client):
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)

kikimr.compute_plane.wait_completed_checkpoints(
query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 1
query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 2
)
stop_yds_query(client, query_id1)

Expand Down Expand Up @@ -724,7 +724,7 @@ def test_many_partitions(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_simple_not_null", partitions_count=4)
self.init_topics("test_many_partitions", partitions_count=4)

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/datastreams_helpers/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ydb.public.api.protos.ydb_status_codes_pb2 import StatusIds


READ_TOOL_TIMEOUT = yatest_common.plain_or_under_sanitizer(20, 300)
READ_TOOL_TIMEOUT = yatest_common.plain_or_under_sanitizer(30, 300)


def write_stream(path, data, partition_key=None):
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/fq_runner/kikimr_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def fill_config(self, control_plane):

fq_config['row_dispatcher'] = {
'enabled': True,
'timeout_before_start_session_sec': 2,
'timeout_before_start_session_sec': 5,
'send_status_period_sec': 2,
'max_session_used_memory': 1000000,
'without_consumer': True}
Expand Down

0 comments on commit a4966da

Please sign in to comment.