diff --git a/capepy/aws/glue.py b/capepy/aws/glue.py index 5a06a74..22e855c 100644 --- a/capepy/aws/glue.py +++ b/capepy/aws/glue.py @@ -27,23 +27,23 @@ def __init__(self): self.parameters = getResolvedOptions( sys.argv, [ - "RAW_BUCKET_NAME", + "SRC_BUCKET_NAME", "OBJECT_KEY", - "CLEAN_BUCKET_NAME", + "SINK_BUCKET_NAME", ], ) - def get_raw_file(self): - """Retrieve the raw file from S3 and return its contents as a byte string. + def get_src_file(self): + """Retrieve the source file from S3 and return its contents as a byte string. Raises: - Exception: If the raw file is unable to be successfully retrieved from S3. + Exception: If the source file is unable to be successfully retrieved from S3. Returns: - A byte string of the raw file contents + A byte string of the source file contents """ response = self.get_client("s3").get_object( - Bucket=self.parameters["RAW_BUCKET_NAME"], + Bucket=self.parameters["SRC_BUCKET_NAME"], Key=self.parameters["OBJECT_KEY"], ) status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") @@ -51,7 +51,7 @@ def get_raw_file(self): if status != 200: err = ( f"ERROR - Could not get object {self.parameters['OBJECT_KEY']} from " - f"bucket {self.parameters['RAW_BUCKET_NAME']}. ETL Cannot continue." + f"bucket {self.parameters['SRC_BUCKET_NAME']}. ETL Cannot continue." ) self.logger.error(err) @@ -63,33 +63,33 @@ def get_raw_file(self): self.logger.info( f"Obtained object {self.parameters['OBJECT_KEY']} from bucket" - f"{self.parameters['RAW_BUCKET_NAME']}" + f"{self.parameters['SRC_BUCKET_NAME']}" ) return response.get("Body").read() - def write_clean_file(self, clean_data, clean_key: Optional[str] = None): - """Write data to a clean data file inside the clean S3 bucket as configured by the Glue ETL job. + def write_sink_file(self, sink_data, sink_key: Optional[str] = None): + """Write data to the sink data file inside the sink S3 bucket as configured by the Glue ETL job. Args: - clean_data (byes or seekable file-like object): Object data to be written to s3. - clean_key: The prefix and filename for the new clean data file within the clean s3 bucket. + sink_data (byes or seekable file-like object): Object data to be written to s3. + sink_key: The prefix and filename for the new sink data file within the sink s3 bucket. Raises: - Exception: If the clean data file is unable to be successfully put into s3. + Exception: If the sink data file is unable to be successfully put into s3. """ response = self.get_client("s3").put_object( - Bucket=self.parameters["CLEAN_BUCKET_NAME"], - Key=clean_key, - Body=clean_data, + Bucket=self.parameters["SINK_BUCKET_NAME"], + Key=sink_key, + Body=sink_data, ) status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") if status != 200: err = ( - f"ERROR - Could not write transformed data object {clean_key} " - f"to bucket {self.parameters['CLEAN_BUCKET_NAME']}. ETL Cannot continue." + f"ERROR - Could not write transformed data object {sink_key} " + f"to bucket {self.parameters['SINK_BUCKET_NAME']}. ETL Cannot continue." ) self.logger.error(err) @@ -100,6 +100,6 @@ def write_clean_file(self, clean_data, clean_key: Optional[str] = None): raise Exception(err) self.logger.info( - f"Transformed {self.parameters['RAW_BUCKET_NAME']}/{self.parameters['OBJECT_KEY']} and wrote result " - f"to {self.parameters['CLEAN_BUCKET_NAME']}/{clean_key}" + f"Transformed {self.parameters['SRC_BUCKET_NAME']}/{self.parameters['OBJECT_KEY']} and wrote result " + f"to {self.parameters['SINK_BUCKET_NAME']}/{sink_key}" )