From ea8c0a1f0fef11da86f26109b98c88b26a296ecd Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 16 May 2023 10:33:35 +0200 Subject: [PATCH] Use keyword paramaters for migration methods for mssql (#31309) The new alembic (1.11.0) requires parameters to be keyword parameters in methods used in migrations. Part of the problem has been fixed in #31306 and #31302 but still some migrations that are specifically run in case of mssql need to use migration parameters. (cherry picked from commit 54790274c2edf39c06f494a900bc85523349a77f) --- ...043_1_10_4_make_taskinstance_pool_not_nullable.py | 12 ++++++++---- .../versions/0054_1_10_10_add_dag_code_table.py | 6 ++++-- .../0060_2_0_0_remove_id_column_from_xcom.py | 2 +- ...0089_2_2_0_make_xcom_pkey_columns_non_nullable.py | 4 +++- .../0102_2_3_0_switch_xcom_table_to_use_run_id.py | 6 +++++- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- setup.py | 7 +++---- 7 files changed, 25 insertions(+), 14 deletions(-) diff --git a/airflow/migrations/versions/0043_1_10_4_make_taskinstance_pool_not_nullable.py b/airflow/migrations/versions/0043_1_10_4_make_taskinstance_pool_not_nullable.py index 52069c56daba3..89bf8f6e6046d 100644 --- a/airflow/migrations/versions/0043_1_10_4_make_taskinstance_pool_not_nullable.py +++ b/airflow/migrations/versions/0043_1_10_4_make_taskinstance_pool_not_nullable.py @@ -64,7 +64,7 @@ def upgrade(): conn = op.get_bind() if conn.dialect.name == "mssql": - op.drop_index("ti_pool", table_name="task_instance") + op.drop_index(index_name="ti_pool", table_name="task_instance") # use batch_alter_table to support SQLite workaround with op.batch_alter_table("task_instance") as batch_op: @@ -75,14 +75,16 @@ def upgrade(): ) if conn.dialect.name == "mssql": - op.create_index("ti_pool", "task_instance", ["pool", "state", "priority_weight"]) + op.create_index( + index_name="ti_pool", table_name="task_instance", columns=["pool", "state", "priority_weight"] + ) def downgrade(): """Make TaskInstance.pool field nullable.""" conn = op.get_bind() if conn.dialect.name == "mssql": - op.drop_index("ti_pool", table_name="task_instance") + op.drop_index(index_name="ti_pool", table_name="task_instance") # use batch_alter_table to support SQLite workaround with op.batch_alter_table("task_instance") as batch_op: @@ -93,7 +95,9 @@ def downgrade(): ) if conn.dialect.name == "mssql": - op.create_index("ti_pool", "task_instance", ["pool", "state", "priority_weight"]) + op.create_index( + index_name="ti_pool", table_name="task_instance", columns=["pool", "state", "priority_weight"] + ) with create_session() as session: session.query(TaskInstance).filter(TaskInstance.pool == "default_pool").update( diff --git a/airflow/migrations/versions/0054_1_10_10_add_dag_code_table.py b/airflow/migrations/versions/0054_1_10_10_add_dag_code_table.py index e0628d142d9bb..4c25b867d22c2 100644 --- a/airflow/migrations/versions/0054_1_10_10_add_dag_code_table.py +++ b/airflow/migrations/versions/0054_1_10_10_add_dag_code_table.py @@ -63,13 +63,15 @@ class SerializedDagModel(Base): conn = op.get_bind() if conn.dialect.name != "sqlite": if conn.dialect.name == "mssql": - op.drop_index("idx_fileloc_hash", "serialized_dag") + op.drop_index(index_name="idx_fileloc_hash", table_name="serialized_dag") op.alter_column( table_name="serialized_dag", column_name="fileloc_hash", type_=sa.BigInteger(), nullable=False ) if conn.dialect.name == "mssql": - op.create_index("idx_fileloc_hash", "serialized_dag", ["fileloc_hash"]) + op.create_index( + index_name="idx_fileloc_hash", table_name="serialized_dag", columns=["fileloc_hash"] + ) sessionmaker = sa.orm.sessionmaker() session = sessionmaker(bind=conn) diff --git a/airflow/migrations/versions/0060_2_0_0_remove_id_column_from_xcom.py b/airflow/migrations/versions/0060_2_0_0_remove_id_column_from_xcom.py index 64ccc753a5bf0..9a22410d0d18e 100644 --- a/airflow/migrations/versions/0060_2_0_0_remove_id_column_from_xcom.py +++ b/airflow/migrations/versions/0060_2_0_0_remove_id_column_from_xcom.py @@ -102,7 +102,7 @@ def upgrade(): if "id" in xcom_columns: if conn.dialect.name == "mssql": constraint_dict = get_table_constraints(conn, "xcom") - drop_column_constraints(bop, "id", constraint_dict) + drop_column_constraints(operator=bop, column_name="id", constraint_dict=constraint_dict) bop.drop_column("id") bop.drop_index("idx_xcom_dag_task_date") # mssql doesn't allow primary keys with nullable columns diff --git a/airflow/migrations/versions/0089_2_2_0_make_xcom_pkey_columns_non_nullable.py b/airflow/migrations/versions/0089_2_2_0_make_xcom_pkey_columns_non_nullable.py index 68ef900a2db98..4be616c6a70aa 100644 --- a/airflow/migrations/versions/0089_2_2_0_make_xcom_pkey_columns_non_nullable.py +++ b/airflow/migrations/versions/0089_2_2_0_make_xcom_pkey_columns_non_nullable.py @@ -43,7 +43,9 @@ def upgrade(): bop.alter_column("key", type_=StringID(length=512), nullable=False) bop.alter_column("execution_date", type_=TIMESTAMP, nullable=False) if conn.dialect.name == "mssql": - bop.create_primary_key("pk_xcom", ["dag_id", "task_id", "key", "execution_date"]) + bop.create_primary_key( + constraint_name="pk_xcom", columns=["dag_id", "task_id", "key", "execution_date"] + ) def downgrade(): diff --git a/airflow/migrations/versions/0102_2_3_0_switch_xcom_table_to_use_run_id.py b/airflow/migrations/versions/0102_2_3_0_switch_xcom_table_to_use_run_id.py index b8b06e802c931..5a0702a682ac6 100644 --- a/airflow/migrations/versions/0102_2_3_0_switch_xcom_table_to_use_run_id.py +++ b/airflow/migrations/versions/0102_2_3_0_switch_xcom_table_to_use_run_id.py @@ -172,4 +172,8 @@ def downgrade(): constraints = get_mssql_table_constraints(conn, "xcom") pk, _ = constraints["PRIMARY KEY"].popitem() op.drop_constraint(pk, "xcom", type_="primary") - op.create_primary_key("pk_xcom", "xcom", ["dag_id", "task_id", "execution_date", "key"]) + op.create_primary_key( + constraint_name="pk_xcom", + table_name="xcom", + columns=["dag_id", "task_id", "execution_date", "key"], + ) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index ded035ba22f23..1f2c2c1f3419a 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -b409a0826f10dc5ef934616fe2d5e72fff322bb1aebe321aad475e4d72c25a4f \ No newline at end of file +4987842fd67d29e194f1117e127d3291ba60d3fbc3e81cba75ce93884c263321 \ No newline at end of file diff --git a/setup.py b/setup.py index 0d387de46104d..77506af9e3dfa 100644 --- a/setup.py +++ b/setup.py @@ -16,10 +16,9 @@ # specific language governing permissions and limitations # under the License. """Setup.py for the Airflow project.""" -# To make sure the CI build is using "upgrade to newer dependencies", which is useful when you want to check -# if the dependencies are still compatible with the latest versions as they seem to break some unrelated -# tests in main, you can modify this file. The modification can be simply modifying this particular comment. -# e.g. you can modify the following number "00001" to something else to trigger it. +# This file can be modified if you want to make sure the CI build is using "upgrade to newer dependencies" +# Which is useful when you want to check if the dependencies are still compatible with the latest versions +# And they seem to break some unrelated tests in main. You can modify this number = 00001 to trigger it. from __future__ import annotations import glob