Skip to content

Commit

Permalink
Merge pull request #303 from OP-TED/feature/TED-803
Browse files Browse the repository at this point in the history
Feature/ted 803 + bug with selectors tasks skip
  • Loading branch information
CaptainOfHacks authored Oct 12, 2022
2 parents 746fca3 + fc89bb6 commit 8cf23bc
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
3 changes: 2 additions & 1 deletion dags/notice_fetch_by_date_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.timetables.trigger import CronTriggerTimetable

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream
Expand All @@ -23,7 +24,7 @@

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
catchup=False,
schedule_interval="0 3 * * *",
timetable=CronTriggerTimetable('0 3 * * *', timezone='UTC'),
tags=['selector', 'daily-fetch'])
def notice_fetch_by_date_workflow():
@task
Expand Down
4 changes: 4 additions & 0 deletions dags/notice_process_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,25 @@ def _stop_processing():
selector_branch_before_transformation = BranchPythonOperator(
task_id=SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
python_callable=_selector_branch_before_transformation,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

selector_branch_before_validation = BranchPythonOperator(
task_id=SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
python_callable=_selector_branch_before_validation,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

selector_branch_before_package = BranchPythonOperator(
task_id=SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
python_callable=_selector_branch_before_package,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

selector_branch_before_publish = BranchPythonOperator(
task_id=SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID,
python_callable=_selector_branch_before_publish,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

stop_processing = PythonOperator(
Expand Down

0 comments on commit 8cf23bc

Please sign in to comment.