Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker updates #2938

Merged
merged 21 commits into from
Jul 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.31.0] - Unreleased

### Added

- Added the ability to run async methods as thread workers https://github.com/Textualize/textual/pull/2938

### Changed

- Breaking change: Creating a thread worker now requires that a `thread=True` keyword argument is passed https://github.com/Textualize/textual/pull/2938

### Fixed

- Fixed a crash when a `SelectionList` had a prompt wider than itself https://github.com/Textualize/textual/issues/2900
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/guide/workers/weather05.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def on_input_changed(self, message: Input.Changed) -> None:
"""Called when the input changes"""
self.update_weather(message.value)

@work(exclusive=True)
@work(exclusive=True, thread=True)
def update_weather(self, city: str) -> None:
"""Update the weather for the given city."""
weather_widget = self.query_one("#weather", Static)
Expand Down
15 changes: 11 additions & 4 deletions docs/guide/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,30 @@ This works well if you are using an async API like `httpx`, but if your API does

Threads are a form of concurrency supplied by your Operating System. Threads allow your code to run more than a single function simultaneously.

You can create threads by applying `run_worker` or the `work` decorator to a plain (non async) method or function.
The API for thread workers is identical to async workers, but there are a few differences you need to be aware of when writing threaded code.
You can create threads by setting `thread=True` on the `run_worker` method or the `work` decorator.
The API for thread workers is identical to async workers, but there are a few differences you need to be aware of when writing code for thread workers.

The first difference is that you should avoid calling methods on your UI directly, or setting reactive variables.
You can work around this with the [App.call_from_thread][textual.app.App.call_from_thread] method which schedules a call in the main thread.

The second difference is that you can't cancel threads in the same way as coroutines, but you *can* manually check if the worker was cancelled.

Let's demonstrate thread workers by replacing `httpx` with `urllib.request` (in the standard library). The `urllib` module is not async aware, so we will need to use threads:
Let's demonstrate thread workers by replacing `httpx` with `urllib.request` (in the standard library).
The `urllib` module is not async aware, so we will need to use threads:

```python title="weather05.py" hl_lines="1 26-43"
--8<-- "docs/examples/guide/workers/weather05.py"
```

The `update_weather` function doesn't have the `async` keyword, so the `@work` decorator will create a thread worker.
In this example, the `update_weather` is not asynchronous (i.e. a regular function).
The `@work` decorator has `thread=True` which makes it a thread worker.
Note the use of [get_current_worker][textual.worker.get_current_worker] which the function uses to check if it has been cancelled or not.

!!! important

Textual will raise an exception if you add the `work` decorator to a regular function without `thread=True`.


#### Posting messages

Most Textual functions are not thread-safe which means you will need to use `call_from_thread` to run them from a thread worker.
Expand Down
43 changes: 40 additions & 3 deletions src/textual/_work_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from __future__ import annotations

from functools import partial, wraps
from inspect import iscoroutinefunction
from typing import TYPE_CHECKING, Callable, Coroutine, TypeVar, Union, cast, overload

from typing_extensions import ParamSpec, TypeAlias
Expand All @@ -30,22 +31,48 @@
]


class WorkerDeclarationError(Exception):
"""An error in the declaration of a worker method."""


@overload
def work(
method: Callable[FactoryParamSpec, Coroutine[None, None, ReturnType]]
method: Callable[FactoryParamSpec, Coroutine[None, None, ReturnType]],
*,
name: str = "",
group: str = "default",
exit_on_error: bool = True,
exclusive: bool = False,
description: str | None = None,
thread: bool = False,
) -> Callable[FactoryParamSpec, "Worker[ReturnType]"]:
...


@overload
def work(
method: Callable[FactoryParamSpec, ReturnType]
method: Callable[FactoryParamSpec, ReturnType],
*,
name: str = "",
group: str = "default",
exit_on_error: bool = True,
exclusive: bool = False,
description: str | None = None,
thread: bool = False,
) -> Callable[FactoryParamSpec, "Worker[ReturnType]"]:
...


@overload
def work(*, exclusive: bool = False) -> Decorator[..., ReturnType]:
def work(
*,
name: str = "",
group: str = "default",
exit_on_error: bool = True,
exclusive: bool = False,
description: str | None = None,
thread: bool = False,
) -> Decorator[..., ReturnType]:
...


Expand All @@ -59,6 +86,7 @@ def work(
exit_on_error: bool = True,
exclusive: bool = False,
description: str | None = None,
thread: bool = False,
) -> Callable[FactoryParamSpec, Worker[ReturnType]] | Decorator:
"""A decorator used to create [workers](/guide/workers).

Expand All @@ -71,6 +99,7 @@ def work(
description: Readable description of the worker for debugging purposes.
By default, it uses a string representation of the decorated method
and its arguments.
thread: Mark the method as a thread worker.
"""

def decorator(
Expand All @@ -81,6 +110,13 @@ def decorator(
) -> Callable[DecoratorParamSpec, Worker[ReturnType]]:
"""The decorator."""

# Methods that aren't async *must* be marked as being a thread
# worker.
if not iscoroutinefunction(method) and not thread:
raise WorkerDeclarationError(
"Can not create a worker from a non-async function unless `thread=True` is set on the work decorator."
)

@wraps(method)
def decorated(
*args: DecoratorParamSpec.args, **kwargs: DecoratorParamSpec.kwargs
Expand Down Expand Up @@ -112,6 +148,7 @@ def decorated(
description=debug_description,
exclusive=exclusive,
exit_on_error=exit_on_error,
thread=thread,
),
)
return worker
Expand Down
3 changes: 3 additions & 0 deletions src/textual/_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def _new_worker(
exit_on_error: bool = True,
start: bool = True,
exclusive: bool = False,
thread: bool = False,
) -> Worker:
"""Create a worker from a function, coroutine, or awaitable.

Expand All @@ -101,6 +102,7 @@ def _new_worker(
exit_on_error: Exit the app if the worker raises an error. Set to `False` to suppress exceptions.
start: Automatically start the worker.
exclusive: Cancel all workers in the same group.
thread: Mark the worker as a thread worker.

Returns:
A Worker instance.
Expand All @@ -112,6 +114,7 @@ def _new_worker(
group=group,
description=description or repr(work),
exit_on_error=exit_on_error,
thread=thread,
)
self.add_worker(worker, start=start, exclusive=exclusive)
return worker
Expand Down
3 changes: 3 additions & 0 deletions src/textual/dom.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def run_worker(
exit_on_error: bool = True,
start: bool = True,
exclusive: bool = False,
thread: bool = False,
) -> Worker[ResultType]:
"""Run work in a worker.

Expand All @@ -254,6 +255,7 @@ def run_worker(
exit_on_error: Exit the app if the worker raises an error. Set to `False` to suppress exceptions.
start: Start the worker immediately.
exclusive: Cancel all workers in the same group.
thread: Mark the worker as a thread worker.

Returns:
New Worker instance.
Expand All @@ -267,6 +269,7 @@ def run_worker(
exit_on_error=exit_on_error,
start=start,
exclusive=exclusive,
thread=thread,
)
return worker

Expand Down
2 changes: 1 addition & 1 deletion src/textual/widgets/_directory_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def _directory_content(self, location: Path, worker: Worker) -> Iterator[Path]:
except PermissionError:
pass

@work
@work(thread=True)
def _load_directory(self, node: TreeNode[DirEntry]) -> list[Path]:
"""Load the directory contents for a given node.

Expand Down
79 changes: 63 additions & 16 deletions src/textual/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def __init__(
group: str = "default",
description: str = "",
exit_on_error: bool = True,
thread: bool = False,
) -> None:
"""Initialize a Worker.

Expand All @@ -154,13 +155,15 @@ def __init__(
group: The worker group.
description: Description of the worker (longer string with more details).
exit_on_error: Exit the app if the worker raises an error. Set to `False` to suppress exceptions.
thread: Mark the worker as a thread worker.
"""
self._node = node
self._work = work
self.name = name
self.group = group
self.description = description
self.exit_on_error = exit_on_error
self._thread_worker = thread
self._state = WorkerState.PENDING
self.state = self._state
self._error: BaseException | None = None
Expand Down Expand Up @@ -271,35 +274,79 @@ def advance(self, steps: int = 1) -> None:
"""
self._completed_steps += steps

async def run(self) -> ResultType:
"""Run the work.

Implement this method in a subclass, or pass a callable to the constructor.
async def _run_threaded(self) -> ResultType:
"""Run a threaded worker.

Returns:
Return value of work.
Return value of the work.
"""

def run_awaitable(work: Awaitable[ResultType]) -> ResultType:
"""Set the active worker and await the awaitable."""

async def do_work() -> ResultType:
active_worker.set(self)
return await work

return asyncio.run(do_work())

def run_coroutine(
work: Callable[[], Coroutine[None, None, ResultType]]
) -> ResultType:
"""Set the active worker and await coroutine."""
return run_awaitable(work())

def run_callable(work: Callable[[], ResultType]) -> ResultType:
"""Set the active worker, and call the callable."""
active_worker.set(self)
return work()

if (
inspect.iscoroutinefunction(self._work)
or hasattr(self._work, "func")
and inspect.iscoroutinefunction(self._work.func)
):
# Coroutine, we can await it.
result: ResultType = await self._work()
runner = run_coroutine
elif inspect.isawaitable(self._work):
result = await self._work
runner = run_awaitable
elif callable(self._work):
runner = run_callable
else:
assert callable(self._work)
loop = asyncio.get_running_loop()
raise WorkerError("Unsupported attempt to run a thread worker")

def run_work(work: Callable[[], ResultType]) -> ResultType:
"""Set the active worker, and run the work."""
active_worker.set(self)
return work()
return await asyncio.get_running_loop().run_in_executor(
None, runner, self._work
)

async def _run_async(self) -> ResultType:
"""Run an async worker.

Returns:
Return value of the work.
"""
if (
inspect.iscoroutinefunction(self._work)
or hasattr(self._work, "func")
and inspect.iscoroutinefunction(self._work.func)
):
return await self._work()
elif inspect.isawaitable(self._work):
return await self._work
elif callable(self._work):
raise WorkerError("Request to run a non-async function as an async worker")
raise WorkerError("Unsupported attempt to run an async worker")

result = await loop.run_in_executor(None, run_work, self._work)
return result
async def run(self) -> ResultType:
"""Run the work.

Implement this method in a subclass, or pass a callable to the constructor.

Returns:
Return value of the work.
"""
return await (
self._run_threaded() if self._thread_worker else self._run_async()
)

async def _run(self, app: App) -> None:
"""Run the worker.
Expand Down
33 changes: 0 additions & 33 deletions tests/test_work_decorator.py

This file was deleted.

Loading