Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add FSE Tests #2043

Merged
merged 2 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions backend/lcfs/tests/final_supply_equipment/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import pytest
from datetime import date
from lcfs.web.api.fuel_code.schema import EndUseTypeSchema, EndUserTypeSchema
from lcfs.web.api.final_supply_equipment.schema import (
FinalSupplyEquipmentSchema,
LevelOfEquipmentSchema,
PortsEnum,
FinalSupplyEquipmentCreateSchema,
)


@pytest.fixture
def valid_final_supply_equipment_schema() -> FinalSupplyEquipmentSchema:
return FinalSupplyEquipmentSchema(
final_supply_equipment_id=1,
compliance_report_id=123,
organization_name="Test Org",
supply_from_date=date(2022, 1, 1),
supply_to_date=date(2022, 12, 31),
registration_nbr="TESTORG-A1A1A1-001",
kwh_usage=100.0,
serial_nbr="SER123",
manufacturer="Manufacturer Inc",
model="ModelX",
level_of_equipment=LevelOfEquipmentSchema(
level_of_equipment_id=1,
name="Level1",
description="Test description",
display_order=1,
),
ports=PortsEnum.SINGLE,
intended_use_types=[EndUseTypeSchema(type="Public", end_use_type_id=1)],
intended_user_types=[
EndUserTypeSchema(type_name="General", end_user_type_id=1)
],
street_address="123 Test St",
city="Test City",
postal_code="A1A 1A1",
latitude=12.34,
longitude=56.78,
notes="Some notes",
)


@pytest.fixture
def valid_final_supply_equipment_create_schema() -> FinalSupplyEquipmentCreateSchema:
return FinalSupplyEquipmentCreateSchema(
final_supply_equipment_id=1,
compliance_report_id=123,
organization_name="Test Org",
supply_from_date=date(2022, 1, 1),
supply_to_date=date(2022, 12, 31),
kwh_usage=100.0,
serial_nbr="SER123",
manufacturer="Manufacturer Inc",
model="ModelX",
level_of_equipment="Level2",
intended_uses=[],
intended_users=[],
ports=PortsEnum.SINGLE,
street_address="123 Test St",
city="Test City",
postal_code="A1A 1A1",
latitude=12.34,
longitude=56.78,
notes="Some notes",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
import pytest
from unittest.mock import AsyncMock
from sqlalchemy.ext.asyncio import AsyncSession

from lcfs.web.api.final_supply_equipment.repo import FinalSupplyEquipmentRepository
from lcfs.web.api.base import PaginationRequestSchema


class FakeAsyncContextManager:
async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
pass


class FakeResult:
def __init__(self, result):
self._result = result

def scalars(self):
return self

def all(self):
return self._result

def unique(self):
return self

def scalar_one_or_none(self):
return self._result[0] if self._result else None

def scalar_one(self):
return self._result[0] if self._result else None

def scalar(self):
return self._result[0] if self._result else None


# Fixture for a fake database session.
@pytest.fixture
def fake_db():
db = AsyncMock(spec=AsyncSession)
# Simulate the async nested transaction context manager.
db.begin_nested.return_value = FakeAsyncContextManager()
return db


# Fixture for the repository instance using the fake database.
@pytest.fixture
def repo(fake_db):
return FinalSupplyEquipmentRepository(db=fake_db)


@pytest.mark.anyio
async def test_get_intended_use_types(repo, fake_db):
# Simulate returning two intended use types.
fake_db.execute.return_value = FakeResult(["use_type1", "use_type2"])
result = await repo.get_intended_use_types()
assert result == ["use_type1", "use_type2"]
fake_db.execute.assert_called_once()


@pytest.mark.anyio
async def test_get_intended_use_by_name(repo, fake_db):
# Simulate a lookup returning one intended use type.
fake_db.execute.return_value = FakeResult(["use_typeA"])
result = await repo.get_intended_use_by_name("use_typeA")
assert result == "use_typeA"
fake_db.execute.assert_called_once()


@pytest.mark.anyio
async def test_get_intended_user_types(repo, fake_db):
fake_db.execute.return_value = FakeResult(["user_type1", "user_type2"])
result = await repo.get_intended_user_types()
assert result == ["user_type1", "user_type2"]


@pytest.mark.anyio
async def test_get_organization_names_valid(repo, fake_db):
# Create a dummy organization object with an organization_id.
organization = type("Org", (), {"organization_id": 1})()
# Simulate the query returning tuples with organization names.
fake_db.execute.return_value = FakeResult([("Org1",), ("Org2",)])
result = await repo.get_organization_names(organization)
assert result == ["Org1", "Org2"]


@pytest.mark.anyio
async def test_get_organization_names_invalid(repo, fake_db):
# When organization is None, it should return an empty list.
result = await repo.get_organization_names(None)
assert result == []

# When organization has no organization_id attribute.
organization = type("Org", (), {})()
result = await repo.get_organization_names(organization)
assert result == []


@pytest.mark.anyio
async def test_get_intended_user_by_name(repo, fake_db):
fake_db.execute.return_value = FakeResult(["user_typeA"])
result = await repo.get_intended_user_by_name("user_typeA")
assert result == "user_typeA"


@pytest.mark.anyio
async def test_get_levels_of_equipment(repo, fake_db):
fake_db.execute.return_value = FakeResult(["level1", "level2"])
result = await repo.get_levels_of_equipment()
assert result == ["level1", "level2"]


@pytest.mark.anyio
async def test_get_level_of_equipment_by_name(repo, fake_db):
fake_db.execute.return_value = FakeResult(["levelX"])
result = await repo.get_level_of_equipment_by_name("levelX")
assert result == "levelX"


@pytest.mark.anyio
async def test_get_fse_list(repo, fake_db):
fake_db.execute.return_value = FakeResult(["fse1", "fse2"])
result = await repo.get_fse_list(report_id=10)
assert result == ["fse1", "fse2"]


@pytest.mark.anyio
async def test_get_fse_paginated(repo, fake_db):
# Prepare two responses:
# 1. Count query returns 3 total records.
# 2. The actual paginated query returns 2 items.
count_result = FakeResult([3])
paginated_result = FakeResult(["fse_paginated1", "fse_paginated2"])
fake_db.execute.side_effect = [count_result, paginated_result]

pagination = PaginationRequestSchema(page=1, size=2)
result, total = await repo.get_fse_paginated(pagination, compliance_report_id=20)
assert total == 3
assert result == ["fse_paginated1", "fse_paginated2"]


@pytest.mark.anyio
async def test_get_final_supply_equipment_by_id(repo, fake_db):
fake_db.execute.return_value = FakeResult(["fse_item"])
result = await repo.get_final_supply_equipment_by_id(final_supply_equipment_id=5)
assert result == "fse_item"


@pytest.mark.anyio
async def test_update_final_supply_equipment(repo, fake_db):
fse = "fse_to_update"
# Simulate the merge returning the same object.
fake_db.merge.return_value = fse
updated = await repo.update_final_supply_equipment(fse)
assert updated == fse
fake_db.merge.assert_called_once_with(fse)
fake_db.flush.assert_called_once()
fake_db.refresh.assert_called_once_with(
fse, ["level_of_equipment", "intended_use_types", "intended_user_types"]
)


@pytest.mark.anyio
async def test_create_final_supply_equipment(repo, fake_db):
fse = "new_fse"
created = await repo.create_final_supply_equipment(fse)
assert created == fse
fake_db.add.assert_called_once_with(fse)
fake_db.flush.assert_called_once()
fake_db.refresh.assert_called_once_with(
fse, ["level_of_equipment", "intended_use_types"]
)


@pytest.mark.anyio
async def test_delete_final_supply_equipment(repo, fake_db):
await repo.delete_final_supply_equipment(final_supply_equipment_id=99)
fake_db.execute.assert_called_once() # delete statement was executed
fake_db.flush.assert_called_once()


@pytest.mark.anyio
async def test_get_current_seq_by_org_and_postal_code_existing(repo, fake_db):
fake_db.execute.return_value = FakeResult([5])
seq = await repo.get_current_seq_by_org_and_postal_code("orgCode", "postal123")
assert seq == 5


@pytest.mark.anyio
async def test_get_current_seq_by_org_and_postal_code_none(repo, fake_db):
fake_db.execute.return_value = FakeResult([None])
seq = await repo.get_current_seq_by_org_and_postal_code("orgCode", "postal123")
assert seq == 0


@pytest.mark.anyio
async def test_increment_seq_by_org_and_postal_code_update(repo, fake_db):
# Simulate update returning a new sequence number.
fake_db.execute.return_value = FakeResult([6])
seq = await repo.increment_seq_by_org_and_postal_code("orgCode", "postal123")
assert seq == 6


@pytest.mark.anyio
async def test_increment_seq_by_org_and_postal_code_insert(repo, fake_db):
# Simulate update returning None so that a new record is inserted.
fake_db.execute.return_value = FakeResult([None])
seq = await repo.increment_seq_by_org_and_postal_code("orgCode", "postal123")
assert seq == 1
fake_db.add.assert_called_once()
fake_db.flush.assert_called_once()


@pytest.mark.anyio
async def test_check_uniques_of_fse_row_exists(
repo, fake_db, valid_final_supply_equipment_create_schema
):
fake_db.execute.return_value = FakeResult([True])
exists_result = await repo.check_uniques_of_fse_row(
valid_final_supply_equipment_create_schema
)
assert exists_result is True


@pytest.mark.anyio
async def test_check_uniques_of_fse_row_not_exists(
repo, fake_db, valid_final_supply_equipment_create_schema
):
fake_db.execute.return_value = FakeResult([False])
exists_result = await repo.check_uniques_of_fse_row(
valid_final_supply_equipment_create_schema
)
assert exists_result is False


@pytest.mark.anyio
async def test_check_overlap_of_fse_row_exists(
repo, fake_db, valid_final_supply_equipment_create_schema
):
valid_final_supply_equipment_create_schema.serial_nbr = "OVERLAP1"
fake_db.execute.return_value = FakeResult([True])
overlap = await repo.check_overlap_of_fse_row(
valid_final_supply_equipment_create_schema
)
assert overlap is True


@pytest.mark.anyio
async def test_check_overlap_of_fse_row_not_exists(
repo, fake_db, valid_final_supply_equipment_create_schema
):
fake_db.execute.return_value = FakeResult([False])
overlap = await repo.check_overlap_of_fse_row(
valid_final_supply_equipment_create_schema
)
assert overlap is False


@pytest.mark.anyio
async def test_search_manufacturers(repo, fake_db):
fake_db.execute.return_value = FakeResult(["Manufacturer1", "Manufacturer2"])
results = await repo.search_manufacturers("manu")
assert results == ["Manufacturer1", "Manufacturer2"]
Loading