Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ci): fix mysql and kafka-connect ingestion test #5352

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import subprocess
import time

import pytest
Expand All @@ -11,6 +12,17 @@
FROZEN_TIME = "2021-10-25 13:00:00"


def is_mysql_up(container_name: str, port: int) -> bool:
"""A cheap way to figure out if mysql is responsive on a container"""

cmd = f"docker logs {container_name} 2>&1 | grep '/var/run/mysqld/mysqld.sock' | grep {port}"
ret = subprocess.run(
cmd,
shell=True,
)
return ret.returncode == 0


@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
Expand All @@ -24,6 +36,13 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
str(test_resources_dir / "docker-compose.override.yml"),
]
with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
wait_for_port(
docker_services,
"test_mysql",
3306,
timeout=120,
checker=lambda: is_mysql_up("test_mysql", 3306),
)
wait_for_port(docker_services, "test_broker", 59092, timeout=120)
wait_for_port(docker_services, "test_connect", 58083, timeout=120)
docker_services.wait_until_responsive(
Expand Down
104 changes: 63 additions & 41 deletions metadata-ingestion/tests/integration/mysql/test_mysql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import subprocess

import pytest
from freezegun import freeze_time

Expand All @@ -6,60 +8,80 @@
from tests.test_helpers.docker_helpers import wait_for_port

FROZEN_TIME = "2020-04-14 07:00:00"
MYSQL_PORT = 3306


@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_mysql_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/mysql"
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/mysql"


def is_mysql_up(container_name: str, port: int) -> bool:
"""A cheap way to figure out if mysql is responsive on a container"""

cmd = f"docker logs {container_name} 2>&1 | grep '/usr/sbin/mysqld: ready for connections.' | grep {port}"
ret = subprocess.run(
cmd,
shell=True,
)
return ret.returncode == 0


@pytest.fixture(scope="module")
def mysql_runner(docker_compose_runner, pytestconfig, test_resources_dir):
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "mysql"
) as docker_services:
wait_for_port(docker_services, "testmysql", 3306)

# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "mysql_to_file.yml").resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)

# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "mysql_mces.json",
golden_path=test_resources_dir / "mysql_mces_golden.json",
wait_for_port(
docker_services,
"testmysql",
MYSQL_PORT,
timeout=120,
checker=lambda: is_mysql_up("testmysql", MYSQL_PORT),
)
yield docker_services


@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_mysql_ingest_with_db_alias(
docker_compose_runner, pytestconfig, tmp_path, mock_time
def test_mysql_ingest(
mysql_runner, pytestconfig, test_resources_dir, tmp_path, mock_time
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/mysql"
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "mysql_to_file.yml").resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)

with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "mysql"
) as docker_services:
wait_for_port(docker_services, "testmysql", 3306)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "mysql_mces.json",
golden_path=test_resources_dir / "mysql_mces_golden.json",
)

# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "mysql_to_file_dbalias.yml").resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)

# Verify the output.
# Assert that all events generated have instance specific urns
import re
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_mysql_ingest_with_db_alias(
mysql_runner, pytestconfig, test_resources_dir, tmp_path, mock_time
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "mysql_to_file_dbalias.yml").resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)

urn_pattern = "^" + re.escape(
"urn:li:dataset:(urn:li:dataPlatform:mysql,foogalaxy."
)
mce_helpers.assert_mcp_entity_urn(
filter="ALL",
entity_type="dataset",
regex_pattern=urn_pattern,
file=tmp_path / "mysql_mces_dbalias.json",
)
# Verify the output.
# Assert that all events generated have instance specific urns
import re

urn_pattern = "^" + re.escape(
"urn:li:dataset:(urn:li:dataPlatform:mysql,foogalaxy."
)
mce_helpers.assert_mcp_entity_urn(
filter="ALL",
entity_type="dataset",
regex_pattern=urn_pattern,
file=tmp_path / "mysql_mces_dbalias.json",
)
11 changes: 5 additions & 6 deletions metadata-ingestion/tests/test_helpers/docker_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import contextlib
import subprocess
from typing import Optional, Union
from typing import Callable, Optional, Union

import pytest
import pytest_docker.plugin
Expand All @@ -27,16 +27,15 @@ def wait_for_port(
hostname: str = None,
timeout: float = 30.0,
pause: float = 0.5,
checker: Optional[Callable[[], bool]] = None,
) -> None:
# import pdb

# breakpoint()
try:
# port = docker_services.port_for(container_name, container_port)
docker_services.wait_until_responsive(
timeout=timeout,
pause=pause,
check=lambda: is_responsive(container_name, container_port, hostname),
check=checker
if checker
else lambda: is_responsive(container_name, container_port, hostname),
)
finally:
# use check=True to raise an error if command gave bad exit code
Expand Down