Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CVAT][Exchange Oracle] Use Alembic in tests #2620

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/examples/cvat/exchange-oracle/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def run_migrations_offline() -> None:
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
output_buffer=config.output_buffer,
)

with context.begin_transaction():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

"""

import os

from sqlalchemy import Column, String, delete, func, select
from sqlalchemy.orm import declarative_base

from alembic import op

# revision identifiers, used by Alembic.
Expand All @@ -14,6 +19,14 @@
branch_labels = None
depends_on = None

Base = declarative_base()


class Project(Base):
__tablename__ = "projects"
id = Column(String, primary_key=True, index=True)
escrow_address = Column(String(42), unique=False, nullable=False)


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
Expand All @@ -22,6 +35,22 @@ def upgrade() -> None:


def downgrade() -> None:
offline_mode = op.get_context().environment_context.is_offline_mode()
assert offline_mode or "TESTING" in os.environ or "test" in op.get_bind().engine.url, (
"This downgrade deletes data and should only run in a test environment."
"If you are sure you want to run it, set the TESTING environment variable."
)

op.execute(
delete(Project).where(
Project.escrow_address.in_(
select(Project.escrow_address)
.group_by(Project.escrow_address)
.having(func.count(Project.escrow_address) > 1)
)
)
)

# ### commands auto generated by Alembic - please adjust! ###
op.create_unique_constraint("projects_escrow_address_key", "projects", ["escrow_address"])
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import enum

import sqlalchemy as sa
from sqlalchemy import Column, DateTime, Enum, String
from sqlalchemy.orm import Session, declarative_base
from sqlalchemy import Column, DateTime, Enum, String, update
from sqlalchemy.orm import declarative_base
from sqlalchemy.sql import func

from alembic import op
Expand Down Expand Up @@ -51,25 +51,36 @@ class Assignment(Base):


def define_initial_updated_at():
bind = op.get_bind()
session = Session(bind=bind)

session.query(Assignment).filter(
Assignment.updated_at == None,
Assignment.status.in_(
[AssignmentStatuses.expired, AssignmentStatuses.rejected, AssignmentStatuses.canceled]
),
).update({Assignment.updated_at: Assignment.expires_at})

session.query(Assignment).filter(
Assignment.updated_at == None,
Assignment.status == AssignmentStatuses.completed,
).update({Assignment.updated_at: Assignment.completed_at})

session.query(Assignment).filter(
Assignment.updated_at == None,
# fallback for invalid entries above + handling of status == "created"
).update({Assignment.updated_at: Assignment.created_at})
# First update: expired, rejected, and canceled assignments
# using op.execute instead of session.execute to support offline migrations
op.execute(
update(Assignment)
.where(
Assignment.updated_at == None,
Assignment.status.in_(
[
AssignmentStatuses.expired,
AssignmentStatuses.rejected,
AssignmentStatuses.canceled,
]
),
)
.values(updated_at=Assignment.expires_at)
)

# Second update: completed assignments
op.execute(
update(Assignment)
.where(Assignment.updated_at == None, Assignment.status == AssignmentStatuses.completed)
.values(updated_at=Assignment.completed_at)
)

# Third update: fallback for invalid entries and handling status == "created"
op.execute(
update(Assignment)
.where(Assignment.updated_at == None)
.values(updated_at=Assignment.created_at)
)


def upgrade() -> None:
Expand Down
75 changes: 71 additions & 4 deletions packages/examples/cvat/exchange-oracle/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,84 @@
from collections.abc import Generator
from dataclasses import dataclass
from io import StringIO
from pathlib import Path

import pytest
from fastapi.testclient import TestClient
from sqlalchemy import TextClause, text
from sqlalchemy.orm import Session
from sqlalchemy_utils import create_database, database_exists, drop_database

from alembic import command as alembic_command
from alembic.config import Config
from src import app
from src.db import Base, SessionLocal, engine
from src.db import SessionLocal, engine

alembic_config = Config(Path(__file__).parent.parent / "alembic.ini")


@dataclass
class AlembicSQL:
upgrade: TextClause
downgrade: TextClause


@pytest.fixture(scope="session")
def alembic() -> AlembicSQL:
"""
Captures the SQL generated by Alembic for upgrade/downgrade operations.
Doesn't actually run migrations.
"""
alembic_config.output_buffer = StringIO()
alembic_command.upgrade(alembic_config, "head", sql=True)
upgrade_sql = alembic_config.output_buffer.getvalue()

alembic_config.output_buffer = StringIO()
alembic_command.downgrade(alembic_config, "head:base", sql=True)
downgrade_sql = alembic_config.output_buffer.getvalue()

return AlembicSQL(text(upgrade_sql), text(downgrade_sql))


@pytest.fixture(scope="session", autouse=True)
def setup_db(alembic) -> None:
assert "test" in engine.url.database, "The test database must be used for testing."
if database_exists(engine.url):
drop_database(engine.url)
create_database(engine.url)
yield # Run the test cases

# Upgrade to the latest version after all tests are done,
# this helps with inspection of the latest schema.
with engine.connect() as connection:
connection.execute(alembic.upgrade)


@pytest.fixture(autouse=True)
def db():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
def init_db(alembic) -> None:
"""
Runs the recorded Alembic upgrade and downgrade SQL for each test.
This ensures correctness of alembic migrations.
"""
try:
with engine.connect() as connection:
connection.execute(alembic.upgrade)
except Exception as e:
raise RuntimeError(
"Failed to upgrade migrations, `alembic upgrade head` would fail."
" inspect the cause error and change migrations accordingly."
) from e

yield # Run the test case

try:
with engine.connect() as connection:
connection.execute(alembic.downgrade)
except Exception as e:
raise RuntimeError(
"Failed to downgrade migrations, `alembic downgrade head:base` would fail."
" inspect the cause error and change migrations accordingly."
) from e


@pytest.fixture(scope="module")
Expand Down
Loading