Skip to content

Commit

Permalink
docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sdc50 committed Oct 10, 2024
1 parent 390ce1b commit c2ed3ab
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 2 deletions.
24 changes: 22 additions & 2 deletions docs/tethys_sdk/jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ For example, a URL may look something like this:
http://example.com/update-job-status/27/
The output would look something like this:
The response would look something like this:

.. code-block:: python
.. code-block:: javascript
{"success": true}
Expand All @@ -241,6 +241,26 @@ This URL can be retrieved from the job manager with the ``get_job_status_callbac
job_manager = App.get_job_manager()
callback_url = job_manager.get_job_status_callback_url(request, job_id)
The callback URL can be used to update the jobs status after a specified delay by passing the ``delay`` query parameter:

.. code-block::
http://<host>/update-job-status/<job_id>/?delay=<delay_in_seconds>
For example, to schedule a job update in 30 seconds:

.. code-block::
http://<host>/update-job-status/27/?delay=30
In this case the response would look like this:

.. code-block:: javascript
{"success": "scheduled"}
This delay can be useful so the job itself can hit the endpoint just before completing to trigger the Tethys Portal to check its status after it has time to complete and exit. This will allow the portal to register that the job has completed and start any data transfer that is triggered upon job completion.

Custom Statuses
---------------
Custom statuses can be given to jobs simply by assigning the ``status`` attribute:
Expand Down
30 changes: 30 additions & 0 deletions tethys_compute/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@


def create_task(func, /, *args, delay=0, periodic=False, count=None, **kwargs):
"""
Schedules a task to be executed after some delay. This is run asynchronously and must be called from a context
where there is an active event loop (e.g. from a controller).
Can be set to run periodically (i.e. it will be rescheduled after it is run) either indefinitely or for a
specified number of times.
Args:
func (callable): the function to schedule
*args: args to pass to the function when it is called
delay (int): number of seconds to wait before executing `func` or between calls if `periodic=True`
periodic (bool): if `True` the function will be rescheduled after each execution until `count=0` or
indefinitely if `count=None`
count (int): the number of times to execute the function if `periodic=True`. If `periodic=False` then
this argument is ignored
**kwargs: key-word arguments to pass to `func`
"""
asyncio.create_task(
_run_after_delay(
func, *args, delay=delay, periodic=periodic, count=count, **kwargs
Expand All @@ -23,6 +39,20 @@ def create_task(func, /, *args, delay=0, periodic=False, count=None, **kwargs):


async def _run_after_delay(func, /, *args, delay, periodic, count, **kwargs):
"""
Helper function to `create_task` that delays before executing a function. It is called recursively to handle
`periodic` tasks.
Args:
func (callable): the function to schedule
*args: args to pass to the function when it is called
delay (int): number of seconds to wait before executing `func` or between calls if `periodic=True`
periodic (bool): if `True` the function will be rescheduled after each execution until `count=0` or
indefinitely if `count=None`
count (int): the number of times to execute the function if `periodic=True`. If `periodic=False` then
this argument is ignored
**kwargs: key-word arguments to pass to `func`
"""
await asyncio.sleep(delay)
try:
logger.info(f'Running task "{func}" with args="{args}" and kwargs="{kwargs}".')
Expand Down
30 changes: 30 additions & 0 deletions tethys_compute/views/update_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@

@database_sync_to_async
def get_job(job_id, user=None):
"""
Helper method to query a `TethysJob` object safely from an asynchronous context.
Args:
job_id: database ID of a `TethysJob`
user: django user object. If `None` then permission are not checked. Default=None
Returns: `TethysJob` object
"""
if (
user is None
or user.is_staff
Expand All @@ -33,6 +43,17 @@ def get_job(job_id, user=None):


async def do_job_action(job, action):
"""
Helper function to call job actions from an asynchronous context.
Handles both sync methods and coroutine job actions.
Args:
job: `TethysJob` object
action (str): name of method to call (without arguments) on the `job`
Returns: return value of `action`
"""
func = getattr(job, action)
if asyncio.iscoroutinefunction(func):
ret = await func()
Expand All @@ -43,6 +64,15 @@ async def do_job_action(job, action):


async def _update_job_status(job_id):
"""
Helper method to update a jobs status as a task (with delayed execution).
Args:
job_id: database ID for a `TethysJob`
Returns: `True` if status was successfully updated, `False` otherwise.
"""
try:
job = await get_job(job_id)
await do_job_action(job, "update_status")
Expand Down

0 comments on commit c2ed3ab

Please sign in to comment.