Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crawler for RunSubmit API usages from External Orchestrators (ADF/Airflow) #366

Closed
wants to merge 25 commits into from

Conversation

zpappa
Copy link

@zpappa zpappa commented Oct 3, 2023

Resolves #266

Added JobRunsCrawler
Added a crawler to look at JobRuns from the SDK and determine which of the job runs are from the RunsSubmit API

Added Unit Tests
Added tests to cover basic logic, further tests pending.

@codecov
Copy link

codecov bot commented Oct 3, 2023

Codecov Report

Merging #366 (bb3fbd2) into main (47123a9) will decrease coverage by 1.13%.
Report is 7 commits behind head on main.
The diff coverage is 69.78%.

@@            Coverage Diff             @@
##             main     #366      +/-   ##
==========================================
- Coverage   83.73%   82.61%   -1.13%     
==========================================
  Files          30       30              
  Lines        2337     2490     +153     
  Branches      410      445      +35     
==========================================
+ Hits         1957     2057     +100     
- Misses        293      326      +33     
- Partials       87      107      +20     
Files Coverage Δ
src/databricks/labs/ucx/framework/crawlers.py 86.79% <100.00%> (ø)
src/databricks/labs/ucx/framework/parallel.py 98.36% <100.00%> (+0.48%) ⬆️
src/databricks/labs/ucx/install.py 83.66% <100.00%> (+0.09%) ⬆️
src/databricks/labs/ucx/runtime.py 55.55% <ø> (+0.76%) ⬆️
src/databricks/labs/ucx/workspace_access/scim.py 100.00% <100.00%> (ø)
...rc/databricks/labs/ucx/workspace_access/generic.py 97.36% <80.00%> (-1.73%) ⬇️
src/databricks/labs/ucx/workspace_access/groups.py 92.45% <93.33%> (+2.17%) ⬆️
src/databricks/labs/ucx/assessment/crawlers.py 66.33% <55.17%> (-7.33%) ⬇️

... and 1 file with indirect coverage changes

zpappa added 2 commits October 3, 2023 21:07
…it on tasks and can vary inside a job. Added hashing algorithm to assess uniqueness across task submissions as external systems do not consistently provide identifiers Added additional tests.
Copy link
Contributor

@larsgeorge-db larsgeorge-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Collaborator

@nfx nfx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing logic for hashing/deduping the similar run-submit jobs and has some logic errors

FastLee and others added 12 commits October 4, 2023 10:23
Added a table migration doc.
Let's discuss the migration process.
This PR aims to fix #297 and
#346

It adds a utility method to filter rows that have a column containing
None, this will help Crawlers to not throw an error when a column is
None.

it also checks if the column in the class is Nullable or not, if it's
nullable and the value is None, it's ignored
…e been synchronised to Unity Catalog already or not (#306)

Closes #303
Check that database name is a word, loop until correct.
* Added `inventory_database` name check during installation
([#275](#275)).
* Added a column to `$inventory.tables` to specify if a table might have
been synchronised to Unity Catalog already or not
([#306](#306)).
* Added a migration state to skip already migrated tables
([#325](#325)).
* Fixed appending to tables by adding filtering of `None` rows
([#356](#356)).
* Fixed handling of missing but linked cluster policies.
([#361](#361)).
* Ignore errors for Redash widgets and queries redeployment during
installation ([#367](#367)).
* Remove exception and added proper logging for groups in the list that…
([#357](#357)).
* Skip group migration when no groups are available after preparation
step. ([#363](#363)).
* Update databricks-sdk requirement from ~=0.9.0 to ~=0.10.0
([#362](#362)).
add test for `_configure_inventory_database()`
add test for `run_for_config()`
Changed days to 90

Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
Copy link
Collaborator

@nfx nfx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this starts to take shape. minor comments remaining - please also add the integration for this, so that we have some happy path calling the real apis.


class ExternallyOrchestratedJobTaskCrawler(CrawlerBase):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "job_runs", ExternallyOrchestratedJobTask)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job_runs may be too generic here for a table name

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will rename it

if task.spark_python_task is not None:
hash_values.append(task.spark_python_task.python_file)
if task.spark_submit_task is not None:
hash_values.append(task.spark_submit_task.parameters)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't these be different for every run in airflow?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 parameters shouldn't be in there

But python file should largely not change if it's the same DAG, unless they're doing some clever metadata driven selection

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove it, it shouldn't be there

hash_values.append(task.sql_task.dashboard.dashboard_id)
hash_values.append(task.sql_task.query.query_id)
if task.dbt_task is not None:
task.dbt_task.commands.sort()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really needed here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defensive posture :)

hash_values.append(task.dbt_task.catalog)
hash_values.append(task.dbt_task.warehouse_id)
hash_values.append(task.dbt_task.project_directory)
hash_values.append(",".join(task.dbt_task.commands))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
hash_values.append(",".join(task.dbt_task.commands))
hash_values.append(",".join(sorted(task.dbt_task.commands)))

this might be a bit more deterministic

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, will opt for this instead

Comment on lines +337 to +339
hash_values.append(task.git_source.git_tag)
hash_values.append(task.git_source.git_branch)
hash_values.append(task.git_source.git_commit)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
hash_values.append(task.git_source.git_tag)
hash_values.append(task.git_source.git_branch)
hash_values.append(task.git_source.git_commit)

these might vary a lot, no need to have them in hash

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

hash_values.append(task.git_source.git_branch)
hash_values.append(task.git_source.git_commit)
hash_values.append(task.git_source.git_provider)
hash_values.append(task.git_source.git_snapshot.used_commit)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
hash_values.append(task.git_source.git_snapshot.used_commit)

for task in job_run.tasks:
spark_version = self._get_spark_version_from_task(task, job_run, all_clusters)
data_security_mode = self._get_data_security_mode_from_task(task, job_run, all_clusters)
hashed_id = self._create_hash_from_job_run_task(task)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thing i forgot: you're not checking if hashed_id was seen already or not. add it.

also: why do we hash on job task and NOT on the job_run? airflow creates job runs, not tasks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured we can do dedupe in the actual report based on the hash.

Airflow creates job runs yes, but each task is triggered in a workflow, so a job run might exist that only has one task out of 5 that ran successfully, also each task can have it's own cluster configuration, and that's what we're checking.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two things:

  • this table has to get as little noise as possible, so we have to dedupe before we write to table.
  • we have to do dedupe based on the job_run, not the task, because that's how multi-task jobs are designed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but cluster definitions live on the task as mentioned, trying to imagine a user running this..

Potentially we run the risk, in particularly large multi-task workflows, of directing the user to the wrong task to address.

If there are multiple tasks that would fail, then a user has to potentially change multiple cluster configurations. If we roll this up to one result, then we run the risk of telling the user to fix "1" thing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For job with 50 tasks we have to have only one record in this table.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The challenge is around usability, if this shows up in failure JSON, returning a useful report from this is hard.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially if there's 50 tasks ☝️

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the grain for now so we can move this along.

Copy link
Author

@zpappa zpappa Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we'll have to make a decision about which cluster version info we surface.

A multi task job run can use multiple cluster definitions.

For now I'll just choose the lowest dbr, but this can be misleading.

Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
for task in job_run.tasks:
spark_version = self._get_spark_version_from_task(task, job_run, all_clusters)
data_security_mode = self._get_data_security_mode_from_task(task, job_run, all_clusters)
hashed_id = self._create_hash_from_job_run_task(task)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two things:

  • this table has to get as little noise as possible, so we have to dedupe before we write to table.
  • we have to do dedupe based on the job_run, not the task, because that's how multi-task jobs are designed.

Copy link
Collaborator

@nfx nfx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also needs an integration test

for task in job_run.tasks:
spark_version = self._get_spark_version_from_task(task, job_run, all_clusters)
data_security_mode = self._get_data_security_mode_from_task(task, job_run, all_clusters)
hashed_id = self._create_hash_from_job_run_task(task)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For job with 50 tasks we have to have only one record in this table.

@zpappa
Copy link
Author

zpappa commented Oct 6, 2023

this also needs an integration test

Haven't even bothered trying to set one up yet, the amount of data required is high to run this, will work on it...

@nfx
Copy link
Collaborator

nfx commented Oct 6, 2023

@zpappa just create few job runs with fixtures - e.g. calling the ws.jobs.submit_run(...)

@zpappa zpappa requested review from a team October 6, 2023 16:12
larsgeorge-db and others added 8 commits October 6, 2023 12:14
…n errors (#375)

Fixed deletion of backup groups [issue #374].
Added rate limits and retries to group operations [issue #353].
Temp fix for issue #359
Added log messages for better visibility.
Added useful troubleshooting snippets to the docs.
…tion (#385)

Fixes: #382 

```
Select PRO or SERVERLESS SQL warehouse to run assessment dashboards on
[0] Shared Endpoint (475b...cd5211, Serverless, RUNNING)
[1] [Create new PRO SQL warehouse]
...
Enter a number between 0 and 7: 
```
Fixed the external location test. Modified crawler to reference
dataclass instead of type.
Added asserts to e2e tests to verify that the backup groups get deleted
@zpappa
Copy link
Author

zpappa commented Oct 6, 2023

Closing in favor of #395 after a rebase gone wrong.

@zpappa zpappa closed this Oct 6, 2023
@zpappa zpappa self-assigned this Dec 13, 2023
@nfx nfx deleted the feature/external-orchestrator-run-submit-crawler branch December 15, 2023 00:32
@nfx nfx restored the feature/external-orchestrator-run-submit-crawler branch December 15, 2023 00:32
@nfx nfx deleted the feature/external-orchestrator-run-submit-crawler branch April 4, 2024 22:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Assessment for RunSubmit API usages
7 participants