Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert unittest to pytest #3622

Merged
merged 12 commits into from
Jan 11, 2024
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import unittest

import pytest
from airflow.exceptions import AirflowException
Expand All @@ -16,40 +15,35 @@
TEST_TASK_ID = "wait_task"
DEV_NULL = "/dev/null"

# unittest.TestCase only allow auto-use fixture which can't retrieve the declared fixtures on conftest.py
# TODO: TEST_POOL/DAG_PREFIX constants can be remove after unittest.TestCase are converted to pytest.
TEST_POOL = (
"catalog__tests__dags__common__sensors__test_single_run_external_dags_sensor_pool"
)
DAG_PREFIX = "catalog__tests__dags__common__sensors__test_single_run_external_dags_sensor_dag" # single_run_external_dags_sensor


def run_sensor(sensor):
sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)


def create_task(dag, task_id, external_dag_ids):
def create_task(dag, task_id, sample_pool_fixture):
return SingleRunExternalDAGsSensor(
task_id=task_id,
external_dag_ids=[],
check_existence=True,
dag=dag,
pool=TEST_POOL,
pool=sample_pool_fixture,
poke_interval=5,
mode="reschedule",
)


def create_dag(dag_id, task_id=TEST_TASK_ID):
def create_dag(
dag_id, sample_dag_id_fixture, sample_pool_fixture, task_id=TEST_TASK_ID
):
with DAG(
f"{DAG_PREFIX}_{dag_id}",
f"{sample_dag_id_fixture}_{dag_id}",
default_args={
"owner": "airflow",
"start_date": DEFAULT_DATE,
},
) as dag:
# Create a sensor task inside the DAG
create_task(dag, task_id, [])
create_task(dag, task_id, sample_pool_fixture)

return dag

Expand All @@ -65,6 +59,16 @@ def create_dagrun(dag, dag_state):
)


@pytest.fixture
def setup_pool(sample_pool_fixture):
Pool.create_or_update_pool(
sample_pool_fixture,
slots=1,
description="test pool",
include_deferred=False,
)
krysal marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.usefixtures("clean_db")
# This appears to be coming from Airflow internals during testing as a result of
# loading the example DAGs:
Expand All @@ -80,134 +84,56 @@ def create_dagrun(dag, dag_state):
"ignore:Param `schedule_interval` is deprecated and will be removed in a future release. "
"Please use `schedule` instead.:airflow.exceptions.RemovedInAirflow3Warning"
)
class TestExternalDAGsSensor(unittest.TestCase):
def setUp(self):
Pool.create_or_update_pool(
TEST_POOL,
slots=1,
description="test pool",
include_deferred=False,
)

def test_fails_if_external_dag_does_not_exist(self):
with pytest.raises(
AirflowException,
match="The external DAG nonexistent_dag_id does not exist.",
):
dag = DAG(
"test_missing_dag_error",
default_args={
"owner": "airflow",
"start_date": DEFAULT_DATE,
},
)
sensor = SingleRunExternalDAGsSensor(
task_id=TEST_TASK_ID,
external_dag_ids=[
"nonexistent_dag_id",
],
check_existence=True,
poke_interval=5,
mode="reschedule",
dag=dag,
)

run_sensor(sensor)

def test_fails_if_external_dag_missing_sensor_task(self):
# Loads an example DAG which does not have a Sensor task.
dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
bash_dag = dagbag.dags["example_bash_operator"]
bash_dag.sync_to_db()

error_msg = (
"The external DAG example_bash_operator does not have a task"
f" with id {TEST_TASK_ID}"
)
with pytest.raises(AirflowException, match=error_msg):
dag = DAG(
"test_missing_task_error",
default_args={
"owner": "airflow",
"start_date": DEFAULT_DATE,
},
)
sensor = SingleRunExternalDAGsSensor(
task_id=TEST_TASK_ID,
external_dag_ids=[
"example_bash_operator",
],
check_existence=True,
poke_interval=5,
mode="reschedule",
dag=dag,
)

run_sensor(sensor)

def test_succeeds_if_no_running_dags(self):
# Create some DAGs that are not considered 'running'
successful_dag = create_dag("successful_dag")
create_dagrun(successful_dag, State.SUCCESS)
failed_dag = create_dag("failed_dag")
create_dagrun(failed_dag, State.FAILED)

# DAG in the running state, but its wait task has not been started
queued_dag = create_dag("queued_dag")
create_dagrun(queued_dag, State.RUNNING)

# Create the Test DAG and sensor with dependent dag Ids
def test_fails_if_external_dag_does_not_exist(clean_db, setup_pool):
with pytest.raises(
AirflowException,
match="The external DAG nonexistent_dag_id does not exist.",
):
dag = DAG(
"test_dag_success",
"test_missing_dag_error",
default_args={
"owner": "airflow",
"start_date": DEFAULT_DATE,
},
)
sensor = SingleRunExternalDAGsSensor(
task_id=TEST_TASK_ID,
external_dag_ids=["successful_dag", "failed_dag", "queued_dag"],
external_dag_ids=[
"nonexistent_dag_id",
],
check_existence=True,
poke_interval=5,
mode="reschedule",
dag=dag,
pool=TEST_POOL,
)

with self.assertLogs(sensor.log, level=logging.INFO) as sensor_logs:
run_sensor(sensor)
assert (
"INFO:airflow.task.operators.common.sensors.single_run_external"
"_dags_sensor.SingleRunExternalDAGsSensor:Poking for DAGs "
"['successful_dag', 'failed_dag', 'queued_dag'] ..."
in sensor_logs.output
)
assert (
"INFO:airflow.task.operators.common.sensors.single_run_external"
"_dags_sensor.SingleRunExternalDAGsSensor:0 DAGs are in the "
"running state" in sensor_logs.output
)

def test_retries_if_running_dags_with_completed_sensor_task(self):
# Create a DAG in the 'running' state
running_dag = create_dag("running_dag")
running_dagrun = create_dagrun(running_dag, State.RUNNING)

pool = Pool.get_pool(TEST_POOL)
assert pool.open_slots() == 1

# Run its sensor task and ensure that it succeeds
ti = running_dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = running_dag.get_task(task_id=TEST_TASK_ID)
ti.run()
assert ti.state == State.SUCCESS

# Create a DAG that is not in the running state
successful_dependent_dag = create_dag("success_dag")
create_dagrun(successful_dependent_dag, State.SUCCESS)

# Create the Test DAG and sensor and set up dependent dag Ids
run_sensor(sensor)


@pytest.mark.filterwarnings(
"ignore:This class is deprecated. Please use "
"`airflow.utils.task_group.TaskGroup`.:airflow.exceptions.RemovedInAirflow3Warning"
)
# This also appears to be coming from Airflow internals during testing as a result of
# loading the example bash operator DAG:
# /home/airflow/.local/lib/python3.10/site-packages/airflow/models/dag.py:3492: RemovedInAirflow3Warning # noqa: E501
@pytest.mark.filterwarnings(
"ignore:Param `schedule_interval` is deprecated and will be removed in a future release. "
"Please use `schedule` instead.:airflow.exceptions.RemovedInAirflow3Warning"
)
def test_fails_if_external_dag_missing_sensor_task(clean_db, setup_pool):
# Loads an example DAG which does not have a Sensor task.
dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
bash_dag = dagbag.dags["example_bash_operator"]
bash_dag.sync_to_db()

error_msg = (
"The external DAG example_bash_operator does not have a task"
f" with id {TEST_TASK_ID}"
)
with pytest.raises(AirflowException, match=error_msg):
dag = DAG(
"test_dag_failure",
"test_missing_task_error",
default_args={
"owner": "airflow",
"start_date": DEFAULT_DATE,
Expand All @@ -216,26 +142,127 @@ def test_retries_if_running_dags_with_completed_sensor_task(self):
sensor = SingleRunExternalDAGsSensor(
task_id=TEST_TASK_ID,
external_dag_ids=[
f"{DAG_PREFIX}_success_dag",
f"{DAG_PREFIX}_running_dag",
"example_bash_operator",
],
check_existence=True,
poke_interval=5,
mode="reschedule",
dag=dag,
pool=TEST_POOL,
)

with self.assertLogs(sensor.log, level=logging.INFO) as sensor_logs:
run_sensor(sensor)

assert (
"INFO:airflow.task.operators.common.sensors.single_run_external"
"_dags_sensor.SingleRunExternalDAGsSensor:Poking for DAGs ['"
f"{DAG_PREFIX}_success_dag', '{DAG_PREFIX}_running_dag'] ..."
in sensor_logs.output
)
assert (
"INFO:airflow.task.operators.common.sensors.single_run_external"
"_dags_sensor.SingleRunExternalDAGsSensor:1 DAGs are in the "
"running state" in sensor_logs.output
)
run_sensor(sensor)


@pytest.mark.filterwarnings(
"ignore:This class is deprecated. Please use "
"`airflow.utils.task_group.TaskGroup`.:airflow.exceptions.RemovedInAirflow3Warning"
)
# This also appears to be coming from Airflow internals during testing as a result of
# loading the example bash operator DAG:
# /home/airflow/.local/lib/python3.10/site-packages/airflow/models/dag.py:3492: RemovedInAirflow3Warning # noqa: E501
@pytest.mark.filterwarnings(
"ignore:Param `schedule_interval` is deprecated and will be removed in a future release. "
"Please use `schedule` instead.:airflow.exceptions.RemovedInAirflow3Warning"
)
def test_succeeds_if_no_running_dags(
caplog, sample_dag_id_fixture, sample_pool_fixture, clean_db, setup_pool
):
# Create some DAGs that are not considered 'running'
successful_dag = create_dag(
"successful_dag", sample_dag_id_fixture, sample_pool_fixture
)
create_dagrun(successful_dag, State.SUCCESS)
failed_dag = create_dag("failed_dag", sample_dag_id_fixture, sample_pool_fixture)
create_dagrun(failed_dag, State.FAILED)

# DAG in the running state, but its wait task has not been started
queued_dag = create_dag("queued_dag", sample_dag_id_fixture, sample_pool_fixture)
create_dagrun(queued_dag, State.RUNNING)

# Create the Test DAG and sensor with dependent dag Ids
dag = DAG(
"test_dag_success",
default_args={
"owner": "airflow",
"start_date": DEFAULT_DATE,
},
)
sensor = SingleRunExternalDAGsSensor(
task_id=TEST_TASK_ID,
external_dag_ids=["successful_dag", "failed_dag", "queued_dag"],
poke_interval=5,
mode="reschedule",
dag=dag,
pool=sample_pool_fixture,
)
with caplog.at_level(logging.INFO):
run_sensor(sensor)
assert (
"Poking for DAGs ['successful_dag', 'failed_dag', 'queued_dag'] ..."
in caplog.text
)
assert "0 DAGs are in the running state" in caplog.text


@pytest.mark.filterwarnings(
"ignore:This class is deprecated. Please use "
"`airflow.utils.task_group.TaskGroup`.:airflow.exceptions.RemovedInAirflow3Warning"
)
# This also appears to be coming from Airflow internals during testing as a result of
# loading the example bash operator DAG:
# /home/airflow/.local/lib/python3.10/site-packages/airflow/models/dag.py:3492: RemovedInAirflow3Warning # noqa: E501
@pytest.mark.filterwarnings(
"ignore:Param `schedule_interval` is deprecated and will be removed in a future release. "
"Please use `schedule` instead.:airflow.exceptions.RemovedInAirflow3Warning"
)
ngken0995 marked this conversation as resolved.
Show resolved Hide resolved
def test_retries_if_running_dags_with_completed_sensor_task(
caplog, sample_dag_id_fixture, sample_pool_fixture, clean_db, setup_pool
):
# Create a DAG in the 'running' state
running_dag = create_dag("running_dag", sample_dag_id_fixture, sample_pool_fixture)
running_dagrun = create_dagrun(running_dag, State.RUNNING)

pool = Pool.get_pool(sample_pool_fixture)
assert pool.open_slots() == 1

# Run its sensor task and ensure that it succeeds
ti = running_dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = running_dag.get_task(task_id=TEST_TASK_ID)
ti.run()
assert ti.state == State.SUCCESS

# Create a DAG that is not in the running state
successful_dependent_dag = create_dag(
"success_dag", sample_dag_id_fixture, sample_pool_fixture
)
create_dagrun(successful_dependent_dag, State.SUCCESS)

# Create the Test DAG and sensor and set up dependent dag Ids
dag = DAG(
"test_dag_failure",
default_args={
"owner": "airflow",
"start_date": DEFAULT_DATE,
},
)
sensor = SingleRunExternalDAGsSensor(
task_id=TEST_TASK_ID,
external_dag_ids=[
f"{sample_dag_id_fixture}_success_dag",
f"{sample_dag_id_fixture}_running_dag",
],
poke_interval=5,
mode="reschedule",
dag=dag,
pool=sample_pool_fixture,
)

with caplog.at_level(logging.INFO):
run_sensor(sensor)

assert (
f"Poking for DAGs ['"
f"{sample_dag_id_fixture}_success_dag', '{sample_dag_id_fixture}_running_dag'] ..."
in caplog.text
)
assert "1 DAGs are in the running state" in caplog.text
Loading