Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3763 Fix flaky fq/yds tests #10428

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void TDqPqReadActorBase::SaveState(const NDqProto::TCheckpoint& /*checkpoint*/,
partitionState->SetCluster(cluster);
partitionState->SetPartition(partition);
partitionState->SetOffset(offset);
SRC_LOG_D("SessionId: " << GetSessionId() << " SaveState: partition " << partition << ", offset: " << offset);
}

stateProto.SetStartingMessageTimestampMs(StartingMessageTimestamp.MilliSeconds());
Expand Down
79 changes: 38 additions & 41 deletions ydb/tests/fq/yds/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import ydb.tests.library.common.yatest_common as yatest_common
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimr
from ydb.tests.tools.fq_runner.kikimr_runner import StreamingOverKikimrConfig
from ydb.tests.tools.fq_runner.kikimr_runner import TenantConfig
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
from ydb.tests.tools.datastreams_helpers.control_plane import create_stream
Expand All @@ -33,7 +34,7 @@ def run_with_sleep(args):

@pytest.fixture
def kikimr():
kikimr_conf = StreamingOverKikimrConfig(node_count=8, cloud_mode=True)
kikimr_conf = StreamingOverKikimrConfig(node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}, cloud_mode=True)
kikimr = StreamingOverKikimr(kikimr_conf)
kikimr.start_mvp_mock_server()
kikimr.start()
Expand All @@ -50,8 +51,8 @@ def setup_class(cls):

@retry.retry_intrusive
def get_graph_master_node_id(self, query_id):
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
if self.kikimr.control_plane.get_task_count(node_index, query_id) > 0:
for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes:
if self.kikimr.compute_plane.get_task_count(node_index, query_id) > 0:
return node_index
assert False, "No active graphs found"

Expand All @@ -61,9 +62,9 @@ def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_
wcs = 0
ccs = 0
list = []
for node_index in self.kikimr.control_plane.kikimr_cluster.nodes:
wc = self.kikimr.control_plane.get_worker_count(node_index)
cc = self.kikimr.control_plane.get_ca_count(node_index)
for node_index in self.kikimr.compute_plane.kikimr_cluster.nodes:
wc = self.kikimr.compute_plane.get_worker_count(node_index)
cc = self.kikimr.compute_plane.get_ca_count(node_index)
wcs += wc
ccs += cc
list.append([node_index, wc, cc])
Expand All @@ -81,8 +82,8 @@ def dump_workers(self, worker_count, ca_count, wait_time=yatest_common.plain_or_

@yq_v1
def test_delete(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
kikimr.control_plane.wait_bootstrap()
kikimr.compute_plane.wait_bootstrap()

self.kikimr = kikimr
self.init_topics("recovery", partitions_count=2)
Expand Down Expand Up @@ -117,10 +118,9 @@ def test_program_state_recovery(self, client, kikimr):
# [ Bucket2 ) |(emited)
# .<------------------------------------- restart
# [ Bucket3 ) |(emited)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
kikimr.control_plane.wait_bootstrap()
kikimr.compute_plane.wait_bootstrap()
kikimr.compute_plane.wait_discovery()

self.kikimr = kikimr
self.init_topics("program_state_recovery", partitions_count=1)
Expand Down Expand Up @@ -162,18 +162,18 @@ def test_program_state_recovery(self, client, kikimr):

# restart node with CA
node_to_restart = None
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
wc = kikimr.control_plane.get_worker_count(node_index)
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
wc = kikimr.compute_plane.get_worker_count(node_index)
if wc is not None:
if wc > 0 and node_index != master_node_index and node_to_restart is None:
node_to_restart = node_index
assert node_to_restart is not None, "Can't find any task on non master node"

logging.debug("Restart non-master node {}".format(node_to_restart))

kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.control_plane.wait_bootstrap(node_to_restart)
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.compute_plane.wait_bootstrap(node_to_restart)

self.write_stream([f'{{"time" = {i};}}' for i in range(116, 144, 2)])

Expand All @@ -198,10 +198,9 @@ def test_program_state_recovery(self, client, kikimr):
# ids=["not_master", "master"]
# )
def test_recovery(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
kikimr.control_plane.wait_bootstrap()
kikimr.compute_plane.wait_bootstrap()
kikimr.compute_plane.wait_discovery()

self.init_topics("recovery", partitions_count=2)

Expand Down Expand Up @@ -242,18 +241,18 @@ def test_recovery(self, client, kikimr):
self.dump_workers(2, 4)

node_to_restart = None
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
wc = kikimr.control_plane.get_worker_count(node_index)
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
wc = kikimr.compute_plane.get_worker_count(node_index)
if wc is not None:
if wc > 0 and node_index != master_node_index and node_to_restart is None:
node_to_restart = node_index
assert node_to_restart is not None, "Can't find any task on non master node"

logging.debug("Restart non-master node {}".format(node_to_restart))

kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.control_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.control_plane.wait_bootstrap(node_to_restart)
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].stop()
kikimr.compute_plane.kikimr_cluster.nodes[node_to_restart].start()
kikimr.compute_plane.wait_bootstrap(node_to_restart)

self.dump_workers(2, 4)

Expand All @@ -273,9 +272,9 @@ def test_recovery(self, client, kikimr):

logging.debug("Restart Master node {}".format(master_node_index))

kikimr.control_plane.kikimr_cluster.nodes[master_node_index].stop()
kikimr.control_plane.kikimr_cluster.nodes[master_node_index].start()
kikimr.control_plane.wait_bootstrap(master_node_index)
kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].stop()
kikimr.compute_plane.kikimr_cluster.nodes[master_node_index].start()
kikimr.compute_plane.wait_bootstrap(master_node_index)
master_node_index = self.get_graph_master_node_id(query_id)

logging.debug("New master node {}".format(master_node_index))
Expand All @@ -295,10 +294,10 @@ def test_recovery(self, client, kikimr):
d[n] = 1
assert len(d) == 30

zero_checkpoints_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(
zero_checkpoints_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(
query_id, "StartedFromEmptyCheckpoint"
)
restored_metric = kikimr.control_plane.get_checkpoint_coordinator_metric(
restored_metric = kikimr.compute_plane.get_checkpoint_coordinator_metric(
query_id, "RestoredFromSavedCheckpoint"
)
assert restored_metric >= 1, "RestoredFromSavedCheckpoint: {}, StartedFromEmptyCheckpoint: {}".format(
Expand Down Expand Up @@ -420,10 +419,9 @@ def test_ic_disconnection(self, client):

@yq_v1
def test_program_state_recovery_error_if_no_states(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
kikimr.control_plane.wait_bootstrap()
kikimr.compute_plane.wait_bootstrap()
kikimr.compute_plane.wait_discovery()
self.init_topics("error_if_no_states", partitions_count=1)

sql = R'''
Expand All @@ -443,17 +441,16 @@ def test_program_state_recovery_error_if_no_states(self, client, kikimr):
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].stop()
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
kikimr.compute_plane.kikimr_cluster.nodes[node_index].stop()

session = kikimr.driver.table_client.session().create()
checkpoint_table_prefix = "/local/CheckpointCoordinatorStorage_" + kikimr.uuid + '/states'
session.transaction().execute(f"DELETE FROM `{checkpoint_table_prefix}`", commit_tx=True)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].start()
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
kikimr.compute_plane.kikimr_cluster.nodes[node_index].start()
kikimr.compute_plane.wait_bootstrap()

client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/yds/test_recovery_mz.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@pytest.fixture
def kikimr():
kikimr_conf = StreamingOverKikimrConfig(
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(8)}
cloud_mode=True, node_count={"/cp": TenantConfig(1), "/compute": TenantConfig(2)}
)
kikimr = StreamingOverKikimr(kikimr_conf)
# control
Expand Down
18 changes: 9 additions & 9 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ def test_read_raw_format_with_row_dispatcher(self, kikimr, client):
)
connections = client.list_connections(fq.Acl.Visibility.PRIVATE).result.connection
assert len(connections) == 1
assert connections[0].content.setting.data_streams.shared_reading is True
assert connections[0].content.setting.data_streams.shared_reading

self.init_topics("test_read_raw_format_without_row_dispatcher", create_output=False)
self.init_topics("test_read_raw_format_with_row_dispatcher", create_output=False)
output_topic = "pq_test_pq_read_write_output"
create_stream(output_topic, partitions_count=1)
create_read_rule(output_topic, self.consumer_name)
Expand Down Expand Up @@ -292,7 +292,7 @@ def test_filter_missing_fields(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_filter")
self.init_topics("test_filter_missing_fields")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
Expand Down Expand Up @@ -503,7 +503,7 @@ def test_stop_start(self, kikimr, client):
assert self.read_stream(len(expected), topic_path=output_topic) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)
stop_yds_query(client, query_id)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
Expand Down Expand Up @@ -551,7 +551,7 @@ def test_stop_start_with_filter(self, kikimr, client):
self.write_stream(data)

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 10
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 10 # long sleep to send status from topic_session to read_actor
)
stop_yds_query(client, query_id)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
Expand Down Expand Up @@ -601,7 +601,7 @@ def test_restart_compute_node(self, kikimr, client):
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
Expand All @@ -618,7 +618,7 @@ def test_restart_compute_node(self, kikimr, client):
expected = ['103', '104']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
)

node_index = 1
Expand Down Expand Up @@ -681,7 +681,7 @@ def test_3_sessions(self, kikimr, client):
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)

kikimr.compute_plane.wait_completed_checkpoints(
query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 1
query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 2
)
stop_yds_query(client, query_id1)

Expand Down Expand Up @@ -724,7 +724,7 @@ def test_many_partitions(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_simple_not_null", partitions_count=4)
self.init_topics("test_many_partitions", partitions_count=4)

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/datastreams_helpers/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ydb.public.api.protos.ydb_status_codes_pb2 import StatusIds


READ_TOOL_TIMEOUT = yatest_common.plain_or_under_sanitizer(20, 300)
READ_TOOL_TIMEOUT = yatest_common.plain_or_under_sanitizer(30, 300)


def write_stream(path, data, partition_key=None):
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/fq_runner/kikimr_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def fill_config(self, control_plane):

fq_config['row_dispatcher'] = {
'enabled': True,
'timeout_before_start_session_sec': 2,
'timeout_before_start_session_sec': 5,
'send_status_period_sec': 2,
'max_session_used_memory': 1000000,
'without_consumer': True}
Expand Down
Loading