-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
109 lines (85 loc) · 3.22 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import logging
import subprocess
import sys
import boto3
import base64
import os
sqs = boto3.client("sqs")
s3 = boto3.client("s3")
request_queue_url: str = (
"https://sqs.us-east-1.amazonaws.com/674846823680/cc-project-1-request-queue"
)
response_queue_url: str = (
"https://sqs.us-east-1.amazonaws.com/674846823680/cc-project-1-response-queue"
)
input_bucket_name: str = "cc-project-1-input"
output_bucket_name: str = "cc-project-1-output"
def upload_file(file_name: str, bucket: str) -> bool:
object_name = os.path.basename(file_name)
s3.upload_file(file_name, bucket, object_name)
return True
def main(logger: logging.Logger) -> None:
while True:
msg = {}
while "Messages" not in msg:
logger.info("Checking the request queue for requests.")
msg = sqs.receive_message(
QueueUrl=request_queue_url,
AttributeNames=["All"],
MessageAttributeNames=["All"],
WaitTimeSeconds=20,
MaxNumberOfMessages=1
)
bytes = str.encode(msg["Messages"][0]["Body"])
img_name = msg["Messages"][0]["MessageAttributes"]["ImageName"]["StringValue"]
logger.info(f"New image `{img_name}` received, now processing.")
input_path: str = f"/home/ubuntu/{img_name}"
output_path: str = f"/home/ubuntu/{img_name}-output.txt"
img_bytes = base64.b64decode((bytes))
with open(input_path, "wb") as file:
file.write(img_bytes)
script_dir: str = "/home/ubuntu"
env = dict(os.environ)
res = subprocess.run(
["python3", "image_classification.py", img_name],
env=env,
cwd=script_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if res.returncode != 0:
logger.critical(
f"Image classification script failed with error:\n{res.stderr.decode('utf-8')}"
)
sys.exit(1)
output = res.stdout.decode("utf-8").strip("\n")
with open(output_path, "w") as f:
f.write(output)
logger.info(f"Image `{img_name}` processed.")
sqs.send_message(
QueueUrl=response_queue_url,
MessageAttributes={
"ImageName": {"DataType": "String", "StringValue": img_name}
},
MessageBody=output,
)
logger.info(f"Send output of `{img_name}` to the response queue.")
# upload_file(input_path, input_bucket_name)
# upload_file(output_path, output_bucket_name)
logger.info(
f"Saved `{input_path}` and `{output_path}` in their respective S3 buckets."
)
sqs.delete_message(
QueueUrl=request_queue_url,
ReceiptHandle=msg["Messages"][0]["ReceiptHandle"],
)
logger.info(
f"All tasks completed successfully. Deleted `{img_name}` from the request queue."
)
os.remove(input_path)
os.remove(output_path)
if __name__ == "__main__":
logging.basicConfig(format="[%(asctime)s] %(name)s:%(levelname)s (%(filename)s:%(lineno)d) -> %(message)s")
logger = logging.getLogger("app")
logger.setLevel(logging.INFO)
main(logger)