-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Crawler for RunSubmit API usages from External Orchestrators (ADF/Airflow) #366
Conversation
Codecov Report
@@ Coverage Diff @@
## main #366 +/- ##
==========================================
- Coverage 83.73% 82.61% -1.13%
==========================================
Files 30 30
Lines 2337 2490 +153
Branches 410 445 +35
==========================================
+ Hits 1957 2057 +100
- Misses 293 326 +33
- Partials 87 107 +20
|
…it on tasks and can vary inside a job. Added hashing algorithm to assess uniqueness across task submissions as external systems do not consistently provide identifiers Added additional tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing logic for hashing/deduping the similar run-submit jobs and has some logic errors
Added a table migration doc. Let's discuss the migration process.
Check that database name is a word, loop until correct.
* Added `inventory_database` name check during installation ([#275](#275)). * Added a column to `$inventory.tables` to specify if a table might have been synchronised to Unity Catalog already or not ([#306](#306)). * Added a migration state to skip already migrated tables ([#325](#325)). * Fixed appending to tables by adding filtering of `None` rows ([#356](#356)). * Fixed handling of missing but linked cluster policies. ([#361](#361)). * Ignore errors for Redash widgets and queries redeployment during installation ([#367](#367)). * Remove exception and added proper logging for groups in the list that… ([#357](#357)). * Skip group migration when no groups are available after preparation step. ([#363](#363)). * Update databricks-sdk requirement from ~=0.9.0 to ~=0.10.0 ([#362](#362)).
add test for `_configure_inventory_database()` add test for `run_for_config()`
Changed days to 90 Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this starts to take shape. minor comments remaining - please also add the integration for this, so that we have some happy path calling the real apis.
|
||
class ExternallyOrchestratedJobTaskCrawler(CrawlerBase): | ||
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): | ||
super().__init__(sbe, "hive_metastore", schema, "job_runs", ExternallyOrchestratedJobTask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
job_runs
may be too generic here for a table name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will rename it
if task.spark_python_task is not None: | ||
hash_values.append(task.spark_python_task.python_file) | ||
if task.spark_submit_task is not None: | ||
hash_values.append(task.spark_submit_task.parameters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't these be different for every run in airflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦 parameters shouldn't be in there
But python file should largely not change if it's the same DAG, unless they're doing some clever metadata driven selection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove it, it shouldn't be there
hash_values.append(task.sql_task.dashboard.dashboard_id) | ||
hash_values.append(task.sql_task.query.query_id) | ||
if task.dbt_task is not None: | ||
task.dbt_task.commands.sort() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it really needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defensive posture :)
hash_values.append(task.dbt_task.catalog) | ||
hash_values.append(task.dbt_task.warehouse_id) | ||
hash_values.append(task.dbt_task.project_directory) | ||
hash_values.append(",".join(task.dbt_task.commands)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hash_values.append(",".join(task.dbt_task.commands)) | |
hash_values.append(",".join(sorted(task.dbt_task.commands))) |
this might be a bit more deterministic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, will opt for this instead
hash_values.append(task.git_source.git_tag) | ||
hash_values.append(task.git_source.git_branch) | ||
hash_values.append(task.git_source.git_commit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hash_values.append(task.git_source.git_tag) | |
hash_values.append(task.git_source.git_branch) | |
hash_values.append(task.git_source.git_commit) |
these might vary a lot, no need to have them in hash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
hash_values.append(task.git_source.git_branch) | ||
hash_values.append(task.git_source.git_commit) | ||
hash_values.append(task.git_source.git_provider) | ||
hash_values.append(task.git_source.git_snapshot.used_commit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hash_values.append(task.git_source.git_snapshot.used_commit) |
for task in job_run.tasks: | ||
spark_version = self._get_spark_version_from_task(task, job_run, all_clusters) | ||
data_security_mode = self._get_data_security_mode_from_task(task, job_run, all_clusters) | ||
hashed_id = self._create_hash_from_job_run_task(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more thing i forgot: you're not checking if hashed_id
was seen already or not. add it.
also: why do we hash on job task and NOT on the job_run? airflow creates job runs, not tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured we can do dedupe in the actual report based on the hash.
Airflow creates job runs yes, but each task is triggered in a workflow, so a job run might exist that only has one task out of 5 that ran successfully, also each task can have it's own cluster configuration, and that's what we're checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two things:
- this table has to get as little noise as possible, so we have to dedupe before we write to table.
- we have to do dedupe based on the job_run, not the task, because that's how multi-task jobs are designed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but cluster definitions live on the task as mentioned, trying to imagine a user running this..
Potentially we run the risk, in particularly large multi-task workflows, of directing the user to the wrong task to address.
If there are multiple tasks that would fail, then a user has to potentially change multiple cluster configurations. If we roll this up to one result, then we run the risk of telling the user to fix "1" thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For job with 50 tasks we have to have only one record in this table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The challenge is around usability, if this shows up in failure JSON, returning a useful report from this is hard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Especially if there's 50 tasks ☝️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change the grain for now so we can move this along.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we'll have to make a decision about which cluster version info we surface.
A multi task job run can use multiple cluster definitions.
For now I'll just choose the lowest dbr, but this can be misleading.
Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
for task in job_run.tasks: | ||
spark_version = self._get_spark_version_from_task(task, job_run, all_clusters) | ||
data_security_mode = self._get_data_security_mode_from_task(task, job_run, all_clusters) | ||
hashed_id = self._create_hash_from_job_run_task(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two things:
- this table has to get as little noise as possible, so we have to dedupe before we write to table.
- we have to do dedupe based on the job_run, not the task, because that's how multi-task jobs are designed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this also needs an integration test
for task in job_run.tasks: | ||
spark_version = self._get_spark_version_from_task(task, job_run, all_clusters) | ||
data_security_mode = self._get_data_security_mode_from_task(task, job_run, all_clusters) | ||
hashed_id = self._create_hash_from_job_run_task(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For job with 50 tasks we have to have only one record in this table.
Haven't even bothered trying to set one up yet, the amount of data required is high to run this, will work on it... |
@zpappa just create few job runs with fixtures - e.g. calling the |
Fixed the external location test. Modified crawler to reference dataclass instead of type.
Added asserts to e2e tests to verify that the backup groups get deleted
Closing in favor of #395 after a rebase gone wrong. |
Resolves #266
Added JobRunsCrawler
Added a crawler to look at JobRuns from the SDK and determine which of the job runs are from the RunsSubmit API
Added Unit Tests
Added tests to cover basic logic, further tests pending.