Skip to content

Commit

Permalink
Merge pull request #181 from Electrostatics/Eo300/issue_180
Browse files Browse the repository at this point in the history
Update job status on failed download and ensure all input files are uploaded to S3
  • Loading branch information
Eo300 authored Jun 26, 2024
2 parents df5e415 + 1fc5116 commit 6543a58
Show file tree
Hide file tree
Showing 23 changed files with 6,041 additions and 153 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ htmlcov/
.cache
nosetests.xml
coverage.xml
coverage.txt
*.cover
*.py,cover
.hypothesis/
Expand Down
6 changes: 5 additions & 1 deletion .readthedocs.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
version: 2

build:
os: "ubuntu-22.04"
tools:
python: "3.8"

python:
version: "3.7"
install:
- method: pip
path: .
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ This package contains the software to automate the workflow of APBS and PDB2PQR


## Setting up Development Environment
To setup a development environment, enter your Python3 environment of choice (e.g. virtualenv, conda, etc.). From the top of the repository, enter the following:
```
$ pip install -e .[dev,test]
To setup a development environment, enter your Python3 environment of choice (e.g. virtualenv, conda, etc.). From the top of the repository in your terminal, enter the following:
```bash
pip install -e .[dev,test]
```
This will install all the necessary packages to develop and test the APBS-AWS software. Check [`setup.py`](./setup.py) to view the list of packages.
5 changes: 3 additions & 2 deletions lambda_services/job_service/job_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Interpret APBS/PDBP2QR job configurations and submit to SQS."""

from json import dumps, loads, JSONDecodeError
from os import getenv
from time import time
Expand Down Expand Up @@ -36,7 +37,7 @@ def get_s3_object_json(job_tag: str, bucket_name: str, object_name: str):
Bucket=bucket_name,
Key=object_name,
)
except (ClientError) as err:
except ClientError as err:
_LOGGER.exception(
"%s Unable to get object for Bucket, %s, and Key, %s: %s",
job_tag,
Expand Down Expand Up @@ -218,7 +219,7 @@ def interpret_job_submission(event: dict, context):
# - Use weboptions if from web
# - Interpret as is if using only command line args
job_runner = pdb2pqr_runner.Runner(job_info_form, job_id, job_date)
job_command_line_args = job_runner.prepare_job()
job_command_line_args = job_runner.prepare_job(bucket_name)

elif job_type in "apbs":
# If APBS:
Expand Down
21 changes: 11 additions & 10 deletions lambda_services/job_service/launcher/apbs_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
from locale import atof, atoi
from os.path import splitext

from .s3_utils import S3Utils

from .jobsetup import JobSetup, MissingFilesError
from .utils import (
_LOGGER,
apbs_extract_input_files,
apbs_infile_creator,
s3_download_file_str,
s3_object_exists,
s3_put_object,
)


Expand Down Expand Up @@ -65,7 +64,9 @@ def prepare_job(

# Check S3 for .in file existence; add to missing list if not
self.add_input_file(infile_name)
if not s3_object_exists(input_bucket_name, infile_object_name):
if not S3Utils.object_exists(
input_bucket_name, infile_object_name
):
_LOGGER.error(
"%s Missing APBS input file '%s'",
job_tag,
Expand All @@ -80,7 +81,7 @@ def prepare_job(
for name in expected_files_list:
object_name = f"{job_tag}/{name}"
self.add_input_file(str(name))
if not s3_object_exists(input_bucket_name, object_name):
if not S3Utils.object_exists(input_bucket_name, object_name):
_LOGGER.error(
"%s Missing APBS input file '%s'",
job_tag,
Expand All @@ -107,7 +108,7 @@ def prepare_job(
apbs_options = self.apbs_options

# Get text for infile string
infile_str = s3_download_file_str(
infile_str = S3Utils.download_file_str(
output_bucket_name, f"{job_tag}/{infile_name}"
)

Expand All @@ -120,7 +121,7 @@ def prepare_job(
new_infile_contents = apbs_infile_creator(job_tag, apbs_options)

# Get contents of PQR file from PDB2PQR run
pqrfile_text = s3_download_file_str(
pqrfile_text = S3Utils.download_file_str(
output_bucket_name, f"{job_tag}/{pqr_file_name}"
)

Expand All @@ -143,7 +144,7 @@ def prepare_job(
)

# Send original PQR file (with water) to S3 output bucket
s3_put_object(
S3Utils.put_object(
output_bucket_name,
f"{job_tag}/{water_pqrname}",
pqrfile_text.encode("utf-8"),
Expand All @@ -167,7 +168,7 @@ def prepare_job(
job_tag,
f"{job_tag}/{apbs_options['tempFile']}",
)
s3_put_object(
S3Utils.put_object(
input_bucket_name,
f"{job_tag}/{apbs_options['tempFile']}",
new_infile_contents.encode("utf-8"),
Expand All @@ -177,7 +178,7 @@ def prepare_job(
job_tag,
f"{job_tag}/{pqr_file_name}",
)
s3_put_object(
S3Utils.put_object(
input_bucket_name,
f"{job_tag}/{pqr_file_name}",
pqrfile_text.encode("utf-8"),
Expand Down
42 changes: 29 additions & 13 deletions lambda_services/job_service/launcher/pdb2pqr_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .jobsetup import JobSetup
from .utils import _LOGGER
from .s3_utils import S3Utils
from .weboptions import WebOptions, WebOptionsError


Expand Down Expand Up @@ -41,16 +42,32 @@ def __init__(self, form: dict, job_id: str, job_date: str):
# Instantiate self.weboptions if job is web submission
if self.invoke_method in ("v1", "gui"):
self.weboptions = WebOptions(self.job_tag, form)

except WebOptionsError:
raise

def prepare_job(self):
def prepare_job(self, input_bucket_name: str = None):
"""Setup the job to run from the GUI or the command line."""
job_id = self.job_id

if self.invoke_method in ["gui", "v1"]:
command_line_args = self.version_1_job(job_id)

# Copy all the sanitized files from the file queue
for payload in self.weboptions.files_copy_queue:
_LOGGER.info(
"%s Copying original object '%s' to sanitized object name '%s' (bucket: %s)",
self.job_tag,
payload.source_object,
payload.dest_object,
payload.bucket_name,
)
S3Utils.copy_object(
self.job_tag,
input_bucket_name,
payload.source_object,
payload.dest_object,
)

elif self.invoke_method in ["cli", "v2"]:
command_line_args = self.version_2_job()
self.command_line_args = command_line_args
Expand Down Expand Up @@ -108,17 +125,16 @@ def version_1_job(self, job_id):
if self.weboptions.user_did_upload:
# Update input files
self.add_input_file(self.weboptions.pdbfilename)
else:
if splitext(self.weboptions.pdbfilename)[1] != ".pdb":
self.weboptions.pdbfilename = (
self.weboptions.pdbfilename + ".pdb"
) # add pdb extension to pdbfilename

# Add url to RCSB PDB file to input file list
self.add_input_file(
f"https://files.rcsb.org/download/"
f"{self.weboptions.pdbfilename}"
)
elif splitext(self.weboptions.pdbfilename)[1] != ".pdb":
self.weboptions.pdbfilename = (
self.weboptions.pdbfilename + ".pdb"
) # add pdb extension to pdbfilename

# Add url to RCSB PDB file to input file list
self.add_input_file(
f"https://files.rcsb.org/download/"
f"{self.weboptions.pdbfilename}"
)

# Check for userff, names, ligand files to add to input_file list
if hasattr(self.weboptions, "ligandfilename"):
Expand Down
141 changes: 141 additions & 0 deletions lambda_services/job_service/launcher/s3_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from typing import Optional
from boto3 import client
from dataclasses import dataclass

from botocore.exceptions import ClientError
from .utils import _LOGGER


class S3Utils:
@staticmethod
def copy_object(
job_tag: str,
source_bucket_name: str,
source_object_name: str,
dest_object_name: str,
dest_bucket_name: Optional[str] = None,
):
# Destination bucket is same as source if not defined
if dest_bucket_name is None:
dest_bucket_name = source_bucket_name

# Initialize boto3 S3 client
s3_client = client("s3")

# Use S3 client to copy object
_LOGGER.debug(
"%s Copying file: '%s' (bucket: %s) - Destination: '%s' (bucket: %s)",
job_tag,
source_object_name,
source_bucket_name,
dest_object_name,
dest_bucket_name,
)
s3_client.copy_object(
CopySource=f"{source_bucket_name}/{source_object_name}",
Bucket=source_bucket_name,
Key=dest_object_name,
)

@staticmethod
def download_file_str(bucket_name: str, object_name: str) -> str:
job_tag = _extract_job_tag_from_objectname(object_name)
try:
s3_client = client("s3")
s3_response: dict = s3_client.get_object(
Bucket=bucket_name,
Key=object_name,
)
return s3_response["Body"].read().decode("utf-8")
except Exception as err:
_LOGGER.exception(
"%s ERROR downloading '%s' from bucket '%s': %s",
job_tag,
object_name,
bucket_name,
err,
)
raise

@staticmethod
def put_object(bucket_name: str, object_name: str, body):
job_tag = _extract_job_tag_from_objectname(object_name)
s3_client = client("s3")
_ = s3_client.put_object(
Bucket=bucket_name,
Key=object_name,
Body=body,
)
_LOGGER.debug(
"%s Putting file: %s (bucket: %s)",
job_tag,
object_name,
bucket_name,
)

@staticmethod
def object_exists(bucket_name: str, object_name: str) -> bool:
s3_client = client("s3")
try:
_ = s3_client.head_object(
Bucket=bucket_name,
Key=object_name,
)
return True
except ClientError as err:
if err.response["Error"]["Code"] == "404": # "NoSuchKey" error
return False
elif err.response["Error"]["Code"] == "403":
job_tag: str = _extract_job_tag_from_objectname(object_name)
_LOGGER.warning(
"%s Received '%s' (%d) message on object HEAD: %s",
job_tag,
err.response["Error"]["Message"],
err.response["ResponseMetadata"]["HTTPStatusCode"],
object_name,
)
return False
else:
raise


@dataclass
class S3CopyPayload:
source_object: str
dest_object: str
bucket_name: Optional[str] = None

def __init__(
self,
source_object_name: str,
dest_object_name: str,
bucket_name: Optional[str] = None,
):
self.source_object = source_object_name
self.dest_object = dest_object_name
self.bucket_name = bucket_name


def _extract_job_tag_from_objectname(s3_object_name: str) -> str:
"""Parse an S3 object key and return the job tag.
Args:
s3_object_name (str): An S3 object key, prefixed with date and job_id
Returns:
str: the job tag, extracted from the S3 object key
"""
objectname_split: list = s3_object_name.split("/")
job_tag: str
if len(objectname_split) >= 3:
job_tag = f"{objectname_split[-3]}/{objectname_split[-2]}"
else:
# NOTE: (Eo300) should we raise error here instead?
job_tag = s3_object_name
_LOGGER.warn(
"%s Couldn't extract job tag from object name '%s'. "
"Returning object name as job_tag.",
job_tag,
s3_object_name,
)
return job_tag
Loading

0 comments on commit 6543a58

Please sign in to comment.