From 06bef54f6f40e77c7feb574b526036fa883f872b Mon Sep 17 00:00:00 2001 From: benoit74 Date: Mon, 30 Oct 2023 11:00:31 +0100 Subject: [PATCH 1/5] Fix worker IP update to use one single DB session --- dispatcher/backend/src/common/constants.py | 12 +++- dispatcher/backend/src/common/external.py | 9 ++- .../routes/requested_tasks/requested_task.py | 19 +++--- dispatcher/backend/src/tests/conftest.py | 12 ++++ .../integration/routes/workers/test_worker.py | 65 ++++++++++++++++++- 5 files changed, 101 insertions(+), 16 deletions(-) create mode 100644 dispatcher/backend/src/tests/conftest.py diff --git a/dispatcher/backend/src/common/constants.py b/dispatcher/backend/src/common/constants.py index bc362b40a..8c73e0c12 100644 --- a/dispatcher/backend/src/common/constants.py +++ b/dispatcher/backend/src/common/constants.py @@ -3,6 +3,12 @@ from common.enum import SchedulePeriodicity + +def refreshable_constant(fn): + """Refreshable constants helper for those we have interest to live update""" + return fn + + OPENSSL_BIN = os.getenv("OPENSSL_BIN", "/usr/bin/openssl") MESSAGE_VALIDITY = 60 # number of seconds before a message expire @@ -66,7 +72,11 @@ # using the following, it is possible to automate # the update of a whitelist of workers IPs on Wasabi (S3 provider) # enable this feature (default is off) -USES_WORKERS_IPS_WHITELIST = bool(os.getenv("USES_WORKERS_IPS_WHITELIST", "")) +# Nota: this is a refreshable constant so that it can be dynamically updated +# (including in tests) +USES_WORKERS_IPS_WHITELIST = refreshable_constant( + lambda: bool(os.getenv("USES_WORKERS_IPS_WHITELIST", "")) +) MAX_WORKER_IP_CHANGES_PER_DAY = 4 # wasabi URL with credentials to update policy WASABI_URL = os.getenv("WASABI_URL", "") diff --git a/dispatcher/backend/src/common/external.py b/dispatcher/backend/src/common/external.py index 8a4d76fd3..5df271ab5 100644 --- a/dispatcher/backend/src/common/external.py +++ b/dispatcher/backend/src/common/external.py @@ -27,12 +27,11 @@ logger = logging.getLogger(__name__) -def update_workers_whitelist(): +def update_workers_whitelist(session: so.Session): """update whitelist of workers on external services""" - update_wasabi_whitelist(build_workers_whitelist()) + IpUpdater.update_fn(build_workers_whitelist(session=session)) -@dbsession def build_workers_whitelist(session: so.Session) -> typing.List[str]: """list of worker IP adresses and networks (text) to use as whitelist""" wl_networks = [] @@ -150,6 +149,10 @@ def get_statement(): ) +class IpUpdater: + update_fn = update_wasabi_whitelist + + @dbsession def advertise_books_to_cms(task_id: UUID, session: so.Session): """inform openZIM CMS of all created ZIMs in the farm for this task diff --git a/dispatcher/backend/src/routes/requested_tasks/requested_task.py b/dispatcher/backend/src/routes/requested_tasks/requested_task.py index 347192bc3..c58080ddd 100644 --- a/dispatcher/backend/src/routes/requested_tasks/requested_task.py +++ b/dispatcher/backend/src/routes/requested_tasks/requested_task.py @@ -35,14 +35,14 @@ logger = logging.getLogger(__name__) -def record_ip_change(worker_name): +def record_ip_change(session: so.Session, worker_name: str): """record that this worker changed its IP and trigger whitelist changes""" today = datetime.date.today() # counts and limits are per-day so reset it if date changed if today != WorkersIpChangesCounts.today: WorkersIpChangesCounts.reset() if WorkersIpChangesCounts.add(worker_name) <= MAX_WORKER_IP_CHANGES_PER_DAY: - update_workers_whitelist() + update_workers_whitelist(session) else: logger.error( f"Worker {worker_name} IP changes for {today} " @@ -229,15 +229,16 @@ def get(self, session: so.Session, token: AccessToken.Payload): worker = dbm.Worker.get(session, worker_name, WorkerNotFound) if worker.user.username == token.username: worker.last_seen = getnow() - previous_ip = str(worker.last_ip) - worker.last_ip = worker_ip - - # flush to DB so that record_ip_change has access to updated IP - session.flush() # IP changed since last encounter - if USES_WORKERS_IPS_WHITELIST and previous_ip != worker_ip: - record_ip_change(worker_name) + if str(worker.last_ip) != worker_ip: + logger.info( + f"Worker IP changed detected for {worker_name}: " + f"IP changed from {worker.last_ip} to {worker_ip}" + ) + worker.last_ip = worker_ip + if USES_WORKERS_IPS_WHITELIST(): + record_ip_change(session=session, worker_name=worker_name) request_args = WorkerRequestedTaskSchema().load(request_args) diff --git a/dispatcher/backend/src/tests/conftest.py b/dispatcher/backend/src/tests/conftest.py new file mode 100644 index 000000000..230be5dae --- /dev/null +++ b/dispatcher/backend/src/tests/conftest.py @@ -0,0 +1,12 @@ +from typing import Generator + +import pytest +from sqlalchemy.orm import Session as OrmSession + +from db import Session + + +@pytest.fixture +def dbsession() -> Generator[OrmSession, None, None]: + with Session.begin() as session: + yield session diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py index 48b1971c9..fac50461a 100644 --- a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py @@ -1,11 +1,14 @@ +import os +from typing import List + import pytest -from common.external import build_workers_whitelist +from common.external import IpUpdater, build_workers_whitelist class TestWorkersCommon: - def test_build_workers_whitelist(self, workers): - whitelist = build_workers_whitelist() + def test_build_workers_whitelist(self, workers, dbsession): + whitelist = build_workers_whitelist(session=dbsession) # - 4 because: # 2 workers have a duplicate IP # 1 worker has an IP missing @@ -206,3 +209,59 @@ def test_checkin_another_user( # response.get_json()["error"] # == "worker with same name already exists for another user" # ) + + +class TestWorkerRequestedTasks: + def test_requested_task_worker_as_admin(self, client, access_token, worker): + response = client.get( + "/requested-tasks/worker", + query_string={ + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + }, + headers={"Authorization": access_token}, + ) + assert response.status_code == 200 + + def test_requested_task_worker_as_worker(self, client, make_access_token, worker): + response = client.get( + "/requested-tasks/worker", + query_string={ + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + }, + headers={"Authorization": make_access_token(worker["username"], "worker")}, + ) + assert response.status_code == 200 + + new_ip_address = "88.88.88.88" + + def custom_ip_update(self, ip_addresses: List): + self.ip_updated = True + assert TestWorkerRequestedTasks.new_ip_address in ip_addresses + + def test_requested_task_worker_update_ip_whitelist( + self, client, make_access_token, worker + ): + self.ip_updated = False + IpUpdater.update_fn = self.custom_ip_update + os.environ["USES_WORKERS_IPS_WHITELIST"] = "1" + response = client.get( + "/requested-tasks/worker", + query_string={ + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + }, + headers={ + "Authorization": make_access_token(worker["username"], "worker"), + "X-Forwarded-For": TestWorkerRequestedTasks.new_ip_address, + }, + ) + assert response.status_code == 200 + assert self.ip_updated From 30ed071854ec1c95c15f5f65d34a266d04f1deee Mon Sep 17 00:00:00 2001 From: benoit74 Date: Tue, 14 Nov 2023 15:54:00 +0100 Subject: [PATCH 2/5] Commit Worker IP change to DB asap --- dispatcher/backend/src/db/__init__.py | 14 +++++++++++++ .../routes/requested_tasks/requested_task.py | 20 ++++++++++++++----- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/dispatcher/backend/src/db/__init__.py b/dispatcher/backend/src/db/__init__.py index 69bad2e96..5f03486a2 100644 --- a/dispatcher/backend/src/db/__init__.py +++ b/dispatcher/backend/src/db/__init__.py @@ -52,6 +52,20 @@ def inner(*args, **kwargs): return inner +def dbsession_manual(func): + """Decorator to create an SQLAlchemy ORM session object and wrap the function + inside the session. A `session` argument is automatically set. Transaction must + be managed by the developer (e.g. perform a commit / rollback). + """ + + def inner(*args, **kwargs): + with Session() as session: + kwargs["session"] = session + return func(*args, **kwargs) + + return inner + + def count_from_stmt(session: OrmSession, stmt: SelectBase) -> int: """Count all records returned by any statement `stmt` passed as parameter""" return session.execute( diff --git a/dispatcher/backend/src/routes/requested_tasks/requested_task.py b/dispatcher/backend/src/routes/requested_tasks/requested_task.py index c58080ddd..6474e7b67 100644 --- a/dispatcher/backend/src/routes/requested_tasks/requested_task.py +++ b/dispatcher/backend/src/routes/requested_tasks/requested_task.py @@ -24,8 +24,8 @@ WorkerRequestedTaskSchema, ) from common.utils import task_event_handler -from db import count_from_stmt, dbsession -from errors.http import InvalidRequestJSON, TaskNotFound, WorkerNotFound +from db import count_from_stmt, dbsession, dbsession_manual +from errors.http import InvalidRequestJSON, TaskNotFound, WorkerNotFound, HTTPBase from routes import auth_info_if_supplied, authenticate, require_perm, url_uuid from routes.base import BaseRoute from routes.errors import NotFound @@ -208,7 +208,7 @@ class RequestedTasksForWorkers(BaseRoute): methods = ["GET"] @authenticate - @dbsession + @dbsession_manual def get(self, session: so.Session, token: AccessToken.Payload): """list of requested tasks to be retrieved by workers, auth-only""" @@ -237,8 +237,18 @@ def get(self, session: so.Session, token: AccessToken.Payload): f"IP changed from {worker.last_ip} to {worker_ip}" ) worker.last_ip = worker_ip - if USES_WORKERS_IPS_WHITELIST(): - record_ip_change(session=session, worker_name=worker_name) + # commit explicitely since we are not using an explicit transaction, + # and do it before calling Wasabi so that changes are propagated + # quickly and transaction is not blocking + session.commit() + if constants.USES_WORKERS_IPS_WHITELIST: + try: + record_ip_change(session=session, worker_name=worker_name) + except Exception: + raise HTTPBase( + status_code=HTTPStatus.SERVICE_UNAVAILABLE, + error="Recording IP changes failed", + ) request_args = WorkerRequestedTaskSchema().load(request_args) From efea331f3900498ae69109377c8f380c826ed790 Mon Sep 17 00:00:00 2001 From: benoit74 Date: Tue, 14 Nov 2023 15:54:37 +0100 Subject: [PATCH 3/5] Rework / simplify handling of external IP update --- dispatcher/backend/src/common/constants.py | 12 +-- dispatcher/backend/src/common/external.py | 4 +- .../routes/requested_tasks/requested_task.py | 10 +- .../integration/routes/workers/test_worker.py | 100 ++++++++++++++++-- 4 files changed, 96 insertions(+), 30 deletions(-) diff --git a/dispatcher/backend/src/common/constants.py b/dispatcher/backend/src/common/constants.py index 8c73e0c12..5f4ee187a 100644 --- a/dispatcher/backend/src/common/constants.py +++ b/dispatcher/backend/src/common/constants.py @@ -3,12 +3,6 @@ from common.enum import SchedulePeriodicity - -def refreshable_constant(fn): - """Refreshable constants helper for those we have interest to live update""" - return fn - - OPENSSL_BIN = os.getenv("OPENSSL_BIN", "/usr/bin/openssl") MESSAGE_VALIDITY = 60 # number of seconds before a message expire @@ -72,11 +66,7 @@ def refreshable_constant(fn): # using the following, it is possible to automate # the update of a whitelist of workers IPs on Wasabi (S3 provider) # enable this feature (default is off) -# Nota: this is a refreshable constant so that it can be dynamically updated -# (including in tests) -USES_WORKERS_IPS_WHITELIST = refreshable_constant( - lambda: bool(os.getenv("USES_WORKERS_IPS_WHITELIST", "")) -) +USES_WORKERS_IPS_WHITELIST = bool(os.getenv("USES_WORKERS_IPS_WHITELIST")) MAX_WORKER_IP_CHANGES_PER_DAY = 4 # wasabi URL with credentials to update policy WASABI_URL = os.getenv("WASABI_URL", "") diff --git a/dispatcher/backend/src/common/external.py b/dispatcher/backend/src/common/external.py index 5df271ab5..fd5de7040 100644 --- a/dispatcher/backend/src/common/external.py +++ b/dispatcher/backend/src/common/external.py @@ -29,7 +29,7 @@ def update_workers_whitelist(session: so.Session): """update whitelist of workers on external services""" - IpUpdater.update_fn(build_workers_whitelist(session=session)) + ExternalIpUpdater.update_fn(build_workers_whitelist(session=session)) def build_workers_whitelist(session: so.Session) -> typing.List[str]: @@ -149,7 +149,7 @@ def get_statement(): ) -class IpUpdater: +class ExternalIpUpdater: update_fn = update_wasabi_whitelist diff --git a/dispatcher/backend/src/routes/requested_tasks/requested_task.py b/dispatcher/backend/src/routes/requested_tasks/requested_task.py index 6474e7b67..688051932 100644 --- a/dispatcher/backend/src/routes/requested_tasks/requested_task.py +++ b/dispatcher/backend/src/routes/requested_tasks/requested_task.py @@ -9,12 +9,8 @@ from marshmallow import ValidationError import db.models as dbm -from common import WorkersIpChangesCounts, getnow -from common.constants import ( - ENABLED_SCHEDULER, - MAX_WORKER_IP_CHANGES_PER_DAY, - USES_WORKERS_IPS_WHITELIST, -) +from common import WorkersIpChangesCounts, constants, getnow +from common.constants import ENABLED_SCHEDULER, MAX_WORKER_IP_CHANGES_PER_DAY from common.external import update_workers_whitelist from common.schemas.orms import RequestedTaskFullSchema, RequestedTaskLightSchema from common.schemas.parameters import ( @@ -25,7 +21,7 @@ ) from common.utils import task_event_handler from db import count_from_stmt, dbsession, dbsession_manual -from errors.http import InvalidRequestJSON, TaskNotFound, WorkerNotFound, HTTPBase +from errors.http import HTTPBase, InvalidRequestJSON, TaskNotFound, WorkerNotFound from routes import auth_info_if_supplied, authenticate, require_perm, url_uuid from routes.base import BaseRoute from routes.errors import NotFound diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py index fac50461a..3be8cf174 100644 --- a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py @@ -1,9 +1,9 @@ -import os from typing import List import pytest -from common.external import IpUpdater, build_workers_whitelist +from common import constants +from common.external import ExternalIpUpdater, build_workers_whitelist class TestWorkersCommon: @@ -212,6 +212,8 @@ def test_checkin_another_user( class TestWorkerRequestedTasks: + new_ip_address = "88.88.88.88" + def test_requested_task_worker_as_admin(self, client, access_token, worker): response = client.get( "/requested-tasks/worker", @@ -238,18 +240,35 @@ def test_requested_task_worker_as_worker(self, client, make_access_token, worker ) assert response.status_code == 200 - new_ip_address = "88.88.88.88" - def custom_ip_update(self, ip_addresses: List): self.ip_updated = True assert TestWorkerRequestedTasks.new_ip_address in ip_addresses + def custom_failing_ip_update(self, ip_addresses: List): + raise Exception() + + @pytest.mark.parametrize( + "prev_ip, new_ip, external_update_enabled, external_update_fails," + " external_update_called", + [ + ("77.77.77.77", "88.88.88.88", False, False, False), # ip update disabled + ("77.77.77.77", "77.77.77.77", True, False, False), # ip did not changed + ("77.77.77.77", "88.88.88.88", True, False, True), # ip should be updated + ("77.77.77.77", "88.88.88.88", True, True, False), # ip update fails + ], + ) def test_requested_task_worker_update_ip_whitelist( - self, client, make_access_token, worker + self, + client, + make_access_token, + worker, + prev_ip, + new_ip, + external_update_enabled, + external_update_fails, + external_update_called, ): - self.ip_updated = False - IpUpdater.update_fn = self.custom_ip_update - os.environ["USES_WORKERS_IPS_WHITELIST"] = "1" + # call it once to set prev_ip response = client.get( "/requested-tasks/worker", query_string={ @@ -260,8 +279,69 @@ def test_requested_task_worker_update_ip_whitelist( }, headers={ "Authorization": make_access_token(worker["username"], "worker"), - "X-Forwarded-For": TestWorkerRequestedTasks.new_ip_address, + "X-Forwarded-For": prev_ip, }, ) assert response.status_code == 200 - assert self.ip_updated + + # check prev_ip has been set + response = client.get("/workers/") + assert response.status_code == 200 + response_data = response.get_json() + for item in response_data["items"]: + if item["name"] != worker["name"]: + continue + assert item["last_ip"] == prev_ip + + # setup custom ip updater to intercept Wasabi operations + updater = IpUpdaterAndChecker(should_fail=external_update_fails) + assert TestWorkerRequestedTasks.new_ip_address not in updater.ip_addresses + ExternalIpUpdater.update_fn = updater.ip_update + constants.USES_WORKERS_IPS_WHITELIST = external_update_enabled + + # call it once to set next_ip + response = client.get( + "/requested-tasks/worker", + query_string={ + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + }, + headers={ + "Authorization": make_access_token(worker["username"], "worker"), + "X-Forwarded-For": new_ip, + }, + ) + if external_update_fails: + assert response.status_code == 503 + else: + assert response.status_code == 200 + assert updater.ips_updated == external_update_called + if external_update_called: + assert new_ip in updater.ip_addresses + + # check new_ip has been set (even if ip update is disabled or has failed) + response = client.get("/workers/") + assert response.status_code == 200 + response_data = response.get_json() + for item in response_data["items"]: + if item["name"] != worker["name"]: + continue + assert item["last_ip"] == new_ip + + +class IpUpdaterAndChecker: + """Helper class to intercept Wasabi operations and perform assertions""" + + def __init__(self, should_fail: bool) -> None: + self.ips_updated = False + self.should_fail = should_fail + self.ip_addresses = [] + + def ip_update(self, ip_addresses: List): + if self.should_fail: + raise Exception() + else: + self.ips_updated = True + self.ip_addresses = ip_addresses From ddead2266636de291aa6368cdcf1e75e5f0a18d7 Mon Sep 17 00:00:00 2001 From: benoit74 Date: Thu, 16 Nov 2023 20:49:13 +0100 Subject: [PATCH 4/5] Add docstring + fix name --- dispatcher/backend/src/common/external.py | 10 ++++++++-- .../tests/integration/routes/workers/test_worker.py | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/dispatcher/backend/src/common/external.py b/dispatcher/backend/src/common/external.py index fd5de7040..3706b0f07 100644 --- a/dispatcher/backend/src/common/external.py +++ b/dispatcher/backend/src/common/external.py @@ -29,7 +29,7 @@ def update_workers_whitelist(session: so.Session): """update whitelist of workers on external services""" - ExternalIpUpdater.update_fn(build_workers_whitelist(session=session)) + ExternalIpUpdater.update(build_workers_whitelist(session=session)) def build_workers_whitelist(session: so.Session) -> typing.List[str]: @@ -150,7 +150,13 @@ def get_statement(): class ExternalIpUpdater: - update_fn = update_wasabi_whitelist + """Class responsible to push IP updates to external system(s) + + `update` is called with the new list of all workers IPs everytime + a change is detected. + By default, this class update our IPs whitelist in Wasabi""" + + update = update_wasabi_whitelist @dbsession diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py index 3be8cf174..f20ff099e 100644 --- a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py @@ -296,7 +296,7 @@ def test_requested_task_worker_update_ip_whitelist( # setup custom ip updater to intercept Wasabi operations updater = IpUpdaterAndChecker(should_fail=external_update_fails) assert TestWorkerRequestedTasks.new_ip_address not in updater.ip_addresses - ExternalIpUpdater.update_fn = updater.ip_update + ExternalIpUpdater.update = updater.ip_update constants.USES_WORKERS_IPS_WHITELIST = external_update_enabled # call it once to set next_ip From 2ddee9b155e373080a819bbefbfade127dcfdd43 Mon Sep 17 00:00:00 2001 From: benoit74 Date: Fri, 17 Nov 2023 10:15:23 +0100 Subject: [PATCH 5/5] Remove dead code --- .../tests/integration/routes/workers/test_worker.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py index f20ff099e..4767bdc6a 100644 --- a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py @@ -212,8 +212,6 @@ def test_checkin_another_user( class TestWorkerRequestedTasks: - new_ip_address = "88.88.88.88" - def test_requested_task_worker_as_admin(self, client, access_token, worker): response = client.get( "/requested-tasks/worker", @@ -240,13 +238,6 @@ def test_requested_task_worker_as_worker(self, client, make_access_token, worker ) assert response.status_code == 200 - def custom_ip_update(self, ip_addresses: List): - self.ip_updated = True - assert TestWorkerRequestedTasks.new_ip_address in ip_addresses - - def custom_failing_ip_update(self, ip_addresses: List): - raise Exception() - @pytest.mark.parametrize( "prev_ip, new_ip, external_update_enabled, external_update_fails," " external_update_called", @@ -295,7 +286,7 @@ def test_requested_task_worker_update_ip_whitelist( # setup custom ip updater to intercept Wasabi operations updater = IpUpdaterAndChecker(should_fail=external_update_fails) - assert TestWorkerRequestedTasks.new_ip_address not in updater.ip_addresses + assert new_ip not in updater.ip_addresses ExternalIpUpdater.update = updater.ip_update constants.USES_WORKERS_IPS_WHITELIST = external_update_enabled