Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move to the new capepy library #6

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 10 additions & 89 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,22 @@
"""ETL script for raw Epi/HAI sequencing report pdf."""

import io
import sys
from datetime import datetime
from pathlib import Path

import boto3 as boto3
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from capepy.aws.glue import EtlJob
from pypdf import PdfReader
from pyspark.sql import SparkSession
from tabula.io import read_pdf

# for our purposes here, the spark and glue context are only (currently) needed
# to get the logger.
spark_ctx = SparkSession.builder.getOrCreate() # pyright: ignore
glue_ctx = GlueContext(spark_ctx)
logger = glue_ctx.get_logger()

# TODO:
# - add error handling for the format of the document being incorrect
# - figure out how we want to name and namespace clean files (e.g. will we
# take the object key we're given, strip the extension and replace it with
# one for the new format, or will we do something else)
# - see what we can extract out of here to be useful for other ETLs. imagine
# we'd have a few different things that could be made into a reusable
# package

parameters = getResolvedOptions(
sys.argv,
[
"RAW_BUCKET_NAME",
"ALERT_OBJ_KEY",
"CLEAN_BUCKET_NAME",
],
)

raw_bucket_name = parameters["RAW_BUCKET_NAME"]
alert_obj_key = parameters["ALERT_OBJ_KEY"]
clean_bucket_name = parameters["CLEAN_BUCKET_NAME"]
etl_job = EtlJob()

# NOTE: for now we'll take the alert object key and change out the file
# extension for the clean data (leaving all namespacing and such). this
# will probably need to change
clean_obj_key = str(Path(alert_obj_key).with_suffix(".csv"))

# NOTE: May need some creds here
s3_client = boto3.client("s3")

# try to get the pdf object from S3 and handle any error that would keep us
# from continuing.
response = s3_client.get_object(Bucket=raw_bucket_name, Key=alert_obj_key)

status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status != 200:
err = (
f"ERROR - Could not get object {alert_obj_key} from bucket "
f"{raw_bucket_name}. ETL Cannot continue."
)

logger.error(err)

# NOTE: need to properly handle exception stuff here, and we probably want
# this going somewhere very visible (e.g. SNS topic or a perpetual log
# as someone will need to be made aware)
raise Exception(err)

logger.info(f"Obtained object {alert_obj_key} from bucket {raw_bucket_name}.")

# handle the document itself...
clean_obj_key = etl_job.parameters["OBJECT_KEY"].replace(".pdf", ".csv")

# the response should contain a StreamingBody object that needs to be converted
# to a file like object to make the pdf libraries happy
f = io.BytesIO(response.get("Body").read())
f = io.BytesIO(etl_job.get_raw_file())

try:
# get the report date from the 4th line of the pdf
Expand All @@ -88,23 +31,24 @@
f"{err}"
)

logger.error(err_message)
etl_job.logger.error(err_message)

date_reported = ""

try:
# get two tables from the pdf
tables = read_pdf(f, multiple_tables=True, pages=2)
mlst_st = tables[0] # pyright: ignore
genes = tables[1] # pyright: ignore
assert isinstance(tables, list)
mlst_st = tables[0]
genes = tables[1]
except (IndexError, KeyError) as err:
err_message = (
f"ERROR - Could not properly read sequencing PDF tables. "
f"ETL Cannot continue."
f"{err}"
)

logger.error(err_message)
etl_job.logger.error(err_message)

# NOTE: need to properly handle exception stuff here, and we probably
# want this going somewhere very visible (e.g. SNS topic or a
Expand All @@ -121,27 +65,4 @@
# write out the transformed data
with io.StringIO() as csv_buff:
interim.to_csv(csv_buff, index=False)

response = s3_client.put_object(
Bucket=clean_bucket_name, Key=clean_obj_key, Body=csv_buff.getvalue()
)

status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status != 200:
err = (
f"ERROR - Could not write transformed data object {clean_obj_key} "
f"to bucket {clean_bucket_name}. ETL Cannot continue."
)

logger.error(err)

# NOTE: need to properly handle exception stuff here, and we probably
# want this going somewhere very visible (e.g. SNS topic or a
# perpetual log as someone will need to be made aware)
raise Exception(err)

logger.info(
f"Transformed {raw_bucket_name}/{alert_obj_key} and wrote result "
f"to {clean_bucket_name}/{clean_obj_key}"
)
etl_job.write_clean_file(csv_buff.getvalue(), clean_obj_key)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
aws-glue-libs @ git+https://github.com/awslabs/aws-glue-libs@9d8293962e6ffc607e5dc328e246f40b24010fa8
boto3==1.34.103
capepy>=1.0.0,<2.0.0
pandas==2.2.2
pyspark==3.5.1
python-docx==1.1.2
tabula-py==2.9.3
pypdf==4.3.1
pypdf==4.3.1