Skip to content

Commit

Permalink
Use keyword paramaters for migration methods for mssql (#31309)
Browse files Browse the repository at this point in the history
The new alembic (1.11.0) requires parameters to be keyword
parameters in methods used in migrations. Part of the problem has
been fixed in #31306 and #31302 but still some migrations that are
specifically run in case of mssql need to use migration parameters.

(cherry picked from commit 5479027)
  • Loading branch information
potiuk committed Jun 9, 2023
1 parent 3bbf6fd commit ea8c0a1
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def upgrade():

conn = op.get_bind()
if conn.dialect.name == "mssql":
op.drop_index("ti_pool", table_name="task_instance")
op.drop_index(index_name="ti_pool", table_name="task_instance")

# use batch_alter_table to support SQLite workaround
with op.batch_alter_table("task_instance") as batch_op:
Expand All @@ -75,14 +75,16 @@ def upgrade():
)

if conn.dialect.name == "mssql":
op.create_index("ti_pool", "task_instance", ["pool", "state", "priority_weight"])
op.create_index(
index_name="ti_pool", table_name="task_instance", columns=["pool", "state", "priority_weight"]
)


def downgrade():
"""Make TaskInstance.pool field nullable."""
conn = op.get_bind()
if conn.dialect.name == "mssql":
op.drop_index("ti_pool", table_name="task_instance")
op.drop_index(index_name="ti_pool", table_name="task_instance")

# use batch_alter_table to support SQLite workaround
with op.batch_alter_table("task_instance") as batch_op:
Expand All @@ -93,7 +95,9 @@ def downgrade():
)

if conn.dialect.name == "mssql":
op.create_index("ti_pool", "task_instance", ["pool", "state", "priority_weight"])
op.create_index(
index_name="ti_pool", table_name="task_instance", columns=["pool", "state", "priority_weight"]
)

with create_session() as session:
session.query(TaskInstance).filter(TaskInstance.pool == "default_pool").update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ class SerializedDagModel(Base):
conn = op.get_bind()
if conn.dialect.name != "sqlite":
if conn.dialect.name == "mssql":
op.drop_index("idx_fileloc_hash", "serialized_dag")
op.drop_index(index_name="idx_fileloc_hash", table_name="serialized_dag")

op.alter_column(
table_name="serialized_dag", column_name="fileloc_hash", type_=sa.BigInteger(), nullable=False
)
if conn.dialect.name == "mssql":
op.create_index("idx_fileloc_hash", "serialized_dag", ["fileloc_hash"])
op.create_index(
index_name="idx_fileloc_hash", table_name="serialized_dag", columns=["fileloc_hash"]
)

sessionmaker = sa.orm.sessionmaker()
session = sessionmaker(bind=conn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def upgrade():
if "id" in xcom_columns:
if conn.dialect.name == "mssql":
constraint_dict = get_table_constraints(conn, "xcom")
drop_column_constraints(bop, "id", constraint_dict)
drop_column_constraints(operator=bop, column_name="id", constraint_dict=constraint_dict)
bop.drop_column("id")
bop.drop_index("idx_xcom_dag_task_date")
# mssql doesn't allow primary keys with nullable columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def upgrade():
bop.alter_column("key", type_=StringID(length=512), nullable=False)
bop.alter_column("execution_date", type_=TIMESTAMP, nullable=False)
if conn.dialect.name == "mssql":
bop.create_primary_key("pk_xcom", ["dag_id", "task_id", "key", "execution_date"])
bop.create_primary_key(
constraint_name="pk_xcom", columns=["dag_id", "task_id", "key", "execution_date"]
)


def downgrade():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,8 @@ def downgrade():
constraints = get_mssql_table_constraints(conn, "xcom")
pk, _ = constraints["PRIMARY KEY"].popitem()
op.drop_constraint(pk, "xcom", type_="primary")
op.create_primary_key("pk_xcom", "xcom", ["dag_id", "task_id", "execution_date", "key"])
op.create_primary_key(
constraint_name="pk_xcom",
table_name="xcom",
columns=["dag_id", "task_id", "execution_date", "key"],
)
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
b409a0826f10dc5ef934616fe2d5e72fff322bb1aebe321aad475e4d72c25a4f
4987842fd67d29e194f1117e127d3291ba60d3fbc3e81cba75ce93884c263321
7 changes: 3 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
# specific language governing permissions and limitations
# under the License.
"""Setup.py for the Airflow project."""
# To make sure the CI build is using "upgrade to newer dependencies", which is useful when you want to check
# if the dependencies are still compatible with the latest versions as they seem to break some unrelated
# tests in main, you can modify this file. The modification can be simply modifying this particular comment.
# e.g. you can modify the following number "00001" to something else to trigger it.
# This file can be modified if you want to make sure the CI build is using "upgrade to newer dependencies"
# Which is useful when you want to check if the dependencies are still compatible with the latest versions
# And they seem to break some unrelated tests in main. You can modify this number = 00001 to trigger it.
from __future__ import annotations

import glob
Expand Down

0 comments on commit ea8c0a1

Please sign in to comment.