Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved installation integration test flakiness #998

Merged
merged 2 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
AlreadyExists,
BadRequest,
Cancelled,
DatabricksError,
DataLoss,
DeadlineExceeded,
InternalError,
Expand Down Expand Up @@ -436,28 +435,20 @@
except OperationFailed as err:
# currently we don't have any good message from API, so we have to work around it.
job_run = self._ws.jobs.get_run(job_run_waiter.run_id)
raise self._infer_nested_error(job_run) from err
raise self._infer_error_from_job_run(job_run) from err

def _infer_nested_error(self, job_run) -> Exception:
errors: list[DatabricksError] = []
def _infer_error_from_job_run(self, job_run) -> Exception:
errors: list[Exception] = []
timeouts: list[DeadlineExceeded] = []
assert job_run.tasks is not None
for run_task in job_run.tasks:
if not run_task.state:
error = self._infer_error_from_task_run(run_task)
if not error:
continue
if run_task.state.result_state == jobs.RunResultState.TIMEDOUT:
msg = f"{run_task.task_key}: The run was stopped after reaching the timeout"
timeouts.append(DeadlineExceeded(msg))
if isinstance(error, DeadlineExceeded):
timeouts.append(error)
continue
if run_task.state.result_state != jobs.RunResultState.FAILED:
continue
assert run_task.run_id is not None
run_output = self._ws.jobs.get_run_output(run_task.run_id)
if logger.isEnabledFor(logging.DEBUG):
if run_output and run_output.error_trace:
sys.stderr.write(run_output.error_trace)
if run_output and run_output.error:
errors.append(self._infer_task_exception(f"{run_task.task_key}: {run_output.error}"))
errors.append(error)
assert job_run.state is not None
assert job_run.state.state_message is not None
if len(errors) == 1:
Expand All @@ -467,8 +458,29 @@
return Unknown(job_run.state.state_message)
return ManyError(all_errors)

def _infer_error_from_task_run(self, run_task: jobs.RunTask) -> Exception | None:
if not run_task.state:
return None

Check warning on line 463 in src/databricks/labs/ucx/install.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/install.py#L463

Added line #L463 was not covered by tests
if run_task.state.result_state == jobs.RunResultState.TIMEDOUT:
msg = f"{run_task.task_key}: The run was stopped after reaching the timeout"
return DeadlineExceeded(msg)
if run_task.state.result_state != jobs.RunResultState.FAILED:
return None

Check warning on line 468 in src/databricks/labs/ucx/install.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/install.py#L468

Added line #L468 was not covered by tests
assert run_task.run_id is not None
run_output = self._ws.jobs.get_run_output(run_task.run_id)
if not run_output:
msg = f'No run output. {run_task.state.state_message}'
return InternalError(msg)

Check warning on line 473 in src/databricks/labs/ucx/install.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/install.py#L472-L473

Added lines #L472 - L473 were not covered by tests
if logger.isEnabledFor(logging.DEBUG):
if run_output.error_trace:
sys.stderr.write(run_output.error_trace)

Check warning on line 476 in src/databricks/labs/ucx/install.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/install.py#L476

Added line #L476 was not covered by tests
if not run_output.error:
msg = f'No error in run output. {run_task.state.state_message}'
return InternalError(msg)

Check warning on line 479 in src/databricks/labs/ucx/install.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/install.py#L478-L479

Added lines #L478 - L479 were not covered by tests
return self._infer_task_exception(f"{run_task.task_key}: {run_output.error}")

@staticmethod
def _infer_task_exception(haystack: str) -> DatabricksError:
def _infer_task_exception(haystack: str) -> Exception:
needles = [
BadRequest,
Unauthenticated,
Expand All @@ -490,8 +502,10 @@
RequestLimitExceeded,
Unknown,
DataLoss,
ValueError,
KeyError,
]
constructors: dict[re.Pattern, type[DatabricksError]] = {
constructors: dict[re.Pattern, type[Exception]] = {
re.compile(r".*\[TABLE_OR_VIEW_NOT_FOUND] (.*)"): NotFound,
re.compile(r".*\[SCHEMA_NOT_FOUND] (.*)"): NotFound,
}
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/workspace_access/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ def inner() -> list[GenericPermissionsInfo]:
result = ws.api_client.do(
"GET", "/api/2.0/feature-store/feature-tables/search", query={"page_token": token, "max_results": 200}
)
assert isinstance(result, dict)
for table in result.get("feature_tables", []):
feature_tables.append(GenericPermissionsInfo(table["id"], "feature-tables"))

Expand Down
16 changes: 8 additions & 8 deletions tests/integration/test_installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from databricks.labs.blueprint.parallel import Threads
from databricks.labs.blueprint.tui import MockPrompts
from databricks.labs.blueprint.wheels import WheelsV2
from databricks.sdk.errors import InvalidParameterValue, NotFound, Unknown
from databricks.sdk.errors import InvalidParameterValue, NotFound
from databricks.sdk.retries import retried
from databricks.sdk.service import compute, sql
from databricks.sdk.service.iam import PermissionLevel
Expand All @@ -36,7 +36,7 @@


@pytest.fixture
def new_installation(ws, sql_backend, env_or_skip, inventory_schema, make_random, make_cluster_policy):
def new_installation(ws, sql_backend, env_or_skip, inventory_schema, make_random):
cleanup = []

def factory(config_transform: Callable[[WorkspaceConfig], WorkspaceConfig] | None = None):
Expand Down Expand Up @@ -98,7 +98,7 @@ def factory(config_transform: Callable[[WorkspaceConfig], WorkspaceConfig] | Non
pending.uninstall()


@retried(on=[NotFound, Unknown, TimeoutError], timeout=timedelta(minutes=5))
@retried(on=[NotFound, TimeoutError], timeout=timedelta(minutes=5))
def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend, new_installation):
install = new_installation()

Expand All @@ -113,7 +113,7 @@ def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend,
assert len(workflow_run_logs) == 1


@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=3))
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=3))
def test_job_cluster_policy(ws, new_installation):
install = new_installation(lambda wc: replace(wc, override_clusters=None))
user_name = ws.current_user.me().user_name
Expand Down Expand Up @@ -154,7 +154,7 @@ def test_new_job_cluster_with_policy_assessment(
assert before[ws_group_a.display_name] == PermissionLevel.CAN_USE


@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=10))
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=10))
def test_running_real_assessment_job(
ws, new_installation, make_ucx_group, make_cluster_policy, make_cluster_policy_permissions
):
Expand All @@ -175,7 +175,7 @@ def test_running_real_assessment_job(
assert before[ws_group_a.display_name] == PermissionLevel.CAN_USE


@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=5))
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5))
def test_running_real_migrate_groups_job(
ws, sql_backend, new_installation, make_ucx_group, make_cluster_policy, make_cluster_policy_permissions
):
Expand Down Expand Up @@ -208,7 +208,7 @@ def test_running_real_migrate_groups_job(
assert found[f"{install.config.renamed_group_prefix}{ws_group_a.display_name}"] == PermissionLevel.CAN_USE


@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=5))
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5))
def test_running_real_validate_groups_permissions_job(
ws, sql_backend, new_installation, make_group, make_query, make_query_permissions
):
Expand Down Expand Up @@ -264,7 +264,7 @@ def test_running_real_validate_groups_permissions_job_fails(
request_object_type="cluster-policies", request_object_id=cluster_policy.policy_id, access_control_list=[]
)

with pytest.raises(Unknown, match=r"Detected \d+ failures: ValueError"):
with pytest.raises(ValueError):
install.run_workflow("validate-groups-permissions")


Expand Down
Loading