Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync and async_as_sync dispatch_to_thread variants #15705

Closed
wants to merge 1 commit into from

Conversation

tlively
Copy link
Member

@tlively tlively commented Dec 3, 2021

The new emscripten_dispatch_to_thread_sync* functions are more general
versions of the existing emscripten_sync_run_in_main_thread that can dispatch
to any thread and can dispatch any workload, not just em_queued_calls. They
are transparently implemented in terms of the even more general
emscripten_dispatch_to_thread_async_as_sync functions, which synchronously
wait until emscripten_async_as_sync_finish is explicitly called on the context
handle they pass to the user-provided function. These functions allow
synchronously waiting for not just a single dispatched function, but arbitrary
and potentially asynchronous work on the target thread.

These functions will be useful for providing a synchronous interface to file
system functions implemented in terms of asynchronous Web APIs on a dedicated
worker thread.

The new `emscripten_dispatch_to_thread_sync*` functions are more general
versions of the existing `emscripten_sync_run_in_main_runtime_thread` that can
dispatch to any thread and can dispatch any workload, not just
`em_queued_call`s. They are transparently implemented in terms of the even more
general `emscripten_dispatch_to_thread_async_as_sync` functions, which
synchronously wait until `emscripten_async_as_sync_finish` is explicitly called
on the context handle they pass to the user-provided function. These functions
allow synchronously waiting for not just a single dispatched function, but
arbitrary and potentially asynchronous work on the target thread.

These functions will be useful for providing a synchronous interface to file
system functions implemented in terms of asynchronous Web APIs on a dedicated
worker thread.
return emscripten_dispatch_to_thread_ptr(target_thread, _do_call, q);
if (emscripten_dispatch_to_thread_ptr(target_thread, _do_call, q)) {
return 1;
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No else after return.

if (!emscripten_dispatch_to_thread_async_ptr(target_thread, _do_call, q)) {
if (emscripten_dispatch_to_thread_async_ptr(target_thread, _do_call, q)) {
return 1;
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

@@ -2417,6 +2417,13 @@ def test_pthread_specific(self):
def test_pthread_equal(self):
self.do_run_in_out_file_test('pthread/test_pthread_equal.cpp')

@node_pthreads
@no_asan('Test depends on EXIT_RUNTIME=0, which is incompatible with ASan')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it depend on EXIT_RUNTIME=0? Doesn't emscripten_set_timeout keep the runtime alive? Do you know about emscripten_exit_with_live_runtime which can keep the runtime alive until emscripten_force_exit?


// Similar to emscripten_dispatch_to_thread, but waits on the dispatching thread
// until `emscripten_async_as_sync_finish` is called on the
// `em_async_as_sync_ctx` context pointer supplied to `func`. The first argument
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the term async_as_sync to be a bit of a mouthful... but I don't have any great suggestions for something better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding some text here to explain the motivation would help: that it can do an async operation on the called thread, while behaving sync on the calling thread, and that async Web APIs are a common use case for it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing emscripten_dispatch_to_thread_sync() function allows one to dispatch a task to another thread, and the calling/dispatching thread synchronously waits until the other thread runs the function and returns from that function call.

If I understand correctly, the idea with async_as_sync is to be able to delay the finish condition of the dispatched task? I.e. instead of the event of returning from the dispatched function signaling completion, the target thread will instead some time later manually call emscripten_async_as_sync_finish() to mark the dispatched call as finished? (to accommodate for async continuations from event handlers?)

If so, there does not seem to be a need to bake this feature as built-in to the proxying queue itself? It does have an effect of complicating the API quite a bit.

The major complication is the question about what should happen with the queue while there are one or more of these kinds of long-running async tasks pending? Should the queue retain them until they are asynchronously signaled to be finished? Or should they be removed from the async call queue already before they are signaled to be finished?

Users should be able to manually implement this kind of delayed completion signaling on top of synchronous or asynchronous dispatches via code like

int longRunningAsyncTaskOnCalledThread(int *notifyAddressOnCompletion)
{
   // do long running async stuff, capture notifyAddressOnCompletion somewhere.
   // ...
   // done: wake all waiters.
   __atomic_store_n(notifyAddressOnCompletion, 1, __ATOMIC_SEQ_CST);
   emscripten_futex_wake(notifyAddressOnCompletion, INT_MAX);
}

int callingThread()
{
  int addressNotifiedOnCompletion = 0;
  emscripten_dispatch_to_thread(target_thread, EM_FUNC_SIG_VI, longRunningAsyncTaskOnCalledThread, 0, &addressNotifiedOnCompletion);
  emscripten_futex_wait(&addressNotifiedOnCompletion, 0, INFINITY);
}

i.e. users can get this behavior through the call queue, so this kind of long running async task could be treated orthogonally and would not need to be a feature of the dispatch queue itself?

Copy link
Member Author

@tlively tlively Dec 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's exactly right. OTOH, the normal sync version of the API can be built on top of the async version of the API in the same way, but we still consider it useful enough to provide as a utility. It makes sense to provide a utility to satisfy this async/long running use case as well rather than forcing each user to implement it themselves. There is no appreciable complexity cost due to the layered nature of the implementation.

// A thread cannot both perform asynchronous work and synchronously wait for
// that work to be finished. If we were proxying to the current thread, the
// work must have been synchronous and should already be done.
assert(!pthread_equal(target_thread, pthread_self()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this assert no just be done once at the very top of the function?

int emscripten_dispatch_to_thread_sync_ptr(pthread_t target_thread,
void (*func)(void*),
void* arg);
int emscripten_dispatch_to_thread_sync_args(pthread_t target_thread,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other function we have in this file that do sync and async versions are of the following form:

emscripten_sync_run_in_main_runtime_thread
emscripten_async_run_in_main_runtime_thread

Should these therefore be called emscripten_sync_dispatch_to_thread_*?

I also wonder if they should be called something completely different such as emscripten_run_in_thread (really should emscripten_run_on_thread I guess). The word dispatch sounds inherently async to me.. maybe maybe I'm wrong about that?

#ifdef __wasm32__
q->args[0].i = (int)ctx;
#else
#ifdef __wasm64__
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elif defined

@sbc100
Copy link
Collaborator

sbc100 commented Dec 3, 2021

I think the big picture is that this stuff is great, but I think we need to think a litte more more about the naming.

IIRC these new functions re-use the emscripten_dispatch_to_thread mechanism internally, but I don't think they need to be called emscripten_dispatch_to_thread... themselves. How about just emscripten_run_on_thread and emscripten_run_on_thread_with_ctx? I especially don't like the async_as_sync terminology as I don't think it conveys what this function is really all about. IIRC this function is about giving the receiver the choice about when to make a given job as done, is that right? Its just a version emscripten_run_on_thread where the receiver can and must choose when signal completion?

It makes me sad to have such a mixed bag of naming conventions in this file, but I guess it a full renaming of the existing functions is probably not an option.


// Similar to emscripten_dispatch_to_thread, but waits on the dispatching thread
// until `emscripten_async_as_sync_finish` is called on the
// `em_async_as_sync_ctx` context pointer supplied to `func`. The first argument
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding some text here to explain the motivation would help: that it can do an async operation on the called thread, while behaving sync on the calling thread, and that async Web APIs are a common use case for it?

case EM_FUNC_SIG_PARAM_D:
q->args[i].d = va_arg(args, double);
break;
if (i >= start) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not have the loop on the previous line start from i = start?

if (emscripten_dispatch_to_thread_ptr(target_thread, _do_call, q)) {
return 1;
} else {
em_queued_call_free(q);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new free is not part of a new API - it looks like a bugfix for an existing API, is that right?

return 0;
}

q->calleeDelete = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is calleeDelete, why do we free on line 848?

ctx->func = func;
ctx->arg = arg;
pthread_mutex_init(&ctx->mutex, NULL);
pthread_cond_init(&ctx->cond, NULL);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if these take non-trivial time we could store them as thread local perhaps. TODO?

#include "emscripten/threading.h"

// Disable leak checking since we have allocations that deliberately outlive
// `main`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this in any other pthreads test - why is this new in this PR?


void start_and_finish_running_widget(em_async_as_sync_ctx* ctx, void* arg) {
((widget*)arg)->ctx = ctx;
finish_running_widget(arg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually do an async operation on the called thread. How about using emscripten_async_call()?

@tlively
Copy link
Member Author

tlively commented Dec 3, 2021

The immediate purpose of this PR is to provide the synchronous proxying functionality we need for WasmFS. Currently we are using a bespoke proxying utility, but we would prefer to use standard APIs.

While we're adding new proxying APIs, we might as well make them more generally useful and provide a well-lit path away from the footguns of the current system (#14292).

I propose a new set of APIs that allow users to allocate and manage their own proxying queues for general purpose work. The existing API can be reimplemented in terms of the new API and the existing work queue can become a special instance of the new queue concept. Bikeshedding of semantics and names welcome.

// emscripten/proxying.h

// Opaque handle to a set of thread-local work queues to which work can be
// asynchronously or synchronously proxied from other threads. When work is
// proxied to a queue on a particular thread, that thread is notified to start
// processing work from that queue if it is not already doing so.
struct em_proxying_queue;

// Create and destroy proxying queues.
em_proxying_queue* em_proxying_queue_create();
void em_proxying_queue_destroy(em_proxying_queue* q);

// Get the queue used for proxying low-level runtime work. Work on this queue
// may be processed at any time inside system functions, so it must be
// nonblocking and safe to run at any time, similar to a native signal handler.
em_proxying_queue* emscripten_proxy_get_system_queue();

// Execute all the tasks enqueued for the current thread on the given queue.
void emscripten_proxy_execute_queue(em_proxying_queue* q);

// Opaque handle to a currently-executing proxied task, used to signal the end
// of the task.
struct em_proxying_ctx;

// Signal the end of a proxied task.
void emscripten_proxy_finish(em_proxying_ctx* ctx);

// Enqueue `func` on the given queue and thread and return immediately. Returns
// 1 if the work was successfully enqueued and the target thread notified or 0
// otherwise.
int emscripten_proxy_async(em_proxying_queue* q,
                           pthread_t target_thread,
                           void (*func)(void*),
                           void* arg);

// Enqueue `func` on the given queue and thread and wait for it to finish
// executing before returning. Returns 1 if the task was successfully completed
// and 0 otherwise.
int emscripten_proxy_sync(em_proxying_queue* q,
                          pthread_t target_thread,
                          void (*func)(void*),
                          void* arg);

// Enqueue `func` on the given queue and thread and wait for it to be executed
// and for the task to be marked finished with `emscripten_proxying_finish`
// before returning. Returns 1 if the task was successfully completed and 0
// otherwise.
int emscripten_proxy_sync_with_ctx(em_proxying_queue* q,
                                   pthread_t target_thread,
                                   void (*func)(em_proxying_ctx*, void*),
                                   void* arg);

@sbc100
Copy link
Collaborator

sbc100 commented Dec 4, 2021

I like the proposed new header and new naming. Can you have all the functions in the new header start with emscripten_proxy_..? This is something I wish we had done with other APIs but didn't.

@sbc100
Copy link
Collaborator

sbc100 commented Dec 4, 2021

I like the proposed new header and new naming. Can you have all the functions in the new header start with emscripten_proxy_..? This is something I wish we had done with other APIs but didn't.

For the three methods of en-queuing work how about giving them similar names, and how about for the async/non-blocking one we allow an optional callback (or a way to poll for completion?):

// optional completion_callback is called with user_data when job is done.
int emscripten_proxy_async(em_proxy_work_queue* q,
                                             pthread_t target_thread,
                                             void (*func)(void*),
                                             void* arg
                                             void (*completion_callback)(void*), void* user_data
);
int emscripten_proxy_sync(em_proxy_work_queue* q,
                                           pthread_t target_thread,
                                           void (*func)(void*),
                                           void* arg);
int emscripten_proxy_sync_with_ctx(em_proxy_work_queue* q,
                                                           pthread_t target_thread,
                                                           void (*func)(em_proxying_ctx*, void*),
                                                           void* arg);

@tlively
Copy link
Member Author

tlively commented Dec 6, 2021

Thanks for the suggestions, @sbc100. I've edited the comment above to reflect them.

@tlively
Copy link
Member Author

tlively commented Dec 9, 2021

I'm closing this PR in favor of the plan laid out in #15705 (comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants