diff --git a/.github/workflows/docker-ingestion-smoke.yml b/.github/workflows/docker-ingestion-smoke.yml index 66762b78ccd902..a3116df793b70a 100644 --- a/.github/workflows/docker-ingestion-smoke.yml +++ b/.github/workflows/docker-ingestion-smoke.yml @@ -35,7 +35,7 @@ jobs: - name: Build and Push image uses: docker/build-push-action@v2 with: - context: ./docker/datahub-ingestion + context: . file: ./docker/datahub-ingestion/smoke.Dockerfile platforms: linux/amd64,linux/arm64 tags: acryldata/datahub-ingestion-base:smoke diff --git a/docker/datahub-ingestion/smoke.Dockerfile b/docker/datahub-ingestion/smoke.Dockerfile index 3bfdc9ccd0d770..e406720083b59f 100644 --- a/docker/datahub-ingestion/smoke.Dockerfile +++ b/docker/datahub-ingestion/smoke.Dockerfile @@ -15,4 +15,8 @@ RUN apt-get update && apt-get install -y \ xauth \ xvfb -RUN DEBIAN_FRONTEND=noninteractive apt-get install -y openjdk-11-jdk \ No newline at end of file +RUN DEBIAN_FRONTEND=noninteractive apt-get install -y openjdk-11-jdk + +COPY . /datahub-src +RUN cd /datahub-src && \ + ./gradlew :metadata-ingestion:installDev diff --git a/smoke-test/tests/managed-ingestion/managed_ingestion_test.py b/smoke-test/tests/managed-ingestion/managed_ingestion_test.py index 837e27f72cd630..4a3d30d2f4d0b0 100644 --- a/smoke-test/tests/managed-ingestion/managed_ingestion_test.py +++ b/smoke-test/tests/managed-ingestion/managed_ingestion_test.py @@ -1,7 +1,13 @@ +import json import time import pytest -from tests.utils import get_frontend_url, wait_for_healthcheck_util +import tenacity + +from tests.utils import (get_frontend_url, get_sleep_info, + wait_for_healthcheck_util) + +sleep_sec, sleep_times = get_sleep_info() @pytest.fixture(scope="session") @@ -16,60 +22,49 @@ def test_healthchecks(wait_for_healthchecks): pass -@pytest.mark.dependency(depends=["test_healthchecks"]) -def test_create_list_get_remove_secret(frontend_session): - - # Get count of existing secrets - json = { - "query": """query listSecrets($input: ListSecretsInput!) {\n - listSecrets(input: $input) {\n +def _get_ingestionSources(frontend_session): + json_q = { + "query": """query listIngestionSources($input: ListIngestionSourcesInput!) {\n + listIngestionSources(input: $input) {\n start\n count\n total\n - secrets {\n + ingestionSources {\n urn\n - name\n }\n }\n }""", "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() assert res_data assert res_data["data"] - assert res_data["data"]["listSecrets"]["total"] is not None + assert res_data["data"]["listIngestionSources"]["total"] is not None assert "errors" not in res_data + return res_data - before_count = res_data["data"]["listSecrets"]["total"] - # Create new secret - json = { - "query": """mutation createSecret($input: CreateSecretInput!) {\n - createSecret(input: $input) - }""", - "variables": {"input": {"name": "SMOKE_TEST", "value": "mytestvalue"}}, - } - - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) - response.raise_for_status() - res_data = response.json() - - assert res_data - assert res_data["data"] - assert res_data["data"]["createSecret"] is not None - assert "errors" not in res_data - - secret_urn = res_data["data"]["createSecret"] +@tenacity.retry( + stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec) +) +def _ensure_ingestion_source_count(frontend_session, expected_count): + res_data = _get_ingestionSources(frontend_session) + after_count = res_data["data"]["listIngestionSources"]["total"] + assert after_count == expected_count + return after_count - # Sleep for eventual consistency (not ideal) - time.sleep(2) - # Get new count of secrets - json = { +@tenacity.retry( + stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec) +) +def _ensure_secret_increased(frontend_session, before_count): + json_q = { "query": """query listSecrets($input: ListSecretsInput!) {\n listSecrets(input: $input) {\n start\n @@ -84,7 +79,9 @@ def test_create_list_get_remove_secret(frontend_session): "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -97,8 +94,13 @@ def test_create_list_get_remove_secret(frontend_session): after_count = res_data["data"]["listSecrets"]["total"] assert after_count == before_count + 1 + +@tenacity.retry( + stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec) +) +def _ensure_secret_not_present(frontend_session): # Get the secret value back - json = { + json_q = { "query": """query getSecretValues($input: GetSecretValuesInput!) {\n getSecretValues(input: $input) {\n name\n @@ -108,96 +110,216 @@ def test_create_list_get_remove_secret(frontend_session): "variables": {"input": {"secrets": ["SMOKE_TEST"]}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() - print(res_data) assert res_data assert res_data["data"] assert res_data["data"]["getSecretValues"] is not None assert "errors" not in res_data secret_values = res_data["data"]["getSecretValues"] - secret_value = [x for x in secret_values if x["name"] == "SMOKE_TEST"][0] - assert secret_value["value"] == "mytestvalue" + secret_value_arr = [x for x in secret_values if x["name"] == "SMOKE_TEST"] + assert len(secret_value_arr) == 0 - # Now cleanup and remove the secret - json = { - "query": """mutation deleteSecret($urn: String!) {\n - deleteSecret(urn: $urn) + +@tenacity.retry( + stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec) +) +def _ensure_ingestion_source_present( + frontend_session, ingestion_source_urn, num_execs=None +): + json_q = { + "query": """query ingestionSource($urn: String!) {\n + ingestionSource(urn: $urn) {\n + executions(start: 0, count: 1) {\n + start\n + count\n + total\n + executionRequests {\n + urn\n + }\n + }\n + }\n }""", - "variables": {"urn": secret_urn}, + "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() assert res_data assert res_data["data"] - assert res_data["data"]["deleteSecret"] is not None + assert res_data["data"]["ingestionSource"] is not None assert "errors" not in res_data - # Re-fetch the secret values and see that they are not there. - time.sleep(2) + if num_execs is not None: + ingestion_source = res_data["data"]["ingestionSource"] + assert ingestion_source["executions"]["total"] == num_execs - # Get the secret value back - json = { - "query": """query getSecretValues($input: GetSecretValuesInput!) {\n - getSecretValues(input: $input) {\n - name\n - value\n + return res_data + + +@tenacity.retry( + stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec) +) +def _ensure_execution_request_present(frontend_session, execution_request_urn): + json_q = { + "query": """query executionRequest($urn: String!) {\n + executionRequest(urn: $urn) {\n + urn\n + input {\n + task\n + arguments {\n + key\n + value\n + }\n + }\n + result {\n + status\n + startTimeMs\n + durationMs\n + }\n }\n }""", - "variables": {"input": {"secrets": ["SMOKE_TEST"]}}, + "variables": {"urn": execution_request_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() assert res_data assert res_data["data"] - assert res_data["data"]["getSecretValues"] is not None + assert res_data["data"]["executionRequest"] is not None assert "errors" not in res_data - - secret_values = res_data["data"]["getSecretValues"] - secret_value_arr = [x for x in secret_values if x["name"] == "SMOKE_TEST"] - assert len(secret_value_arr) == 0 + return res_data @pytest.mark.dependency(depends=["test_healthchecks"]) -def test_create_list_get_remove_ingestion_source(frontend_session): +def test_create_list_get_remove_secret(frontend_session): - # Get count of existing ingestion sources - json = { - "query": """query listIngestionSources($input: ListIngestionSourcesInput!) {\n - listIngestionSources(input: $input) {\n + # Get count of existing secrets + json_q = { + "query": """query listSecrets($input: ListSecretsInput!) {\n + listSecrets(input: $input) {\n start\n count\n total\n - ingestionSources {\n + secrets {\n urn\n + name\n }\n }\n }""", "variables": {"input": {"start": "0", "count": "20"}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() assert res_data assert res_data["data"] - assert res_data["data"]["listIngestionSources"]["total"] is not None + assert res_data["data"]["listSecrets"]["total"] is not None + assert "errors" not in res_data + + before_count = res_data["data"]["listSecrets"]["total"] + + # Create new secret + json_q = { + "query": """mutation createSecret($input: CreateSecretInput!) {\n + createSecret(input: $input) + }""", + "variables": {"input": {"name": "SMOKE_TEST", "value": "mytestvalue"}}, + } + + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) + response.raise_for_status() + res_data = response.json() + + assert res_data + assert res_data["data"] + assert res_data["data"]["createSecret"] is not None + assert "errors" not in res_data + + secret_urn = res_data["data"]["createSecret"] + + # Get new count of secrets + _ensure_secret_increased(frontend_session, before_count) + + # Get the secret value back + json_q = { + "query": """query getSecretValues($input: GetSecretValuesInput!) {\n + getSecretValues(input: $input) {\n + name\n + value\n + }\n + }""", + "variables": {"input": {"secrets": ["SMOKE_TEST"]}}, + } + + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) + response.raise_for_status() + res_data = response.json() + + print(res_data) + assert res_data + assert res_data["data"] + assert res_data["data"]["getSecretValues"] is not None assert "errors" not in res_data + secret_values = res_data["data"]["getSecretValues"] + secret_value = [x for x in secret_values if x["name"] == "SMOKE_TEST"][0] + assert secret_value["value"] == "mytestvalue" + + # Now cleanup and remove the secret + json_q = { + "query": """mutation deleteSecret($urn: String!) {\n + deleteSecret(urn: $urn) + }""", + "variables": {"urn": secret_urn}, + } + + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) + response.raise_for_status() + res_data = response.json() + + assert res_data + assert res_data["data"] + assert res_data["data"]["deleteSecret"] is not None + assert "errors" not in res_data + + # Re-fetch the secret values and see that they are not there. + _ensure_secret_not_present(frontend_session) + + +@pytest.mark.dependency(depends=["test_healthchecks"]) +def test_create_list_get_remove_ingestion_source(frontend_session): + + # Get count of existing ingestion sources + res_data = _get_ingestionSources(frontend_session) + before_count = res_data["data"]["listIngestionSources"]["total"] # Create new ingestion source - json = { + json_q = { "query": """mutation createIngestionSource($input: UpdateIngestionSourceInput!) {\n createIngestionSource(input: $input) }""", @@ -206,9 +328,9 @@ def test_create_list_get_remove_ingestion_source(frontend_session): "name": "My Test Ingestion Source", "type": "mysql", "description": "My ingestion source description", - "schedule": {"interval": "* * * * *", "timezone": "UTC"}, + "schedule": {"interval": "*/5 * * * *", "timezone": "UTC"}, "config": { - "recipe": "MY_TEST_RECIPE", + "recipe": '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}', "version": "0.8.18", "executorId": "mytestexecutor", }, @@ -216,7 +338,9 @@ def test_create_list_get_remove_ingestion_source(frontend_session): }, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -227,39 +351,11 @@ def test_create_list_get_remove_ingestion_source(frontend_session): ingestion_source_urn = res_data["data"]["createIngestionSource"] - # Sleep for eventual consistency (not ideal) - time.sleep(2) - - # Get new count of ingestion sources - json = { - "query": """query listIngestionSources($input: ListIngestionSourcesInput!) {\n - listIngestionSources(input: $input) {\n - start\n - count\n - total\n - ingestionSources {\n - urn\n - }\n - }\n - }""", - "variables": {"input": {"start": "0", "count": "20"}}, - } - - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) - response.raise_for_status() - res_data = response.json() - - assert res_data - assert res_data["data"] - assert res_data["data"]["listIngestionSources"]["total"] is not None - assert "errors" not in res_data - # Assert that there are more ingestion sources now. - after_count = res_data["data"]["listIngestionSources"]["total"] - assert after_count == before_count + 1 + after_count = _ensure_ingestion_source_count(frontend_session, before_count + 1) # Get the ingestion source back - json = { + json_q = { "query": """query ingestionSource($urn: String!) {\n ingestionSource(urn: $urn) {\n urn\n @@ -279,7 +375,9 @@ def test_create_list_get_remove_ingestion_source(frontend_session): "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -292,21 +390,26 @@ def test_create_list_get_remove_ingestion_source(frontend_session): assert ingestion_source["urn"] == ingestion_source_urn assert ingestion_source["type"] == "mysql" assert ingestion_source["name"] == "My Test Ingestion Source" - assert ingestion_source["schedule"]["interval"] == "* * * * *" + assert ingestion_source["schedule"]["interval"] == "*/5 * * * *" assert ingestion_source["schedule"]["timezone"] == "UTC" - assert ingestion_source["config"]["recipe"] == "MY_TEST_RECIPE" + assert ( + ingestion_source["config"]["recipe"] + == '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}' + ) assert ingestion_source["config"]["executorId"] == "mytestexecutor" assert ingestion_source["config"]["version"] == "0.8.18" # Now cleanup and remove the ingestion source - json = { + json_q = { "query": """mutation deleteIngestionSource($urn: String!) {\n deleteIngestionSource(urn: $urn) }""", "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -316,35 +419,8 @@ def test_create_list_get_remove_ingestion_source(frontend_session): assert res_data["data"]["deleteIngestionSource"] is not None assert "errors" not in res_data - # Re-fetch the ingestion sources and see that they are not there. - time.sleep(2) - # Ensure the ingestion source has been removed. - json = { - "query": """query listIngestionSources($input: ListIngestionSourcesInput!) {\n - listIngestionSources(input: $input) {\n - start\n - count\n - total\n - ingestionSources {\n - urn\n - }\n - }\n - }""", - "variables": {"input": {"start": "0", "count": "20"}}, - } - - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) - response.raise_for_status() - res_data = response.json() - - assert res_data - assert res_data["data"] - assert res_data["data"]["listIngestionSources"]["total"] is not None - assert "errors" not in res_data - - final_count = res_data["data"]["listIngestionSources"]["total"] - assert final_count == after_count - 1 + _ensure_ingestion_source_count(frontend_session, after_count - 1) @pytest.mark.dependency( @@ -355,7 +431,7 @@ def test_create_list_get_remove_ingestion_source(frontend_session): ) def test_create_list_get_ingestion_execution_request(frontend_session): # Create new ingestion source - json = { + json_q = { "query": """mutation createIngestionSource($input: UpdateIngestionSourceInput!) {\n createIngestionSource(input: $input) }""", @@ -364,9 +440,9 @@ def test_create_list_get_ingestion_execution_request(frontend_session): "name": "My Test Ingestion Source", "type": "mysql", "description": "My ingestion source description", - "schedule": {"interval": "* * * * *", "timezone": "UTC"}, + "schedule": {"interval": "*/5 * * * *", "timezone": "UTC"}, "config": { - "recipe": "MY_TEST_RECIPE", + "recipe": '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}', "version": "0.8.18", "executorId": "mytestexecutor", }, @@ -374,7 +450,9 @@ def test_create_list_get_ingestion_execution_request(frontend_session): }, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() @@ -386,90 +464,43 @@ def test_create_list_get_ingestion_execution_request(frontend_session): ingestion_source_urn = res_data["data"]["createIngestionSource"] # Create a request to execute the ingestion source - json = { + json_q = { "query": """mutation createIngestionExecutionRequest($input: CreateIngestionExecutionRequestInput!) {\n createIngestionExecutionRequest(input: $input) }""", "variables": {"input": {"ingestionSourceUrn": ingestion_source_urn}}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() assert res_data assert res_data["data"] - assert res_data["data"]["createIngestionExecutionRequest"] is not None + assert ( + res_data["data"]["createIngestionExecutionRequest"] is not None + ), f"res_data was {res_data}" assert "errors" not in res_data execution_request_urn = res_data["data"]["createIngestionExecutionRequest"] - # Wait for eventual consistency. - time.sleep(2) - - # Get the ingestion source executions - json = { - "query": """query ingestionSource($urn: String!) {\n - ingestionSource(urn: $urn) {\n - executions(start: 0, count: 1) {\n - start\n - count\n - total\n - executionRequests {\n - urn\n - }\n - }\n - }\n - }""", - "variables": {"urn": ingestion_source_urn}, - } - - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) - response.raise_for_status() - res_data = response.json() - - assert res_data - assert res_data["data"] - assert res_data["data"]["ingestionSource"] is not None - assert "errors" not in res_data + res_data = _ensure_ingestion_source_present( + frontend_session, ingestion_source_urn, 1 + ) ingestion_source = res_data["data"]["ingestionSource"] - assert ingestion_source["executions"]["total"] == 1 + assert ( ingestion_source["executions"]["executionRequests"][0]["urn"] == execution_request_urn ) # Get the ingestion request back via direct lookup - json = { - "query": """query executionRequest($urn: String!) {\n - executionRequest(urn: $urn) {\n - urn\n - input {\n - task\n - arguments {\n - key\n - value\n - }\n - }\n - result {\n - status\n - startTimeMs\n - durationMs\n - }\n - }\n - }""", - "variables": {"urn": execution_request_urn}, - } - - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) - response.raise_for_status() - res_data = response.json() - - assert res_data - assert res_data["data"] - assert res_data["data"]["executionRequest"] is not None - assert "errors" not in res_data + res_data = _ensure_execution_request_present( + frontend_session, execution_request_urn + ) execution_request = res_data["data"]["executionRequest"] assert execution_request["urn"] == execution_request_urn @@ -478,7 +509,12 @@ def test_create_list_get_ingestion_execution_request(frontend_session): assert execution_request["input"]["task"] == "RUN_INGEST" assert len(execution_request["input"]["arguments"]) == 2 assert execution_request["input"]["arguments"][0]["key"] == "recipe" - assert execution_request["input"]["arguments"][0]["value"] == "MY_TEST_RECIPE" + assert ( + json.loads(execution_request["input"]["arguments"][0]["value"])["source"] + == json.loads( + '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}' + )["source"] + ) assert execution_request["input"]["arguments"][1]["key"] == "version" assert execution_request["input"]["arguments"][1]["value"] == "0.8.18" @@ -486,14 +522,16 @@ def test_create_list_get_ingestion_execution_request(frontend_session): assert execution_request["result"] is None # Now cleanup and remove the ingestion source - json = { + json_q = { "query": """mutation deleteIngestionSource($urn: String!) {\n deleteIngestionSource(urn: $urn) }""", "variables": {"urn": ingestion_source_urn}, } - response = frontend_session.post(f"{get_frontend_url()}/api/v2/graphql", json=json) + response = frontend_session.post( + f"{get_frontend_url()}/api/v2/graphql", json=json_q + ) response.raise_for_status() res_data = response.json() diff --git a/smoke-test/tests/utils.py b/smoke-test/tests/utils.py index 71f5e325e722d6..b3a33b3dd85cff 100644 --- a/smoke-test/tests/utils.py +++ b/smoke-test/tests/utils.py @@ -40,8 +40,8 @@ def get_mysql_password(): def get_sleep_info() -> Tuple[int, int]: return ( - int(os.getenv("DATAHUB_TEST_SLEEP_BETWEEN", 60)), - int(os.getenv("DATAHUB_TEST_SLEEP_TIMES", 5)), + int(os.getenv("DATAHUB_TEST_SLEEP_BETWEEN", 20)), + int(os.getenv("DATAHUB_TEST_SLEEP_TIMES", 15)), )