diff --git a/smoke-test/tests/managed-ingestion/managed_ingestion_test.py b/smoke-test/tests/managed-ingestion/managed_ingestion_test.py index 99a5b16ccfe0f5..4a3d30d2f4d0b0 100644 --- a/smoke-test/tests/managed-ingestion/managed_ingestion_test.py +++ b/smoke-test/tests/managed-ingestion/managed_ingestion_test.py @@ -1,9 +1,11 @@ +import json import time import pytest import tenacity -from tests.utils import get_frontend_url, wait_for_healthcheck_util, get_sleep_info -import json + +from tests.utils import (get_frontend_url, get_sleep_info, + wait_for_healthcheck_util) sleep_sec, sleep_times = get_sleep_info() @@ -35,7 +37,9 @@ def _get_ingestionSources(frontend_session): "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -75,7 +79,9 @@ def _ensure_secret_increased(frontend_session, before_count): "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -104,7 +110,9 @@ def _ensure_secret_not_present(frontend_session): "variables": {"input": {"secrets": ["SMOKE_TEST"]}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -121,7 +129,9 @@ def _ensure_secret_not_present(frontend_session): @tenacity.retry( stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec) ) -def _ensure_ingestion_source_present(frontend_session, ingestion_source_urn, num_execs=None): +def _ensure_ingestion_source_present( + frontend_session, ingestion_source_urn, num_execs=None +): json_q = { "query": """query ingestionSource($urn: String!) {\n ingestionSource(urn: $urn) {\n @@ -138,7 +148,9 @@ def _ensure_ingestion_source_present(frontend_session, ingestion_source_urn, num "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -179,7 +191,9 @@ def _ensure_execution_request_present(frontend_session, execution_request_urn): "variables": {"urn": execution_request_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -209,7 +223,9 @@ def test_create_list_get_remove_secret(frontend_session): "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -228,7 +244,9 @@ def test_create_list_get_remove_secret(frontend_session): "variables": {"input": {"name": "SMOKE_TEST", "value": "mytestvalue"}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -253,7 +271,9 @@ def test_create_list_get_remove_secret(frontend_session): "variables": {"input": {"secrets": ["SMOKE_TEST"]}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -275,7 +295,9 @@ def test_create_list_get_remove_secret(frontend_session): "variables": {"urn": secret_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -308,7 +330,7 @@ def test_create_list_get_remove_ingestion_source(frontend_session): "description": "My ingestion source description", "schedule": {"interval": "*/5 * * * *", "timezone": "UTC"}, "config": { - "recipe": "{\"source\":{\"type\":\"mysql\",\"config\":{\"include_tables\":true,\"database\":null,\"password\":\"${MYSQL_PASSWORD}\",\"profiling\":{\"enabled\":false},\"host_port\":null,\"include_views\":true,\"username\":\"${MYSQL_USERNAME}\"}},\"pipeline_name\":\"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927\"}", + "recipe": '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}', "version": "0.8.18", "executorId": "mytestexecutor", }, @@ -316,7 +338,9 @@ def test_create_list_get_remove_ingestion_source(frontend_session): }, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -351,7 +375,9 @@ def test_create_list_get_remove_ingestion_source(frontend_session): "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -366,7 +392,10 @@ def test_create_list_get_remove_ingestion_source(frontend_session): assert ingestion_source["name"] == "My Test Ingestion Source" assert ingestion_source["schedule"]["interval"] == "*/5 * * * *" assert ingestion_source["schedule"]["timezone"] == "UTC" - assert ingestion_source["config"]["recipe"] == "{\"source\":{\"type\":\"mysql\",\"config\":{\"include_tables\":true,\"database\":null,\"password\":\"${MYSQL_PASSWORD}\",\"profiling\":{\"enabled\":false},\"host_port\":null,\"include_views\":true,\"username\":\"${MYSQL_USERNAME}\"}},\"pipeline_name\":\"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927\"}" + assert ( + ingestion_source["config"]["recipe"] + == '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}' + ) assert ingestion_source["config"]["executorId"] == "mytestexecutor" assert ingestion_source["config"]["version"] == "0.8.18" @@ -378,7 +407,9 @@ def test_create_list_get_remove_ingestion_source(frontend_session): "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -411,7 +442,7 @@ def test_create_list_get_ingestion_execution_request(frontend_session): "description": "My ingestion source description", "schedule": {"interval": "*/5 * * * *", "timezone": "UTC"}, "config": { - "recipe": "{\"source\":{\"type\":\"mysql\",\"config\":{\"include_tables\":true,\"database\":null,\"password\":\"${MYSQL_PASSWORD}\",\"profiling\":{\"enabled\":false},\"host_port\":null,\"include_views\":true,\"username\":\"${MYSQL_USERNAME}\"}},\"pipeline_name\":\"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927\"}", + "recipe": '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}', "version": "0.8.18", "executorId": "mytestexecutor", }, @@ -419,7 +450,9 @@ def test_create_list_get_ingestion_execution_request(frontend_session): }, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -438,28 +471,36 @@ def test_create_list_get_ingestion_execution_request(frontend_session): "variables": {"input": {"ingestionSourceUrn": ingestion_source_urn}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() assert res_data assert res_data["data"] - assert res_data["data"]["createIngestionExecutionRequest"] is not None, f"res_data was {res_data}" + assert ( + res_data["data"]["createIngestionExecutionRequest"] is not None + ), f"res_data was {res_data}" assert "errors" not in res_data execution_request_urn = res_data["data"]["createIngestionExecutionRequest"] - res_data = _ensure_ingestion_source_present(frontend_session, ingestion_source_urn, 1) + res_data = _ensure_ingestion_source_present( + frontend_session, ingestion_source_urn, 1 + ) ingestion_source = res_data["data"]["ingestionSource"] - + assert ( ingestion_source["executions"]["executionRequests"][0]["urn"] == execution_request_urn ) # Get the ingestion request back via direct lookup - res_data = _ensure_execution_request_present(frontend_session, execution_request_urn) + res_data = _ensure_execution_request_present( + frontend_session, execution_request_urn + ) execution_request = res_data["data"]["executionRequest"] assert execution_request["urn"] == execution_request_urn @@ -468,7 +509,12 @@ def test_create_list_get_ingestion_execution_request(frontend_session): assert execution_request["input"]["task"] == "RUN_INGEST" assert len(execution_request["input"]["arguments"]) == 2 assert execution_request["input"]["arguments"][0]["key"] == "recipe" - assert json.loads(execution_request["input"]["arguments"][0]["value"])["source"] == json.loads("{\"source\":{\"type\":\"mysql\",\"config\":{\"include_tables\":true,\"database\":null,\"password\":\"${MYSQL_PASSWORD}\",\"profiling\":{\"enabled\":false},\"host_port\":null,\"include_views\":true,\"username\":\"${MYSQL_USERNAME}\"}},\"pipeline_name\":\"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927\"}")["source"] + assert ( + json.loads(execution_request["input"]["arguments"][0]["value"])["source"] + == json.loads( + '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}' + )["source"] + ) assert execution_request["input"]["arguments"][1]["key"] == "version" assert execution_request["input"]["arguments"][1]["value"] == "0.8.18" @@ -483,7 +529,9 @@ def test_create_list_get_ingestion_execution_request(frontend_session): "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json_q) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json()