Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repeatedly running run_in_worker_thread #267

Closed
matham opened this issue Aug 5, 2017 · 4 comments
Closed

Repeatedly running run_in_worker_thread #267

matham opened this issue Aug 5, 2017 · 4 comments

Comments

@matham
Copy link
Contributor

matham commented Aug 5, 2017

I have been thinking how to upgrade some of my tools to use async, specifically trio, now that I switched to py3 (finally). However, I have run into some conceptual threading issues that I'm not sure the best way to solve.

My tool runs scientific experiments and currently does it all in a callback framework (kivy). So e.g. it runs through a series of stages that does various things. More importantly, it opens a bunch of devices in secondary threads which continuously read or write to them in a blocking manner and update the main thread.

Trio is perfect for the stages part because I can just await each stage to finish in a tree like manner (stages have sub-stages etc). The devices part is where it's more difficult. Obviously I can run run_in_worker_thread for each read and write, but the problem is two-fold, a) the cost of spawning a thread for every r/w and b) I would like to run all operations on the same device in the same thread.

In the ideal world it'd look like something like this in trio. For every device,

  • At the start:
dev = MyDevice(...)
await run_in_worker_thread(dev.start, thread_id=dev)

Then later someplace:

async for port_value in run_in_worker_thread_repeatedly(dev.read_port, thread_id=dev):
    dev.port_value = port_value

# or
while True:  # this would get canceled by a parent nursery
    dev.port_value = await run_in_worker_thread(dev.read_port, thread_id=dev)

And elsewhere at the same time:

await run_in_worker_thread(dev.write_port, 0x22, thread_id=dev)

I guess this is not that different from having trio interfacing with an event loop running in its own (or even the main) thread. I had initially written something like that using the python async library callbacks so I could await in the main thread a task running in an event loop in a different thread before I found trio. I can see why that may not mash with your idea that in the long term there would be no blocking calls so there would be no need for this. And also that interfacing with a second (and third) event loop which would require callbacks makes things ugly, rather than running things in a very temporary and singular run_in_worker_thread call. But I'm not sure how to make my use case work in trio given the two issues I mentioned.

@njsmith
Copy link
Member

njsmith commented Aug 8, 2017

It's hard to give specific advice without knowing more about your setup, but in general there are a few possible strategies here.

First, all else being equal, if you can avoid juggling threads altogether by doing everything inside trio, then that's great -- it makes your life simpler. (That's what th docs about run_in_worker_thread are trying to say – I don't really believe we'll eliminate all need for threads any time soon :-).) But sure, if you have a large existing codebase using threads then obviously all else is not equal, and it totally makes sense you wouldn't want to rewrite all of it.

Then if you're using threads, there are two general approaches: you could stay in trio most of the time, and only use run_in_worker_thread occasionally to dispatch small operations. Or, you could run_in_worker_thread to spawn a long-running thread (obviously you'd want to put this in its own task somewhere near the root of your task tree), and communicate back and forth between the thread and trio using some combination of run_in_trio_thread / await_in_trio_thread / run_in_worker_thread. (For example, you could have a trio.Queue that you use to send messages back and forth, and use it directly in the trio thread and via {run,await}_in_trio_thread in the worker thread. Or you could use a queue.Queue, and when you want to call a blocking operation like Queue.get from the trio thread you can put that in a run_in_worker_thread.)

Any mechanism for crossing back and forth between threads is going to have some cost, unfortunately -- have you measured it and found it problematic? For example, trio's run_in_worker_thread does currently spawn a new thread for each operation, but on my laptop it's just as fast as the standard library's concurrent.futures.ThreadExecutor, which uses a dedicated thread pool.

Why does it matter that all the device operations use the same thread?

In your example code, you have two different tasks trying to assign a read_port and a write_port operation to the same thread at the same time – what are you imagining would happen here?

@matham
Copy link
Contributor Author

matham commented Aug 9, 2017

First, thanks for the response!

Regarding using a queue, I didn't want to do that because I liked and wanted the exception injection in a task offered by run_in_worker_thread. If I did stuff with a queue and passed the results back I would lose that.

Regarding why I want to run them in the same thread, that's because a call may block and I don't want devices to block each other. I.e. I don't mind a device blocking other calls to the same device, but I don't want that to leak between devices. Similarly, I don't want to have more than one thread per device because I'd rather multiple calls to the same device line up than spawn more threads due to the dubious multi-threaded safety of the underlying devices.

With regards to doing a read_port and write_port simultaneously, there is a actually generally a input and output port on a device so it's not writing and reading to the same port :)

Anyway, following is an implementation of what I wanted:

import threading
import queue as stdlib_queue
import trio


class Executor(object):

    _thread = None

    name = 'Executor'

    _exec_queue = None

    def __del__(self):
        self.stop_executor(block=False)

    def start_executor(self):
        queue = self._exec_queue = stdlib_queue.Queue()
        # daemon=True because it might get left behind if we cancel, and in
        # this case shouldn't block process exit.
        thread = self._thread = threading.Thread(
            target=self.worker_thread_fn,  name=self.name,  daemon=True,
            args=(queue, ))
        thread.start()

    def stop_executor(self, block=True):
        if not self._thread:
            return

        if not self._exec_queue:
            self._thread = None
            return

        self._exec_queue.put('eof', block=block)
        if block:
            self._thread.join()

        self._thread = self._exec_queue = None

    def report_back_in_trio_thread_fn(self, task_container):
        # This function gets scheduled into the trio run loop to deliver the
        # thread's result.
        def do_release_then_return_result():
            # if canceled, do the cancellation otherwise the result.
            if task_container[1] is not None:
                task_container[1]()
            return task_container[2].unwrap()

        result = trio.Result.capture(do_release_then_return_result)
        trio.hazmat.reschedule(task_container[0], result)

    def worker_thread_fn(self, queue):
        # This is the function that runs in the worker thread to do the actual
        # work and then schedule the calls to report_back_in_trio_thread_fn
        while True:
            sync_fn, args, task_container, call_soon = queue.get(block=True)
            if sync_fn == 'eof':
                return

            if task_container[1] is None:
                task_container[2] = trio.Result.capture(sync_fn, *args)

            try:
                call_soon(self.report_back_in_trio_thread_fn, task_container)
            except trio.RunFinishedError:
                # The entire run finished, so our particular tasks are certainly
                # long gone - it must have cancelled. Continue eating the queue.
                raise  # pass

    @trio.hazmat.enable_ki_protection
    async def run_in_worker_thread(self, sync_fn, *args, cancellable=False):
        await trio.hazmat.yield_if_cancelled()
        # Holds a reference to the task that's blocked in this function waiting
        # for the result as well as to the cancel callback and the result
        # (when not canceled).
        if self._thread is None:
            self.start_executor()
        task_container = [trio.current_task(), None, None]
        self._exec_queue.put(
            (sync_fn, args, task_container,
             trio.hazmat.current_call_soon_thread_and_signal_safe()))

        def abort(raise_cancel):
            if cancellable:
                task_container[1] = raise_cancel
            return trio.hazmat.Abort.FAILED
        return await trio.hazmat.yield_indefinitely(abort)


if __name__ == '__main__':
    executor = Executor()

    def get_value(value):
        return value

    async def run_sequence():
        for i in range(10):
            print(await executor.run_in_worker_thread(get_value, i))

    async def execute_thread(i):
        print(await executor.run_in_worker_thread(get_value, i))

    async def spawner():
        async with trio.open_nursery() as nursery:
            for i in range(10):
                nursery.spawn(execute_thread, i)

    trio.run(run_sequence)
    print('done run_sequence')
    trio.run(spawner)
    print('done spawner')

Which when run prints

0
1
2
3
4
5
6
7
8
9
done run_sequence
9
7
6
0
3
8
1
2
4
5
done spawner

Here's a timeit result compared to the trio one:

from timeit import timeit
import trio
from src.playground import Executor

executor = Executor()


def get_value(value):
    return value

async def run_sequence():
    for i in range(1000):
        (await executor.run_in_worker_thread(get_value, i))

async def run_sequence_trio():
    for i in range(1000):
        (await trio.run_in_worker_thread(get_value, i))


print(timeit(stmt='trio.run(run_sequence)', setup='from __main__ import executor, trio, run_sequence, run_sequence_trio', number=10))
print(timeit(stmt='trio.run(run_sequence_trio)', setup='from __main__ import executor, trio, run_sequence, run_sequence_trio', number=10))

prints:

1.4945311561727652
3.339993947649539

Now, back to my original request, I was hoping that I (or trio) could implement the Executor class. The reason being that I didn't know the existence of reschedule and yield_indefinitely. That's exactly what I wanted and what I had done in stdlib async with their callbacks.

I really appreciate the level of detail of the docs written in trio :) but I do think the it could benefit of some trimming because it is somewhat difficult to get a broad picture of the trio api. Maybe for now just listing all the api signatures would provide the best of both worlds. Sphinx doesn't make it much easy to collapse api docs (I really wish they had a template that did that) so you can just see the signatures. Although I guess it was mostly the DANGER DANGER DANGER that made me ignore that page :D

But I started out with your run_in_worker_thread and modified it, which let me see what I needed to do. Notice that I did cancel a little different than you. I wonder if there are downsides of this compared to how you did run_in_worker_thread and why you didn't do it like I did (delay cancel and raise it in the task later)?

The reason why I think this executor example would be useful is not only my use case that I described above, but with some modifications you can use it to schedule an event in an existing event loop framework (e.g. when you're using some gui framework that provides one) and have a trio task proxy it fully. This is really what I wanted; a high level bullet proof way of proxying an event scheduled in another even loop with a trio task. E.g. have a trio task interface that a event loop designer can call when their event is exec or canceled or creates an exception that passes it on to the trio task. Then, a user scheduling a callback with the framework's event loop can also await it. This would be great for compatibility reasons.

Looking at my executor class, it is not "that" hard to implement, especially working by example. But there is enough things one needs to look out for, that having such an example pattern from which to start from would be beneficial for framework authors. I suppose run_in_worker_thread is it :)

@njsmith
Copy link
Member

njsmith commented Aug 15, 2017

Regarding why I want to run them in the same thread, that's because a call may block and I don't want devices to block each other. I.e. I don't mind a device blocking other calls to the same device, but I don't want that to leak between devices. Similarly, I don't want to have more than one thread per device because I'd rather multiple calls to the same device line up than spawn more threads due to the dubious multi-threaded safety of the underlying devices.

It sounds like using a trio.Lock for each device might be a simpler way to accomplish this?

[2x speedup for your class over trio.run_in_worker_thread]

That's pretty interesting. What platform did you use to measure this? Windows?

I wonder if there are downsides of this compared to how you did run_in_worker_thread and why you didn't do it like I did (delay cancel and raise it in the task later)?

Just because if the operation has already completed successfully, then it wasn't cancelled :-). As much as possible for the low-level trio operations I try to maintain the rule that cancelled means that the operation had no effect. Like if a timeout expires but before you can react to the timeout, the operation completes successfully... why throw away that work and pretend it failed? it's already done :-).

The reason why I think this executor example would be useful is not only my use case that I described above, but with some modifications you can use it to schedule an event in an existing event loop framework (e.g. when you're using some gui framework that provides one) and have a trio task proxy it fully. This is really what I wanted; a high level bullet proof way of proxying an event scheduled in another even loop with a trio task

Right, this is definitely interesting! It does require some case-by-case work of course though. You don't actually want to use a queue to communicate between trio and kivy, for example, because you don't want to do blocking queue operations in either thread, plus they both have a solid set of non-blocking tools to work with. Instead you should use call_soon as the low-level primitive for sending stuff from the kivy thread to trio, and it looks like kivy.clock.Clock.schedule_once() as the low-level primitive for sending stuff from trio to kivy, and then build run_in_kivy_thread on top of those. (I don't know enough about kivy's idioms to know what would be an idiomatic API for run_in_trio_thread_from_kivy. It sounds like you're mostly interested in the other direction anyway.) And this kind of problem in building fundamental plumbing is exactly why yield_indefinitely and friends are exposed in trio.hazmat.

Regarding your comments on docs and examples: the feedback is definitely appreciated. I may not be able to do too much about it in the immediate future because of limited hours in a day and because I'm still trying to focus on getting trio feature-complete :-). But it's useful to hear in any case. (And if you have more specific suggestions or get an itch and decide to submit some edits as a PR, that'd be very welcome too.)

Hope that helps!

@matham
Copy link
Contributor Author

matham commented Aug 22, 2017

It sounds like using a trio.Lock for each device might be a simpler way to accomplish this?

Yeah, I could have a lock per device and then not care which threads they run on as long as there are enough threads. But I do like the design of thread per device so everything is self contained and I don't have to worry about devices blocking each other if there are not enough threads or spawning more threads that really needed.

[2x speedup for your class over trio.run_in_worker_thread]

That's pretty interesting. What platform did you use to measure this? Windows?

Yes, I'm on windows 10.

Just because if the operation has already completed successfully, then it wasn't cancelled :-). As much as possible for the low-level trio operations I try to maintain the rule that cancelled means that the operation had no effect. Like if a timeout expires but before you can react to the timeout, the operation completes successfully... why throw away that work and pretend it failed? it's already done :-).

Right, I couldn't be sure where exactly we were when cancel was called. I.e. when cancel is called in the trio thread the other thread may or may not have executed the task (I check for canceling in the second thread before executing the task). It would be a lot more complicated to try to make sure to actually only raise the cancel exception if it was cancelled before the second thread executed the task.

And I guess for my use case, if I canceled one device, other devices may have also been canceled and closed and had other side effects. So imagine that I don't raise cancel until the next checkpoint and continue processing the result from the second thread because we pretended it wasn't cancelled. Now, I'm in state though where the overall program state is canceled, yet I still continue to process the result for this task as if it isn't, which may cause issues. I'd rather once cancel is called, nothing proceeds.

Of course with well designed code this may not be an issue, but still, I'd just rather avoid this issue by canceling aggressively.

I'm closing the issue as it's pretty much resolved for me.

@matham matham closed this as completed Aug 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants