Skip to content

Commit

Permalink
fix(backend): various fixes based on sentry error reports (#2053)
Browse files Browse the repository at this point in the history
* fix(db): update connection handling for background task updates

* feat(db): implement batch processing for entity updates in the database

* fix(submission_crud): correct attachment key and await  db commit

* fix(db): update connection handling in DbBackgroundTask for cursor usage

* fix(backend): also get a new db conn from pool when updating basemap model

---------

Co-authored-by: spwoodcock <sam.woodcock@protonmail.com>
  • Loading branch information
Anuj-Gupta4 and spwoodcock authored Jan 9, 2025
1 parent 34c86d8 commit 19a7eed
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 43 deletions.
104 changes: 63 additions & 41 deletions src/backend/app/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from app.central.central_schemas import ODKCentralDecrypted
from app.config import settings
from app.db import database
from app.db.enums import (
BackgroundTaskStatus,
CommunityType,
Expand Down Expand Up @@ -1387,58 +1388,69 @@ async def upsert(
db: Connection,
project_id: int,
entities: list[Self],
batch_size: int = 10000,
) -> bool:
"""Update or insert Entity data, with statuses.
"""Update or insert Entity data in batches, with statuses.
Args:
db (Connection): The database connection.
project_id (int): The project ID.
entities (list[Self]): List of DbOdkEntities objects.
batch_size (int): Number of entities to process in each batch.
Returns:
bool: Success or failure.
"""
log.info(
f"Updating FMTM database Entities for project {project_id} "
f"with ({len(entities)}) features"
f"with ({len(entities)}) features in batches of {batch_size}"
)

sql = """
INSERT INTO public.odk_entities
(entity_id, status, project_id, task_id)
VALUES
"""
result = []

# Prepare data for bulk insert
values = []
data = {}
for index, entity in enumerate(entities):
entity_index = f"entity_{index}"
values.append(
f"(%({entity_index}_entity_id)s, "
f"%({entity_index}_status)s, "
f"%({entity_index}_project_id)s, "
f"%({entity_index}_task_id)s)"
)
data[f"{entity_index}_entity_id"] = entity["id"]
data[f"{entity_index}_status"] = EntityState(int(entity["status"])).name
data[f"{entity_index}_project_id"] = project_id
task_id = entity["task_id"]
data[f"{entity_index}_task_id"] = int(task_id) if task_id else None
for batch_start in range(0, len(entities), batch_size):
batch = entities[batch_start : batch_start + batch_size]
sql = """
INSERT INTO public.odk_entities
(entity_id, status, project_id, task_id)
VALUES
"""

sql += (
", ".join(values)
+ """
ON CONFLICT (entity_id) DO UPDATE SET
status = EXCLUDED.status,
task_id = EXCLUDED.task_id
RETURNING True;
"""
)
# Prepare data for batch insert
values = []
data = {}
for index, entity in enumerate(batch):
entity_index = f"entity_{batch_start + index}"
values.append(
f"(%({entity_index}_entity_id)s, "
f"%({entity_index}_status)s, "
f"%({entity_index}_project_id)s, "
f"%({entity_index}_task_id)s)"
)
data[f"{entity_index}_entity_id"] = entity["id"]
data[f"{entity_index}_status"] = EntityState(int(entity["status"])).name
data[f"{entity_index}_project_id"] = project_id
task_id = entity["task_id"]
data[f"{entity_index}_task_id"] = int(task_id) if task_id else None

sql += (
", ".join(values)
+ """
ON CONFLICT (entity_id) DO UPDATE SET
status = EXCLUDED.status,
task_id = EXCLUDED.task_id
RETURNING True;
"""
)

async with db.cursor() as cur:
await cur.execute(sql, data)
result = await cur.fetchall()
async with db.cursor() as cur:
await cur.execute(sql, data)
batch_result = await cur.fetchall()
if not batch_result:
log.warning(
f"Batch failed at batch {batch_start} for project {project_id}"
)
result.extend(batch_result)

return bool(result)

Expand Down Expand Up @@ -1534,9 +1546,14 @@ async def update(
RETURNING *;
"""

async with db.cursor(row_factory=class_row(cls)) as cur:
await cur.execute(sql, {"task_id": task_id, **model_dump})
updated_task = await cur.fetchone()
# This is a workaround as the db connection can often timeout,
# before the background job is finished processing
pool = database.get_db_connection_pool()
async with pool as pool_instance:
async with pool_instance.connection() as conn:
async with conn.cursor(row_factory=class_row(cls)) as cur:
await cur.execute(sql, {"task_id": task_id, **model_dump})
updated_task = await cur.fetchone()

if updated_task is None:
msg = f"Failed to update background task with ID: {task_id}"
Expand Down Expand Up @@ -1690,9 +1707,14 @@ async def update(
RETURNING *;
"""

async with db.cursor(row_factory=class_row(cls)) as cur:
await cur.execute(sql, {"basemap_id": basemap_id, **model_dump})
updated_basemap = await cur.fetchone()
# This is a workaround as the db connection can often timeout,
# before the basemap is finished processing
pool = database.get_db_connection_pool()
async with pool as pool_instance:
async with pool_instance.connection() as conn:
async with conn.cursor(row_factory=class_row(cls)) as cur:
await cur.execute(sql, {"basemap_id": basemap_id, **model_dump})
updated_basemap = await cur.fetchone()

if updated_basemap is None:
msg = f"Failed to update basemap with ID: {basemap_id}"
Expand Down
4 changes: 2 additions & 2 deletions src/backend/app/submissions/submission_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ async def upload_attachment_to_s3(
batch_insert_data = []
for instance_id in instance_ids:
submission_detail = await get_submission_detail(instance_id, project)
attachments = submission_detail["verification"]["image"]
attachments = submission_detail["image"]

if not isinstance(attachments, list):
attachments = [attachments]
Expand Down Expand Up @@ -358,7 +358,7 @@ async def upload_attachment_to_s3(
""",
batch_insert_data,
)
db.commit()
await db.commit()
return True

except Exception as e:
Expand Down

0 comments on commit 19a7eed

Please sign in to comment.