Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/apache/airflow
Browse files Browse the repository at this point in the history
  • Loading branch information
rawwar committed Sep 26, 2024
2 parents 136efc6 + 291048a commit be7187f
Show file tree
Hide file tree
Showing 149 changed files with 3,080 additions and 2,205 deletions.
6 changes: 3 additions & 3 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
/airflow/ui/ @bbovenzi @pierrejeambrun @ryanahamilton @jscheffl

# Security/Permissions
/airflow/api_connexion/security.py @jhtimmins
/airflow/security/permissions.py @jhtimmins
/airflow/www/security.py @jhtimmins
/airflow/api_connexion/security.py @vincbeck
/airflow/security/permissions.py @vincbeck
/airflow/www/security.py @vincbeck

# Calendar/Timetables
/airflow/timetables/ @uranusjr
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ airflow.db
airflow/git_version
airflow/www/static/coverage/
airflow/www/*.log
airflow/ui/coverage/
logs/
airflow-webserver.pid
standalone_admin_password.txt
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ repos:
description: TS types generation / ESLint / Prettier new UI files
language: node
types_or: [javascript, ts, tsx, yaml, css, json]
files: ^airflow/ui/|^airflow/api_connexion/openapi/v1\.yaml$
files: ^airflow/ui/|^airflow/api_fastapi/openapi/v1-generated\.yaml$
entry: ./scripts/ci/pre_commit/lint_ui.py
additional_dependencies: ['pnpm@9.7.1']
pass_filenames: false
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ARG AIRFLOW_VERSION="2.10.1"
ARG PYTHON_BASE_IMAGE="python:3.8-slim-bookworm"

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.1
ARG AIRFLOW_UV_VERSION=0.4.7
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,7 @@ ARG DEFAULT_CONSTRAINTS_BRANCH="constraints-main"
ARG AIRFLOW_CI_BUILD_EPOCH="10"
ARG AIRFLOW_PRE_CACHED_PIP_PACKAGES="true"
ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.1
ARG AIRFLOW_UV_VERSION=0.4.7
ARG AIRFLOW_USE_UV="true"
# Setup PIP
# By default PIP install run without cache to make image smaller
Expand All @@ -1286,7 +1286,7 @@ ARG AIRFLOW_VERSION=""
ARG ADDITIONAL_PIP_INSTALL_FLAGS=""

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.1
ARG AIRFLOW_UV_VERSION=0.4.7
ARG AIRFLOW_USE_UV="true"

ENV AIRFLOW_REPO=${AIRFLOW_REPO}\
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,5 +352,6 @@ def create_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
)
if not dataset_event:
raise NotFound(title="Dataset not found", detail=f"Dataset with uri: '{uri}' not found")
session.flush() # So we can dump the timestamp.
event = dataset_event_schema.dump(dataset_event)
return event
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def get_xcom_entry(
stub.value = XCom.deserialize_value(stub)
item = stub

if stringify:
if stringify or conf.getboolean("core", "enable_xcom_pickling"):
return xcom_schema_string.dump(item)

return xcom_schema_native.dump(item)
8 changes: 6 additions & 2 deletions airflow/api_connexion/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ def __init__(
)


class AlreadyExists(ProblemException):
"""Raise when the object already exists."""
class Conflict(ProblemException):
"""Raise when there is some conflict."""

def __init__(
self,
Expand All @@ -173,6 +173,10 @@ def __init__(
)


class AlreadyExists(Conflict):
"""Raise when the object already exists."""


class Unknown(ProblemException):
"""Returns a response body and status code for HTTP 500 exception."""

Expand Down
2 changes: 2 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,8 @@ paths:
If set to true (default) the Any value will be returned as string, e.g. a Python representation
of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored.
This parameter is not meaningful when using XCom pickling, then it is always returned as string.
*New in version 2.10.0*
responses:
"200":
Expand Down
13 changes: 6 additions & 7 deletions airflow/api_connexion/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,12 @@ def callback():
if dag_id or access or access_entity:
return access

# No DAG id is provided, the user is not authorized to access all DAGs and authorization is done
# on DAG level
# If method is "GET", return whether the user has read access to any DAGs
# If method is "PUT", return whether the user has edit access to any DAGs
return (method == "GET" and any(get_auth_manager().get_permitted_dag_ids(methods=["GET"]))) or (
method == "PUT" and any(get_auth_manager().get_permitted_dag_ids(methods=["PUT"]))
)
# dag_id is not provided, and the user is not authorized to access *all* DAGs
# so we check that the user can access at least *one* dag
# but we leave it to the endpoint function to properly restrict access beyond that
if method not in ("GET", "PUT"):
return False
return any(get_auth_manager().get_permitted_dag_ids(methods=[method]))

return callback

Expand Down
11 changes: 11 additions & 0 deletions airflow/api_fastapi/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

if TYPE_CHECKING:
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select

from airflow.api_fastapi.parameters import BaseParam


async def get_session() -> Session:
Expand All @@ -41,3 +44,11 @@ def your_route(session: Annotated[Session, Depends(get_session)]):
"""
with create_session() as session:
yield session


def apply_filters_to_select(base_select: Select, filters: list[BaseParam]) -> Select:
select = base_select
for filter in filters:
select = filter.to_orm(select)

return select
24 changes: 19 additions & 5 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ paths:
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
type: array
items:
type: string
title: Tags
- name: owners
in: query
required: false
schema:
type: array
items:
type: string
title: Owners
- name: dag_id_pattern
in: query
required: false
Expand All @@ -74,6 +80,14 @@ paths:
- type: string
- type: 'null'
title: Dag Id Pattern
- name: dag_display_name_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Display Name Pattern
- name: only_active
in: query
required: false
Expand Down
209 changes: 209 additions & 0 deletions airflow/api_fastapi/parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Generic, List, TypeVar, Union

from fastapi import Depends, HTTPException, Query
from sqlalchemy import case, or_
from typing_extensions import Annotated, Self

from airflow.models.dag import DagModel, DagTag

if TYPE_CHECKING:
from sqlalchemy.sql import ColumnElement, Select

T = TypeVar("T")


class BaseParam(Generic[T], ABC):
"""Base class for filters."""

def __init__(self) -> None:
self.value: T | None = None
self.attribute: ColumnElement | None = None

@abstractmethod
def to_orm(self, select: Select) -> Select:
pass

@abstractmethod
def __call__(self, *args: Any, **kwarg: Any) -> BaseParam:
pass

def set_value(self, value: T) -> Self:
self.value = value
return self


class _LimitFilter(BaseParam[int]):
"""Filter on the limit."""

def to_orm(self, select: Select) -> Select:
if self.value is None:
return select

return select.limit(self.value)

def __call__(self, limit: int = 100) -> _LimitFilter:
return self.set_value(limit)


class _OffsetFilter(BaseParam[int]):
"""Filter on offset."""

def to_orm(self, select: Select) -> Select:
if self.value is None:
return select
return select.offset(self.value)

def __call__(self, offset: int = 0) -> _OffsetFilter:
return self.set_value(offset)


class _PausedFilter(BaseParam[Union[bool, None]]):
"""Filter on is_paused."""

def to_orm(self, select: Select) -> Select:
if self.value is None:
return select
return select.where(DagModel.is_paused == self.value)

def __call__(self, paused: bool | None = Query(default=None)) -> _PausedFilter:
return self.set_value(paused)


class _OnlyActiveFilter(BaseParam[bool]):
"""Filter on is_active."""

def to_orm(self, select: Select) -> Select:
if self.value:
return select.where(DagModel.is_active == self.value)
return select

def __call__(self, only_active: bool = Query(default=True)) -> _OnlyActiveFilter:
return self.set_value(only_active)


class _SearchParam(BaseParam[Union[str, None]]):
"""Search on attribute."""

def __init__(self, attribute: ColumnElement) -> None:
super().__init__()
self.attribute: ColumnElement = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None:
return select
return select.where(self.attribute.ilike(f"%{self.value}"))


class _DagIdPatternSearch(_SearchParam):
"""Search on dag_id."""

def __init__(self) -> None:
super().__init__(DagModel.dag_id)

def __call__(self, dag_id_pattern: str | None = Query(default=None)) -> _DagIdPatternSearch:
return self.set_value(dag_id_pattern)


class _DagDisplayNamePatternSearch(_SearchParam):
"""Search on dag_display_name."""

def __init__(self) -> None:
super().__init__(DagModel.dag_display_name)

def __call__(
self, dag_display_name_pattern: str | None = Query(default=None)
) -> _DagDisplayNamePatternSearch:
return self.set_value(dag_display_name_pattern)


class SortParam(BaseParam[Union[str]]):
"""Order result by the attribute."""

def __init__(self, allowed_attrs: list[str]) -> None:
super().__init__()
self.allowed_attrs = allowed_attrs

def to_orm(self, select: Select) -> Select:
if self.value is None:
return select

lstriped_orderby = self.value.lstrip("-")
if self.allowed_attrs and lstriped_orderby not in self.allowed_attrs:
raise HTTPException(
400,
f"Ordering with '{lstriped_orderby}' is disallowed or "
f"the attribute does not exist on the model",
)

column = getattr(DagModel, lstriped_orderby)

# MySQL does not support `nullslast`, and True/False ordering depends on the
# database implementation
nullscheck = case((column.isnot(None), 0), else_=1)
if self.value[0] == "-":
return select.order_by(nullscheck, column.desc(), DagModel.dag_id)
else:
return select.order_by(nullscheck, column.asc(), DagModel.dag_id)

def __call__(self, order_by: str = Query(default="dag_id")) -> SortParam:
return self.set_value(order_by)


class _TagsFilter(BaseParam[List[str]]):
"""Filter on tags."""

def to_orm(self, select: Select) -> Select:
if not self.value:
return select

conditions = [DagModel.tags.any(DagTag.name == tag) for tag in self.value]
return select.where(or_(*conditions))

def __call__(self, tags: list[str] = Query(default_factory=list)) -> _TagsFilter:
return self.set_value(tags)


class _OwnersFilter(BaseParam[List[str]]):
"""Filter on owners."""

def to_orm(self, select: Select) -> Select:
if not self.value:
return select

conditions = [DagModel.owners.ilike(f"%{owner}%") for owner in self.value]
return select.where(or_(*conditions))

def __call__(self, owners: list[str] = Query(default_factory=list)) -> _OwnersFilter:
return self.set_value(owners)


QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter())]
QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter())]
QueryPausedFilter = Annotated[_PausedFilter, Depends(_PausedFilter())]
QueryOnlyActiveFilter = Annotated[_OnlyActiveFilter, Depends(_OnlyActiveFilter())]
QueryDagIdPatternSearch = Annotated[_DagIdPatternSearch, Depends(_DagIdPatternSearch())]
QueryDagDisplayNamePatternSearch = Annotated[
_DagDisplayNamePatternSearch, Depends(_DagDisplayNamePatternSearch())
]
QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter())]
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter())]
Loading

0 comments on commit be7187f

Please sign in to comment.