Skip to content

Commit

Permalink
fix(glue)!: terminology was shifted upstream to use src/sink rath…
Browse files Browse the repository at this point in the history
…er than `raw`/`clean`
  • Loading branch information
mehalter committed Jan 23, 2025
1 parent 3c604de commit fd43b0d
Showing 1 changed file with 21 additions and 21 deletions.
42 changes: 21 additions & 21 deletions capepy/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,31 @@ def __init__(self):
self.parameters = getResolvedOptions(
sys.argv,
[
"RAW_BUCKET_NAME",
"SRC_BUCKET_NAME",
"OBJECT_KEY",
"CLEAN_BUCKET_NAME",
"SINK_BUCKET_NAME",
],
)

def get_raw_file(self):
"""Retrieve the raw file from S3 and return its contents as a byte string.
def get_src_file(self):
"""Retrieve the source file from S3 and return its contents as a byte string.
Raises:
Exception: If the raw file is unable to be successfully retrieved from S3.
Exception: If the source file is unable to be successfully retrieved from S3.
Returns:
A byte string of the raw file contents
A byte string of the source file contents
"""
response = self.get_client("s3").get_object(
Bucket=self.parameters["RAW_BUCKET_NAME"],
Bucket=self.parameters["SRC_BUCKET_NAME"],
Key=self.parameters["OBJECT_KEY"],
)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status != 200:
err = (
f"ERROR - Could not get object {self.parameters['OBJECT_KEY']} from "
f"bucket {self.parameters['RAW_BUCKET_NAME']}. ETL Cannot continue."
f"bucket {self.parameters['SRC_BUCKET_NAME']}. ETL Cannot continue."
)

self.logger.error(err)
Expand All @@ -63,33 +63,33 @@ def get_raw_file(self):

self.logger.info(
f"Obtained object {self.parameters['OBJECT_KEY']} from bucket"
f"{self.parameters['RAW_BUCKET_NAME']}"
f"{self.parameters['SRC_BUCKET_NAME']}"
)

return response.get("Body").read()

def write_clean_file(self, clean_data, clean_key: Optional[str] = None):
"""Write data to a clean data file inside the clean S3 bucket as configured by the Glue ETL job.
def write_sink_file(self, sink_data, sink_key: Optional[str] = None):
"""Write data to the sink data file inside the sink S3 bucket as configured by the Glue ETL job.
Args:
clean_data (byes or seekable file-like object): Object data to be written to s3.
clean_key: The prefix and filename for the new clean data file within the clean s3 bucket.
sink_data (byes or seekable file-like object): Object data to be written to s3.
sink_key: The prefix and filename for the new sink data file within the sink s3 bucket.
Raises:
Exception: If the clean data file is unable to be successfully put into s3.
Exception: If the sink data file is unable to be successfully put into s3.
"""
response = self.get_client("s3").put_object(
Bucket=self.parameters["CLEAN_BUCKET_NAME"],
Key=clean_key,
Body=clean_data,
Bucket=self.parameters["SINK_BUCKET_NAME"],
Key=sink_key,
Body=sink_data,
)

status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status != 200:
err = (
f"ERROR - Could not write transformed data object {clean_key} "
f"to bucket {self.parameters['CLEAN_BUCKET_NAME']}. ETL Cannot continue."
f"ERROR - Could not write transformed data object {sink_key} "
f"to bucket {self.parameters['SINK_BUCKET_NAME']}. ETL Cannot continue."
)

self.logger.error(err)
Expand All @@ -100,6 +100,6 @@ def write_clean_file(self, clean_data, clean_key: Optional[str] = None):
raise Exception(err)

self.logger.info(
f"Transformed {self.parameters['RAW_BUCKET_NAME']}/{self.parameters['OBJECT_KEY']} and wrote result "
f"to {self.parameters['CLEAN_BUCKET_NAME']}/{clean_key}"
f"Transformed {self.parameters['SRC_BUCKET_NAME']}/{self.parameters['OBJECT_KEY']} and wrote result "
f"to {self.parameters['SINK_BUCKET_NAME']}/{sink_key}"
)

0 comments on commit fd43b0d

Please sign in to comment.