Skip to content

Commit

Permalink
Merge branch 'main' into feat/42592/patch-connections-fast-api
Browse files Browse the repository at this point in the history
  • Loading branch information
bugraoz93 authored Oct 30, 2024
2 parents bf18241 + 4fc16f1 commit 2b85e78
Show file tree
Hide file tree
Showing 125 changed files with 4,573 additions and 2,794 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ jobs:
breeze release-management prepare-provider-packages standard --package-format wheel --skip-tag-check
- name: "Install Airflow with standard provider for webserver tests"
run: pip install . dist/apache_airflow_providers_standard-*.whl
- name: "Prepare Task SDK package: wheel"
run: >
breeze release-management prepare-task-sdk-package --package-format wheel
- name: "Install Task SDK package"
run: pip install ./dist/apache_airflow_task_sdk-*.whl
- name: "Install Python client"
run: pip install ./dist/apache_airflow_client-*.whl
- name: "Initialize Airflow DB and start webserver"
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ repos:
types_or: [python, pyi]
args: [--fix]
require_serial: true
additional_dependencies: ["ruff==0.7.0"]
additional_dependencies: ["ruff==0.7.1"]
exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py|^performance/tests/test_.*.py
- id: ruff-format
name: Run 'ruff format'
Expand All @@ -370,7 +370,7 @@ repos:
types_or: [python, pyi]
args: []
require_serial: true
additional_dependencies: ["ruff==0.7.0"]
additional_dependencies: ["ruff==0.7.1"]
exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py$
- id: replace-bad-characters
name: Replace bad characters
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.4.27
ARG AIRFLOW_UV_VERSION=0.4.28
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,7 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.4.27
ARG AIRFLOW_UV_VERSION=0.4.28

ENV AIRFLOW_PIP_VERSION=${AIRFLOW_PIP_VERSION} \
AIRFLOW_UV_VERSION=${AIRFLOW_UV_VERSION}
Expand Down
4 changes: 4 additions & 0 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ or you can install all packages needed to run tests for core, providers, and all

You can see the list of all available extras below.

Additionally when you want to develop providers you need to install providers code in editable mode:

pip install -e "./providers"

# Using Hatch to manage your Python, virtualenvs, and build packages

Airflow uses [hatch](https://hatch.pypa.io/) as a build and development tool. It is one of the popular
Expand Down
11 changes: 8 additions & 3 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from typing import TYPE_CHECKING, Collection, Iterable, Iterator, NamedTuple

from sqlalchemy import or_, select
from sqlalchemy import and_, or_, select
from sqlalchemy.orm import lazyload

from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -402,8 +402,13 @@ def set_dag_run_state_to_failed(
select(TaskInstance).filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id == run_id,
TaskInstance.state.not_in(State.finished),
TaskInstance.state.not_in(running_states),
or_(
TaskInstance.state.is_(None),
and_(
TaskInstance.state.not_in(State.finished),
TaskInstance.state.not_in(running_states),
),
),
)
).all()

Expand Down
3 changes: 3 additions & 0 deletions airflow/api_connexion/endpoints/backfill_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from airflow.models.backfill import (
AlreadyRunningBackfill,
Backfill,
ReprocessBehavior,
_cancel_backfill,
_create_backfill,
)
Expand Down Expand Up @@ -151,6 +152,7 @@ def create_backfill(
max_active_runs: int = 10,
reverse: bool = False,
dag_run_conf: dict | None = None,
reprocess_behavior: ReprocessBehavior | None = None,
) -> APIResponse:
try:
backfill_obj = _create_backfill(
Expand All @@ -160,6 +162,7 @@ def create_backfill(
max_active_runs=max_active_runs,
reverse=reverse,
dag_run_conf=dag_run_conf,
reprocess_behavior=reprocess_behavior,
)
return backfill_schema.dump(backfill_obj)
except AlreadyRunningBackfill:
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")


@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.RUN)
@provide_session
@action_logging
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import Log
from airflow.utils import timezone
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session

Expand All @@ -40,6 +41,7 @@
from airflow.api_connexion.types import APIResponse


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.AUDIT_LOG)
@provide_session
def get_event_log(*, event_log_id: int, session: Session = NEW_SESSION) -> APIResponse:
Expand Down
15 changes: 10 additions & 5 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
except _UnsupportedOrderBy as e:
raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported")

ti_query = ti_query.offset(data["page_offset"]).limit(data["page_limit"])
task_instances = session.scalars(ti_query)

return task_instance_collection_schema.dump(
Expand Down Expand Up @@ -533,9 +534,11 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
detail=f"Task instance not found for task {task_id!r} on execution_date {execution_date}"
)

if run_id and not session.get(
TI, {"task_id": task_id, "dag_id": dag_id, "run_id": run_id, "map_index": -1}
):
select_stmt = select(TI).where(
TI.dag_id == dag_id, TI.task_id == task_id, TI.run_id == run_id, TI.map_index == -1
)

if run_id and not session.scalars(select_stmt).one_or_none():
error_message = f"Task instance not found for task {task_id!r} on DAG run with ID {run_id!r}"
raise NotFound(detail=error_message)

Expand Down Expand Up @@ -581,10 +584,12 @@ def patch_task_instance(
if not dag.has_task(task_id):
raise NotFound("Task not found", detail=f"Task {task_id!r} not found in DAG {dag_id!r}")

ti: TI | None = session.get(
TI, {"task_id": task_id, "dag_id": dag_id, "run_id": dag_run_id, "map_index": map_index}
select_stmt = select(TI).where(
TI.dag_id == dag_id, TI.task_id == task_id, TI.run_id == dag_run_id, TI.map_index == map_index
)

ti: TI | None = session.scalars(select_stmt).one_or_none()

if not ti:
error_message = f"Task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
raise NotFound(detail=error_message)
Expand Down
27 changes: 19 additions & 8 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2902,6 +2902,16 @@ components:
type: boolean
nullable: true
description: is_paused
reprocess_behavior:
type: string
default: none
enum:
- none
- failed
- completed
description: |
Controls whether new runs will be created when there's an existing run
for a given logical date.
max_active_runs:
type: integer
nullable: true
Expand Down Expand Up @@ -3069,14 +3079,6 @@ components:
Time when the DAG last received a refresh signal
(e.g. the DAG's "refresh" button was clicked in the web UI)
*New in version 2.3.0*
scheduler_lock:
type: boolean
readOnly: true
nullable: true
description: |
Whether (one of) the scheduler is scheduling this DAG at the moment
*New in version 2.3.0*
pickle_id:
type: string
Expand Down Expand Up @@ -5038,6 +5040,15 @@ components:
ListTaskInstanceForm:
type: object
properties:
page_offset:
type: integer
minimum: 0
description: The number of items to skip before starting to collect the result set.
page_limit:
type: integer
minimum: 1
default: 100
description: The numbers of items to return.
dag_ids:
type: array
items:
Expand Down
25 changes: 23 additions & 2 deletions airflow/api_connexion/schemas/backfill_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,32 @@
# under the License.
from __future__ import annotations

import typing
from typing import NamedTuple

from marshmallow import Schema, fields
from marshmallow import Schema, fields, utils, validate
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow.models.backfill import Backfill, BackfillDagRun
from airflow.models.backfill import Backfill, BackfillDagRun, ReprocessBehavior


class ReprocessBehaviorField(fields.String):
"""Schema for ReprocessBehavior enum."""

def __init__(self, **metadata):
super().__init__(**metadata)
self.validators = [validate.OneOf(ReprocessBehavior), *self.validators]

def _serialize(self, value, attr, obj, **kwargs) -> str | None:
if value is None:
return None
return utils.ensure_text_type(ReprocessBehavior(value).value)

def _deserialize(self, value, attr, data, **kwargs) -> typing.Any:
deser = super()._deserialize(value, attr, data, **kwargs)
if not deser:
return None
return ReprocessBehavior(deser)


class BackfillSchema(SQLAlchemySchema):
Expand All @@ -40,6 +60,7 @@ class Meta:
dag_run_conf = fields.Dict(allow_none=True)
reverse = fields.Boolean()
is_paused = auto_field()
reprocess_behavior = ReprocessBehaviorField()
max_active_runs = auto_field()
created_at = auto_field()
completed_at = auto_field()
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class Meta:
last_parsed_time = auto_field(dump_only=True)
last_pickled = auto_field(dump_only=True)
last_expired = auto_field(dump_only=True)
scheduler_lock = auto_field(dump_only=True)
pickle_id = auto_field(dump_only=True)
default_view = auto_field(dump_only=True)
fileloc = auto_field(dump_only=True)
Expand Down
Loading

0 comments on commit 2b85e78

Please sign in to comment.