Skip to content

Commit

Permalink
Add Changes to support smoke test on Datahub deployed on kubernetes C…
Browse files Browse the repository at this point in the history
…luster (datahub-project#5334)

Co-authored-by: Aseem Bansal <asmbansal2@gmail.com>
  • Loading branch information
2 people authored and maggiehays committed Aug 1, 2022
1 parent c455189 commit 21c4e15
Show file tree
Hide file tree
Showing 20 changed files with 248 additions and 160 deletions.
2 changes: 1 addition & 1 deletion docker/elasticsearch-setup/create-indices.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ function create_datahub_usage_event_aws_elasticsearch() {
fi
if [ $(curl -o /dev/null -s -w "%{http_code}" --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template") -eq 404 ]
then
echo -e "\ncreating datahub_usagAe_event_index_template"
echo -e "\ncreating datahub_usage_event_index_template"
sed -e "s/PREFIX/${PREFIX}/g" /index/usage-event/aws_es_index_template.json | tee -a /tmp/aws_es_index_template.json
curl -XPUT --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/_template/${PREFIX}datahub_usage_event_index_template" -H 'Content-Type: application/json' --data @/tmp/aws_es_index_template.json
curl -XPUT --header "$ELASTICSEARCH_AUTH_HEADER" "$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/${PREFIX}datahub_usage_event-000001" -H 'Content-Type: application/json' --data "{\"aliases\":{\"${PREFIX}datahub_usage_event\":{\"is_write_index\":true}}}"
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ include 'metadata-integration:java:datahub-client'
include 'metadata-integration:java:datahub-protobuf'
include 'metadata-ingestion-modules:airflow-plugin'
include 'ingestion-scheduler'
include 'smoke-test'
1 change: 1 addition & 0 deletions smoke-test/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,4 @@ dmypy.json

# Pyre type checker
.pyre/
junit*
40 changes: 40 additions & 0 deletions smoke-test/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
apply plugin: 'com.github.node-gradle.node'

node {

// If true, it will download node using above parameters.
// If false, it will try to use globally installed node.
if (project.hasProperty('useSystemNode') && project.getProperty('useSystemNode').toBoolean()) {
download = false
} else {
download = true
}

// Version of node to use.
version = '16.8.0'

// Version of Yarn to use.
yarnVersion = '1.22.0'

// Base URL for fetching node distributions (set nodeDistBaseUrl if you have a mirror).
if (project.hasProperty('nodeDistBaseUrl')) {
distBaseUrl = project.getProperty('nodeDistBaseUrl')
} else {
distBaseUrl = 'https://nodejs.org/dist'
}

// Set the work directory for unpacking node
workDir = file("${project.projectDir}/.gradle/nodejs")

// Set the work directory for NPM
yarnWorkDir = file("${project.projectDir}/.gradle/yarn")

// Set the work directory where node_modules should be located
nodeModulesDir = file("${project.projectDir}")

}

task yarnInstall(type: YarnTask) {
println "Root directory: ${project.rootDir}";
args = ['install', '--cwd', "${project.rootDir}/smoke-test/tests/cypress"]
}
1 change: 1 addition & 0 deletions smoke-test/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pytest>=6.2
pytest-dependency>=0.5.1
psutil
tenacity
-e ../metadata-ingestion[datahub-rest,datahub-kafka,mysql]
4 changes: 2 additions & 2 deletions smoke-test/smoke.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ pip install -r requirements.txt
echo "DATAHUB_VERSION = $DATAHUB_VERSION"
DATAHUB_TELEMETRY_ENABLED=false datahub docker quickstart --quickstart-compose-file ../docker/quickstart/docker-compose-without-neo4j.quickstart.yml --dump-logs-on-failure

(cd tests/cypress ; yarn install)
(cd ..; ./gradlew :smoke-test:yarnInstall)

pytest -vv --continue-on-collection-errors --junit-xml=junit.smoke.xml
pytest -rP --durations=20 -vv --continue-on-collection-errors --junit-xml=junit.smoke.xml
135 changes: 54 additions & 81 deletions smoke-test/test_e2e.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import time
import urllib
from contextlib import contextmanager
from typing import Optional
from typing import Any, Optional

import pytest
import requests
from datahub.cli.docker import check_local_docker_containers
import tenacity
from datahub.ingestion.run.pipeline import Pipeline

from tests.utils import (
get_frontend_url,
get_gms_url,
get_kafka_broker_url,
get_kafka_schema_registry,
get_sleep_info,
ingest_file_via_rest,
wait_for_healthcheck_util,
)

bootstrap_sample_data = "../metadata-ingestion/examples/mce_files/bootstrap_mce.json"
Expand All @@ -26,11 +27,12 @@
}
kafka_post_ingestion_wait_sec = 60

sleep_sec, sleep_times = get_sleep_info()


@pytest.fixture(scope="session")
def wait_for_healthchecks():
# Simply assert that everything is healthy, but don't wait.
assert not check_local_docker_containers()
wait_for_healthcheck_util()
yield


Expand All @@ -54,71 +56,52 @@ def frontend_session(wait_for_healthchecks):
yield session


@contextmanager
def with_sleep_times(
sleep_between: Optional[int] = None, sleep_times: Optional[int] = None
):
_sleep_between, _sleep_times = get_sleep_info()
if sleep_times is None:
sleep_times = _sleep_times
while True:
try:
yield
except Exception as e:
if sleep_times > 0:
sleep_time = sleep_between or _sleep_between
sleep_times -= 1
print(
f"Sleeping for {sleep_time}. Will sleep for {sleep_times} more if needed"
)
time.sleep(sleep_time)
else:
raise e
finally:
break


def _ensure_user_present(
urn: str, sleep_between: Optional[int] = None, sleep_times: Optional[int] = None
):
with with_sleep_times(sleep_between, sleep_times):
response = requests.get(
f"{get_gms_url()}/entities/{urllib.parse.quote(urn)}",
headers={
**restli_default_headers,
},
)
response.raise_for_status()
data = response.json()
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_user_present(urn: str):
response = requests.get(
f"{get_gms_url()}/entities/{urllib.parse.quote(urn)}",
headers={
**restli_default_headers,
},
)
response.raise_for_status()
data = response.json()

user_key = "com.linkedin.metadata.snapshot.CorpUserSnapshot"
assert data["value"]
assert data["value"][user_key]
assert data["value"][user_key]["urn"] == urn
user_key = "com.linkedin.metadata.snapshot.CorpUserSnapshot"
assert data["value"]
assert data["value"][user_key]
assert data["value"][user_key]["urn"] == urn
return data


@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_dataset_present(
urn: str, sleep_between: Optional[int] = None, sleep_times: Optional[int] = None
):
with with_sleep_times(sleep_between, sleep_times):
response = requests.get(
f"{get_gms_url()}/entitiesV2?ids=List({urllib.parse.quote(urn)})&aspects=List(datasetProperties)",
headers={
**restli_default_headers,
"X-RestLi-Method": "batch_get",
},
)
response.raise_for_status()
res_data = response.json()
assert res_data["results"]
assert res_data["results"][urn]
assert res_data["results"][urn]["aspects"]["datasetProperties"]
urn: str,
aspects: Optional[str] = "datasetProperties",
) -> Any:
response = requests.get(
f"{get_gms_url()}/entitiesV2?ids=List({urllib.parse.quote(urn)})&aspects=List({aspects})",
headers={
**restli_default_headers,
"X-RestLi-Method": "batch_get",
},
)
response.raise_for_status()
res_data = response.json()
assert res_data["results"]
assert res_data["results"][urn]
assert res_data["results"][urn]["aspects"]["datasetProperties"]
return res_data


@pytest.mark.dependency(depends=["test_healthchecks"])
def test_ingestion_via_rest(wait_for_healthchecks):
ingest_file_via_rest(bootstrap_sample_data)
_ensure_user_present(urn="urn:li:corpuser:datahub", sleep_between=10, sleep_times=6)
_ensure_user_present(urn="urn:li:corpuser:datahub")


@pytest.mark.dependency(depends=["test_healthchecks"])
Expand All @@ -139,6 +122,7 @@ def test_ingestion_via_kafka(wait_for_healthchecks):
"config": {
"connection": {
"bootstrap": get_kafka_broker_url(),
"schema_registry_url": get_kafka_schema_registry(),
}
},
},
Expand Down Expand Up @@ -227,25 +211,12 @@ def test_gms_batch_get_v2():
urn1 = f"urn:li:dataset:({platform},{name_1},{env})"
urn2 = f"urn:li:dataset:({platform},{name_2},{env})"

response = requests.get(
f"{get_gms_url()}/entitiesV2?ids=List({urllib.parse.quote(urn1)},{urllib.parse.quote(urn2)})&aspects=List(datasetProperties,ownership)",
headers={
**restli_default_headers,
"X-RestLi-Method": "batch_get",
},
)
response.raise_for_status()
res_data = response.json()
resp1 = _ensure_dataset_present(urn1, aspects="datasetProperties,ownership")
assert resp1["results"][urn1]["aspects"]["ownership"]

# Verify both urns exist and have correct aspects
assert res_data["results"]
assert res_data["results"][urn1]
assert res_data["results"][urn1]["aspects"]["datasetProperties"]
assert res_data["results"][urn1]["aspects"]["ownership"]
assert res_data["results"][urn2]
assert res_data["results"][urn2]["aspects"]["datasetProperties"]
resp2 = _ensure_dataset_present(urn2, aspects="datasetProperties,ownership")
assert (
"ownership" not in res_data["results"][urn2]["aspects"]
"ownership" not in resp2["results"][urn2]["aspects"]
) # Aspect does not exist.


Expand Down Expand Up @@ -1171,8 +1142,8 @@ def test_update_corp_group_properties(frontend_session):

# Reset the editable properties
json = {
"query": """mutation updateCorpGroupProperties($urn: String!, $input: UpdateCorpGroupPropertiesInput!) {\n
updateCorpGroupProperties(urn: $urn, input: $input) }""",
"query": """mutation updateCorpGroupProperties($urn: String!, $input: CorpGroupUpdateInput!) {\n
updateCorpGroupProperties(urn: $urn, input: $input) { urn } }""",
"variables": {
"urn": group_urn,
"input": {"description": "", "slack": "", "email": ""},
Expand Down Expand Up @@ -1466,7 +1437,9 @@ def test_generate_personal_access_token(frontend_session):
# Test unauthenticated case
json = {
"query": """query getAccessToken($input: GetAccessTokenInput!) {\n
accessToken\n
getAccessToken(input: $input) {\n
accessToken\n
}\n
}""",
"variables": {
"input": {
Expand Down
6 changes: 2 additions & 4 deletions smoke-test/test_rapid.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@

import pytest
import requests
from datahub.cli.docker import check_local_docker_containers

from tests.utils import get_frontend_url, ingest_file_via_rest
from tests.utils import get_frontend_url, ingest_file_via_rest, wait_for_healthcheck_util

bootstrap_small = "test_resources/bootstrap_single.json"
bootstrap_small_2 = "test_resources/bootstrap_single2.json"


@pytest.fixture(scope="session")
def wait_for_healthchecks():
# Simply assert that everything is healthy, but don't wait.
assert not check_local_docker_containers()
wait_for_healthcheck_util()
yield


Expand Down
7 changes: 2 additions & 5 deletions smoke-test/tests/assertions/assertions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pytest
import requests
from datahub.cli.docker import check_local_docker_containers
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
Expand All @@ -24,7 +23,7 @@
PartitionSpecClass,
PartitionTypeClass,
)
from tests.utils import delete_urns_from_file, get_gms_url, ingest_file_via_rest
from tests.utils import delete_urns_from_file, get_gms_url, ingest_file_via_rest, wait_for_healthcheck_util

restli_default_headers = {
"X-RestLi-Protocol-Version": "2.0.0",
Expand Down Expand Up @@ -63,7 +62,6 @@ def create_test_data(test_file):
1643880726874,
1643880726875,
]
msg_ids = []
# The assertion run event attached to the dataset
mcp2 = MetadataChangeProposalWrapper(
entityType="assertion",
Expand Down Expand Up @@ -233,8 +231,7 @@ def generate_test_data(tmp_path_factory):

@pytest.fixture(scope="session")
def wait_for_healthchecks(generate_test_data):
# Simply assert that everything is healthy, but don't wait.
assert not check_local_docker_containers()
wait_for_healthcheck_util()
yield


Expand Down
5 changes: 3 additions & 2 deletions smoke-test/tests/cli/datahub_graph_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import KafkaSchemaClass, SchemaMetadataClass
from tests.utils import delete_urns_from_file, ingest_file_via_rest
from tests.utils import delete_urns_from_file, ingest_file_via_rest, get_gms_url



@pytest.fixture(scope="module", autouse=False)
Expand All @@ -21,7 +22,7 @@ def test_healthchecks(wait_for_healthchecks):

@pytest.mark.dependency(depends=["test_healthchecks"])
def test_get_aspect_v2(frontend_session, ingest_cleanup_data):
graph: DataHubGraph = DataHubGraph(DatahubClientConfig())
graph: DataHubGraph = DataHubGraph(DatahubClientConfig(server=get_gms_url()))
urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-rollback,PROD)"
schema_metadata: SchemaMetadataClass = graph.get_aspect_v2(
urn, aspect="schemaMetadata", aspect_type=SchemaMetadataClass
Expand Down
6 changes: 2 additions & 4 deletions smoke-test/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@

import pytest
import requests
from datahub.cli.docker import check_local_docker_containers

from tests.utils import get_frontend_url
from tests.utils import get_frontend_url, wait_for_healthcheck_util

# Disable telemetry
os.putenv("DATAHUB_TELEMETRY_ENABLED", "false")


@pytest.fixture(scope="session")
def wait_for_healthchecks():
# Simply assert that everything is healthy, but don't wait.
assert not check_local_docker_containers()
wait_for_healthcheck_util()
yield


Expand Down
4 changes: 2 additions & 2 deletions smoke-test/tests/cypress/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ def test_run_cypress(frontend_session, wait_for_healthchecks):
record_key = os.getenv("CYPRESS_RECORD_KEY")
if record_key:
print('Running Cypress tests with recording')
command = f"npx cypress run --record"
command = f"NO_COLOR=1 npx cypress run --record"
else:
print('Running Cypress tests without recording')
command = f"npx cypress run"
command = f"NO_COLOR=1 npx cypress run"
proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd="tests/cypress")
stdout = proc.stdout.read()
stderr = proc.stderr.read()
Expand Down
Loading

0 comments on commit 21c4e15

Please sign in to comment.