Skip to content

Commit

Permalink
Rename task_concurrency to max_active_tis_per_dag (#17708)
Browse files Browse the repository at this point in the history
Follow-up of apache/airflow#16267

Renames `task_concurrency` to `max_active_tis_per_dag`

Some of Airflow's concurrency settings have been a source of confusion for a lot of users (including me), for example:

https://stackoverflow.com/questions/56370720/how-to-control-the-parallelism-or-concurrency-of-an-airflow-installation
https://stackoverflow.com/questions/38200666/airflow-parallelism
This PR is an attempt to make the settings easier to understand

GitOrigin-RevId: 35a6c302772bf5d0bb74646b2da04856c7d7aaac
  • Loading branch information
kaxil authored and Cloud Composer Team committed Aug 27, 2022
1 parent 37a2d26 commit 7ffe27d
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 37 deletions.
22 changes: 22 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,28 @@ dag = DAG(
If you are using DAGs Details API endpoint, use `max_active_tasks` instead of `concurrency`.
### Task concurrency parameter has been renamed
`BaseOperator.task_concurrency` has been deprecated and renamed to `max_active_tis_per_dag` for
better understanding.
This parameter controls the number of concurrent running task instances across ``dag_runs``
per task.
**Before**:
```python
with DAG(dag_id="task_concurrency_example"):
BashOperator(task_id="t1", task_concurrency=2, bash_command="echo Hi")
```
**After**:
```python
with DAG(dag_id="task_concurrency_example"):
BashOperator(task_id="t1", max_active_tis_per_dag=2, bash_command="echo Hi")
```
### Marking success/failed automatically clears failed downstream tasks
When marking a task success/failed in Graph View, its downstream tasks that are in failed/upstream_failed state are automatically cleared.
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,14 @@ def _per_task_process(key, ti, session=None):
"Not scheduling since DAG max_active_tasks limit is reached."
)

if task.task_concurrency:
if task.max_active_tis_per_dag:
num_running_task_instances_in_task = DAG.get_num_task_instances(
dag_id=self.dag_id,
task_ids=[task.task_id],
states=self.STATES_COUNT_AS_RUNNING,
)

if num_running_task_instances_in_task >= task.task_concurrency:
if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
raise TaskConcurrencyLimitReached(
"Not scheduling since Task concurrency limit is reached."
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
if serialized_dag.has_task(task_instance.task_id):
task_concurrency_limit = serialized_dag.get_task(
task_instance.task_id
).task_concurrency
).max_active_tis_per_dag

if task_concurrency_limit is not None:
current_task_concurrency = task_concurrency_map[
Expand Down
17 changes: 13 additions & 4 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ class derived from this one results in the creation of a task object,
:type resources: dict
:param run_as_user: unix username to impersonate while running the task
:type run_as_user: str
:param task_concurrency: When set, a task will be able to limit the concurrent
runs across execution_dates
:type task_concurrency: int
:param max_active_tis_per_dag: When set, a task will be able to limit the concurrent
runs across execution_dates.
:type max_active_tis_per_dag: int
:param executor_config: Additional task-level configuration parameters that are
interpreted by a specific executor. Parameters are namespaced by the name of
executor.
Expand Down Expand Up @@ -492,6 +492,7 @@ def __init__(
resources: Optional[Dict] = None,
run_as_user: Optional[str] = None,
task_concurrency: Optional[int] = None,
max_active_tis_per_dag: Optional[int] = None,
executor_config: Optional[Dict] = None,
do_xcom_push: bool = True,
inlets: Optional[Any] = None,
Expand Down Expand Up @@ -624,7 +625,15 @@ def __init__(
self.weight_rule = weight_rule
self.resources: Optional[Resources] = Resources(**resources) if resources else None
self.run_as_user = run_as_user
self.task_concurrency = task_concurrency
if task_concurrency and not max_active_tis_per_dag:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The 'task_concurrency' parameter is deprecated. Please use 'max_active_tis_per_dag'.",
DeprecationWarning,
stacklevel=2,
)
max_active_tis_per_dag = task_concurrency
self.max_active_tis_per_dag = max_active_tis_per_dag
self.executor_config = executor_config or {}
self.do_xcom_push = do_xcom_push

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2233,7 +2233,7 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=None):
orm_dag.description = dag.description
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.max_active_tasks = dag.max_active_tasks
orm_dag.has_task_concurrency_limits = any(t.task_concurrency is not None for t in dag.tasks)
orm_dag.has_task_concurrency_limits = any(t.max_active_tis_per_dag is not None for t in dag.tasks)

orm_dag.calculate_dagrun_date_fields(
dag,
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def update_state(
unfinished_tasks = info.unfinished_tasks

none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
none_task_concurrency = all(t.task.max_active_tis_per_dag is None for t in unfinished_tasks)
none_deferred = all(t.state != State.DEFERRED for t in unfinished_tasks)

if unfinished_tasks and none_depends_on_past and none_task_concurrency and none_deferred:
Expand Down
4 changes: 2 additions & 2 deletions airflow/ti_deps/deps/task_concurrency_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class TaskConcurrencyDep(BaseTIDep):

@provide_session
def _get_dep_statuses(self, ti, session, dep_context):
if ti.task.task_concurrency is None:
if ti.task.max_active_tis_per_dag is None:
yield self._passing_status(reason="Task concurrency is not set.")
return

if ti.get_num_running_task_instances(session) >= ti.task.task_concurrency:
if ti.get_num_running_task_instances(session) >= ti.task.max_active_tis_per_dag:
yield self._failing_status(reason="The max task concurrency has been reached.")
return
else:
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ DAGs have configurations that improves efficiency:

Operators or tasks also have configurations that improves efficiency and scheduling priority:

- ``task_concurrency``: This parameter controls the number of concurrent running task instances across ``dag_runs``
- ``max_active_tis_per_dag``: This parameter controls the number of concurrent running task instances across ``dag_runs``
per task.
- ``pool``: See :ref:`concepts:pool`.
- ``priority_weight``: See :ref:`concepts:priority-weight`.
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ def create_dummy_dag(dag_maker):
def create_dag(
dag_id='dag',
task_id='op1',
task_concurrency=16,
max_active_tis_per_dag=16,
pool='default_pool',
executor_config={},
trigger_rule='all_done',
Expand All @@ -626,7 +626,7 @@ def create_dag(
with dag_maker(dag_id, **kwargs) as dag:
op = DummyOperator(
task_id=task_id,
task_concurrency=task_concurrency,
max_active_tis_per_dag=max_active_tis_per_dag,
executor_config=executor_config,
on_success_callback=on_success_callback,
on_execute_callback=on_execute_callback,
Expand Down
16 changes: 8 additions & 8 deletions tests/jobs/test_backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ def _get_dummy_dag(
dag_maker_fixture,
dag_id='test_dag',
pool=Pool.DEFAULT_POOL_NAME,
task_concurrency=None,
max_active_tis_per_dag=None,
task_id='op',
**kwargs,
):
with dag_maker_fixture(dag_id=dag_id, schedule_interval='@daily', **kwargs) as dag:
DummyOperator(task_id=task_id, pool=pool, task_concurrency=task_concurrency)
DummyOperator(task_id=task_id, pool=pool, max_active_tis_per_dag=max_active_tis_per_dag)

return dag

Expand Down Expand Up @@ -295,12 +295,12 @@ def test_backfill_conf(self, dag_maker):
assert conf_ == dr[0].conf

@patch('airflow.jobs.backfill_job.BackfillJob.log')
def test_backfill_respect_task_concurrency_limit(self, mock_log, dag_maker):
task_concurrency = 2
def test_backfill_respect_max_active_tis_per_dag_limit(self, mock_log, dag_maker):
max_active_tis_per_dag = 2
dag = self._get_dummy_dag(
dag_maker,
dag_id='test_backfill_respect_task_concurrency_limit',
task_concurrency=task_concurrency,
dag_id='test_backfill_respect_max_active_tis_per_dag_limit',
max_active_tis_per_dag=max_active_tis_per_dag,
)
dag_maker.create_dagrun()

Expand All @@ -321,9 +321,9 @@ def test_backfill_respect_task_concurrency_limit(self, mock_log, dag_maker):

num_running_task_instances = 0
for running_task_instances in executor.history:
assert len(running_task_instances) <= task_concurrency
assert len(running_task_instances) <= max_active_tis_per_dag
num_running_task_instances += len(running_task_instances)
if len(running_task_instances) == task_concurrency:
if len(running_task_instances) == max_active_tis_per_dag:
task_concurrency_limit_reached_at_least_once = True

assert 8 == num_running_task_instances
Expand Down
12 changes: 6 additions & 6 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,12 +692,12 @@ def test_find_executable_task_instances_concurrency_queued(self, dag_maker):
session.rollback()

# TODO: This is a hack, I think I need to just remove the setting and have it on always
def test_find_executable_task_instances_task_concurrency(self, dag_maker):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_task_concurrency'
def test_find_executable_task_instances_max_active_tis_per_dag(self, dag_maker):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_max_active_tis_per_dag'
task_id_1 = 'dummy'
task_id_2 = 'dummy2'
with dag_maker(dag_id=dag_id, max_active_tasks=16) as dag:
task1 = DummyOperator(task_id=task_id_1, task_concurrency=2)
task1 = DummyOperator(task_id=task_id_1, max_active_tis_per_dag=2)
task2 = DummyOperator(task_id=task_id_2)

executor = MockExecutor(do_update=True)
Expand Down Expand Up @@ -2911,15 +2911,15 @@ def test_dag_file_processor_process_task_instances(self, state, start_date, end_
],
],
)
def test_dag_file_processor_process_task_instances_with_task_concurrency(
def test_dag_file_processor_process_task_instances_with_max_active_tis_per_dag(
self, state, start_date, end_date, dag_maker
):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
with dag_maker(dag_id='test_scheduler_process_execute_task_with_task_concurrency'):
BashOperator(task_id='dummy', task_concurrency=2, bash_command='echo Hi')
with dag_maker(dag_id='test_scheduler_process_execute_task_with_max_active_tis_per_dag'):
BashOperator(task_id='dummy', max_active_tis_per_dag=2, bash_command='echo Hi')

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.processor_agent = mock.MagicMock()
Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def test_dagrun_no_deadlock_with_depends_on_past(self):
dag = DAG('test_dagrun_no_deadlock', start_date=DEFAULT_DATE)
with dag:
DummyOperator(task_id='dop', depends_on_past=True)
DummyOperator(task_id='tc', task_concurrency=1)
DummyOperator(task_id='tc', max_active_tis_per_dag=1)

dag.clear()
dr = dag.create_dagrun(
Expand Down
10 changes: 5 additions & 5 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,11 @@ def test_requeue_over_dag_concurrency(self, mock_concurrency_reached, create_dum
ti.run()
assert ti.state == State.NONE

def test_requeue_over_task_concurrency(self, create_dummy_dag):
def test_requeue_over_max_active_tis_per_dag(self, create_dummy_dag):
_, task = create_dummy_dag(
dag_id='test_requeue_over_task_concurrency',
task_id='test_requeue_over_task_concurrency_op',
task_concurrency=0,
dag_id='test_requeue_over_max_active_tis_per_dag',
task_id='test_requeue_over_max_active_tis_per_dag_op',
max_active_tis_per_dag=0,
max_active_runs=1,
max_active_tasks=2,
)
Expand All @@ -310,7 +310,7 @@ def test_requeue_over_pool_concurrency(self, create_dummy_dag):
_, task = create_dummy_dag(
dag_id='test_requeue_over_pool_concurrency',
task_id='test_requeue_over_pool_concurrency_op',
task_concurrency=0,
max_active_tis_per_dag=0,
max_active_runs=1,
max_active_tasks=2,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ def test_no_new_fields_added_to_base_operator(self):
'executor_config': {},
'inlets': [],
'label': '10',
'max_active_tis_per_dag': None,
'max_retry_delay': None,
'on_execute_callback': None,
'on_failure_callback': None,
Expand All @@ -907,7 +908,6 @@ def test_no_new_fields_added_to_base_operator(self):
'sla': None,
'start_date': None,
'subdag': None,
'task_concurrency': None,
'task_id': '10',
'trigger_rule': 'all_success',
'wait_for_downstream': False,
Expand Down
4 changes: 2 additions & 2 deletions tests/ti_deps/deps/test_task_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ def test_not_task_concurrency(self):
assert TaskConcurrencyDep().is_met(ti=ti, dep_context=dep_context)

def test_not_reached_concurrency(self):
task = self._get_task(start_date=datetime(2016, 1, 1), task_concurrency=1)
task = self._get_task(start_date=datetime(2016, 1, 1), max_active_tis_per_dag=1)
dep_context = DepContext()
ti = Mock(task=task, execution_date=datetime(2016, 1, 1))
ti.get_num_running_task_instances = lambda x: 0
assert TaskConcurrencyDep().is_met(ti=ti, dep_context=dep_context)

def test_reached_concurrency(self):
task = self._get_task(start_date=datetime(2016, 1, 1), task_concurrency=2)
task = self._get_task(start_date=datetime(2016, 1, 1), max_active_tis_per_dag=2)
dep_context = DepContext()
ti = Mock(task=task, execution_date=datetime(2016, 1, 1))
ti.get_num_running_task_instances = lambda x: 1
Expand Down

0 comments on commit 7ffe27d

Please sign in to comment.