diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 4dd54305ec8fbf..56ea3839435e73 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -1,3 +1,4 @@ +import subprocess import time import pytest @@ -11,6 +12,17 @@ FROZEN_TIME = "2021-10-25 13:00:00" +def is_mysql_up(container_name: str, port: int) -> bool: + """A cheap way to figure out if mysql is responsive on a container""" + + cmd = f"docker logs {container_name} 2>&1 | grep '/var/run/mysqld/mysqld.sock' | grep {port}" + ret = subprocess.run( + cmd, + shell=True, + ) + return ret.returncode == 0 + + @freeze_time(FROZEN_TIME) @pytest.mark.integration_batch_1 def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time): @@ -24,6 +36,13 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc str(test_resources_dir / "docker-compose.override.yml"), ] with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services: + wait_for_port( + docker_services, + "test_mysql", + 3306, + timeout=120, + checker=lambda: is_mysql_up("test_mysql", 3306), + ) wait_for_port(docker_services, "test_broker", 59092, timeout=120) wait_for_port(docker_services, "test_connect", 58083, timeout=120) docker_services.wait_until_responsive( diff --git a/metadata-ingestion/tests/integration/mysql/test_mysql.py b/metadata-ingestion/tests/integration/mysql/test_mysql.py index 29f12f677e703b..9608152d05f8b6 100644 --- a/metadata-ingestion/tests/integration/mysql/test_mysql.py +++ b/metadata-ingestion/tests/integration/mysql/test_mysql.py @@ -1,3 +1,5 @@ +import subprocess + import pytest from freezegun import freeze_time @@ -6,60 +8,80 @@ from tests.test_helpers.docker_helpers import wait_for_port FROZEN_TIME = "2020-04-14 07:00:00" +MYSQL_PORT = 3306 -@freeze_time(FROZEN_TIME) -@pytest.mark.integration -def test_mysql_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time): - test_resources_dir = pytestconfig.rootpath / "tests/integration/mysql" +@pytest.fixture(scope="module") +def test_resources_dir(pytestconfig): + return pytestconfig.rootpath / "tests/integration/mysql" + + +def is_mysql_up(container_name: str, port: int) -> bool: + """A cheap way to figure out if mysql is responsive on a container""" + + cmd = f"docker logs {container_name} 2>&1 | grep '/usr/sbin/mysqld: ready for connections.' | grep {port}" + ret = subprocess.run( + cmd, + shell=True, + ) + return ret.returncode == 0 + +@pytest.fixture(scope="module") +def mysql_runner(docker_compose_runner, pytestconfig, test_resources_dir): with docker_compose_runner( test_resources_dir / "docker-compose.yml", "mysql" ) as docker_services: - wait_for_port(docker_services, "testmysql", 3306) - - # Run the metadata ingestion pipeline. - config_file = (test_resources_dir / "mysql_to_file.yml").resolve() - run_datahub_cmd( - ["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path - ) - - # Verify the output. - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / "mysql_mces.json", - golden_path=test_resources_dir / "mysql_mces_golden.json", + wait_for_port( + docker_services, + "testmysql", + MYSQL_PORT, + timeout=120, + checker=lambda: is_mysql_up("testmysql", MYSQL_PORT), ) + yield docker_services @freeze_time(FROZEN_TIME) @pytest.mark.integration -def test_mysql_ingest_with_db_alias( - docker_compose_runner, pytestconfig, tmp_path, mock_time +def test_mysql_ingest( + mysql_runner, pytestconfig, test_resources_dir, tmp_path, mock_time ): - test_resources_dir = pytestconfig.rootpath / "tests/integration/mysql" + # Run the metadata ingestion pipeline. + config_file = (test_resources_dir / "mysql_to_file.yml").resolve() + run_datahub_cmd( + ["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path + ) - with docker_compose_runner( - test_resources_dir / "docker-compose.yml", "mysql" - ) as docker_services: - wait_for_port(docker_services, "testmysql", 3306) + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "mysql_mces.json", + golden_path=test_resources_dir / "mysql_mces_golden.json", + ) - # Run the metadata ingestion pipeline. - config_file = (test_resources_dir / "mysql_to_file_dbalias.yml").resolve() - run_datahub_cmd( - ["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path - ) - # Verify the output. - # Assert that all events generated have instance specific urns - import re +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_mysql_ingest_with_db_alias( + mysql_runner, pytestconfig, test_resources_dir, tmp_path, mock_time +): + # Run the metadata ingestion pipeline. + config_file = (test_resources_dir / "mysql_to_file_dbalias.yml").resolve() + run_datahub_cmd( + ["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path + ) - urn_pattern = "^" + re.escape( - "urn:li:dataset:(urn:li:dataPlatform:mysql,foogalaxy." - ) - mce_helpers.assert_mcp_entity_urn( - filter="ALL", - entity_type="dataset", - regex_pattern=urn_pattern, - file=tmp_path / "mysql_mces_dbalias.json", - ) + # Verify the output. + # Assert that all events generated have instance specific urns + import re + + urn_pattern = "^" + re.escape( + "urn:li:dataset:(urn:li:dataPlatform:mysql,foogalaxy." + ) + mce_helpers.assert_mcp_entity_urn( + filter="ALL", + entity_type="dataset", + regex_pattern=urn_pattern, + file=tmp_path / "mysql_mces_dbalias.json", + ) diff --git a/metadata-ingestion/tests/test_helpers/docker_helpers.py b/metadata-ingestion/tests/test_helpers/docker_helpers.py index 6ac46c54ab8a2d..329664ad14b128 100644 --- a/metadata-ingestion/tests/test_helpers/docker_helpers.py +++ b/metadata-ingestion/tests/test_helpers/docker_helpers.py @@ -1,6 +1,6 @@ import contextlib import subprocess -from typing import Optional, Union +from typing import Callable, Optional, Union import pytest import pytest_docker.plugin @@ -27,16 +27,15 @@ def wait_for_port( hostname: str = None, timeout: float = 30.0, pause: float = 0.5, + checker: Optional[Callable[[], bool]] = None, ) -> None: - # import pdb - - # breakpoint() try: - # port = docker_services.port_for(container_name, container_port) docker_services.wait_until_responsive( timeout=timeout, pause=pause, - check=lambda: is_responsive(container_name, container_port, hostname), + check=checker + if checker + else lambda: is_responsive(container_name, container_port, hostname), ) finally: # use check=True to raise an error if command gave bad exit code