From d4a87610f0ca82afe41c7c295430b826c2be0cdd Mon Sep 17 00:00:00 2001 From: sdc50 Date: Wed, 16 Oct 2024 15:21:49 -0600 Subject: [PATCH] Delayed job status (#1102) * tasks and delayed update job status * tests * black/lint * docs * fix docs * Update django pin in docs --------- Co-authored-by: Nathan Swain --- docs/docs_environment.yml | 2 +- .../configuration/basic/file_permissions.rst | 2 +- .../manual/configuration/basic/supervisor.rst | 2 +- docs/tethys_sdk/jobs.rst | 24 +++- .../unit_tests/test_tethys_apps/test_views.py | 54 -------- .../test_tethys_compute/test_tasks.py | 55 ++++++++ .../test_views/test_update_status.py | 120 ++++++++++++++++ .../test_views/test_gizmos/test_jobs_table.py | 44 ++---- .../test_tethys_portal/test_urls.py | 14 +- tethys_apps/views.py | 46 +------ tethys_compute/tasks.py | 73 ++++++++++ tethys_compute/views/__init__.py | 18 +++ tethys_compute/views/update_status.py | 130 ++++++++++++++++++ tethys_gizmos/views/gizmos/jobs_table.py | 37 ++--- tethys_portal/urls.py | 8 +- 15 files changed, 465 insertions(+), 164 deletions(-) create mode 100644 tests/unit_tests/test_tethys_compute/test_tasks.py create mode 100644 tests/unit_tests/test_tethys_compute/test_views/test_update_status.py create mode 100644 tethys_compute/tasks.py create mode 100644 tethys_compute/views/update_status.py diff --git a/docs/docs_environment.yml b/docs/docs_environment.yml index c8e75e4e8..a6097f559 100644 --- a/docs/docs_environment.yml +++ b/docs/docs_environment.yml @@ -12,7 +12,7 @@ dependencies: - python - pip - tethys_dataset_services >=2.0.0 - - django =3.2.* + - django =4.2.* - sphinx - sphinx-argparse - make diff --git a/docs/installation/production/manual/configuration/basic/file_permissions.rst b/docs/installation/production/manual/configuration/basic/file_permissions.rst index ed01327cc..9bacd932e 100644 --- a/docs/installation/production/manual/configuration/basic/file_permissions.rst +++ b/docs/installation/production/manual/configuration/basic/file_permissions.rst @@ -91,7 +91,7 @@ For convenience, you may consider setting up these or similar aliases in the act .. _selinux_configuration: 3. Security-Enhanced Linux File Permissions (Rocky Linux, May not Apply) -=================================================================== +======================================================================== If you are installing Tethys Portal on a Rocky Linux or RedHat system that has `Security-Enhanced Linux (SELinux) `_ enabled and set to enforcing mode, you may need to perform additional setup to allow the server processes to access files. diff --git a/docs/installation/production/manual/configuration/basic/supervisor.rst b/docs/installation/production/manual/configuration/basic/supervisor.rst index a7eb4f656..7eaa25af7 100644 --- a/docs/installation/production/manual/configuration/basic/supervisor.rst +++ b/docs/installation/production/manual/configuration/basic/supervisor.rst @@ -82,7 +82,7 @@ Create a symbolic links from the two configuration files generated in the previo Replace ```` with the path to the Tethys home directory as noted in :ref:`production_portal_config` section. 5. Modify :file:`supervisord.conf` (Rocky Linux Only) -================================================ +===================================================== For Rocky Linux systems, modify :file:`supervisord.conf` to recognize our configuration files: diff --git a/docs/tethys_sdk/jobs.rst b/docs/tethys_sdk/jobs.rst index 3e8bd2d0e..67131d8b9 100644 --- a/docs/tethys_sdk/jobs.rst +++ b/docs/tethys_sdk/jobs.rst @@ -228,9 +228,9 @@ For example, a URL may look something like this: http://example.com/update-job-status/27/ -The output would look something like this: +The response would look something like this: -.. code-block:: python +.. code-block:: javascript {"success": true} @@ -241,6 +241,26 @@ This URL can be retrieved from the job manager with the ``get_job_status_callbac job_manager = App.get_job_manager() callback_url = job_manager.get_job_status_callback_url(request, job_id) +The callback URL can be used to update the jobs status after a specified delay by passing the ``delay`` query parameter: + +.. code-block:: + + http:///update-job-status//?delay= + +For example, to schedule a job update in 30 seconds: + +.. code-block:: + + http:///update-job-status/27/?delay=30 + +In this case the response would look like this: + +.. code-block:: javascript + + {"success": "scheduled"} + +This delay can be useful so the job itself can hit the endpoint just before completing to trigger the Tethys Portal to check its status after it has time to complete and exit. This will allow the portal to register that the job has completed and start any data transfer that is triggered upon job completion. + Custom Statuses --------------- Custom statuses can be given to jobs simply by assigning the ``status`` attribute: diff --git a/tests/unit_tests/test_tethys_apps/test_views.py b/tests/unit_tests/test_tethys_apps/test_views.py index 250178ce4..ee8dd9d93 100644 --- a/tests/unit_tests/test_tethys_apps/test_views.py +++ b/tests/unit_tests/test_tethys_apps/test_views.py @@ -7,8 +7,6 @@ handoff_capabilities, handoff, send_beta_feedback_email, - update_job_status, - update_dask_job_status, ) @@ -291,55 +289,3 @@ def test_send_beta_feedback_email_send_mail_exception( mock_json_response.assert_called_once_with( {"success": False, "error": "Failed to send email: foo_error"} ) - - @mock.patch("tethys_apps.views.JsonResponse") - @mock.patch("tethys_apps.views.TethysJob") - def test_update_job_status(self, mock_tethysjob, mock_json_response): - mock_request = mock.MagicMock() - mock_job_id = mock.MagicMock() - mock_job1 = mock.MagicMock() - mock_job1.status = True - mock_tethysjob.objects.get_subclass.return_value = mock_job1 - - update_job_status(mock_request, mock_job_id) - mock_tethysjob.objects.get_subclass.assert_called_once_with(id=mock_job_id) - mock_json_response.assert_called_once_with({"success": True}) - - @mock.patch("tethys_apps.views.JsonResponse") - @mock.patch("tethys_apps.views.TethysJob") - def test_update_job_statusException(self, mock_tethysjob, mock_json_response): - mock_request = mock.MagicMock() - mock_job_id = mock.MagicMock() - mock_tethysjob.objects.get_subclass.side_effect = Exception - - update_job_status(mock_request, mock_job_id) - mock_tethysjob.objects.get_subclass.assert_called_once_with(id=mock_job_id) - mock_json_response.assert_called_once_with({"success": False}) - - @mock.patch("tethys_apps.views.JsonResponse") - @mock.patch("tethys_apps.views.DaskJob") - def test_update_dask_job_status(self, mock_daskjob, mock_json_response): - mock_request = mock.MagicMock() - mock_job_key = mock.MagicMock() - mock_job1 = mock.MagicMock() - mock_job1.status = True - mock_job2 = mock.MagicMock() - mock_daskjob.objects.filter.return_value = [mock_job1, mock_job2] - - # Call the method - update_dask_job_status(mock_request, mock_job_key) - - # check results - mock_daskjob.objects.filter.assert_called_once_with(key=mock_job_key) - mock_json_response.assert_called_once_with({"success": True}) - - @mock.patch("tethys_apps.views.JsonResponse") - @mock.patch("tethys_apps.views.DaskJob") - def test_update_dask_job_statusException(self, mock_daskjob, mock_json_response): - mock_request = mock.MagicMock() - mock_job_key = mock.MagicMock() - mock_daskjob.objects.filter.side_effect = Exception - - update_dask_job_status(mock_request, mock_job_key) - mock_daskjob.objects.filter.assert_called_once_with(key=mock_job_key) - mock_json_response.assert_called_once_with({"success": False}) diff --git a/tests/unit_tests/test_tethys_compute/test_tasks.py b/tests/unit_tests/test_tethys_compute/test_tasks.py new file mode 100644 index 000000000..165fdc2b9 --- /dev/null +++ b/tests/unit_tests/test_tethys_compute/test_tasks.py @@ -0,0 +1,55 @@ +import unittest +from unittest import mock + +import tethys_compute.tasks as tethys_compute_tasks + + +async def noop(): + pass + + +def raise_error(): + raise Exception() + + +class TestTasks(unittest.IsolatedAsyncioTestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + @mock.patch("tethys_compute.tasks._run_after_delay", new_callable=mock.MagicMock) + @mock.patch("tethys_compute.tasks.asyncio.create_task") + def test_create_task(self, mock_aio_ct, mock_run_delay): + mock_func = mock.MagicMock() + mock_coro = mock.MagicMock() + mock_run_delay.return_value = mock_coro + tethys_compute_tasks.create_task(mock_func) + mock_aio_ct.assert_called_with(mock_coro) + mock_run_delay.assert_called_with( + mock_func, delay=0, periodic=False, count=None + ) + + @mock.patch("tethys_compute.tasks.logger") + async def test_run_after_delay(self, mock_log): + await tethys_compute_tasks._run_after_delay( + noop, delay=0, periodic=False, count=None + ) + mock_log.info.assert_called() + + @mock.patch("tethys_compute.tasks.logger") + @mock.patch("tethys_compute.tasks.asyncio.sleep") + async def test_run_after_delay_periodic(self, mock_sleep, mock_log): + await tethys_compute_tasks._run_after_delay( + noop, delay=30, periodic=True, count=2 + ) + mock_sleep.assert_called_with(30) + mock_log.info.assert_called() + + @mock.patch("tethys_compute.tasks.logger") + async def test_run_after_delay_exception(self, mock_log): + await tethys_compute_tasks._run_after_delay( + raise_error, delay=0, periodic=False, count=None + ) + self.assertEqual(mock_log.info.call_count, 2) diff --git a/tests/unit_tests/test_tethys_compute/test_views/test_update_status.py b/tests/unit_tests/test_tethys_compute/test_views/test_update_status.py new file mode 100644 index 000000000..9ee3093c4 --- /dev/null +++ b/tests/unit_tests/test_tethys_compute/test_views/test_update_status.py @@ -0,0 +1,120 @@ +import unittest +from unittest import mock + +import tethys_compute.views.update_status as tethys_compute_update_status + + +class TestUpdateStatus(unittest.IsolatedAsyncioTestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + @mock.patch("tethys_compute.views.update_status.TethysJob.objects.get_subclass") + async def test_get_job(self, mock_tj): + mock_user = mock.MagicMock(is_staff=False) + mock_user.has_perm.return_value = False + await tethys_compute_update_status.get_job("job_id", mock_user) + mock_tj.assert_called_with(id="job_id", user=mock_user) + + @mock.patch("tethys_compute.views.update_status.TethysJob.objects.get_subclass") + async def test_get_job_staff(self, mock_tj): + mock_user = mock.MagicMock(is_staff=True) + await tethys_compute_update_status.get_job("job_id", mock_user) + mock_tj.assert_called_with(id="job_id") + + @mock.patch("tethys_compute.views.update_status.TethysJob.objects.get_subclass") + async def test_get_job_has_permission(self, mock_tj): + mock_user = mock.MagicMock(is_staff=False) + mock_user.has_perm.return_value = True + await tethys_compute_update_status.get_job("job_id", mock_user) + mock_tj.assert_called_with(id="job_id") + + @mock.patch("tethys_compute.views.update_status.logger") + @mock.patch("tethys_compute.views.update_status.JsonResponse") + @mock.patch("tethys_compute.views.update_status.TethysJob") + async def test_update_job_status(self, mock_tethysjob, mock_json_response, _): + mock_request = mock.MagicMock(GET={}) + mock_job_id = mock.MagicMock() + mock_job1 = mock.MagicMock() + mock_job1.status = True + mock_tethysjob.objects.get_subclass.return_value = mock_job1 + + await tethys_compute_update_status.update_job_status(mock_request, mock_job_id) + mock_tethysjob.objects.get_subclass.assert_called_once_with(id=mock_job_id) + mock_json_response.assert_called_once_with({"success": True}) + + @mock.patch("tethys_compute.views.update_status.create_task") + @mock.patch("tethys_compute.views.update_status.logger") + @mock.patch("tethys_compute.views.update_status.JsonResponse") + async def test_update_job_status_with_delay( + self, mock_json_response, mock_log, mock_ct + ): + mock_request = mock.MagicMock(GET={"delay": "1"}) + mock_job_id = mock.MagicMock() + + await tethys_compute_update_status.update_job_status(mock_request, mock_job_id) + mock_json_response.assert_called_once_with({"success": "scheduled"}) + mock_log.debug.assert_called_once() + mock_ct.assert_called_with( + tethys_compute_update_status._update_job_status, mock_job_id, delay=1 + ) + + @mock.patch("tethys_compute.views.update_status.create_task") + @mock.patch("tethys_compute.views.update_status.logger") + @mock.patch("tethys_compute.views.update_status.JsonResponse") + async def test_update_job_status_with_delay_exception( + self, mock_json_response, mock_log, mock_ct + ): + mock_request = mock.MagicMock(GET={"delay": "1"}) + mock_job_id = mock.MagicMock() + mock_ct.side_effect = Exception + + await tethys_compute_update_status.update_job_status(mock_request, mock_job_id) + mock_json_response.assert_called_once_with({"success": False}) + mock_log.warning.assert_called_once() + + @mock.patch("tethys_compute.views.update_status.logger") + @mock.patch("tethys_compute.views.update_status.JsonResponse") + @mock.patch("tethys_compute.views.update_status.TethysJob") + async def test_update_job_statusException( + self, mock_tethysjob, mock_json_response, mock_log + ): + mock_request = mock.MagicMock(GET={}) + mock_job_id = mock.MagicMock() + mock_tethysjob.objects.get_subclass.side_effect = Exception + + await tethys_compute_update_status.update_job_status(mock_request, mock_job_id) + mock_tethysjob.objects.get_subclass.assert_called_once_with(id=mock_job_id) + mock_json_response.assert_called_once_with({"success": False}) + mock_log.warning.assert_called_once() + + @mock.patch("tethys_compute.views.update_status.JsonResponse") + @mock.patch("tethys_compute.views.update_status.DaskJob") + def test_update_dask_job_status(self, mock_daskjob, mock_json_response): + mock_request = mock.MagicMock() + mock_job_key = mock.MagicMock() + mock_job1 = mock.MagicMock() + mock_job1.status = True + mock_job2 = mock.MagicMock() + mock_daskjob.objects.filter.return_value = [mock_job1, mock_job2] + + # Call the method + tethys_compute_update_status.update_dask_job_status(mock_request, mock_job_key) + + # check results + mock_daskjob.objects.filter.assert_called_once_with(key=mock_job_key) + mock_json_response.assert_called_once_with({"success": True}) + + @mock.patch("tethys_compute.views.update_status.JsonResponse") + @mock.patch("tethys_compute.views.update_status.DaskJob") + def test_update_dask_job_statusException(self, mock_daskjob, mock_json_response): + mock_request = mock.MagicMock() + mock_job_key = mock.MagicMock() + mock_daskjob.objects.filter.side_effect = Exception + + tethys_compute_update_status.update_dask_job_status(mock_request, mock_job_key) + mock_daskjob.objects.filter.assert_called_once_with(key=mock_job_key) + mock_json_response.assert_called_once_with({"success": False}) diff --git a/tests/unit_tests/test_tethys_gizmos/test_views/test_gizmos/test_jobs_table.py b/tests/unit_tests/test_tethys_gizmos/test_views/test_gizmos/test_jobs_table.py index a80382e30..4874d504e 100644 --- a/tests/unit_tests/test_tethys_gizmos/test_views/test_gizmos/test_jobs_table.py +++ b/tests/unit_tests/test_tethys_gizmos/test_views/test_gizmos/test_jobs_table.py @@ -266,10 +266,10 @@ async def test_get_log_content_exception(self, mock_tj, mock_log): ) @mock.patch("tethys_gizmos.views.gizmos.jobs_table.render_to_string") - @mock.patch("tethys_gizmos.views.gizmos.jobs_table.TethysJob") + @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") async def test_update_row_showcase(self, mock_tj, mock_rts): mock_rts.return_value = '{"job_statuses":[]}' - mock_tj.objects.get_subclass.return_value = mock.MagicMock( + mock_tj.return_value = mock.MagicMock( spec=TethysJob, cached_status="Various", label="gizmo_showcase" ) rows = [("1", "30")] @@ -290,10 +290,10 @@ async def test_update_row_showcase(self, mock_tj, mock_rts): self.assertEqual(200, result.status_code) @mock.patch("tethys_gizmos.views.gizmos.jobs_table.render_to_string") - @mock.patch("tethys_gizmos.views.gizmos.jobs_table.TethysJob") + @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") async def test_update_row_showcase_various_complete(self, mock_tj, mock_rts): mock_rts.return_value = '{"job_statuses":[]}' - mock_tj.objects.get_subclass.return_value = mock.MagicMock( + mock_tj.return_value = mock.MagicMock( spec=TethysJob, cached_status="Various-Complete", label="gizmo_showcase" ) rows = [("1", "30")] @@ -314,10 +314,10 @@ async def test_update_row_showcase_various_complete(self, mock_tj, mock_rts): self.assertEqual(200, result.status_code) @mock.patch("tethys_gizmos.views.gizmos.jobs_table.render_to_string") - @mock.patch("tethys_gizmos.views.gizmos.jobs_table.TethysJob") + @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") async def test_update_row_showcase_condor_workflow(self, mock_tj, mock_rts): mock_rts.return_value = '{"job_statuses":[]}' - mock_tj.objects.get_subclass.return_value = mock.MagicMock( + mock_tj.return_value = mock.MagicMock( spec=CondorWorkflow, cached_status="Various", label="gizmo_showcase" ) rows = [("1", "30")] @@ -381,11 +381,11 @@ async def test_update_row(self, mock_tj, mock_rts): self.assertEqual(200, result.status_code) @mock.patch("tethys_gizmos.views.gizmos.jobs_table.render_to_string") - @mock.patch("tethys_gizmos.views.gizmos.jobs_table.TethysJob") + @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") async def test_update_row_dask_job_results_ready(self, mock_tj, mock_rts): # Another Case where job.label is not gizmo_showcase mock_rts.return_value = '{"job_statuses":[]}' - mock_tj.objects.get_subclass.return_value = mock.MagicMock( + mock_tj.return_value = mock.MagicMock( spec=DaskJob, cached_status="Results-Ready", label="test_label", @@ -403,11 +403,11 @@ async def test_update_row_dask_job_results_ready(self, mock_tj, mock_rts): self.assertEqual(200, result.status_code) @mock.patch("tethys_gizmos.views.gizmos.jobs_table.render_to_string") - @mock.patch("tethys_gizmos.views.gizmos.jobs_table.TethysJob") + @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") async def test_update_row_condor_workflow_no_statuses(self, mock_tj, mock_rts): # Another Case where job.label is not gizmo_showcase mock_rts.return_value = '{"job_statuses":[]}' - mock_tj.objects.get_subclass.return_value = mock.MagicMock( + mock_tj.return_value = mock.MagicMock( spec=CondorWorkflow, cached_status="Various", label="test_label", @@ -433,9 +433,9 @@ async def test_update_row_condor_workflow_no_statuses(self, mock_tj, mock_rts): self.assertEqual(200, result.status_code) @mock.patch("tethys_gizmos.views.gizmos.jobs_table.logger") - @mock.patch("tethys_gizmos.views.gizmos.jobs_table.TethysJob") + @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") async def test_update_row_exception(self, mock_tj, mock_log): - mock_tj.objects.get_subclass.side_effect = Exception("error") + mock_tj.side_effect = Exception("error") rows = [("1", "30"), ("2", "18"), ("3", "26")] request = RequestFactory().post( "/jobs", {"column_fields": self.column_names, "row": rows} @@ -667,25 +667,5 @@ async def test_bokeh_row_scheduler_error(self, mock_tj, mock_scheduler, mock_log " for job test_id: test_error_message" ) - @mock.patch("tethys_gizmos.views.gizmos.jobs_table.TethysJob.objects.get_subclass") - async def test_get_job(self, mock_tj): - mock_user = mock.MagicMock(is_staff=False) - mock_user.has_perm.return_value = False - await gizmo_jobs_table.get_job("job_id", mock_user) - mock_tj.assert_called_with(id="job_id", user=mock_user) - - @mock.patch("tethys_gizmos.views.gizmos.jobs_table.TethysJob.objects.get_subclass") - async def test_get_job_staff(self, mock_tj): - mock_user = mock.MagicMock(is_staff=True) - await gizmo_jobs_table.get_job("job_id", mock_user) - mock_tj.assert_called_with(id="job_id") - - @mock.patch("tethys_gizmos.views.gizmos.jobs_table.TethysJob.objects.get_subclass") - async def test_get_job_has_permission(self, mock_tj): - mock_user = mock.MagicMock(is_staff=False) - mock_user.has_perm.return_value = True - await gizmo_jobs_table.get_job("job_id", mock_user) - mock_tj.assert_called_with(id="job_id") - def test_permission_exists(self): Permission.objects.get(codename="jobs_table_actions") diff --git a/tests/unit_tests/test_tethys_portal/test_urls.py b/tests/unit_tests/test_tethys_portal/test_urls.py index 17a00ec92..c7a252987 100644 --- a/tests/unit_tests/test_tethys_portal/test_urls.py +++ b/tests/unit_tests/test_tethys_portal/test_urls.py @@ -151,14 +151,17 @@ def test_urlpatterns_update_job_status(self): url = reverse("update_job_status", kwargs={"job_id": "JI001"}) resolver = resolve(url) self.assertEqual("/update-job-status/JI001/", url) - self.assertEqual("tethys_apps.views.update_job_status", resolver._func_path) + self.assertEqual( + "tethys_compute.views.update_status.update_job_status", resolver._func_path + ) def test_urlpatterns_update_dask_job_status(self): url = reverse("update_dask_job_status", kwargs={"key": "123456789"}) resolver = resolve(url) self.assertEqual("/update-dask-job-status/123456789/", url) self.assertEqual( - "tethys_apps.views.update_dask_job_status", resolver._func_path + "tethys_compute.views.update_status.update_dask_job_status", + resolver._func_path, ) @override_settings(REGISTER_CONTROLLER="test") @@ -362,14 +365,17 @@ def test_urlpatterns_update_job_status(self): url = reverse("update_job_status", kwargs={"job_id": "JI001"}) resolver = resolve(url) self.assertEqual("/test/prefix/update-job-status/JI001/", url) - self.assertEqual("tethys_apps.views.update_job_status", resolver._func_path) + self.assertEqual( + "tethys_compute.views.update_status.update_job_status", resolver._func_path + ) def test_urlpatterns_update_dask_job_status(self): url = reverse("update_dask_job_status", kwargs={"key": "123456789"}) resolver = resolve(url) self.assertEqual("/test/prefix/update-dask-job-status/123456789/", url) self.assertEqual( - "tethys_apps.views.update_dask_job_status", resolver._func_path + "tethys_compute.views.update_status.update_dask_job_status", + resolver._func_path, ) @override_settings(REGISTER_CONTROLLER="test") diff --git a/tethys_apps/views.py b/tethys_apps/views.py index 29938d139..e777c3966 100644 --- a/tethys_apps/views.py +++ b/tethys_apps/views.py @@ -9,19 +9,19 @@ """ import logging + from django.shortcuts import render from django.http import HttpResponse, JsonResponse from django.core.mail import send_mail -from tethys_compute.models import TethysJob, DaskJob -from tethys_config.models import get_custom_template +from tethys_config.models import get_custom_template from .base.app_base import TethysAppBase from .models import TethysApp from .utilities import get_active_app, user_can_access_app from .models import ProxyApp from .decorators import login_required -log = logging.getLogger("tethys." + __name__) +logger = logging.getLogger("tethys." + __name__) @login_required() @@ -150,43 +150,3 @@ def send_beta_feedback_email(request): json = {"success": True, "result": "Emails sent to specified developers"} return JsonResponse(json) - - -def update_job_status(request, job_id): - """ - Callback endpoint for jobs to update status. - """ - try: - job = TethysJob.objects.get_subclass(id=job_id) - job.status - json = {"success": True} - except Exception: - json = {"success": False} - - return JsonResponse(json) - - -def update_dask_job_status(request, key): - """ - Callback endpoint for dask jobs to update status. - """ - params = request.GET - status = params.get("status", None) - log.debug( - "Recieved update status for DaskJob".format(key, status) - ) - - try: - job = DaskJob.objects.filter(key=key)[0] - job_status = job.DASK_TO_STATUS_TYPES[status] - log.debug( - 'Mapped dask status "{}" to tethys job status: "{}"'.format( - status, job_status - ) - ) - job.status = job_status - json = {"success": True} - except Exception: - json = {"success": False} - - return JsonResponse(json) diff --git a/tethys_compute/tasks.py b/tethys_compute/tasks.py new file mode 100644 index 000000000..a2a461208 --- /dev/null +++ b/tethys_compute/tasks.py @@ -0,0 +1,73 @@ +""" +******************************************************************************** +* Name: tasks.py +* Author: Scott Christensen +* Created On: 2024 +* Copyright: (c) Tethys Geospatial Foundation 2024 +* License: BSD 2-Clause +******************************************************************************** +""" + +import asyncio +import logging + +logger = logging.getLogger(f"tethys.{__name__}") + + +def create_task(func, /, *args, delay=0, periodic=False, count=None, **kwargs): + """ + Schedules a task to be executed after some delay. This is run asynchronously and must be called from a context + where there is an active event loop (e.g. from a controller). + + Can be set to run periodically (i.e. it will be rescheduled after it is run) either indefinitely or for a + specified number of times. + Args: + func (callable): the function to schedule + *args: args to pass to the function when it is called + delay (int): number of seconds to wait before executing `func` or between calls if `periodic=True` + periodic (bool): if `True` the function will be rescheduled after each execution until `count=0` or + indefinitely if `count=None` + count (int): the number of times to execute the function if `periodic=True`. If `periodic=False` then + this argument is ignored + **kwargs: key-word arguments to pass to `func` + """ + asyncio.create_task( + _run_after_delay( + func, *args, delay=delay, periodic=periodic, count=count, **kwargs + ) + ) + + +async def _run_after_delay(func, /, *args, delay, periodic, count, **kwargs): + """ + Helper function to `create_task` that delays before executing a function. It is called recursively to handle + `periodic` tasks. + + Args: + func (callable): the function to schedule + *args: args to pass to the function when it is called + delay (int): number of seconds to wait before executing `func` or between calls if `periodic=True` + periodic (bool): if `True` the function will be rescheduled after each execution until `count=0` or + indefinitely if `count=None` + count (int): the number of times to execute the function if `periodic=True`. If `periodic=False` then + this argument is ignored + **kwargs: key-word arguments to pass to `func` + """ + await asyncio.sleep(delay) + try: + logger.info(f'Running task "{func}" with args="{args}" and kwargs="{kwargs}".') + result = func(*args, **kwargs) + if asyncio.iscoroutine(result): + await result + except Exception as e: + logger.info( + f'The following error occurred while running the task "{func}": {e}' + ) + if periodic and (count is None or count > 0): + if isinstance(count, int): + count -= 1 + asyncio.create_task( + _run_after_delay( + func, *args, delay=delay, periodic=periodic, count=count, **kwargs + ) + ) diff --git a/tethys_compute/views/__init__.py b/tethys_compute/views/__init__.py index e69de29bb..c3e34b5d2 100644 --- a/tethys_compute/views/__init__.py +++ b/tethys_compute/views/__init__.py @@ -0,0 +1,18 @@ +""" +******************************************************************************** +* Name: tethys_compute/views/__init__.py +* Author: Scott Christensen +* Created On: October 2024 +* Copyright: (c) Tethys Geospatial Foundation 2024 +* License: BSD 2-Clause +******************************************************************************** +""" + +# flake8: noqa +from .dask_dashboard import dask_dashboard +from .update_status import ( + get_job, + do_job_action, + update_job_status, + update_dask_job_status, +) diff --git a/tethys_compute/views/update_status.py b/tethys_compute/views/update_status.py new file mode 100644 index 000000000..786708009 --- /dev/null +++ b/tethys_compute/views/update_status.py @@ -0,0 +1,130 @@ +""" +******************************************************************************** +* Name: update_status.py +* Author: Scott Christensen +* Created On: 2024 +* Copyright: (c) Tethys Geospatial Foundation 2024 +* License: BSD 2-Clause +******************************************************************************** +""" + +import asyncio +import logging + +from django.http import JsonResponse +from channels.db import database_sync_to_async + +from tethys_compute.models import TethysJob, DaskJob + +from ..tasks import create_task + +logger = logging.getLogger(f"tethys.{__name__}") + + +@database_sync_to_async +def get_job(job_id, user=None): + """ + Helper method to query a `TethysJob` object safely from an asynchronous context. + + Args: + job_id: database ID of a `TethysJob` + user: django user object. If `None` then permission are not checked. Default=None + + Returns: `TethysJob` object + + """ + if ( + user is None + or user.is_staff + or user.has_perm("tethys_compute.jobs_table_actions") + ): + return TethysJob.objects.get_subclass(id=job_id) + return TethysJob.objects.get_subclass(id=job_id, user=user) + + +async def do_job_action(job, action): + """ + Helper function to call job actions from an asynchronous context. + Handles both sync methods and coroutine job actions. + + Args: + job: `TethysJob` object + action (str): name of method to call (without arguments) on the `job` + + Returns: return value of `action` + + """ + func = getattr(job, action) + if asyncio.iscoroutinefunction(func): + ret = await func() + await job.safe_close() + else: + ret = await database_sync_to_async(func)() + return ret + + +async def _update_job_status(job_id): + """ + Helper method to update a jobs status as a task (with delayed execution). + + Args: + job_id: database ID for a `TethysJob` + + Returns: `True` if status was successfully updated, `False` otherwise. + + """ + try: + job = await get_job(job_id) + await do_job_action(job, "update_status") + return True + except Exception as e: + logger.warning( + f"The following exception occurred while updating the status of job_id={job_id}: {e}" + ) + return False + + +async def update_job_status(request, job_id): + """ + Callback endpoint for jobs to update status. + """ + delay = request.GET.get("delay") + if delay: + logger.debug( + f"Updating the status of job_id={job_id} after {delay} second delay." + ) + try: + delay = int(delay) + create_task(_update_job_status, job_id, delay=delay) + result = "scheduled" + except Exception as e: + logger.warning( + f"The following exception occurred while scheduling the status update of job_id={job_id}: {e}" + ) + result = False + else: + result = await _update_job_status(job_id) + + return JsonResponse({"success": result}) + + +def update_dask_job_status(request, key): + """ + Callback endpoint for dask jobs to update status. + """ + params = request.GET + status = params.get("status", None) + logger.debug(f"Received update status for DaskJob") + + try: + job = DaskJob.objects.filter(key=key)[0] + job_status = job.DASK_TO_STATUS_TYPES[status] + logger.debug( + f'Mapped dask status "{status}" to tethys job status: "{job_status}"' + ) + job.status = job_status + json = {"success": True} + except Exception: + json = {"success": False} + + return JsonResponse(json) diff --git a/tethys_gizmos/views/gizmos/jobs_table.py b/tethys_gizmos/views/gizmos/jobs_table.py index 5785fe7d8..9e001522f 100644 --- a/tethys_gizmos/views/gizmos/jobs_table.py +++ b/tethys_gizmos/views/gizmos/jobs_table.py @@ -1,3 +1,13 @@ +""" +******************************************************************************** +* Name: jobs_table.py +* Author: Scott Christensen +* Created On: 2014 +* Copyright: (c) Brigham Young University 2014 +* License: BSD 2-Clause +******************************************************************************** +""" + import inspect import logging import re @@ -5,14 +15,14 @@ from django.http import JsonResponse from django.template.loader import render_to_string -from tethys_compute.models import TethysJob, CondorWorkflow, DaskJob, DaskScheduler -from tethys_gizmos.gizmo_options.jobs_table import JobsTable -from tethys_sdk.gizmos import SelectInput -from tethys_portal.optional_dependencies import optional_import from django.contrib.auth.decorators import login_required - from channels.db import database_sync_to_async +from tethys_compute.models import CondorWorkflow, DaskJob, DaskScheduler +from tethys_gizmos.gizmo_options.jobs_table import JobsTable +from tethys_sdk.gizmos import SelectInput +from tethys_portal.optional_dependencies import optional_import +from tethys_compute.views import get_job, do_job_action # optional imports server_document = optional_import("server_document", from_module="bokeh.embed") @@ -31,23 +41,6 @@ async def wrapper(request, *args, **kwargs): return wrapper -@database_sync_to_async -def get_job(job_id, user): - if user.is_staff or user.has_perm("tethys_compute.jobs_table_actions"): - return TethysJob.objects.get_subclass(id=job_id) - return TethysJob.objects.get_subclass(id=job_id, user=user) - - -async def do_job_action(job, action): - func = getattr(job, action) - if inspect.iscoroutinefunction(func): - ret = await func() - await job.safe_close() - else: - ret = await database_sync_to_async(func)() - return ret - - async def _get_log_content(job, key1, key2): # Get the Job logs. if inspect.iscoroutinefunction(job.get_logs): diff --git a/tethys_portal/urls.py b/tethys_portal/urls.py index 1c1cc1a05..057c2e93b 100644 --- a/tethys_portal/urls.py +++ b/tethys_portal/urls.py @@ -37,7 +37,7 @@ ) from tethys_portal.optional_dependencies import has_module from tethys_apps import views as tethys_apps_views -from tethys_compute.views import dask_dashboard as tethys_dask_views +from tethys_compute import views as tethys_compute_views from tethys_apps.base.function_extractor import TethysFunctionExtractor # ensure at least staff users logged in before accessing admin login page @@ -71,7 +71,7 @@ 0, re_path( r"^dask-dashboard/(?P[\w-]+)/(?P[\w-]+)/$", - tethys_dask_views.dask_dashboard, + tethys_compute_views.dask_dashboard, name="dask_dashboard", ), ) @@ -212,12 +212,12 @@ ), re_path( r"^update-job-status/(?P[\w-]+)/$", - tethys_apps_views.update_job_status, + tethys_compute_views.update_job_status, name="update_job_status", ), re_path( r"^update-dask-job-status/(?P[\w-]+)/$", - tethys_apps_views.update_dask_job_status, + tethys_compute_views.update_dask_job_status, name="update_dask_job_status", ), re_path(r"^api/", include((api_urls, "api"), namespace="api")),