From c56c16651866413f427824a4834829bc39de8bd7 Mon Sep 17 00:00:00 2001 From: Ramesh Maddegoda <94033485+ramesh-maddegoda@users.noreply.github.com> Date: Tue, 3 Oct 2023 12:01:27 -0700 Subject: [PATCH 1/4] ADD RDS based PDS Nucleus product completion checker lambda functions --- .../pds-nucleus-product-completion-checker.py | 317 ++++++++++++++++++ .../pds-nucleus-product-completion-checker.py | 307 +++-------------- src/pds/ingress/pds-nucleus-product-writer.py | 263 +++++++++++++++ 3 files changed, 631 insertions(+), 256 deletions(-) create mode 100644 src/pds/ingress/dynamodb/pds-nucleus-product-completion-checker.py create mode 100644 src/pds/ingress/pds-nucleus-product-writer.py diff --git a/src/pds/ingress/dynamodb/pds-nucleus-product-completion-checker.py b/src/pds/ingress/dynamodb/pds-nucleus-product-completion-checker.py new file mode 100644 index 0000000..41332e4 --- /dev/null +++ b/src/pds/ingress/dynamodb/pds-nucleus-product-completion-checker.py @@ -0,0 +1,317 @@ +""" +========================================= +pds-nucleus-product-completion-checker.py +========================================= + +Lambda function to check if the staging S3 bucket has received a complete product +with all required file. This lambda function is triggered by S3 bucket events +when a new file is copied to the staging S3 bucket. + +""" + +import json +import urllib.parse +import logging +import shutil +import boto3 +import os +from xml.dom import minidom +from boto3.dynamodb.conditions import Key, Attr + +s3 = boto3.client('s3') +dynamodb = boto3.resource('dynamodb') +logger = logging.getLogger("pds-nucleus-product-completion-checker-logger") + +# Main lambda handler +def lambda_handler(event, context): + + # Get the object from the event and show its content type + bucket = event['Records'][0]['s3']['bucket']['name'] + key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') + s3_url_of_data_file = "s3://" + bucket + "/" + key + s3_url_of_product_label = "s3://" + bucket + "/" + key + efs_mount_path = os.environ.get('EFS_MOUNT_PATH') + logger.setLevel(logging.INFO) + logger.addHandler(logging.StreamHandler()) + + logger.info(f"Lambda Request ID: {context.aws_request_id}") + + try: + + # Product label received + if s3_url_of_data_file.lower().endswith(".xml"): + + missing_files = handle_missing_files_for_product_label(s3_url_of_product_label, bucket, key) + + copy_file_to_efs(bucket, key, efs_mount_path) + + # If there are any missing files for the given product label + if len(missing_files) > 0: + save_incomplete_product(s3_url_of_product_label, missing_files) + else: + product_completed(s3_url_of_product_label) + + # Data file received + elif not s3_url_of_data_file.lower().endswith("/"): # Not a directory + copy_file_to_efs(bucket, key, efs_mount_path) + save_received_file(s3_url_of_data_file) + update_existing_incomplete_products(s3_url_of_data_file) + delete_expected_file(s3_url_of_data_file) + + return f"Event for s3://{bucket}/{key} processed." + except Exception as e: + logger.error(f"Error processing S3 event: {event}. Exception: {str(e)}") + raise e + + +# Handles the missing files detected for a product label +def handle_missing_files_for_product_label(s3_url_of_product_label, bucket, key): + s3_base_dir = s3_url_of_product_label.rsplit('/',1)[0] + + logger.info('Get s3_response for key {} from bucket {}'.format(key, bucket)) + + try: + s3_response = s3.get_object(Bucket=bucket, Key=key) + + # Get the Body object in the S3 get_object() response + s3_object_body = s3_response.get('Body') + + # Read the data in bytes format and convert it to string + content_str = s3_object_body.read().decode() + + # parse xml + xmldoc = minidom.parseString(content_str) + missing_files_from_product_label = xmldoc.getElementsByTagName('file_name') + + missing_files = [] + + for x in missing_files_from_product_label: + + s3_url_of_data_file = s3_base_dir + "/" + x.firstChild.nodeValue + + # Check received file table + items = check_in_received_file_table(s3_url_of_data_file) + + # File is already received + if items: + logger.debug(f"The file is already received: {items['s3_url_of_data_file']}") + else: + + missing_files.append(str(s3_url_of_data_file)) + update_expected_files(s3_url_of_data_file, s3_url_of_product_label) + logger.debug(f"Missing files: {str(missing_files)}") + + return missing_files + + except Exception as e: + logger.error(f"Error handling missing files for product label: {s3_url_of_product_label}. Exception: {str(e)}") + raise e + + +# Saves received files +def save_received_file(s3_url_of_data_file): + received_data_files_table = dynamodb.Table('received_data_files') + + received_data_files_table.put_item( + Item={ + 's3_url_of_data_file': s3_url_of_data_file + } + ) + + +# Returns an expected file response (with list of product labels) for a given s3_url_of_data_file +def get_expected_file(s3_url_of_data_file): + expected_data_files_table = dynamodb.Table('expected_data_files') + + expected_data_files_table_get_response = expected_data_files_table.get_item(Key={ + "s3_url_of_data_file": str(s3_url_of_data_file) + } + ) + + json_string_expected_files = json.dumps(expected_data_files_table_get_response, default=set_default) + expected_data_files_get_response_json = json.loads(json_string_expected_files) + expected_files_items = expected_data_files_get_response_json.get('Item') + + return expected_files_items + + +# Returns incomplete products +def get_incomplete_products(s3_url_of_product_label): + incomplete_products_table = dynamodb.Table('incomplete_products') + + incomplete_products_table_get_response = incomplete_products_table.get_item(Key={ + "s3_url_of_product_label": str(s3_url_of_product_label) + } + ) + + json_string_incomplete_products = json.dumps(incomplete_products_table_get_response, default=set_default) + incomplete_products_table_get_response_json = json.loads(json_string_incomplete_products) + incomplete_products_table_items = incomplete_products_table_get_response_json.get('Item') + + return incomplete_products_table_items + + +# Checks for s3_url_of_data_file in received files table +def check_in_received_file_table(s3_url_of_data_file): + # Check received file table + received_data_files_table = dynamodb.Table('received_data_files') + + received_data_files_table_get_response = received_data_files_table.get_item(Key={ + "s3_url_of_data_file": str(s3_url_of_data_file) + }) + + json_string = json.dumps(received_data_files_table_get_response, default=set_default) + received_data_files_table_get_response_json = json.loads(json_string) + items = received_data_files_table_get_response_json.get('Item') + + return items + + +# Saves incomplete product label with a list if missing files +def save_incomplete_product(s3_url_of_product_label, missing_files): + incomplete_products_table = dynamodb.Table('incomplete_products') + + incomplete_products_table.put_item( + Item={ + 's3_url_of_product_label': s3_url_of_product_label, + 'missing_files': missing_files + } + ) + + +# Updates exiting incomplete products +def update_existing_incomplete_products(s3_url_of_data_file): + expected_files_items = get_expected_file(s3_url_of_data_file) + + # If this is an already expected file (i.e.: another product was also expecting this) + if expected_files_items: + expected_files_product_label_list = expected_files_items['s3_urls_of_product_labels'] + logger.info("expected_files_product_label_list = " + str(expected_files_product_label_list)) + + for expected_files_product_label in expected_files_product_label_list: + + incomplete_products_table_items = get_incomplete_products(expected_files_product_label) + + if incomplete_products_table_items: + missing_files_list = incomplete_products_table_items['missing_files'] + s3_url_of_product_label = incomplete_products_table_items['s3_url_of_product_label'] + missing_files_list.remove(s3_url_of_data_file) + + if len(missing_files_list) > 0: + save_incomplete_product(s3_url_of_product_label, missing_files_list) + else: + product_completed(s3_url_of_product_label) + + +# Updates expected files +def update_expected_files(s3_url_of_data_file, s3_url_of_product_label): + expected_files_items = get_expected_file(s3_url_of_data_file) + expected_files_product_label_set = set() + + if expected_files_items: + expected_files_product_label_set = set(expected_files_items['s3_urls_of_product_labels']) + + expected_files_product_label_set.add(s3_url_of_product_label) + save_expected_files(s3_url_of_data_file, expected_files_product_label_set) + + +# Saves expected files +def save_expected_files(s3_url_of_data_file, expected_files_product_label_set): + expected_data_files_table = dynamodb.Table('expected_data_files') + + expected_data_files_table.put_item( + Item={ + 's3_url_of_data_file': s3_url_of_data_file, + 's3_urls_of_product_labels': expected_files_product_label_set + } + ) + + +# Deletes expected files +def delete_expected_file(s3_url_of_data_file): + expected_data_files_table = dynamodb.Table('expected_data_files') + + expected_data_files_table.delete_item(Key={ + "s3_url_of_data_file": str(s3_url_of_data_file) + } + ) + + +# Deletes incomplete products +def delete_incomplete_product(s3_url_of_product_label): + incomplete_products_table = dynamodb.Table('incomplete_products') + + incomplete_products_table.delete_item(Key={ + "s3_url_of_product_label": str(s3_url_of_product_label) + } + ) + + +# Processes complete products +def product_completed(s3_url_of_product_label): + logger.info("PRODUCT COMPLETED") + delete_incomplete_product(s3_url_of_product_label) + notify_product(s3_url_of_product_label) + + +# Copies a file from S3 to EFS +def copy_file_to_efs(s3_bucket, s3_key, efs_mount_path): + path_of_product = s3_bucket + "/" + s3_key + download_dir = os.path.dirname('/tmp/' + path_of_product) + download_file_path = os.path.normpath('/tmp/' + path_of_product) + + try: + # Download the file from S3 (only /tmp has permissions to download) + logger.info(f"File downloading to : {download_file_path}") + os.makedirs(download_dir, exist_ok=True) + s3.download_file(s3_bucket, s3_key, download_file_path) + logger.info(f"File downloaded: {download_file_path}") + except Exception as e: + logger.error(f"Error downloading file from S3. s3_bucket: {s3_bucket}, s3_key: {s3_key}, efs_mount_path: {efs_mount_path}, Exception: {str(e)}") + + # Move the file to the /mnt/data directory + destination_path = efs_mount_path + os.path.dirname(path_of_product) + destination_file = efs_mount_path + path_of_product + + try: + os.makedirs(destination_path, exist_ok=True) + + if os.path.isfile(os.path.normpath(destination_file)): + os.remove(os.path.normpath(destination_file)) + logger.debug(f"Deleted existing file in: {os.path.normpath(destination_file)}") + + shutil.move(download_file_path, os.path.normpath(destination_path)) + logger.info(f"File moved to: {destination_path}") + except Exception as e: + logger.error(f"Error moving file to : {destination_path}. Exception: {str(e)}") + + return { + 'statusCode': 200, + 'body': 'File downloaded and moved successfully' + } + + +# Sends a notification to SQS on product copy completion +def notify_product(s3_url_of_product_label): + efs_mount_path = os.environ.get('EFS_MOUNT_PATH') + sqs_queue_url = os.environ.get('SQS_QUEUE_URL') + efs_product_label_file_location = s3_url_of_product_label.replace("s3:/", efs_mount_path, 1) + + sqs = boto3.client('sqs') + + message = { + "s3_url_of_product_label": s3_url_of_product_label, + "efs_product_label_file_location": efs_product_label_file_location, + } + + sqs.send_message( + QueueUrl=sqs_queue_url, + MessageBody=json.dumps(message) + ) + logger.info('SQS Message sent for the completed product') + +# Uses to convert a set to list for JSON serializing +def set_default(obj): + if isinstance(obj, set): + return list(obj) + raise TypeError diff --git a/src/pds/ingress/pds-nucleus-product-completion-checker.py b/src/pds/ingress/pds-nucleus-product-completion-checker.py index 41332e4..a92297a 100644 --- a/src/pds/ingress/pds-nucleus-product-completion-checker.py +++ b/src/pds/ingress/pds-nucleus-product-completion-checker.py @@ -1,11 +1,10 @@ """ -========================================= +============================================== pds-nucleus-product-completion-checker.py -========================================= +============================================== -Lambda function to check if the staging S3 bucket has received a complete product -with all required file. This lambda function is triggered by S3 bucket events -when a new file is copied to the staging S3 bucket. +Lambda function to check if the staging S3 bucket has received a complete product +with all required files. This lambda function is triggered periodically. """ @@ -14,285 +13,86 @@ import logging import shutil import boto3 +import botocore import os +import time from xml.dom import minidom from boto3.dynamodb.conditions import Key, Attr +from botocore.exceptions import ClientError s3 = boto3.client('s3') dynamodb = boto3.resource('dynamodb') logger = logging.getLogger("pds-nucleus-product-completion-checker-logger") +db_clust_arn = os.environ.get('DB_CLUSTER_ARN') +db_secret_arn = os.environ.get('DB_SECRET_ARN') +rds_data = boto3.client('rds-data') # Main lambda handler def lambda_handler(event, context): - # Get the object from the event and show its content type - bucket = event['Records'][0]['s3']['bucket']['name'] - key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') - s3_url_of_data_file = "s3://" + bucket + "/" + key - s3_url_of_product_label = "s3://" + bucket + "/" + key - efs_mount_path = os.environ.get('EFS_MOUNT_PATH') - logger.setLevel(logging.INFO) + logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) logger.info(f"Lambda Request ID: {context.aws_request_id}") try: - - # Product label received - if s3_url_of_data_file.lower().endswith(".xml"): - - missing_files = handle_missing_files_for_product_label(s3_url_of_product_label, bucket, key) - - copy_file_to_efs(bucket, key, efs_mount_path) - - # If there are any missing files for the given product label - if len(missing_files) > 0: - save_incomplete_product(s3_url_of_product_label, missing_files) - else: - product_completed(s3_url_of_product_label) - - # Data file received - elif not s3_url_of_data_file.lower().endswith("/"): # Not a directory - copy_file_to_efs(bucket, key, efs_mount_path) - save_received_file(s3_url_of_data_file) - update_existing_incomplete_products(s3_url_of_data_file) - delete_expected_file(s3_url_of_data_file) - - return f"Event for s3://{bucket}/{key} processed." + check_completed_products() + return f"Processed." except Exception as e: logger.error(f"Error processing S3 event: {event}. Exception: {str(e)}") raise e +# Identifies and processes completed products +def process_completed_products(): -# Handles the missing files detected for a product label -def handle_missing_files_for_product_label(s3_url_of_product_label, bucket, key): - s3_base_dir = s3_url_of_product_label.rsplit('/',1)[0] - - logger.info('Get s3_response for key {} from bucket {}'.format(key, bucket)) - - try: - s3_response = s3.get_object(Bucket=bucket, Key=key) - - # Get the Body object in the S3 get_object() response - s3_object_body = s3_response.get('Body') - - # Read the data in bytes format and convert it to string - content_str = s3_object_body.read().decode() - - # parse xml - xmldoc = minidom.parseString(content_str) - missing_files_from_product_label = xmldoc.getElementsByTagName('file_name') - - missing_files = [] - - for x in missing_files_from_product_label: - - s3_url_of_data_file = s3_base_dir + "/" + x.firstChild.nodeValue - - # Check received file table - items = check_in_received_file_table(s3_url_of_data_file) - - # File is already received - if items: - logger.debug(f"The file is already received: {items['s3_url_of_data_file']}") - else: - - missing_files.append(str(s3_url_of_data_file)) - update_expected_files(s3_url_of_data_file, s3_url_of_product_label) - logger.debug(f"Missing files: {str(missing_files)}") - - return missing_files - - except Exception as e: - logger.error(f"Error handling missing files for product label: {s3_url_of_product_label}. Exception: {str(e)}") - raise e - - -# Saves received files -def save_received_file(s3_url_of_data_file): - received_data_files_table = dynamodb.Table('received_data_files') - - received_data_files_table.put_item( - Item={ - 's3_url_of_data_file': s3_url_of_data_file - } - ) - - -# Returns an expected file response (with list of product labels) for a given s3_url_of_data_file -def get_expected_file(s3_url_of_data_file): - expected_data_files_table = dynamodb.Table('expected_data_files') - - expected_data_files_table_get_response = expected_data_files_table.get_item(Key={ - "s3_url_of_data_file": str(s3_url_of_data_file) - } - ) - - json_string_expected_files = json.dumps(expected_data_files_table_get_response, default=set_default) - expected_data_files_get_response_json = json.loads(json_string_expected_files) - expected_files_items = expected_data_files_get_response_json.get('Item') - - return expected_files_items - - -# Returns incomplete products -def get_incomplete_products(s3_url_of_product_label): - incomplete_products_table = dynamodb.Table('incomplete_products') - - incomplete_products_table_get_response = incomplete_products_table.get_item(Key={ - "s3_url_of_product_label": str(s3_url_of_product_label) - } - ) - - json_string_incomplete_products = json.dumps(incomplete_products_table_get_response, default=set_default) - incomplete_products_table_get_response_json = json.loads(json_string_incomplete_products) - incomplete_products_table_items = incomplete_products_table_get_response_json.get('Item') - - return incomplete_products_table_items + logger.info(f'Checking completed products') + sql = """ + select distinct s3_url_of_product_label from product where s3_url_of_product_label + NOT IN (select distinct s3_url_of_product_label from product_data_file_mapping + where s3_url_of_data_file + NOT IN (select s3_url_of_data_file from data_file)); + """ -# Checks for s3_url_of_data_file in received files table -def check_in_received_file_table(s3_url_of_data_file): - # Check received file table - received_data_files_table = dynamodb.Table('received_data_files') + response = rds_data.execute_statement( + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql) - received_data_files_table_get_response = received_data_files_table.get_item(Key={ - "s3_url_of_data_file": str(s3_url_of_data_file) - }) + logger.info("Number of completed product labels: " + str(len(response['records']))) - json_string = json.dumps(received_data_files_table_get_response, default=set_default) - received_data_files_table_get_response_json = json.loads(json_string) - items = received_data_files_table_get_response_json.get('Item') + for record in response['records']: + for data_dict in record: + for data_type, data_value in data_dict.items(): + update_product_processing_status_in_database(data_value,'COMPLETE') + notify_completed_product(data_value) + time.sleep(1/1000) - return items +# Updates the product processing status of the given s3_url_of_product_label +def update_product_processing_status_in_database(s3_url_of_product_label, processing_status): + sql = f""" + UPDATE product + SET processing_status = '{processing_status}' + # SET last_updated_epoch_time = {round(time.time()*1000)} + WHERE s3_url_of_product_label = '{s3_url_of_product_label}' + """ -# Saves incomplete product label with a list if missing files -def save_incomplete_product(s3_url_of_product_label, missing_files): - incomplete_products_table = dynamodb.Table('incomplete_products') + logger.debug((sql) - incomplete_products_table.put_item( - Item={ - 's3_url_of_product_label': s3_url_of_product_label, - 'missing_files': missing_files - } - ) - - -# Updates exiting incomplete products -def update_existing_incomplete_products(s3_url_of_data_file): - expected_files_items = get_expected_file(s3_url_of_data_file) - - # If this is an already expected file (i.e.: another product was also expecting this) - if expected_files_items: - expected_files_product_label_list = expected_files_items['s3_urls_of_product_labels'] - logger.info("expected_files_product_label_list = " + str(expected_files_product_label_list)) - - for expected_files_product_label in expected_files_product_label_list: - - incomplete_products_table_items = get_incomplete_products(expected_files_product_label) - - if incomplete_products_table_items: - missing_files_list = incomplete_products_table_items['missing_files'] - s3_url_of_product_label = incomplete_products_table_items['s3_url_of_product_label'] - missing_files_list.remove(s3_url_of_data_file) - - if len(missing_files_list) > 0: - save_incomplete_product(s3_url_of_product_label, missing_files_list) - else: - product_completed(s3_url_of_product_label) - - -# Updates expected files -def update_expected_files(s3_url_of_data_file, s3_url_of_product_label): - expected_files_items = get_expected_file(s3_url_of_data_file) - expected_files_product_label_set = set() - if expected_files_items: - expected_files_product_label_set = set(expected_files_items['s3_urls_of_product_labels']) + response = rds_data.execute_statement( + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql) - expected_files_product_label_set.add(s3_url_of_product_label) - save_expected_files(s3_url_of_data_file, expected_files_product_label_set) - - -# Saves expected files -def save_expected_files(s3_url_of_data_file, expected_files_product_label_set): - expected_data_files_table = dynamodb.Table('expected_data_files') - - expected_data_files_table.put_item( - Item={ - 's3_url_of_data_file': s3_url_of_data_file, - 's3_urls_of_product_labels': expected_files_product_label_set - } - ) - - -# Deletes expected files -def delete_expected_file(s3_url_of_data_file): - expected_data_files_table = dynamodb.Table('expected_data_files') - - expected_data_files_table.delete_item(Key={ - "s3_url_of_data_file": str(s3_url_of_data_file) - } - ) - - -# Deletes incomplete products -def delete_incomplete_product(s3_url_of_product_label): - incomplete_products_table = dynamodb.Table('incomplete_products') - - incomplete_products_table.delete_item(Key={ - "s3_url_of_product_label": str(s3_url_of_product_label) - } - ) - - -# Processes complete products -def product_completed(s3_url_of_product_label): - logger.info("PRODUCT COMPLETED") - delete_incomplete_product(s3_url_of_product_label) - notify_product(s3_url_of_product_label) - - -# Copies a file from S3 to EFS -def copy_file_to_efs(s3_bucket, s3_key, efs_mount_path): - path_of_product = s3_bucket + "/" + s3_key - download_dir = os.path.dirname('/tmp/' + path_of_product) - download_file_path = os.path.normpath('/tmp/' + path_of_product) - - try: - # Download the file from S3 (only /tmp has permissions to download) - logger.info(f"File downloading to : {download_file_path}") - os.makedirs(download_dir, exist_ok=True) - s3.download_file(s3_bucket, s3_key, download_file_path) - logger.info(f"File downloaded: {download_file_path}") - except Exception as e: - logger.error(f"Error downloading file from S3. s3_bucket: {s3_bucket}, s3_key: {s3_key}, efs_mount_path: {efs_mount_path}, Exception: {str(e)}") - - # Move the file to the /mnt/data directory - destination_path = efs_mount_path + os.path.dirname(path_of_product) - destination_file = efs_mount_path + path_of_product - - try: - os.makedirs(destination_path, exist_ok=True) - - if os.path.isfile(os.path.normpath(destination_file)): - os.remove(os.path.normpath(destination_file)) - logger.debug(f"Deleted existing file in: {os.path.normpath(destination_file)}") - - shutil.move(download_file_path, os.path.normpath(destination_path)) - logger.info(f"File moved to: {destination_path}") - except Exception as e: - logger.error(f"Error moving file to : {destination_path}. Exception: {str(e)}") - - return { - 'statusCode': 200, - 'body': 'File downloaded and moved successfully' - } + logger.info("response = " + str(response)) # Sends a notification to SQS on product copy completion -def notify_product(s3_url_of_product_label): +def notify_completed_product(s3_url_of_product_label): efs_mount_path = os.environ.get('EFS_MOUNT_PATH') sqs_queue_url = os.environ.get('SQS_QUEUE_URL') efs_product_label_file_location = s3_url_of_product_label.replace("s3:/", efs_mount_path, 1) @@ -308,10 +108,5 @@ def notify_product(s3_url_of_product_label): QueueUrl=sqs_queue_url, MessageBody=json.dumps(message) ) - logger.info('SQS Message sent for the completed product') -# Uses to convert a set to list for JSON serializing -def set_default(obj): - if isinstance(obj, set): - return list(obj) - raise TypeError + logger.info('SQS Message sent for the completed product') diff --git a/src/pds/ingress/pds-nucleus-product-writer.py b/src/pds/ingress/pds-nucleus-product-writer.py new file mode 100644 index 0000000..cc0cd33 --- /dev/null +++ b/src/pds/ingress/pds-nucleus-product-writer.py @@ -0,0 +1,263 @@ +""" +========================================= +pds-nucleus-product-writer.py +========================================= + +Lambda function to copy files from PDS Nucleus S3 staging bucket to an EFS volume. +This lambda function is triggered by S3 bucket events when a new file is copied to the staging S3 bucket. + +""" + +import json +import urllib.parse +import logging +import shutil +import boto3 +import botocore +import os +import time +from xml.dom import minidom +from botocore.exceptions import ClientError + +s3 = boto3.client('s3') +logger = logging.getLogger("pds-nucleus-product-writer-logger") +db_clust_arn = os.environ.get('DB_CLUSTER_ARN') +db_secret_arn = os.environ.get('DB_SECRET_ARN') +rds_data = boto3.client('rds-data') + +# Main lambda handler +def lambda_handler(event, context): + + # Get the object from the event and show its content type + bucket = event['Records'][0]['s3']['bucket']['name'] + key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') + s3_url_of_data_file = "s3://" + bucket + "/" + key + s3_url_of_product_label = "s3://" + bucket + "/" + key + efs_mount_path = os.environ.get('EFS_MOUNT_PATH') + logger.setLevel(logging.DEBUG) + logger.addHandler(logging.StreamHandler()) + logger.info(f"Lambda Request ID: {context.aws_request_id}") + + try: + + # Product label received + if s3_url_of_data_file.lower().endswith(".xml"): + logger.info(f"Received product file: {s3_url_of_data_file}" ) + copy_file_to_efs(bucket, key, efs_mount_path) + save_product_processing_status_in_database(s3_url_of_product_label, "INCOMPLETE") + save_files_for_product_label(s3_url_of_product_label, bucket, key) + + # Data file received + elif not s3_url_of_data_file.lower().endswith("/"): # Not a directory + logger.info(f"Received data file: {s3_url_of_data_file}" ) + copy_file_to_efs(bucket, key, efs_mount_path) + save_data_file_in_database(s3_url_of_data_file) + + return f"Event for s3://{bucket}/{key} processed." + except Exception as e: + logger.error(f"Error processing S3 event: {event}. Exception: {str(e)}") + raise e + + +# Returns the missing files detected for a product label +def get_missing_files_for_product_label(s3_url_of_product_label, bucket, key): + s3_base_dir = s3_url_of_product_label.rsplit('/',1)[0] + + logger.info('Get s3_response for key {} from bucket {}'.format(key, bucket)) + + try: + s3_response = s3.get_object(Bucket=bucket, Key=key) + + # Get the Body object in the S3 get_object() response + s3_object_body = s3_response.get('Body') + + # Read the data in bytes format and convert it to string + content_str = s3_object_body.read().decode() + + # parse xml + xmldoc = minidom.parseString(content_str) + missing_files_from_product_label = xmldoc.getElementsByTagName('file_name') + + missing_files = [] + + for x in missing_files_from_product_label: + + s3_url_of_data_file = s3_base_dir + "/" + x.firstChild.nodeValue + + # Check received file table + items = check_in_received_file_table(s3_url_of_data_file) + + # File is already received + if items: + logger.debug(f"The file is already received: {items['s3_url_of_data_file']}") + else: + + missing_files.append(str(s3_url_of_data_file)) + update_expected_files(s3_url_of_data_file, s3_url_of_product_label) + logger.debug(f"Missing files: {str(missing_files)}") + + return missing_files + + except Exception as e: + logger.error(f"Error handling missing files for product label: {s3_url_of_product_label}. Exception: {str(e)}") + raise e + +# Creates a mapping record in the database for product and relevant files +def save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_of_data_file): + + logger.info(f'Save product data file mapping {s3_url_of_product_label} --> {s3_url_of_data_file}') + sql = """ + INSERT INTO product_data_file_mapping + ( + s3_url_of_product_label, + s3_url_of_data_file, + last_updated_epoch_time) + VALUES( + :s3_url_of_product_label_param, + :s3_url_of_data_file_param, + :last_updated_epoch_time_param + ) + """ + + s3_url_of_product_label_param = {'name':'s3_url_of_product_label_param', 'value':{'stringValue': s3_url_of_product_label}} + s3_url_of_data_file_param = {'name':'s3_url_of_data_file_param', 'value':{'stringValue': s3_url_of_data_file}} + last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} + + param_set = [s3_url_of_product_label_param, s3_url_of_data_file_param, last_updated_epoch_time_param] + + response = rds_data.execute_statement( + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, + parameters = param_set) + + print(str(response)) + +# Creates a record for product +def save_product_processing_status_in_database(s3_url_of_product_label, processing_status): + + logger.info(f'Save product processing status for: {s3_url_of_product_label}') + + sql = """ + INSERT INTO product + ( + s3_url_of_product_label, + processing_status, + last_updated_epoch_time) + VALUES( + :s3_url_of_product_label_param, + :processing_status_param, + :last_updated_epoch_time_param + ) + """ + + s3_url_of_product_label_param = {'name':'s3_url_of_product_label_param', 'value':{'stringValue': s3_url_of_product_label}} + processing_status_param = {'name':'processing_status_param', 'value':{'stringValue': processing_status}} + last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} + + param_set = [s3_url_of_product_label_param, processing_status_param, last_updated_epoch_time_param] + + response = rds_data.execute_statement( + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, + parameters = param_set) + + print(str(response)) + +# Creates a record for data file +def save_data_file_in_database(s3_url_of_data_file): + + logger.info(f'Save datafile: {s3_url_of_data_file}') + + sql = """ + REPLACE INTO data_file + ( + s3_url_of_data_file, + last_updated_epoch_time) + VALUES( + :s3_url_of_data_file_param, + :last_updated_epoch_time_param + ) + """ + + + s3_url_of_data_file_param = {'name':'s3_url_of_data_file_param', 'value':{'stringValue': s3_url_of_data_file}} + last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} + + param_set = [s3_url_of_data_file_param, last_updated_epoch_time_param] + + response = rds_data.execute_statement( + resourceArn = db_clust_arn, + secretArn = db_secret_arn, + database = 'pds_nucleus', + sql = sql, + parameters = param_set) + + logger.info(str(response)) + + +# Creates a record for product label +def save_files_for_product_label(s3_url_of_product_label, bucket, key): + s3_base_dir = s3_url_of_product_label.rsplit('/',1)[0] + + logger.info('Get s3_response for key {} from bucket {}'.format(key, bucket)) + + try: + s3_response = s3.get_object(Bucket=bucket, Key=key) + + # Get the Body object in the S3 get_object() response + s3_object_body = s3_response.get('Body') + + # Read the data in bytes format and convert it to string + content_str = s3_object_body.read().decode() + + # parse xml + xmldoc = minidom.parseString(content_str) + expected_files_from_product_label = xmldoc.getElementsByTagName('file_name') + + for x in expected_files_from_product_label: + s3_url_of_data_file = s3_base_dir + "/" + x.firstChild.nodeValue + save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_of_data_file) + + except Exception as e: + logger.error(f"Error handling missing files for product label: {s3_url_of_product_label}. Exception: {str(e)}") + raise e + +# Copies a file from S3 to EFS +def copy_file_to_efs(s3_bucket, s3_key, efs_mount_path): + path_of_product = s3_bucket + "/" + s3_key + download_dir = os.path.dirname('/tmp/' + path_of_product) + download_file_path = os.path.normpath('/tmp/' + path_of_product) + + try: + # Download the file from S3 (only /tmp has permissions to download) + logger.info(f"File downloading to : {download_file_path}") + os.makedirs(download_dir, exist_ok=True) + s3.download_file(s3_bucket, s3_key, download_file_path) + logger.info(f"File downloaded: {download_file_path}") + except Exception as e: + logger.error(f"Error downloading file from S3. s3_bucket: {s3_bucket}, s3_key: {s3_key}, efs_mount_path: {efs_mount_path}, Exception: {str(e)}") + + # Move the file to the /mnt/data directory + destination_path = efs_mount_path + os.path.dirname(path_of_product) + destination_file = efs_mount_path + path_of_product + + try: + os.makedirs(destination_path, exist_ok=True) + + if os.path.isfile(os.path.normpath(destination_file)): + os.remove(os.path.normpath(destination_file)) + logger.debug(f"Deleted existing file in: {os.path.normpath(destination_file)}") + + shutil.move(download_file_path, os.path.normpath(destination_path)) + logger.info(f"File moved to: {destination_path}") + except Exception as e: + logger.error(f"Error moving file to : {destination_path}. Exception: {str(e)}") + + return { + 'statusCode': 200, + 'body': 'File downloaded and moved successfully' + } From b2b56f2f773361af6443c3cf474d3034fa5e9aa0 Mon Sep 17 00:00:00 2001 From: Ramesh Maddegoda <94033485+ramesh-maddegoda@users.noreply.github.com> Date: Thu, 12 Oct 2023 00:42:53 -0700 Subject: [PATCH 2/4] UPDATE RDS based PDS Nucleus product completion checker lambda functions --- .pre-commit-config.yaml | 88 +++++++++++++++---- .../pds-nucleus-product-completion-checker.py | 9 +- src/pds/ingress/pds-nucleus-product-writer.py | 45 ---------- terraform/terraform-modules/ecs/ecs.tf | 4 +- 4 files changed, 74 insertions(+), 72 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a533e91..704b63c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,21 +1,71 @@ repos: -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 - hooks: - - id: trailing-whitespace - - id: end-of-file-fixer - - id: check-merge-conflict - - id: debug-statements - - id: check-yaml - - id: check-json #checks json files for parseable syntax. - - id: pretty-format-json #sets a standard for formatting json files. + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: trailing-whitespace + exclude: REQUIREMENTS\.md$ + - id: end-of-file-fixer + exclude: REQUIREMENTS\.md$ + - id: check-executables-have-shebangs + - id: check-merge-conflict + - id: debug-statements + - id: check-yaml + files: .*\.(yaml|yml)$ + - id: check-json + - id: pretty-format-json -- repo: https://github.com/antonbabenko/pre-commit-terraform - rev: v1.81.0 - hooks: - - id: terraform_fmt #Rewrites all Terraform configuration files to a canonical format. - - id: terraform_validate #Validates all Terraform configuration files. - - id: terraform_tflint #Validates all Terraform configuration files with TFLint. - # - id: terrascan #Detect compliance and security violations of Terraform templates. - # - id: terraform_tfsec - # - id: terraform_docs + - repo: https://github.com/antonbabenko/pre-commit-terraform + rev: v1.81.0 + hooks: + - id: terraform_fmt + - id: terraform_validate + # - id: terraform_tflint + # - id: terrascan + # - id: terraform_tfsec + + - repo: https://github.com/asottile/reorder_python_imports + rev: v2.6.0 + hooks: + - id: reorder-python-imports + files: ^src/|tests/ + + - repo: local + hooks: + - id: mypy + name: mypy + entry: mypy src + language: system + pass_filenames: false + + - repo: local + hooks: + - id: black + name: black + entry: black + files: ^src/|tests/ + language: system + types: [python] + + - repo: local + hooks: + - id: flake8 + name: flake8 + entry: flake8 src + language: system + pass_filenames: false + + - repo: local + hooks: + - id: tests + name: Tests + entry: pytest + language: system + stages: [push] + pass_filenames: false + + # #65: support for git-secrets + - repo: https://github.com/awslabs/git-secrets.git + # We have to use an sha here instead of a tag because of awslabs/git-secrets#182 + rev: b9e96b3212fa06aea65964ff0d5cda84ce935f38 + hooks: + - id: git-secrets diff --git a/src/pds/ingress/pds-nucleus-product-completion-checker.py b/src/pds/ingress/pds-nucleus-product-completion-checker.py index a92297a..6a19add 100644 --- a/src/pds/ingress/pds-nucleus-product-completion-checker.py +++ b/src/pds/ingress/pds-nucleus-product-completion-checker.py @@ -13,12 +13,9 @@ import logging import shutil import boto3 -import botocore import os import time from xml.dom import minidom -from boto3.dynamodb.conditions import Key, Attr -from botocore.exceptions import ClientError s3 = boto3.client('s3') dynamodb = boto3.resource('dynamodb') @@ -36,7 +33,7 @@ def lambda_handler(event, context): logger.info(f"Lambda Request ID: {context.aws_request_id}") try: - check_completed_products() + process_completed_products() return f"Processed." except Exception as e: logger.error(f"Error processing S3 event: {event}. Exception: {str(e)}") @@ -67,7 +64,7 @@ def process_completed_products(): for data_type, data_value in data_dict.items(): update_product_processing_status_in_database(data_value,'COMPLETE') notify_completed_product(data_value) - time.sleep(1/1000) + time.sleep(1/100) # Updates the product processing status of the given s3_url_of_product_label def update_product_processing_status_in_database(s3_url_of_product_label, processing_status): @@ -79,7 +76,7 @@ def update_product_processing_status_in_database(s3_url_of_product_label, proces WHERE s3_url_of_product_label = '{s3_url_of_product_label}' """ - logger.debug((sql) + logger.debug(sql) response = rds_data.execute_statement( diff --git a/src/pds/ingress/pds-nucleus-product-writer.py b/src/pds/ingress/pds-nucleus-product-writer.py index cc0cd33..534aaf9 100644 --- a/src/pds/ingress/pds-nucleus-product-writer.py +++ b/src/pds/ingress/pds-nucleus-product-writer.py @@ -13,11 +13,9 @@ import logging import shutil import boto3 -import botocore import os import time from xml.dom import minidom -from botocore.exceptions import ClientError s3 = boto3.client('s3') logger = logging.getLogger("pds-nucleus-product-writer-logger") @@ -59,49 +57,6 @@ def lambda_handler(event, context): raise e -# Returns the missing files detected for a product label -def get_missing_files_for_product_label(s3_url_of_product_label, bucket, key): - s3_base_dir = s3_url_of_product_label.rsplit('/',1)[0] - - logger.info('Get s3_response for key {} from bucket {}'.format(key, bucket)) - - try: - s3_response = s3.get_object(Bucket=bucket, Key=key) - - # Get the Body object in the S3 get_object() response - s3_object_body = s3_response.get('Body') - - # Read the data in bytes format and convert it to string - content_str = s3_object_body.read().decode() - - # parse xml - xmldoc = minidom.parseString(content_str) - missing_files_from_product_label = xmldoc.getElementsByTagName('file_name') - - missing_files = [] - - for x in missing_files_from_product_label: - - s3_url_of_data_file = s3_base_dir + "/" + x.firstChild.nodeValue - - # Check received file table - items = check_in_received_file_table(s3_url_of_data_file) - - # File is already received - if items: - logger.debug(f"The file is already received: {items['s3_url_of_data_file']}") - else: - - missing_files.append(str(s3_url_of_data_file)) - update_expected_files(s3_url_of_data_file, s3_url_of_product_label) - logger.debug(f"Missing files: {str(missing_files)}") - - return missing_files - - except Exception as e: - logger.error(f"Error handling missing files for product label: {s3_url_of_product_label}. Exception: {str(e)}") - raise e - # Creates a mapping record in the database for product and relevant files def save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_of_data_file): diff --git a/terraform/terraform-modules/ecs/ecs.tf b/terraform/terraform-modules/ecs/ecs.tf index 604668d..06fd0ed 100644 --- a/terraform/terraform-modules/ecs/ecs.tf +++ b/terraform/terraform-modules/ecs/ecs.tf @@ -4,8 +4,8 @@ resource "aws_ecs_cluster" "main" { name = "pds-nucleus-ecc-tf" } -resource "aws_ecs_task_definition" "pds-s3-to-efs-data-move-terraform" { - family = "pds-s3-to-efs-data-move-terraform" +resource "aws_ecs_task_definition" "pds-s3-to-efs-data-move" { + family = "pds-s3-to-efs-data-move" requires_compatibilities = ["EC2", "FARGATE"] network_mode = "awsvpc" cpu = 4096 From bb6ce5585d7adf304447258a14c38873ab772068 Mon Sep 17 00:00:00 2001 From: Ramesh Maddegoda <94033485+ramesh-maddegoda@users.noreply.github.com> Date: Thu, 12 Oct 2023 00:59:53 -0700 Subject: [PATCH 3/4] UPDATE the python files with PEP8 format --- .../pds-nucleus-product-completion-checker.py | 35 ++++---- src/pds/ingress/pds-nucleus-product-writer.py | 81 ++++++++++--------- 2 files changed, 59 insertions(+), 57 deletions(-) diff --git a/src/pds/ingress/pds-nucleus-product-completion-checker.py b/src/pds/ingress/pds-nucleus-product-completion-checker.py index 6a19add..95a4370 100644 --- a/src/pds/ingress/pds-nucleus-product-completion-checker.py +++ b/src/pds/ingress/pds-nucleus-product-completion-checker.py @@ -9,13 +9,11 @@ """ import json -import urllib.parse import logging -import shutil -import boto3 import os import time -from xml.dom import minidom + +import boto3 s3 = boto3.client('s3') dynamodb = boto3.resource('dynamodb') @@ -24,9 +22,9 @@ db_secret_arn = os.environ.get('DB_SECRET_ARN') rds_data = boto3.client('rds-data') + # Main lambda handler def lambda_handler(event, context): - logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) @@ -39,9 +37,9 @@ def lambda_handler(event, context): logger.error(f"Error processing S3 event: {event}. Exception: {str(e)}") raise e + # Identifies and processes completed products def process_completed_products(): - logger.info(f'Checking completed products') sql = """ @@ -52,38 +50,37 @@ def process_completed_products(): """ response = rds_data.execute_statement( - resourceArn = db_clust_arn, - secretArn = db_secret_arn, - database = 'pds_nucleus', - sql = sql) + resourceArn=db_clust_arn, + secretArn=db_secret_arn, + database='pds_nucleus', + sql=sql) logger.info("Number of completed product labels: " + str(len(response['records']))) for record in response['records']: for data_dict in record: for data_type, data_value in data_dict.items(): - update_product_processing_status_in_database(data_value,'COMPLETE') + update_product_processing_status_in_database(data_value, 'COMPLETE') notify_completed_product(data_value) - time.sleep(1/100) + time.sleep(1 / 100) + # Updates the product processing status of the given s3_url_of_product_label def update_product_processing_status_in_database(s3_url_of_product_label, processing_status): - sql = f""" UPDATE product SET processing_status = '{processing_status}' - # SET last_updated_epoch_time = {round(time.time()*1000)} + # SET last_updated_epoch_time = {round(time.time() * 1000)} WHERE s3_url_of_product_label = '{s3_url_of_product_label}' """ logger.debug(sql) - response = rds_data.execute_statement( - resourceArn = db_clust_arn, - secretArn = db_secret_arn, - database = 'pds_nucleus', - sql = sql) + resourceArn=db_clust_arn, + secretArn=db_secret_arn, + database='pds_nucleus', + sql=sql) logger.info("response = " + str(response)) diff --git a/src/pds/ingress/pds-nucleus-product-writer.py b/src/pds/ingress/pds-nucleus-product-writer.py index 534aaf9..45d4ca7 100644 --- a/src/pds/ingress/pds-nucleus-product-writer.py +++ b/src/pds/ingress/pds-nucleus-product-writer.py @@ -8,24 +8,24 @@ """ -import json -import urllib.parse import logging -import shutil -import boto3 import os +import shutil import time +import urllib.parse from xml.dom import minidom +import boto3 + s3 = boto3.client('s3') logger = logging.getLogger("pds-nucleus-product-writer-logger") db_clust_arn = os.environ.get('DB_CLUSTER_ARN') db_secret_arn = os.environ.get('DB_SECRET_ARN') rds_data = boto3.client('rds-data') + # Main lambda handler def lambda_handler(event, context): - # Get the object from the event and show its content type bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') @@ -40,14 +40,14 @@ def lambda_handler(event, context): # Product label received if s3_url_of_data_file.lower().endswith(".xml"): - logger.info(f"Received product file: {s3_url_of_data_file}" ) + logger.info(f"Received product file: {s3_url_of_data_file}") copy_file_to_efs(bucket, key, efs_mount_path) save_product_processing_status_in_database(s3_url_of_product_label, "INCOMPLETE") save_files_for_product_label(s3_url_of_product_label, bucket, key) # Data file received - elif not s3_url_of_data_file.lower().endswith("/"): # Not a directory - logger.info(f"Received data file: {s3_url_of_data_file}" ) + elif not s3_url_of_data_file.lower().endswith("/"): # Not a directory + logger.info(f"Received data file: {s3_url_of_data_file}") copy_file_to_efs(bucket, key, efs_mount_path) save_data_file_in_database(s3_url_of_data_file) @@ -59,7 +59,6 @@ def lambda_handler(event, context): # Creates a mapping record in the database for product and relevant files def save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_of_data_file): - logger.info(f'Save product data file mapping {s3_url_of_product_label} --> {s3_url_of_data_file}') sql = """ INSERT INTO product_data_file_mapping @@ -74,24 +73,26 @@ def save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_o ) """ - s3_url_of_product_label_param = {'name':'s3_url_of_product_label_param', 'value':{'stringValue': s3_url_of_product_label}} - s3_url_of_data_file_param = {'name':'s3_url_of_data_file_param', 'value':{'stringValue': s3_url_of_data_file}} - last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} + s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', + 'value': {'stringValue': s3_url_of_product_label}} + s3_url_of_data_file_param = {'name': 's3_url_of_data_file_param', 'value': {'stringValue': s3_url_of_data_file}} + last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', + 'value': {'longValue': round(time.time() * 1000)}} param_set = [s3_url_of_product_label_param, s3_url_of_data_file_param, last_updated_epoch_time_param] response = rds_data.execute_statement( - resourceArn = db_clust_arn, - secretArn = db_secret_arn, - database = 'pds_nucleus', - sql = sql, - parameters = param_set) + resourceArn=db_clust_arn, + secretArn=db_secret_arn, + database='pds_nucleus', + sql=sql, + parameters=param_set) print(str(response)) + # Creates a record for product def save_product_processing_status_in_database(s3_url_of_product_label, processing_status): - logger.info(f'Save product processing status for: {s3_url_of_product_label}') sql = """ @@ -107,24 +108,26 @@ def save_product_processing_status_in_database(s3_url_of_product_label, processi ) """ - s3_url_of_product_label_param = {'name':'s3_url_of_product_label_param', 'value':{'stringValue': s3_url_of_product_label}} - processing_status_param = {'name':'processing_status_param', 'value':{'stringValue': processing_status}} - last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} + s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', + 'value': {'stringValue': s3_url_of_product_label}} + processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}} + last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', + 'value': {'longValue': round(time.time() * 1000)}} param_set = [s3_url_of_product_label_param, processing_status_param, last_updated_epoch_time_param] response = rds_data.execute_statement( - resourceArn = db_clust_arn, - secretArn = db_secret_arn, - database = 'pds_nucleus', - sql = sql, - parameters = param_set) + resourceArn=db_clust_arn, + secretArn=db_secret_arn, + database='pds_nucleus', + sql=sql, + parameters=param_set) print(str(response)) + # Creates a record for data file def save_data_file_in_database(s3_url_of_data_file): - logger.info(f'Save datafile: {s3_url_of_data_file}') sql = """ @@ -138,25 +141,25 @@ def save_data_file_in_database(s3_url_of_data_file): ) """ - - s3_url_of_data_file_param = {'name':'s3_url_of_data_file_param', 'value':{'stringValue': s3_url_of_data_file}} - last_updated_epoch_time_param = {'name':'last_updated_epoch_time_param', 'value':{'longValue': round(time.time()*1000)}} + s3_url_of_data_file_param = {'name': 's3_url_of_data_file_param', 'value': {'stringValue': s3_url_of_data_file}} + last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', + 'value': {'longValue': round(time.time() * 1000)}} param_set = [s3_url_of_data_file_param, last_updated_epoch_time_param] response = rds_data.execute_statement( - resourceArn = db_clust_arn, - secretArn = db_secret_arn, - database = 'pds_nucleus', - sql = sql, - parameters = param_set) + resourceArn=db_clust_arn, + secretArn=db_secret_arn, + database='pds_nucleus', + sql=sql, + parameters=param_set) logger.info(str(response)) # Creates a record for product label def save_files_for_product_label(s3_url_of_product_label, bucket, key): - s3_base_dir = s3_url_of_product_label.rsplit('/',1)[0] + s3_base_dir = s3_url_of_product_label.rsplit('/', 1)[0] logger.info('Get s3_response for key {} from bucket {}'.format(key, bucket)) @@ -181,6 +184,7 @@ def save_files_for_product_label(s3_url_of_product_label, bucket, key): logger.error(f"Error handling missing files for product label: {s3_url_of_product_label}. Exception: {str(e)}") raise e + # Copies a file from S3 to EFS def copy_file_to_efs(s3_bucket, s3_key, efs_mount_path): path_of_product = s3_bucket + "/" + s3_key @@ -191,10 +195,11 @@ def copy_file_to_efs(s3_bucket, s3_key, efs_mount_path): # Download the file from S3 (only /tmp has permissions to download) logger.info(f"File downloading to : {download_file_path}") os.makedirs(download_dir, exist_ok=True) - s3.download_file(s3_bucket, s3_key, download_file_path) + s3.download_file(s3_bucket, s3_key, download_file_path) logger.info(f"File downloaded: {download_file_path}") except Exception as e: - logger.error(f"Error downloading file from S3. s3_bucket: {s3_bucket}, s3_key: {s3_key}, efs_mount_path: {efs_mount_path}, Exception: {str(e)}") + logger.error( + f"Error downloading file from S3. s3_bucket: {s3_bucket}, s3_key: {s3_key}, efs_mount_path: {efs_mount_path}, Exception: {str(e)}") # Move the file to the /mnt/data directory destination_path = efs_mount_path + os.path.dirname(path_of_product) From 8993b4ba1aaaaf2c2e1853d8431ee221885d766d Mon Sep 17 00:00:00 2001 From: Ramesh Maddegoda <94033485+ramesh-maddegoda@users.noreply.github.com> Date: Thu, 12 Oct 2023 01:01:52 -0700 Subject: [PATCH 4/4] REMOVE DynamoDB related code --- .../pds-nucleus-product-completion-checker.py | 317 ------------------ 1 file changed, 317 deletions(-) delete mode 100644 src/pds/ingress/dynamodb/pds-nucleus-product-completion-checker.py diff --git a/src/pds/ingress/dynamodb/pds-nucleus-product-completion-checker.py b/src/pds/ingress/dynamodb/pds-nucleus-product-completion-checker.py deleted file mode 100644 index 41332e4..0000000 --- a/src/pds/ingress/dynamodb/pds-nucleus-product-completion-checker.py +++ /dev/null @@ -1,317 +0,0 @@ -""" -========================================= -pds-nucleus-product-completion-checker.py -========================================= - -Lambda function to check if the staging S3 bucket has received a complete product -with all required file. This lambda function is triggered by S3 bucket events -when a new file is copied to the staging S3 bucket. - -""" - -import json -import urllib.parse -import logging -import shutil -import boto3 -import os -from xml.dom import minidom -from boto3.dynamodb.conditions import Key, Attr - -s3 = boto3.client('s3') -dynamodb = boto3.resource('dynamodb') -logger = logging.getLogger("pds-nucleus-product-completion-checker-logger") - -# Main lambda handler -def lambda_handler(event, context): - - # Get the object from the event and show its content type - bucket = event['Records'][0]['s3']['bucket']['name'] - key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') - s3_url_of_data_file = "s3://" + bucket + "/" + key - s3_url_of_product_label = "s3://" + bucket + "/" + key - efs_mount_path = os.environ.get('EFS_MOUNT_PATH') - logger.setLevel(logging.INFO) - logger.addHandler(logging.StreamHandler()) - - logger.info(f"Lambda Request ID: {context.aws_request_id}") - - try: - - # Product label received - if s3_url_of_data_file.lower().endswith(".xml"): - - missing_files = handle_missing_files_for_product_label(s3_url_of_product_label, bucket, key) - - copy_file_to_efs(bucket, key, efs_mount_path) - - # If there are any missing files for the given product label - if len(missing_files) > 0: - save_incomplete_product(s3_url_of_product_label, missing_files) - else: - product_completed(s3_url_of_product_label) - - # Data file received - elif not s3_url_of_data_file.lower().endswith("/"): # Not a directory - copy_file_to_efs(bucket, key, efs_mount_path) - save_received_file(s3_url_of_data_file) - update_existing_incomplete_products(s3_url_of_data_file) - delete_expected_file(s3_url_of_data_file) - - return f"Event for s3://{bucket}/{key} processed." - except Exception as e: - logger.error(f"Error processing S3 event: {event}. Exception: {str(e)}") - raise e - - -# Handles the missing files detected for a product label -def handle_missing_files_for_product_label(s3_url_of_product_label, bucket, key): - s3_base_dir = s3_url_of_product_label.rsplit('/',1)[0] - - logger.info('Get s3_response for key {} from bucket {}'.format(key, bucket)) - - try: - s3_response = s3.get_object(Bucket=bucket, Key=key) - - # Get the Body object in the S3 get_object() response - s3_object_body = s3_response.get('Body') - - # Read the data in bytes format and convert it to string - content_str = s3_object_body.read().decode() - - # parse xml - xmldoc = minidom.parseString(content_str) - missing_files_from_product_label = xmldoc.getElementsByTagName('file_name') - - missing_files = [] - - for x in missing_files_from_product_label: - - s3_url_of_data_file = s3_base_dir + "/" + x.firstChild.nodeValue - - # Check received file table - items = check_in_received_file_table(s3_url_of_data_file) - - # File is already received - if items: - logger.debug(f"The file is already received: {items['s3_url_of_data_file']}") - else: - - missing_files.append(str(s3_url_of_data_file)) - update_expected_files(s3_url_of_data_file, s3_url_of_product_label) - logger.debug(f"Missing files: {str(missing_files)}") - - return missing_files - - except Exception as e: - logger.error(f"Error handling missing files for product label: {s3_url_of_product_label}. Exception: {str(e)}") - raise e - - -# Saves received files -def save_received_file(s3_url_of_data_file): - received_data_files_table = dynamodb.Table('received_data_files') - - received_data_files_table.put_item( - Item={ - 's3_url_of_data_file': s3_url_of_data_file - } - ) - - -# Returns an expected file response (with list of product labels) for a given s3_url_of_data_file -def get_expected_file(s3_url_of_data_file): - expected_data_files_table = dynamodb.Table('expected_data_files') - - expected_data_files_table_get_response = expected_data_files_table.get_item(Key={ - "s3_url_of_data_file": str(s3_url_of_data_file) - } - ) - - json_string_expected_files = json.dumps(expected_data_files_table_get_response, default=set_default) - expected_data_files_get_response_json = json.loads(json_string_expected_files) - expected_files_items = expected_data_files_get_response_json.get('Item') - - return expected_files_items - - -# Returns incomplete products -def get_incomplete_products(s3_url_of_product_label): - incomplete_products_table = dynamodb.Table('incomplete_products') - - incomplete_products_table_get_response = incomplete_products_table.get_item(Key={ - "s3_url_of_product_label": str(s3_url_of_product_label) - } - ) - - json_string_incomplete_products = json.dumps(incomplete_products_table_get_response, default=set_default) - incomplete_products_table_get_response_json = json.loads(json_string_incomplete_products) - incomplete_products_table_items = incomplete_products_table_get_response_json.get('Item') - - return incomplete_products_table_items - - -# Checks for s3_url_of_data_file in received files table -def check_in_received_file_table(s3_url_of_data_file): - # Check received file table - received_data_files_table = dynamodb.Table('received_data_files') - - received_data_files_table_get_response = received_data_files_table.get_item(Key={ - "s3_url_of_data_file": str(s3_url_of_data_file) - }) - - json_string = json.dumps(received_data_files_table_get_response, default=set_default) - received_data_files_table_get_response_json = json.loads(json_string) - items = received_data_files_table_get_response_json.get('Item') - - return items - - -# Saves incomplete product label with a list if missing files -def save_incomplete_product(s3_url_of_product_label, missing_files): - incomplete_products_table = dynamodb.Table('incomplete_products') - - incomplete_products_table.put_item( - Item={ - 's3_url_of_product_label': s3_url_of_product_label, - 'missing_files': missing_files - } - ) - - -# Updates exiting incomplete products -def update_existing_incomplete_products(s3_url_of_data_file): - expected_files_items = get_expected_file(s3_url_of_data_file) - - # If this is an already expected file (i.e.: another product was also expecting this) - if expected_files_items: - expected_files_product_label_list = expected_files_items['s3_urls_of_product_labels'] - logger.info("expected_files_product_label_list = " + str(expected_files_product_label_list)) - - for expected_files_product_label in expected_files_product_label_list: - - incomplete_products_table_items = get_incomplete_products(expected_files_product_label) - - if incomplete_products_table_items: - missing_files_list = incomplete_products_table_items['missing_files'] - s3_url_of_product_label = incomplete_products_table_items['s3_url_of_product_label'] - missing_files_list.remove(s3_url_of_data_file) - - if len(missing_files_list) > 0: - save_incomplete_product(s3_url_of_product_label, missing_files_list) - else: - product_completed(s3_url_of_product_label) - - -# Updates expected files -def update_expected_files(s3_url_of_data_file, s3_url_of_product_label): - expected_files_items = get_expected_file(s3_url_of_data_file) - expected_files_product_label_set = set() - - if expected_files_items: - expected_files_product_label_set = set(expected_files_items['s3_urls_of_product_labels']) - - expected_files_product_label_set.add(s3_url_of_product_label) - save_expected_files(s3_url_of_data_file, expected_files_product_label_set) - - -# Saves expected files -def save_expected_files(s3_url_of_data_file, expected_files_product_label_set): - expected_data_files_table = dynamodb.Table('expected_data_files') - - expected_data_files_table.put_item( - Item={ - 's3_url_of_data_file': s3_url_of_data_file, - 's3_urls_of_product_labels': expected_files_product_label_set - } - ) - - -# Deletes expected files -def delete_expected_file(s3_url_of_data_file): - expected_data_files_table = dynamodb.Table('expected_data_files') - - expected_data_files_table.delete_item(Key={ - "s3_url_of_data_file": str(s3_url_of_data_file) - } - ) - - -# Deletes incomplete products -def delete_incomplete_product(s3_url_of_product_label): - incomplete_products_table = dynamodb.Table('incomplete_products') - - incomplete_products_table.delete_item(Key={ - "s3_url_of_product_label": str(s3_url_of_product_label) - } - ) - - -# Processes complete products -def product_completed(s3_url_of_product_label): - logger.info("PRODUCT COMPLETED") - delete_incomplete_product(s3_url_of_product_label) - notify_product(s3_url_of_product_label) - - -# Copies a file from S3 to EFS -def copy_file_to_efs(s3_bucket, s3_key, efs_mount_path): - path_of_product = s3_bucket + "/" + s3_key - download_dir = os.path.dirname('/tmp/' + path_of_product) - download_file_path = os.path.normpath('/tmp/' + path_of_product) - - try: - # Download the file from S3 (only /tmp has permissions to download) - logger.info(f"File downloading to : {download_file_path}") - os.makedirs(download_dir, exist_ok=True) - s3.download_file(s3_bucket, s3_key, download_file_path) - logger.info(f"File downloaded: {download_file_path}") - except Exception as e: - logger.error(f"Error downloading file from S3. s3_bucket: {s3_bucket}, s3_key: {s3_key}, efs_mount_path: {efs_mount_path}, Exception: {str(e)}") - - # Move the file to the /mnt/data directory - destination_path = efs_mount_path + os.path.dirname(path_of_product) - destination_file = efs_mount_path + path_of_product - - try: - os.makedirs(destination_path, exist_ok=True) - - if os.path.isfile(os.path.normpath(destination_file)): - os.remove(os.path.normpath(destination_file)) - logger.debug(f"Deleted existing file in: {os.path.normpath(destination_file)}") - - shutil.move(download_file_path, os.path.normpath(destination_path)) - logger.info(f"File moved to: {destination_path}") - except Exception as e: - logger.error(f"Error moving file to : {destination_path}. Exception: {str(e)}") - - return { - 'statusCode': 200, - 'body': 'File downloaded and moved successfully' - } - - -# Sends a notification to SQS on product copy completion -def notify_product(s3_url_of_product_label): - efs_mount_path = os.environ.get('EFS_MOUNT_PATH') - sqs_queue_url = os.environ.get('SQS_QUEUE_URL') - efs_product_label_file_location = s3_url_of_product_label.replace("s3:/", efs_mount_path, 1) - - sqs = boto3.client('sqs') - - message = { - "s3_url_of_product_label": s3_url_of_product_label, - "efs_product_label_file_location": efs_product_label_file_location, - } - - sqs.send_message( - QueueUrl=sqs_queue_url, - MessageBody=json.dumps(message) - ) - logger.info('SQS Message sent for the completed product') - -# Uses to convert a set to list for JSON serializing -def set_default(obj): - if isinstance(obj, set): - return list(obj) - raise TypeError