Skip to content

Commit

Permalink
[Issue #3322] Transform opportunity attachment files (#3486)
Browse files Browse the repository at this point in the history
## Summary
Fixes #3322

### Time to review: __10 mins__

## Changes proposed
A lot of file utilities (used in this PR) for handling
reading/writing/naming files on s3

Utility for setting up s3 file paths for the attachments

Logic to handle inserts/updates/deletes of attachments and the files
that need to move around on s3.

## Context for reviewers
There are some scenarios I haven't accounted for yet when the
opportunity itself is modified (deleted / is no longer a draft), I
originally wanted to handle this in a single PR, but I'll split that out
as this one already was getting too big.

See the ticket for details on the scenarios we need to handle.

## Additional information
Testing this is a bit tedious - there is a lot that needs to be setup
exactly to test it.

I'd recommend nuking anything you already have with `make
volume-recreate`

Set the env var to enable the job to run (add
`TRANSFORM_ORACLE_DATA_ENABLE_OPPORTUNITY_ATTACHMENT=1` to override.env)

Run `make console` and in that do
`f.StagingTsynopsisAttachmentFactory.create_batch(size=50)` and then
`exit()`

Finally you can run the job by doing `make cmd args="data-migration
load-transform --no-load --transform --no-set-current"`
  • Loading branch information
chouinar authored Jan 17, 2025
1 parent 45834c5 commit b4ccc44
Show file tree
Hide file tree
Showing 18 changed files with 716 additions and 75 deletions.
23 changes: 15 additions & 8 deletions api/bin/setup_localstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import src.logging
from src.adapters.aws import S3Config, get_s3_client
from src.util import file_util
from src.util.local import error_if_not_local

logger = logging.getLogger(__name__)
Expand All @@ -26,6 +27,14 @@ def does_s3_bucket_exist(s3_client: botocore.client.BaseClient, bucket_name: str
return False


def create_bucket_if_not_exists(s3_client: botocore.client.BaseClient, bucket_name: str) -> None:
if not does_s3_bucket_exist(s3_client, bucket_name):
logger.info("Creating S3 bucket %s", bucket_name)
s3_client.create_bucket(Bucket=bucket_name)
else:
logger.info("S3 bucket %s already exists - skipping creation", bucket_name)


def setup_s3() -> None:
s3_config = S3Config()
# This is only used locally - to avoid any accidental running of commands here
Expand All @@ -35,14 +44,12 @@ def setup_s3() -> None:
s3_config, boto3.Session(aws_access_key_id="NO_CREDS", aws_secret_access_key="NO_CREDS")
)

if s3_config.s3_opportunity_bucket is None:
raise Exception("S3_OPPORTUNITY_BUCKET env var must be set")

if not does_s3_bucket_exist(s3_client, s3_config.s3_opportunity_bucket):
logger.info("Creating S3 bucket %s", s3_config.s3_opportunity_bucket)
s3_client.create_bucket(Bucket=s3_config.s3_opportunity_bucket)
else:
logger.info("S3 bucket %s already exists - skipping", s3_config.s3_opportunity_bucket)
create_bucket_if_not_exists(
s3_client, file_util.get_s3_bucket(s3_config.public_files_bucket_path)
)
create_bucket_if_not_exists(
s3_client, file_util.get_s3_bucket(s3_config.draft_files_bucket_path)
)


def main() -> None:
Expand Down
4 changes: 3 additions & 1 deletion api/local.env
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ S3_ENDPOINT_URL=http://localstack:4566
# S3
############################

S3_OPPORTUNITY_BUCKET=local-opportunities
# Our terraform sets these as s3 paths, so include s3:// on the bucket name
PUBLIC_FILES_BUCKET=s3://local-mock-public-bucket
DRAFT_FILES_BUCKET=s3://local-mock-draft-bucket

# This env var is used to set local AWS credentials
IS_LOCAL_AWS=1
Expand Down
7 changes: 4 additions & 3 deletions api/src/adapters/aws/s3_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import boto3
import botocore.client
import botocore.config
from pydantic import Field

from src.util.env_config import PydanticBaseEnvConfig

Expand All @@ -16,9 +17,9 @@ class S3Config(PydanticBaseEnvConfig):
# so that we don't need to set all of these for every
# process that uses S3

# TODO - I'm not sure how we want to organize our
# s3 buckets so this will likely change in the future
s3_opportunity_bucket: str | None = None
# Note these env vars get set as "s3://..."
public_files_bucket_path: str = Field(alias="PUBLIC_FILES_BUCKET")
draft_files_bucket_path: str = Field(alias="DRAFT_FILES_BUCKET")


def get_s3_client(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
import logging
from typing import Tuple
from typing import Tuple, cast

import src.data_migration.transformation.transform_constants as transform_constants
import src.data_migration.transformation.transform_util as transform_util
from src.adapters.aws import S3Config
from src.data_migration.transformation.subtask.abstract_transform_subtask import (
AbstractTransformSubTask,
)
from src.db.models.opportunity_models import Opportunity
from src.db.models.staging.opportunity import Topportunity
from src.services.opportunity_attachments import attachment_util
from src.task.task import Task
from src.util import file_util

logger = logging.getLogger(__name__)


class TransformOpportunity(AbstractTransformSubTask):

def __init__(self, task: Task, s3_config: S3Config | None = None):
super().__init__(task)

if s3_config is None:
s3_config = S3Config()

self.s3_config = s3_config

def transform_records(self) -> None:
# Fetch all opportunities that were modified
# Alongside that, grab the existing opportunity record
Expand Down Expand Up @@ -53,11 +66,18 @@ def process_opportunity(
extra,
)

# Cleanup the attachments from s3
if target_opportunity is not None:
for attachment in target_opportunity.opportunity_attachments:
file_util.delete_file(attachment.file_location)

else:
# To avoid incrementing metrics for records we fail to transform, record
# here whether it's an insert/update and we'll increment after transforming
is_insert = target_opportunity is None

was_draft = target_opportunity.is_draft if target_opportunity else None

logger.info("Transforming and upserting opportunity", extra=extra)
transformed_opportunity = transform_util.transform_opportunity(
source_opportunity, target_opportunity
Expand All @@ -76,5 +96,23 @@ def process_opportunity(
)
self.db_session.merge(transformed_opportunity)

# If an opportunity went from being a draft to not a draft (published)
# then we need to move all of its attachments to the public bucket
# from the draft s3 bucket.
if was_draft and transformed_opportunity.is_draft is False:
for attachment in cast(Opportunity, target_opportunity).opportunity_attachments:
# Determine the new path
file_name = attachment_util.adjust_legacy_file_name(attachment.file_name)
s3_path = attachment_util.get_s3_attachment_path(
file_name,
attachment.attachment_id,
transformed_opportunity,
self.s3_config,
)

# Move the file
file_util.move_file(attachment.file_location, s3_path)
attachment.file_location = s3_path

logger.info("Processed opportunity", extra=extra)
source_opportunity.transformed_at = self.transform_time
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,30 @@

import src.data_migration.transformation.transform_constants as transform_constants
import src.data_migration.transformation.transform_util as transform_util
from src.adapters.aws import S3Config
from src.constants.lookup_constants import OpportunityAttachmentType
from src.data_migration.transformation.subtask.abstract_transform_subtask import (
AbstractTransformSubTask,
)
from src.db.models.opportunity_models import Opportunity, OpportunityAttachment
from src.db.models.staging.attachment import TsynopsisAttachment
from src.services.opportunity_attachments import attachment_util
from src.task.task import Task
from src.util import file_util

logger = logging.getLogger(__name__)


class TransformOpportunityAttachment(AbstractTransformSubTask):

def __init__(self, task: Task, s3_config: S3Config | None = None):
super().__init__(task)

if s3_config is None:
s3_config = S3Config()

self.s3_config = s3_config

def transform_records(self) -> None:

# Fetch staging attachment / our attachment / opportunity groups
Expand Down Expand Up @@ -63,16 +75,17 @@ def process_opportunity_attachment(
logger.info("Processing opportunity attachment", extra=extra)

if source_attachment.is_deleted:
# TODO - https://github.com/HHS/simpler-grants-gov/issues/3322
# deletes are more complex because of s3
# this just handles deleting the DB record at the moment
self._handle_delete(
source=source_attachment,
target=target_attachment,
record_type=transform_constants.OPPORTUNITY_ATTACHMENT,
extra=extra,
)

# Delete the file from s3 as well
if target_attachment is not None:
file_util.delete_file(target_attachment.file_location)

elif opportunity is None:
# This shouldn't be possible as the incoming data has foreign keys, but as a safety net
# we'll make sure the opportunity actually exists
Expand All @@ -85,13 +98,29 @@ def process_opportunity_attachment(
# here whether it's an insert/update and we'll increment after transforming
is_insert = target_attachment is None

prior_attachment_location = (
target_attachment.file_location if target_attachment else None
)

logger.info("Transforming and upserting opportunity attachment", extra=extra)

transformed_opportunity_attachment = transform_opportunity_attachment(
source_attachment, target_attachment
source_attachment, target_attachment, opportunity, self.s3_config
)

# TODO - we'll need to handle more with the s3 files here
# Write the file to s3
write_file(source_attachment, transformed_opportunity_attachment)

# If this was an update, and the file name changed
# Cleanup the old file from s3.
if (
prior_attachment_location is not None
and prior_attachment_location != transformed_opportunity_attachment.file_location
):
file_util.delete_file(prior_attachment_location)

logger.info("Transforming and upserting opportunity attachment", extra=extra)

if is_insert:
self.increment(
transform_constants.Metrics.TOTAL_RECORDS_INSERTED,
Expand All @@ -110,26 +139,38 @@ def process_opportunity_attachment(


def transform_opportunity_attachment(
source_attachment: TsynopsisAttachment, incoming_attachment: OpportunityAttachment | None
source_attachment: TsynopsisAttachment,
incoming_attachment: OpportunityAttachment | None,
opportunity: Opportunity,
s3_config: S3Config,
) -> OpportunityAttachment:

log_extra = transform_util.get_log_extra_opportunity_attachment(source_attachment)

if incoming_attachment is None:
logger.info("Creating new opportunity attachment record", extra=log_extra)

# Adjust the file_name to remove characters clunky in URLs
if source_attachment.file_name is None:
raise ValueError("Opportunity attachment does not have a file name, cannot process.")
file_name = attachment_util.adjust_legacy_file_name(source_attachment.file_name)

file_location = attachment_util.get_s3_attachment_path(
file_name, source_attachment.syn_att_id, opportunity, s3_config
)

# We always create a new record here and merge it in the calling function
# this way if there is any error doing the transformation, we don't modify the existing one.
target_attachment = OpportunityAttachment(
attachment_id=source_attachment.syn_att_id,
opportunity_id=source_attachment.opportunity_id,
# TODO - we'll eventually remove attachment type, for now just arbitrarily set the value
opportunity_attachment_type=OpportunityAttachmentType.OTHER,
# TODO - in https://github.com/HHS/simpler-grants-gov/issues/3322
# we'll actually handle the file location logic with s3
file_location="TODO", # TODO - next PR
# Note we calculate the file location here, but haven't yet done anything
# with s3, the calling function, will handle writing the file to s3.
file_location=file_location,
mime_type=source_attachment.mime_type,
file_name=source_attachment.file_name,
file_name=file_name,
file_description=source_attachment.file_desc,
file_size_bytes=source_attachment.file_lob_size,
created_by=source_attachment.creator_id,
Expand All @@ -142,3 +183,10 @@ def transform_opportunity_attachment(
)

return target_attachment


def write_file(
source_attachment: TsynopsisAttachment, destination_attachment: OpportunityAttachment
) -> None:
with file_util.open_stream(destination_attachment.file_location, "wb") as outfile:
outfile.write(source_attachment.file_lob)
Empty file.
60 changes: 60 additions & 0 deletions api/src/services/opportunity_attachments/attachment_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import re

from src.adapters.aws import S3Config
from src.db.models.opportunity_models import Opportunity
from src.util import file_util


def get_s3_attachment_path(
file_name: str, opportunity_attachment_id: int, opportunity: Opportunity, s3_config: S3Config
) -> str:
"""Construct a path to the attachments on s3
Will be formatted like:
s3://<bucket>/opportunities/<opportunity_id>/attachments/<attachment_id>/<file_name>
Note that we store the files under a "folder" with the attachment ID as
the legacy system doesn't guarantee file names are unique within an opportunity.
"""

base_path = (
s3_config.draft_files_bucket_path
if opportunity.is_draft
else s3_config.public_files_bucket_path
)

return file_util.join(
base_path,
"opportunities",
str(opportunity.opportunity_id),
"attachments",
str(opportunity_attachment_id),
file_name,
)


def adjust_legacy_file_name(existing_file_name: str) -> str:
"""Correct the file names to remove any characters problematic for URL/s3 processing.
We only keep the following characters:
* A-Z
* a-z
* 0-9
* _
* -
* ~
* .
Whitespace will be replaced with underscores.
All other characters will be removed.
"""

# Replace one-or-more whitespace with a single underscore
file_name = re.sub(r"\s+", "_", existing_file_name)

# Remove all non-accepted characters
file_name = re.sub(r"[^a-zA-Z0-9_.\-~]", "", file_name)

return file_name
Loading

0 comments on commit b4ccc44

Please sign in to comment.