Skip to content

Commit

Permalink
fix(ingest): airflow: update subdag check for compatibility with olde…
Browse files Browse the repository at this point in the history
…r Airflow versions (datahub-project#5523)

* use getattr to default None if no subdag

* add None check

* add other None check

* Apply suggestions from code review- double quotes

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>

* minor tweak to fix lint

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
  • Loading branch information
2 people authored and maggiehays committed Aug 1, 2022
1 parent 3d0d648 commit a5926a5
Showing 1 changed file with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ def _get_dependencies(
upstream_task = dag.task_dict[upstream_task_id]

# if upstream task is not a subdag, then skip it
if upstream_task.subdag is None:
upstream_subdag = getattr(upstream_task, "subdag", None)
if upstream_subdag is None:
continue

# else, link the leaf tasks of the upstream subdag as upstream tasks
upstream_subdag = upstream_task.subdag

for upstream_subdag_task_id in upstream_subdag.task_dict:
upstream_subdag_task = upstream_subdag.task_dict[
upstream_subdag_task_id
Expand Down Expand Up @@ -113,7 +112,7 @@ def _get_dependencies(
[
DataJobUrn.create_from_ids(job_id=task_id, data_flow_urn=str(flow_urn))
for task_id in task.upstream_task_ids
if dag.task_dict[task_id].subdag is None
if getattr(dag.task_dict[task_id], "subdag", None) is None
]
+ upstream_subdag_task_urns
+ upstream_subdag_triggers
Expand Down

0 comments on commit a5926a5

Please sign in to comment.