Skip to content

Commit

Permalink
Checking in changes for feature
Browse files Browse the repository at this point in the history
  • Loading branch information
zpappa committed Oct 16, 2023
1 parent bc4c1af commit ea103b4
Show file tree
Hide file tree
Showing 2 changed files with 520 additions and 5 deletions.
201 changes: 198 additions & 3 deletions src/databricks/labs/ucx/assessment/crawlers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import datetime
import base64
import json
import logging
import re
from dataclasses import dataclass
from hashlib import sha256

from databricks.sdk import WorkspaceClient
from databricks.sdk.core import DatabricksError
from databricks.sdk.service.compute import ClusterSource
from databricks.sdk.service.jobs import BaseJob
from databricks.sdk.service import compute
from databricks.sdk.service.compute import ClusterDetails, ClusterSource
from databricks.sdk.service.jobs import BaseJob, BaseRun, JobCluster, RunTask, RunType

from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend

Expand All @@ -26,9 +29,11 @@
"fs.azure.account.oauth2.client.secret",
"fs.azure.account.oauth2.client.endpoint",
]

_AZURE_SP_CONF_FAILURE_MSG = "Uses azure service principal credentials config in "
LOWEST_COMPATIBLE_DBR = "11.3.x"
_SECRET_PATTERN = r"{{(secrets.*?)}}"
_STORAGE_ACCOUNT_EXTRACT_PATTERN = r"(?:id|endpoint)(.*?)dfs"
_AZURE_SP_CONF_FAILURE_MSG = "Uses azure service principal credentials config in"
_SECRET_LIST_LENGTH = 3
_CLIENT_ENDPOINT_LENGTH = 6
_INIT_SCRIPT_DBFS_PATH = 2
Expand Down Expand Up @@ -61,6 +66,14 @@ class PipelineInfo:
failures: str


@dataclass
class ExternallyOrchestratedJobRunWithFailingConfiguration:
run_id: int
hashed_id: str
spark_version: str
num_tasks_with_failing_configuration: int


@dataclass
class AzureServicePrincipalInfo:
# fs.azure.account.oauth2.client.id
Expand Down Expand Up @@ -120,6 +133,29 @@ def _azure_sp_conf_present_check(config: dict) -> bool:
return False


def is_custom_image(version_string: str):
"""
Is this a custom version?
"""
return "custom" in version_string


def spark_version_greater_than(left_version: str | None, right_version: str | None) -> bool:
"""
Is left version greater than right version
"""
if left_version is None or is_custom_image(left_version):
return False
if right_version is None or is_custom_image(right_version):
return True
pattern = r"^(?P<major>\d+)?\.(?P<minor>\d+)?\.(?P<patch>[\dx]+)?.*"
lvg = re.match(pattern, left_version)
rvg = re.match(pattern, right_version)
left = (int(lvg.group("major")), int(lvg.group("minor")))
right = (int(rvg.group("major")), int(rvg.group("minor")))
return left >= right


def spark_version_compatibility(spark_version: str) -> str:
first_comp_custom_rt = 3
first_comp_custom_x = 2
Expand All @@ -139,6 +175,26 @@ def spark_version_compatibility(spark_version: str) -> str:
return "supported"


def get_job_cluster_from_task(
task: RunTask, job_run: BaseRun, all_clusters: dict[str, ClusterDetails]
) -> compute.ClusterSpec | ClusterDetails | None:
"""
Determine the cluster associated with the task
1) Look for new_cluster on the task
2) Look for existing cluster on the task
"""
if task.new_cluster is not None:
return task.new_cluster
if task.existing_cluster_id is not None:
# from api docs If existing_cluster_id, the ID of an existing cluster that is used for all runs of this job.
job_clusters: dict[str, JobCluster] = {x.job_cluster_key: x for x in job_run.job_clusters}
if job_clusters.get(task.existing_cluster_id, None) is not None:
return job_clusters.get(task.existing_cluster_id).new_cluster
if task.cluster_instance is not None:
return all_clusters.get(task.cluster_instance.cluster_id)
return None


class GlobalInitScriptCrawler(CrawlerBase):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "global_init_scripts", GlobalInitScriptInfo)
Expand Down Expand Up @@ -559,3 +615,142 @@ def snapshot(self) -> list[JobInfo]:
def _try_fetch(self) -> list[JobInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
yield JobInfo(*row)


class ExternallyOrchestratedJobRunsWithFailingConfigCrawler(CrawlerBase):
"""
This class will look for job runs that are sent from external orchestrators that have
a failing configuration with UC
- There will be no persisted job id, ie job id will not be in list jobs
- The data security mode is None AND the DBR>=11.3
Return a list of records, one per failing job, with a task count of affected tasks
and the lowest DBR version across all tasks
"""

def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(
sbe,
"hive_metastore",
schema,
"ext_orc_job_runs_with_failing_config",
ExternallyOrchestratedJobRunWithFailingConfiguration,
)
self._ws = ws

def _crawl(self) -> list[ExternallyOrchestratedJobRunWithFailingConfiguration]:
no_of_days_back = datetime.timedelta(days=30) # todo make configurable in yaml?
start_time_from = datetime.datetime.now() - no_of_days_back
# todo figure out if we need to specify a default timezone
all_job_runs = list(
self._ws.jobs.list_runs(
expand_tasks=True,
start_time_from=start_time_from,
start_time_to=datetime.datetime.now(),
run_type=RunType.SUBMIT_RUN,
)
)
all_jobs = list(self._ws.jobs.list())
all_clusters: dict[str, ClusterDetails] = {c.cluster_id: c for c in self._ws.clusters.list()}
return self._assess_job_runs(all_clusters, all_job_runs, all_jobs)

def _retrieve_hash_values_from_task(self, task: RunTask) -> list[str]:
"""
Retrieve all hashable attributes and append to a list with None removed
- specifically ignore parameters as these change.
"""
hash_values = []
if task.notebook_task is not None:
hash_values.append(task.notebook_task.notebook_path)
if task.spark_python_task is not None:
hash_values.append(task.spark_python_task.python_file)
if task.spark_submit_task is not None:
hash_values.append(task.spark_submit_task.parameters)
if task.spark_jar_task is not None:
hash_values.append(task.spark_jar_task.jar_uri)
hash_values.append(task.spark_jar_task.main_class_name)
if task.pipeline_task is not None:
hash_values.append(task.pipeline_task.pipeline_id)
if task.python_wheel_task is not None:
hash_values.append(task.python_wheel_task.package_name)
hash_values.append(task.python_wheel_task.entry_point)
if task.sql_task is not None:
hash_values.append(task.sql_task.file.path)
hash_values.append(task.sql_task.alert.alert_id)
hash_values.append(task.sql_task.dashboard.dashboard_id)
hash_values.append(task.sql_task.query.query_id)
if task.dbt_task is not None:
task.dbt_task.commands.sort()
hash_values.append(task.dbt_task.schema)
hash_values.append(task.dbt_task.catalog)
hash_values.append(task.dbt_task.warehouse_id)
hash_values.append(task.dbt_task.project_directory)
hash_values.append(",".join(task.dbt_task.commands))
if task.condition_task is not None:
hash_values.append(task.condition_task.op.value)
hash_values.append(task.condition_task.right)
hash_values.append(task.condition_task.left)
hash_values.append(task.condition_task.outcome)
if task.run_job_task is not None:
hash_values.append(task.run_job_task.job_id)
if task.git_source is not None:
hash_values.append(task.git_source.git_url)
hash_values.append(task.git_source.git_tag)
hash_values.append(task.git_source.git_branch)
hash_values.append(task.git_source.git_commit)
hash_values.append(task.git_source.git_provider)
hash_values.append(task.git_source.git_snapshot.used_commit)

return [str(value) for value in hash_values if value is not None]

def _assess_job_runs(
self, all_clusters: dict[str, ClusterDetails], all_job_runs: list[BaseRun], all_jobs: list[BaseJob]
) -> list[ExternallyOrchestratedJobRunWithFailingConfiguration]:
"""
Returns a list of ExternallyOrchestratedJobRunWithFailingConfiguration
"""
all_persisted_job_ids = [x.job_id for x in all_jobs]
not_persisted_job_runs = list(
filter(lambda jr: jr.job_id is None or jr.job_id not in all_persisted_job_ids, all_job_runs)
)
ext_orc_job_runs_with_failing_configuration: dict[
str, ExternallyOrchestratedJobRunWithFailingConfiguration
] = {}
for job_run in not_persisted_job_runs:
num_tasks_with_failing_configuration = 0
lowest_dbr = None
hashable_items = []
all_tasks = job_run.tasks if job_run.tasks is not None else []
for task in sorted(all_tasks, key=lambda x: x.task_key):
task_cluster = get_job_cluster_from_task(task, job_run, all_clusters)
if task_cluster is None:
continue
if task_cluster.data_security_mode is None and spark_version_greater_than(
task_cluster.spark_version, LOWEST_COMPATIBLE_DBR
):
if lowest_dbr is None or spark_version_greater_than(
lowest_dbr,
task_cluster.spark_version,
):
lowest_dbr = task_cluster.spark_version
num_tasks_with_failing_configuration += 1
hashable_items.extend(self._retrieve_hash_values_from_task(task))
if num_tasks_with_failing_configuration > 0:
hashed_id = sha256(bytes("|".join(hashable_items).encode("utf-8"))).hexdigest()
ext_orc_job_runs_with_failing_configuration[
hashed_id
] = ExternallyOrchestratedJobRunWithFailingConfiguration(
run_id=job_run.run_id,
hashed_id=hashed_id,
num_tasks_with_failing_configuration=num_tasks_with_failing_configuration,
spark_version=lowest_dbr,
)

return list(ext_orc_job_runs_with_failing_configuration.values())

def snapshot(self) -> list[ExternallyOrchestratedJobRunWithFailingConfiguration]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> list[ExternallyOrchestratedJobRunWithFailingConfiguration]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
yield ExternallyOrchestratedJobRunWithFailingConfiguration(*row)
Loading

0 comments on commit ea103b4

Please sign in to comment.