From fee4e945f12a9facabf97543ed512b4e1b7d3b5a Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Fri, 27 Jan 2023 14:00:58 -0500 Subject: [PATCH 1/6] Support the new Process File Instrument Api --- lambda_function/src/config.yaml | 3 + .../src/file_processor/file_processor.py | 366 ++++++++++-------- lambda_function/src/lambda.py | 59 ++- 3 files changed, 243 insertions(+), 185 deletions(-) diff --git a/lambda_function/src/config.yaml b/lambda_function/src/config.yaml index e151c70..cdb4d3d 100644 --- a/lambda_function/src/config.yaml +++ b/lambda_function/src/config.yaml @@ -11,3 +11,6 @@ INSTR_NAMES: - "nemisis" - "merit" - "spani" + +# Timestream region +TSD_REGION: 'us-east-1' diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index 2e76c66..9989047 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -2,10 +2,6 @@ This Module contains the FileProcessor class that will distinguish the appropriate HERMES intrument library to use when processing the file based off which bucket the file is located in. - -TODO: Skeleton Code for initial repo, class still needs to be -implemented including logging to DynamoDB + S3 log -file and docstrings expanded """ import boto3 import botocore @@ -13,6 +9,9 @@ import time import logging import yaml +import os +import os.path +from pathlib import Path # Initialize constants to be parsed from config.yaml MISSION_NAME = "" @@ -27,9 +26,10 @@ MISSION_NAME = config["MISSION_NAME"] INSTR_NAMES = config["INSTR_NAMES"] MISSION_PKG = config["MISSION_PKG"] + TSD_REGION = config["TSD_REGION"] except FileNotFoundError: - print("config.yaml not found. Check to make sure it exists in the root directory.") + print("config.yaml not found") exit(1) @@ -51,7 +51,7 @@ util = getattr(mission_pkg, "util").util # Starts boto3 session so it gets access to needed credentials -session = boto3.Session() +session = boto3.Session(region_name=TSD_REGION) # To remove boto3 noisy debug logging logging.getLogger("botocore").setLevel(logging.CRITICAL) @@ -60,14 +60,22 @@ class FileProcessor: """ - Main FileProcessor class which initializes an object with the data file and the - bucket event which triggered the lambda function to be called. + The FileProcessor class will then determine which instrument + library to use to process the file. + + :param s3_bucket: The name of the S3 bucket the file is located in + :type s3_bucket: str + :param file_key: The name of the S3 object that is being processed + :type file_key: str + :param environment: The environment the FileProcessor is running in + :type environment: str + :param dry_run: Whether or not the FileProcessor is performing a dry run + :type dry_run: bool """ - def __init__(self, s3_bucket, s3_object, environment, dry_run=False): - """ - FileProcessor Constructorlogger - """ + def __init__( + self, s3_bucket: str, file_key: str, environment: str, dry_run: str = None + ) -> None: # Initialize Class Variables try: @@ -78,12 +86,12 @@ def __init__(self, s3_bucket, s3_object, environment, dry_run=False): ) except KeyError: - error_message = "KeyError when extracting S3 Bucket Name/ARN from dict" + error_message = "KeyError when extracting S3 Bucket Name" log.error({"status": "ERROR", "message": error_message}) raise KeyError(error_message) try: - self.file_key = s3_object + self.file_key = file_key log.info( { @@ -94,7 +102,7 @@ def __init__(self, s3_bucket, s3_object, environment, dry_run=False): ) except KeyError: - error_message = "KeyError when extracting S3 Object Name/eTag from dict" + error_message = "KeyError when extracting S3 File Key" log.error({"status": "ERROR", "message": error_message}) raise KeyError(error_message) @@ -107,9 +115,16 @@ def __init__(self, s3_bucket, s3_object, environment, dry_run=False): log.warning("Performing Dry Run - Files will not be copied/removed") # Process File - self._process_file(file_key=self.file_key) + self._process_file() + + def _process_file(self) -> None: + """ + This method serves as the main entry point for the FileProcessor class. + It will then determine which instrument library to use to process the file. - def _process_file(self, file_key): + :return: None + :rtype: None + """ # Verify object exists in instrument bucket if ( self._does_object_exists( @@ -122,6 +137,16 @@ def _process_file(self, file_key): # Parse file key to get instrument name file_key_array = self.file_key.split("/") parsed_file_key = file_key_array[-1] + + # Download file from instrument bucket if not a dry run + if not self.dry_run: + file_path = self._download_file( + self.instrument_bucket_name, + self.file_key, + parsed_file_key, + ) + + # Parse the science file name science_file = util.parse_science_filename(parsed_file_key) this_instr = science_file["instrument"] destination_bucket = INSTR_TO_BUCKET_NAME[this_instr] @@ -131,43 +156,58 @@ def _process_file(self, file_key): # Dynamically import instrument package instr_pkg = __import__( - f"{INSTR_TO_PKG[this_instr]}.calibration", fromlist=["calibration"] + f"{INSTR_TO_PKG[this_instr]}.calibration", + fromlist=["calibration"], ) calibration = getattr(instr_pkg, "calibration") log.info(f"Using {INSTR_TO_PKG[this_instr]} module for calibration") - # Run Calibration on File (This will cause a ValueError - # if no calibration is found) - calibration.calibrate_file(parsed_file_key) + # Process file + try: + # Get name of new file + new_file_path = calibration.process_file(file_path)[0].name - except ValueError as e: - # Expected ValueError for Data Flow Test because no calibration - # files are ready - log.warning( - { - "status": "WARNING", - "message": f"Expected Value Error for Data Flow Test: {e}", - } - ) + # Get new file key + new_file_key = self._generate_file_key(new_file_path) - # Copy File to Instrument Bucket - new_file_key = self._get_new_file_key(file_key) + # Upload file to destination bucket if not a dry run + if not self.dry_run: - self._process_object( - source_bucket=self.instrument_bucket_name, - file_key=file_key, - new_file_key=new_file_key, - ) + # Upload file to destination bucket + self._upload_file( + new_file_path, destination_bucket, new_file_key + ) - else: - raise ValueError("File does not exist in bucket") + # Log to timeseries database + self._log_to_timestream( + action_type="PUT", + file_key=self.file_key, + new_file_key=new_file_key, + source_bucket=destination_bucket, + destination_bucket=destination_bucket, + ) + + except ValueError: + log.info("No calibration performed") - def _does_object_exists(self, bucket, file_key): + except Exception as e: + log.error("Error Processing File") + raise e + + @staticmethod + def _does_object_exists(bucket: str, file_key: str) -> bool: """ Returns wether or not the file exists in the specified bucket + + :param bucket: The name of the bucket + :type bucket: str + :param file_key: The name of the file + :type file_key: str + :return: True if the file exists, False if it does not + :rtype: bool """ - s3 = boto3.resource("s3") + s3 = session.resource("s3") try: s3.Object(bucket, file_key).load() @@ -183,165 +223,161 @@ def _does_object_exists(self, bucket, file_key): log.info(f"File {file_key} already exists in Bucket {bucket}") return True - def _process_object(self, source_bucket, file_key, new_file_key): + @staticmethod + def _generate_file_key(file_key) -> str: """ - Function to copy file from S3 incoming bucket using bucket key - to destination bucket - """ - log.info(f"Moving File From {file_key} to {new_file_key}") + Function to generate full s3 file key in the format: + {level}/{year}/{month}/{file_key} + :param file_key: The name of the file + :type file_key: str + :return: The full s3 file key + :rtype: str + """ try: - # Initialize S3 Client and Copy Source Dict - - # Move S3 file from one folder to another - if not self.dry_run: - s3 = boto3.client("s3") - copy_source = {"Bucket": source_bucket, "Key": file_key} - s3.copy(copy_source, source_bucket, new_file_key) - # Log added file to Incoming Bucket in Timestream - self._log_to_timestream( - action_type="PUT", - file_key=file_key, - new_file_key=new_file_key, - source_bucket=source_bucket, - destination_bucket=source_bucket, - ) - log.info( - { - "status": "INFO", - "message": f"File {file_key} successfully " - f"processed to {new_file_key}", - } - ) - log.info(f"File {file_key} Successfully Moved to {new_file_key}") + current_year = date.today().year + current_month = date.today().month + if current_month < 10: + current_month = f"0{current_month}" - except botocore.exceptions.ClientError as e: - log.error({"status": "ERROR", "message": e}) + science_file = util.parse_science_filename(file_key) - raise e + new_file_key = ( + f"{science_file['level']}/{current_year}/{current_month}/{file_key}" + ) + + return new_file_key - def _get_datalevel(self, file_key): - """ - Function to extract data level from file key - """ - try: - file_key_array = self.file_key.split("/") - parsed_level = file_key_array[0] - return parsed_level except IndexError as e: log.error({"status": "ERROR", "message": e}) raise e - def _get_next_datalevel(self, file_key): + @staticmethod + def _download_file(source_bucket: str, file_key: str, parsed_file_key: str) -> Path: """ - Function to extract next data level from file key + Function to download file from S3 + + :param source_bucket: The name of the source bucket + :type source_bucket: str + :param file_key: The name of the file + :type file_key: str + :param parsed_file_key: The name of the file + :type parsed_file_key: str + :return: The path to the downloaded file + :rtype: Path """ try: - current_level = util.VALID_DATA_LEVELS.index(self._get_datalevel(file_key)) - return util.VALID_DATA_LEVELS[current_level + 1] - except IndexError as e: + # Initialize S3 Client + log.info(f"Downloading file {file_key} from {source_bucket}") + s3 = session.client("s3") + + # Create tmp directory in root of lambda + if not os.path.exists("/tmp"): + os.mkdir("/tmp") + + # Download file to tmp directory + s3.download_file(source_bucket, file_key, f"/tmp/{parsed_file_key}") + + log.info(f"File {file_key} Successfully Downloaded") + + return Path(f"/tmp/{parsed_file_key}") + + except botocore.exceptions.ClientError as e: log.error({"status": "ERROR", "message": e}) + raise e - def _get_new_file_key(self, file_key): + @staticmethod + def _upload_file(filename: str, destination_bucket: str, file_key: str) -> None: """ - Function to create new file key for next data level + Function to upload file to S3 + + :param filename: The name of the file + :type filename: str + :param destination_bucket: The name of the destination bucket + :type destination_bucket: str + :param file_key: The name of the file + :type file_key: str + :return: None + :rtype: None """ try: - current_year = date.today().year - current_month = date.today().month - if current_month < 10: - current_month = f"0{current_month}" - file_key_array = self.file_key.split("/") - parsed_file_key = file_key_array[-1] - current_data_level = self._get_datalevel(file_key) - next_data_level = self._get_next_datalevel(file_key) - science_file = util.parse_science_filename(parsed_file_key) - science_file["level"] = next_data_level - processed_name = util.create_science_filename( - time=science_file["time"], - instrument=science_file["instrument"], - level=science_file["level"], - version="0.0.1", - ) - new_file_key = ( - f"{next_data_level}/{current_year}/{current_month}/{processed_name}" - ) - new_file_key = new_file_key.replace(current_data_level, next_data_level) - return new_file_key - except IndexError as e: + # Initialize S3 Client + log.info(f"Uploading file {file_key} to {destination_bucket}") + s3 = session.client("s3") + + # Upload file to destination bucket + s3.upload_file(f"/tmp/{filename}", destination_bucket, file_key) + + log.info(f"File {file_key} Successfully Uploaded") + + except botocore.exceptions.ClientError as e: log.error({"status": "ERROR", "message": e}) + raise e + @staticmethod def _log_to_timestream( - self, - action_type, - file_key, - new_file_key=None, - source_bucket=None, - destination_bucket=None, - ): + action_type: str, + file_key: str, + new_file_key: str = None, + source_bucket: str = None, + destination_bucket: str = None, + ) -> None: """ Function to log to Timestream + + :param action_type: The type of action performed + :type action_type: str + :param file_key: The name of the file + :type file_key: str + :param new_file_key: The new name of the file + :type new_file_key: str + :param source_bucket: The name of the source bucket + :type source_bucket: str + :param destination_bucket: The name of the destination bucket + :type destination_bucket: str + :return: None + :rtype: None """ log.info("Logging to Timestream") CURRENT_TIME = str(int(time.time() * 1000)) try: # Initialize Timestream Client - timestream = boto3.client("timestream-write") + timestream = session.client("timestream-write") if not source_bucket and not destination_bucket: raise ValueError("A Source or Destination Buckets is required") - # connect to s3 - assuming your creds are all - # set up and you have boto3 installed - s3 = boto3.resource("s3") - - # get the bucket - - bucket = s3.Bucket(destination_bucket) - if action_type == "DELETE": - bucket = s3.Bucket(source_bucket) - - # use loop and count increment - count_obj = 0 - for i in bucket.objects.all(): - count_obj = count_obj + 1 - # Write to Timestream - if not self.dry_run: - timestream.write_records( - DatabaseName="sdc_aws_logs", - TableName="sdc_aws_s3_bucket_log_table", - Records=[ - { - "Time": CURRENT_TIME, - "Dimensions": [ - {"Name": "action_type", "Value": action_type}, - { - "Name": "source_bucket", - "Value": source_bucket or "N/A", - }, - { - "Name": "destination_bucket", - "Value": destination_bucket or "N/A", - }, - {"Name": "file_key", "Value": file_key}, - { - "Name": "new_file_key", - "Value": new_file_key or "N/A", - }, - { - "Name": "current file count", - "Value": str(count_obj) or "N/A", - }, - ], - "MeasureName": "timestamp", - "MeasureValue": str(datetime.utcnow().timestamp()), - "MeasureValueType": "DOUBLE", - }, - ], - ) + timestream.write_records( + DatabaseName="sdc_aws_logs", + TableName="sdc_aws_s3_bucket_log_table", + Records=[ + { + "Time": CURRENT_TIME, + "Dimensions": [ + {"Name": "action_type", "Value": action_type}, + { + "Name": "source_bucket", + "Value": source_bucket or "N/A", + }, + { + "Name": "destination_bucket", + "Value": destination_bucket or "N/A", + }, + {"Name": "file_key", "Value": file_key}, + { + "Name": "new_file_key", + "Value": new_file_key or "N/A", + }, + ], + "MeasureName": "timestamp", + "MeasureValue": str(datetime.utcnow().timestamp()), + "MeasureValueType": "DOUBLE", + }, + ], + ) log.info((f"File {file_key} Successfully Logged to Timestream")) diff --git a/lambda_function/src/lambda.py b/lambda_function/src/lambda.py index 4c951b4..7d8325e 100755 --- a/lambda_function/src/lambda.py +++ b/lambda_function/src/lambda.py @@ -19,11 +19,18 @@ logging.getLogger("boto3").setLevel(logging.CRITICAL) -def handler(event, context): +def handler(event, context) -> dict: """ This is the lambda handler function that passes variables to the function that handles the logic that initializes the FileProcessor class in it's correct environment. + + :param event: Event data passed from the lambda trigger + :type event: dict + :param context: Lambda context + :type context: dict + :return: Returns a 200 (Successful) / 500 (Error) HTTP response + :rtype: dict """ # Extract needed information from event try: @@ -36,56 +43,68 @@ def handler(event, context): s3_bucket = s3_event["s3"]["bucket"]["name"] file_key = s3_event["s3"]["object"]["key"] - # Pass required variables to sort function and returns a 200 (Successful) + # / 500 (Error) HTTP response - response = process_file( - environment=environment, s3_bucket=s3_bucket, file_key=file_key + response = environment_setup( + s3_bucket=s3_bucket, file_key=file_key, environment=environment ) return response - except BaseException as e: - + except Exception as e: # Pass required variables to sort function and returns a 200 (Successful) # / 500 (Error) HTTP response log.error({"status": "ERROR", "message": e}) - return {"statusCode": 500, "body": json.dumps(f"Error Processing File: {e}")} + return { + "statusCode": 500, + "body": json.dumps(f"Error Processing File: {e}"), + } -def process_file(s3_bucket, file_key, environment): +def environment_setup(s3_bucket: str, file_key: str, environment: str) -> dict: """ This is the main function that handles logic that initializes the FileProcessor class in it's correct environment. + + :param s3_bucket: The name of the S3 bucket the file is located in + :type s3_bucket: str + :param file_key: The name of the S3 object that is being processed + :type file_key: str + :param environment: The environment the FileProcessor is running in + :type environment: str """ # Production (Official Release) Environment / Local Development try: log.info(f"Initializing FileProcessor - Environment: {environment}") - # Parse file key to get instrument name - # file_key_array = file_key.split("/") - # parsed_file_key = file_key_array[-1] - # science_file = util.parse_science_filename(parsed_file_key) - # print(science_file) if environment == "Production": + + # Initialize FileProcessor class FileProcessor( - s3_bucket=s3_bucket, s3_object=file_key, environment=environment + s3_bucket=s3_bucket, file_key=file_key, environment=environment ) + else: # pylint: disable=import-outside-toplevel from dev_file_processor.file_processor import ( FileProcessor as DevFileProcessor, ) + # Initialize FileProcessor class DevFileProcessor( - s3_bucket=s3_bucket, s3_object=file_key, environment=environment + s3_bucket=s3_bucket, file_key=file_key, environment=environment ) - log.info("File Processed Successfully") - - return {"statusCode": 200, "body": json.dumps("File Processed Successfully")} + return { + "statusCode": 200, + "body": json.dumps("File Processed Successfully"), + } - except BaseException as e: + except Exception as e: log.error({"status": "ERROR", "message": e}) - return {"statusCode": 500, "body": json.dumps("Error Processing File")} + return { + "statusCode": 500, + "body": json.dumps("Error Processing File"), + } From 7066b67c71fd3cee0aaf4fe81628453745303b79 Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Mon, 30 Jan 2023 08:45:49 -0500 Subject: [PATCH 2/6] Update process function to parse SNS event --- lambda_function/src/lambda.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lambda_function/src/lambda.py b/lambda_function/src/lambda.py index 7d8325e..f7e42af 100755 --- a/lambda_function/src/lambda.py +++ b/lambda_function/src/lambda.py @@ -39,8 +39,12 @@ def handler(event, context) -> dict: if environment is None: environment = "DEVELOPMENT" - for s3_event in event["Records"]: + # Parse message from SNS Notification + records = json.loads(event["Message"])["Records"] + for s3_event in records: + + # Extract needed information from event s3_bucket = s3_event["s3"]["bucket"]["name"] file_key = s3_event["s3"]["object"]["key"] From 632d78f3d27beb565557df111ad666a6ab16b427 Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Mon, 30 Jan 2023 11:17:21 -0500 Subject: [PATCH 3/6] Change ValueError Catch to Log Message instead of using a hardcoded one --- lambda_function/src/file_processor/file_processor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index 9989047..b689514 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -188,11 +188,11 @@ def _process_file(self) -> None: destination_bucket=destination_bucket, ) - except ValueError: - log.info("No calibration performed") + except ValueError as e: + log.info(e) except Exception as e: - log.error("Error Processing File") + log.error(f"Error Processing File: {e}") raise e @staticmethod From 2905fc3c76199f99b69b1ae1b0b41621251b5291 Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Mon, 30 Jan 2023 13:02:29 -0500 Subject: [PATCH 4/6] Handle SNS and S3 Events --- lambda_function/src/lambda.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lambda_function/src/lambda.py b/lambda_function/src/lambda.py index f7e42af..d5f5676 100755 --- a/lambda_function/src/lambda.py +++ b/lambda_function/src/lambda.py @@ -39,8 +39,13 @@ def handler(event, context) -> dict: if environment is None: environment = "DEVELOPMENT" + # Check if SNS or S3 event + if "Message" not in event: + records = json.loads(event["Message"])["Records"] + else: + records = event["Records"] + # Parse message from SNS Notification - records = json.loads(event["Message"])["Records"] for s3_event in records: From 0ac140dd7b473698017c45655f2063171eead229 Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Mon, 30 Jan 2023 13:26:04 -0500 Subject: [PATCH 5/6] Change log info to error --- lambda_function/src/file_processor/file_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda_function/src/file_processor/file_processor.py b/lambda_function/src/file_processor/file_processor.py index b689514..12b0d95 100755 --- a/lambda_function/src/file_processor/file_processor.py +++ b/lambda_function/src/file_processor/file_processor.py @@ -189,7 +189,7 @@ def _process_file(self) -> None: ) except ValueError as e: - log.info(e) + log.error(e) except Exception as e: log.error(f"Error Processing File: {e}") From 627dc11ffcd0dad776fdeafe73bc5e464c4621af Mon Sep 17 00:00:00 2001 From: Damian Barrous-Dume Date: Mon, 30 Jan 2023 13:43:10 -0500 Subject: [PATCH 6/6] Fix workflows --- .github/workflows/codestyle.yml | 2 +- .github/workflows/testing.yml | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/.github/workflows/codestyle.yml b/.github/workflows/codestyle.yml index 4be2b36..3a891a7 100755 --- a/.github/workflows/codestyle.yml +++ b/.github/workflows/codestyle.yml @@ -4,7 +4,7 @@ name: Codestyle and Linting on: pull_request: branches: - - master + - main jobs: build: diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 1eb6e8b..3096597 100755 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -4,7 +4,7 @@ name: Testing on: pull_request: branches: - - master + - main jobs: build: @@ -17,8 +17,5 @@ jobs: python -m pip install --upgrade pip python -m pip install pytest pytest-astropy pytest-cov python -m pip install -r requirements.txt - - name: Run tests - run: | - pytest --pyargs lambda_function --cov lambda_function