From 48858e0395417bb0c53ac9af883fcb3643e22741 Mon Sep 17 00:00:00 2001 From: michieltukker Date: Sun, 18 Feb 2024 15:44:58 +0100 Subject: [PATCH] Added container definitions and test environment --- Dockerfile | 8 ++-- docker-compose.yml | 77 ++++++++++++++++++++++++++++++++++ local_test/Dockerfile | 11 +++++ local_test/requirements.txt | 2 + local_test/test_app.py | 82 +++++++++++++++++++++++++++++++++++++ 5 files changed, 176 insertions(+), 4 deletions(-) create mode 100644 docker-compose.yml create mode 100644 local_test/Dockerfile create mode 100644 local_test/requirements.txt create mode 100644 local_test/test_app.py diff --git a/Dockerfile b/Dockerfile index 8a93d08..dbc48b3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,10 +2,10 @@ FROM python:3.11-bookworm WORKDIR /app -COPY ./requirements.txt . +COPY requirements.txt /app/requirements.txt -RUN pip install -r requirements.txt +RUN pip install -r /app/requirements.txt -COPY . . +COPY src/simulator_worker /app/ -ENTRYPOINT ./run.sh \ No newline at end of file +ENTRYPOINT python -m simulator_worker.simulator_worker \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..834c03e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,77 @@ +version: "3.8" + +networks: + omotes: + + +volumes: + db-data: + broker-data: + influxdb_storage: + + +services: + rabbitmq: + image: bitnami/rabbitmq:3.8.27 + ports: + - "15672:15672" + - "5673:5672" + networks: + - omotes + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:15672" ] + interval: 10s + timeout: 10s + retries: 5 + + omotes_influxdb: + image: influxdb:1.7 + ports: + - "${INFLUXDB_PORT}:${INFLUXDB_PORT}" + - "${INFLUXDB_RPC_PORT}:${INFLUXDB_RPC_PORT}" + networks: + - omotes + environment: + - INFLUXDB_ADMIN_USER=${INFLUXDB_ADMIN_USER} + - INFLUXDB_ADMIN_PASSWORD=${INFLUXDB_ADMIN_PASSWORD} + - INFLUXDB_HTTP_BIND_ADDRESS=:${INFLUXDB_PORT} + - INFLUXDB_BIND_ADDRESS=:${INFLUXDB_RPC_PORT} + - INFLUXDB_DB=omotes_timeseries + - INFLUXDB_WRITE_USER=${INFLUXDB_WRITE_USER} + - INFLUXDB_WRITE_USER_PASSWORD=${INFLUXDB_WRITE_USER_PASSWORD} + healthcheck: + test: + [ + "CMD", + "curl", + "-f", + "http://omotes_influxdb:${INFLUXDB_PORT}/ping" + ] + interval: 10s + timeout: 5s + volumes: + - influxdb_storage:/var/lib/influxdb + + simulator_worker: + build: + context: . + env_file: .env + networks: + - omotes + deploy: + replicas: 1 + depends_on: + rabbitmq: + condition: service_healthy + omotes_influxdb: + condition: service_healthy + + local_test: + build: + context: ./local_test + dockerfile: Dockerfile + networks: + - omotes + depends_on: + simulator_worker: + condition: service_started diff --git a/local_test/Dockerfile b/local_test/Dockerfile new file mode 100644 index 0000000..9590968 --- /dev/null +++ b/local_test/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.11-bookworm + +WORKDIR /app + +COPY ./requirements.txt . + +RUN pip install -r requirements.txt + +COPY . . + +ENTRYPOINT python -u test_app.py \ No newline at end of file diff --git a/local_test/requirements.txt b/local_test/requirements.txt new file mode 100644 index 0000000..8a5c52f --- /dev/null +++ b/local_test/requirements.txt @@ -0,0 +1,2 @@ +celery==5.3.5 +jsonpickle==3.0.2 \ No newline at end of file diff --git a/local_test/test_app.py b/local_test/test_app.py new file mode 100644 index 0000000..5618fc7 --- /dev/null +++ b/local_test/test_app.py @@ -0,0 +1,82 @@ +import sys +import threading +from typing import Any +from uuid import UUID, uuid4 + +import jsonpickle # type: ignore +from celery import Celery # type: ignore + +input_esdl_no_influx = "" # noqa + +print("Application started") + +app = Celery( + "omotes", + broker="amqp://user:bitnami@rabbitmq", + backend="rpc://user:bitnami@rabbitmq", + worker_prefetch_multiplier=1, + task_acks_late=True, + task_reject_on_worker_lost=True, + # task_serializer="pickle", + # result_serializer="pickle", + # accept_content=["application/json", "application/x-python-serialize"], +) +print("Celery started") + + +def task_monitor() -> None: + def on_event(event: dict[str, str]) -> None: + print("EVENT HAPPENED: ", event["type"]) + + def on_progress_update(event: Any) -> None: + print( + "################################ TASK PROGRESS UPDATED: ", + event["progress"]["fraction"] * 100, + ) + + while True: + try: + with app.connection() as conn: + recv = app.events.Receiver( + conn, + handlers={ + "task-failed": on_event, + "task-succeeded": on_event, + "task-sent": on_event, + "task-received": on_event, + "task-revoked": on_event, + "task-started": on_event, + "task-progress-update": on_progress_update, + # OR: '*' : on_event + }, + ) + recv.capture(limit=None, timeout=None) + except (KeyboardInterrupt, SystemExit): + print("EXCEPTION KEYBOARD INTERRUPT") + sys.exit() + + +def print_result(result: dict[str, str], task_name: str) -> None: + if "error_message" in result: + print( + f"Error message: {result['error_message']}, exit code: {result['exit_code']} " + "from task: {task_name}" + ) + print(f"logs: {result['logs']} from task: {task_name}") + else: + print(f"Received logs: '{result['logs']}' from task: {task_name}") + print(f"Output ESDL: '{result['output_esdl']}' from task: {task_name}") + + +t = threading.Thread(target=task_monitor) +t.daemon = True +t.start() + +q_name = "grow" +simulator_job_id: UUID = uuid4() + +optimizer_task = app.signature( + "simulator", (simulator_job_id, input_esdl_no_influx), queue=q_name +).delay() +print("waiting for tasks...") +print_result(jsonpickle.decode(optimizer_task.get()), "optimizer-task")