Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for the experimental Databricks CLI launcher #517

Merged
merged 5 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,5 @@ dev/cleanup.py
.databricks
.vscode

.python-version
.python-version
.databricks-login.json
19 changes: 19 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
name: ucx
description: Unity Catalog Migration Toolkit (UCX)
install:
warehouse_types:
- PRO
script: src/databricks/labs/ucx/install.py
entrypoint: src/databricks/labs/ucx/cli.py
min_python: 3.10
commands:
- name: open-remote-config
description: Opens remote configuration in the browser

- name: workflows
description: Show deployed workflows and their state
table_template: |-
Step\tState\tStarted
{{range .}}{{.step}}\t{{.state}}\t{{.started}}
{{end}}
51 changes: 51 additions & 0 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
import logging
import sys
import webbrowser

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.install import WorkspaceInstaller

logger = logging.getLogger("databricks.labs.ucx")


def workflows():
ws = WorkspaceClient()
installer = WorkspaceInstaller(ws)
logger.info("Fetching deployed jobs...")
print(json.dumps(installer.latest_job_status()))


def open_remote_config():
ws = WorkspaceClient()
installer = WorkspaceInstaller(ws)

ws_file_url = installer.notebook_link(installer.config_file)
webbrowser.open(ws_file_url)


MAPPING = {
"open-remote-config": open_remote_config,
"workflows": workflows,
}


def main(raw):
payload = json.loads(raw)
command = payload["command"]
if command not in MAPPING:
msg = f"cannot find command: {command}"
raise KeyError(msg)
flags = payload["flags"]
log_level = flags.pop("log_level")
if log_level != "disabled":
databricks_logger = logging.getLogger("databricks")
databricks_logger.setLevel(log_level.upper())

kwargs = {k.replace("-", "_"): v for k, v in flags.items()}
MAPPING[command](**kwargs)


if __name__ == "__main__":
main(*sys.argv[1:])
53 changes: 32 additions & 21 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def _run_configured(self):
self._install_spark_config_for_hms_lineage()
self._create_dashboards()
self._create_jobs()
readme = f'{self._notebook_link(f"{self._install_folder}/README.py")}'
readme = f'{self.notebook_link(f"{self._install_folder}/README.py")}'
msg = f"Installation completed successfully! Please refer to the {readme} notebook for next steps."
logger.info(msg)

Expand Down Expand Up @@ -249,14 +249,14 @@ def _install_folder(self):
return f"/Users/{self._my_username}/.{self._prefix}"

@property
def _config_file(self):
def config_file(self):
return f"{self._install_folder}/config.yml"

@property
def _current_config(self):
if hasattr(self, "_config"):
return self._config
with self._ws.workspace.download(self._config_file) as f:
with self._ws.workspace.download(self.config_file) as f:
self._config = WorkspaceConfig.from_bytes(f.read())
return self._config

Expand All @@ -279,12 +279,10 @@ def _configure_inventory_database(self):
return inventory_database

def _configure(self):
ws_file_url = self._notebook_link(self._config_file)
ws_file_url = self.notebook_link(self.config_file)
try:
self._ws.workspace.get_status(self._config_file)
self._ws.workspace.get_status(self.config_file)
logger.info(f"UCX is already configured. See {ws_file_url}")
if self._prompts and self._question("Open config file in the browser", default="yes") == "yes":
webbrowser.open(ws_file_url)
return
except DatabricksError as err:
if err.error_code != "RESOURCE_DOES_NOT_EXIST":
Expand Down Expand Up @@ -377,13 +375,13 @@ def _write_config(self):
self._ws.workspace.mkdirs(self._install_folder)

config_bytes = yaml.dump(self._config.as_dict()).encode("utf8")
logger.info(f"Creating configuration file: {self._config_file}")
self._ws.workspace.upload(self._config_file, config_bytes, format=ImportFormat.AUTO)
logger.info(f"Creating configuration file: {self.config_file}")
self._ws.workspace.upload(self.config_file, config_bytes, format=ImportFormat.AUTO)

def _create_jobs(self):
logger.debug(f"Creating jobs from tasks in {main.__name__}")
remote_wheel = self._upload_wheel()
self._deployed_steps = self._deployed_steps()
self._deployed_steps = self.deployed_steps()
desired_steps = {t.workflow for t in _TASKS.values()}
wheel_runner = None

Expand Down Expand Up @@ -424,7 +422,7 @@ def _step_list(cls) -> list[str]:
def _create_readme(self):
md = [
"# UCX - The Unity Catalog Migration Assistant",
f'To troubleshoot, see [debug notebook]({self._notebook_link(f"{self._install_folder}/DEBUG.py")}).\n',
f'To troubleshoot, see [debug notebook]({self.notebook_link(f"{self._install_folder}/DEBUG.py")}).\n',
"Here are the URLs and descriptions of workflows that trigger various stages of migration.",
"All jobs are defined with necessary cluster configurations and DBR versions.\n",
]
Expand Down Expand Up @@ -457,7 +455,7 @@ def _create_readme(self):
intro = "\n".join(preamble + [f"# MAGIC {line}" for line in md])
path = f"{self._install_folder}/README.py"
self._ws.workspace.upload(path, intro.encode("utf8"), overwrite=True)
url = self._notebook_link(path)
url = self.notebook_link(path)
logger.info(f"Created README notebook with job overview: {url}")
msg = "Open job overview in README notebook in your home directory ?"
if self._prompts and self._question(msg, default="yes") == "yes":
Expand All @@ -467,22 +465,22 @@ def _replace_inventory_variable(self, text: str) -> str:
return text.replace("$inventory", f"hive_metastore.{self._current_config.inventory_database}")

def _create_debug(self, remote_wheel: str):
readme_link = self._notebook_link(f"{self._install_folder}/README.py")
readme_link = self.notebook_link(f"{self._install_folder}/README.py")
job_links = ", ".join(
f"[{self._name(step_name)}]({self._ws.config.host}#job/{job_id})"
for step_name, job_id in self._deployed_steps.items()
)
path = f"{self._install_folder}/DEBUG.py"
logger.debug(f"Created debug notebook: {self._notebook_link(path)}")
logger.debug(f"Created debug notebook: {self.notebook_link(path)}")
self._ws.workspace.upload(
path,
DEBUG_NOTEBOOK.format(
remote_wheel=remote_wheel, readme_link=readme_link, job_links=job_links, config_file=self._config_file
remote_wheel=remote_wheel, readme_link=readme_link, job_links=job_links, config_file=self.config_file
).encode("utf8"),
overwrite=True,
)

def _notebook_link(self, path: str) -> str:
def notebook_link(self, path: str) -> str:
return f"{self._ws.config.host}/#workspace{path}"

def _choice_from_dict(self, text: str, choices: dict[str, Any]) -> Any:
Expand Down Expand Up @@ -569,8 +567,8 @@ def _apply_cluster_overrides(settings: dict[str, any], overrides: dict[str, str]
def _upload_wheel_runner(self, remote_wheel: str):
# TODO: we have to be doing this workaround until ES-897453 is solved in the platform
path = f"{self._install_folder}/wheels/wheel-test-runner-{self._version}.py"
logger.debug(f"Created runner notebook: {self._notebook_link(path)}")
py = TEST_RUNNER_NOTEBOOK.format(remote_wheel=remote_wheel, config_file=self._config_file).encode("utf8")
logger.debug(f"Created runner notebook: {self.notebook_link(path)}")
py = TEST_RUNNER_NOTEBOOK.format(remote_wheel=remote_wheel, config_file=self.config_file).encode("utf8")
self._ws.workspace.upload(path, py, overwrite=True)
return path

Expand Down Expand Up @@ -624,7 +622,7 @@ def _job_notebook_task(self, jobs_task: jobs.Task, task: Task) -> jobs.Task:
base_parameters={
"inventory_database": self._current_config.inventory_database,
"task": task.name,
"config": f"/Workspace{self._config_file}",
"config": f"/Workspace{self.config_file}",
}
| EXTRA_TASK_PARAMS,
),
Expand All @@ -637,7 +635,7 @@ def _job_wheel_task(self, jobs_task: jobs.Task, task: Task, dbfs_path: str) -> j
python_wheel_task=jobs.PythonWheelTask(
package_name="databricks_labs_ucx",
entry_point="runtime", # [project.entry-points.databricks] in pyproject.toml
named_parameters={"task": task.name, "config": f"/Workspace{self._config_file}"} | EXTRA_TASK_PARAMS,
named_parameters={"task": task.name, "config": f"/Workspace{self.config_file}"} | EXTRA_TASK_PARAMS,
),
)

Expand Down Expand Up @@ -774,7 +772,7 @@ def _cluster_node_type(self, spec: compute.ClusterSpec) -> compute.ClusterSpec:
)
return replace(spec, gcp_attributes=compute.GcpAttributes(availability=compute.GcpAvailability.ON_DEMAND_GCP))

def _deployed_steps(self):
def deployed_steps(self):
deployed_steps = {}
logger.debug(f"Fetching all jobs to determine already deployed steps for app={self._app}")
for j in self._ws.jobs.list():
Expand Down Expand Up @@ -820,6 +818,19 @@ def _get_ext_hms_conf_from_policy(cluster_policy):
spark_conf_dict[key[11:]] = cluster_policy[key]["value"]
return instance_profile, spark_conf_dict

def latest_job_status(self) -> list[dict]:
latest_status = []
for step, job_id in self.deployed_steps().items():
job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1))
latest_status.append(
{
"step": step,
"state": "UNKNOWN" if not job_runs else str(job_runs[0].state.result_state),
"started": "" if not job_runs else job_runs[0].start_time,
}
)
return latest_status


if __name__ == "__main__":
ws = WorkspaceClient(product="ucx", product_version=__version__)
Expand Down