diff --git a/include/uv.h b/include/uv.h index b4fc333357..57ce8ae5ad 100644 --- a/include/uv.h +++ b/include/uv.h @@ -330,7 +330,7 @@ typedef void (*uv_exit_cb)(uv_process_t*, int exit_status, int term_signal); typedef void (*uv_walk_cb)(uv_handle_t* handle, void* arg); typedef void (*uv_fs_cb)(uv_fs_t* req); typedef void (*uv_work_cb)(uv_work_t* req); -typedef void (*uv_after_work_cb)(uv_work_t* req); +typedef void (*uv_after_work_cb)(uv_work_t* req, int status); typedef void (*uv_getaddrinfo_cb)(uv_getaddrinfo_t* req, int status, struct addrinfo* res); @@ -1394,6 +1394,17 @@ UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_t* req, * Only cancellation of uv_fs_t, uv_getaddrinfo_t and uv_work_t requests is * currently supported. * + * Cancelled requests have their callbacks invoked some time in the future. + * It's _not_ safe to free the memory associated with the request until your + * callback is called. + * + * Here is how cancellation is reported to your callback: + * + * - A uv_fs_t request has its req->errorno field set to UV_ECANCELED. + * + * - A uv_work_t or uv_getaddrinfo_t request has its callback invoked with + * status == -1 and uv_last_error(loop).code == UV_ECANCELED. + * * This function is currently only implemented on UNIX platforms. On Windows, * it always returns -1. */ diff --git a/src/unix/fs.c b/src/unix/fs.c index 493bdc2ff7..6c6faf53cd 100644 --- a/src/unix/fs.c +++ b/src/unix/fs.c @@ -522,16 +522,17 @@ static void uv__fs_done(struct uv__work* w, int status) { req = container_of(w, uv_fs_t, work_req); uv__req_unregister(req->loop, req); - if (status != 0) { - uv_fs_req_cleanup(req); - return; - } - if (req->errorno != 0) { req->errorno = uv_translate_sys_error(req->errorno); uv__set_artificial_error(req->loop, req->errorno); } + if (status == -UV_ECANCELED) { + assert(req->errorno == 0); + req->errorno = UV_ECANCELED; + uv__set_artificial_error(req->loop, UV_ECANCELED); + } + if (req->cb != NULL) req->cb(req); } diff --git a/src/unix/getaddrinfo.c b/src/unix/getaddrinfo.c index d021f46f74..7f147291db 100644 --- a/src/unix/getaddrinfo.c +++ b/src/unix/getaddrinfo.c @@ -67,9 +67,6 @@ static void uv__getaddrinfo_done(struct uv__work* w, int status) { req->service = NULL; req->hostname = NULL; - if (status != 0) - return; - if (req->retcode == 0) { /* OK */ #if EAI_NODATA /* FreeBSD deprecated EAI_NODATA */ @@ -87,6 +84,12 @@ static void uv__getaddrinfo_done(struct uv__work* w, int status) { req->loop->last_err.sys_errno_ = req->retcode; } + if (status == -UV_ECANCELED) { + assert(req->retcode == 0); + req->retcode = UV_ECANCELED; + uv__set_artificial_error(req->loop, UV_ECANCELED); + } + req->cb(req, req->retcode, res); } diff --git a/src/unix/threadpool.c b/src/unix/threadpool.c index 3f6fe38028..ee428201a5 100644 --- a/src/unix/threadpool.c +++ b/src/unix/threadpool.c @@ -20,6 +20,7 @@ */ #include "internal.h" +#include static uv_once_t once = UV_ONCE_INIT; static uv_cond_t cond; @@ -30,6 +31,11 @@ static ngx_queue_t wq; static volatile int initialized; +static void uv__cancelled(struct uv__work* w) { + abort(); +} + + /* To avoid deadlock with uv_cancel() it's crucial that the worker * never holds the global mutex and the loop-local mutex at the same time. */ @@ -149,8 +155,10 @@ int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { if (!cancelled) return -1; - ngx_queue_init(&w->wq); - w->done(w, -UV_ECANCELED); + w->work = uv__cancelled; + uv_mutex_lock(&loop->wq_mutex); + ngx_queue_insert_tail(&loop->wq, &w->wq); + uv_mutex_unlock(&loop->wq_mutex); return 0; } @@ -161,6 +169,7 @@ void uv__work_done(uv_async_t* handle, int status) { uv_loop_t* loop; ngx_queue_t* q; ngx_queue_t wq; + int err; loop = container_of(handle, uv_loop_t, wq_async); ngx_queue_init(&wq); @@ -177,7 +186,8 @@ void uv__work_done(uv_async_t* handle, int status) { ngx_queue_remove(q); w = container_of(q, struct uv__work, wq); - w->done(w, 0); + err = (w->work == uv__cancelled) ? -UV_ECANCELED : 0; + w->done(w, err); } } @@ -190,15 +200,18 @@ static void uv__queue_work(struct uv__work* w) { static void uv__queue_done(struct uv__work* w, int status) { - uv_work_t* req = container_of(w, uv_work_t, work_req); + uv_work_t* req; + req = container_of(w, uv_work_t, work_req); uv__req_unregister(req->loop, req); - if (status != 0) + if (req->after_work_cb == NULL) return; - if (req->after_work_cb) - req->after_work_cb(req); + if (status == -UV_ECANCELED) + uv__set_artificial_error(req->loop, UV_ECANCELED); + + req->after_work_cb(req, status ? -1 : 0); } diff --git a/src/win/threadpool.c b/src/win/threadpool.c index 2452bdb044..1446878cce 100644 --- a/src/win/threadpool.c +++ b/src/win/threadpool.c @@ -78,5 +78,5 @@ int uv_cancel(uv_req_t* req) { void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) { uv__req_unregister(loop, req); if(req->after_work_cb) - req->after_work_cb(req); + req->after_work_cb(req, 0); } diff --git a/test/test-threadpool-cancel.c b/test/test-threadpool-cancel.c index e009e0c0f1..db0397afe0 100644 --- a/test/test-threadpool-cancel.c +++ b/test/test-threadpool-cancel.c @@ -41,8 +41,12 @@ static uv_cond_t signal_cond; static uv_mutex_t signal_mutex; static uv_mutex_t wait_mutex; static unsigned num_threads; +static unsigned fs_cb_called; +static unsigned work_cb_called; static unsigned done_cb_called; +static unsigned done2_cb_called; static unsigned timer_cb_called; +static unsigned getaddrinfo_cb_called; static void work_cb(uv_work_t* req) { @@ -52,10 +56,12 @@ static void work_cb(uv_work_t* req) { uv_mutex_lock(&wait_mutex); uv_mutex_unlock(&wait_mutex); + + work_cb_called++; } -static void done_cb(uv_work_t* req) { +static void done_cb(uv_work_t* req, int status) { done_cb_called++; free(req); } @@ -82,7 +88,6 @@ static void saturate_threadpool(void) { */ if (uv_cond_timedwait(&signal_cond, &signal_mutex, 350 * 1e6)) { ASSERT(0 == uv_cancel((uv_req_t*) req)); - free(req); break; } } @@ -96,15 +101,40 @@ static void unblock_threadpool(void) { static void cleanup_threadpool(void) { - ASSERT(done_cb_called == num_threads); + ASSERT(done_cb_called == num_threads + 1); /* +1 == cancelled work req. */ + ASSERT(work_cb_called == num_threads); + uv_cond_destroy(&signal_cond); uv_mutex_destroy(&signal_mutex); uv_mutex_destroy(&wait_mutex); } -static void fail_cb(/* empty */) { - ASSERT(0 && "fail_cb called"); +static void fs_cb(uv_fs_t* req) { + ASSERT(req->errorno == UV_ECANCELED); + uv_fs_req_cleanup(req); + fs_cb_called++; +} + + +static void getaddrinfo_cb(uv_getaddrinfo_t* req, + int status, + struct addrinfo* res) { + ASSERT(UV_ECANCELED == uv_last_error(req->loop).code); + ASSERT(UV_ECANCELED == status); + getaddrinfo_cb_called++; +} + + +static void work2_cb(uv_work_t* req) { + ASSERT(0 && "work2_cb called"); +} + + +static void done2_cb(uv_work_t* req, int status) { + ASSERT(uv_last_error(req->loop).code == UV_ECANCELED); + ASSERT(status == -1); + done2_cb_called++; } @@ -131,15 +161,23 @@ TEST_IMPL(threadpool_cancel_getaddrinfo) { struct cancel_info ci; struct addrinfo hints; uv_loop_t* loop; + int r; INIT_CANCEL_INFO(&ci, reqs); loop = uv_default_loop(); saturate_threadpool(); - ASSERT(0 == uv_getaddrinfo(loop, reqs + 0, fail_cb, "fail", NULL, NULL)); - ASSERT(0 == uv_getaddrinfo(loop, reqs + 1, fail_cb, NULL, "fail", NULL)); - ASSERT(0 == uv_getaddrinfo(loop, reqs + 2, fail_cb, "fail", "fail", NULL)); - ASSERT(0 == uv_getaddrinfo(loop, reqs + 3, fail_cb, "fail", NULL, &hints)); + r = uv_getaddrinfo(loop, reqs + 0, getaddrinfo_cb, "fail", NULL, NULL); + ASSERT(r == 0); + + r = uv_getaddrinfo(loop, reqs + 1, getaddrinfo_cb, NULL, "fail", NULL); + ASSERT(r == 0); + + r = uv_getaddrinfo(loop, reqs + 2, getaddrinfo_cb, "fail", "fail", NULL); + ASSERT(r == 0); + + r = uv_getaddrinfo(loop, reqs + 3, getaddrinfo_cb, "fail", NULL, &hints); + ASSERT(r == 0); ASSERT(0 == uv_timer_init(loop, &ci.timer_handle)); ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); @@ -163,12 +201,13 @@ TEST_IMPL(threadpool_cancel_work) { saturate_threadpool(); for (i = 0; i < ARRAY_SIZE(reqs); i++) - ASSERT(0 == uv_queue_work(loop, reqs + i, fail_cb, NULL)); + ASSERT(0 == uv_queue_work(loop, reqs + i, work2_cb, done2_cb)); ASSERT(0 == uv_timer_init(loop, &ci.timer_handle)); ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); ASSERT(0 == uv_run(loop)); ASSERT(1 == timer_cb_called); + ASSERT(ARRAY_SIZE(reqs) == done2_cb_called); cleanup_threadpool(); @@ -188,36 +227,37 @@ TEST_IMPL(threadpool_cancel_fs) { /* Needs to match ARRAY_SIZE(fs_reqs). */ n = 0; - ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fail_cb)); - ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fail_cb)); - ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fail_cb)); - ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fail_cb)); - ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fail_cb)); - ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fail_cb)); - ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fail_cb)); - ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fail_cb)); - ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fail_cb)); - ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fail_cb)); - ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fail_cb)); - ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fail_cb)); - ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb)); - ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fail_cb)); - ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fail_cb)); - ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fail_cb)); - ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fail_cb)); - ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fail_cb)); - ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb)); - ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fail_cb)); - ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fail_cb)); - ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fail_cb)); - ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fail_cb)); - ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fail_cb)); - ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fail_cb)); + ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fs_cb)); + ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fs_cb)); + ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fs_cb)); + ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fs_cb)); + ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fs_cb)); + ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fs_cb)); + ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fs_cb)); + ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fs_cb)); + ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb)); + ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fs_cb)); + ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fs_cb)); + ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fs_cb)); + ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fs_cb)); + ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb)); + ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fs_cb)); + ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fs_cb)); + ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fs_cb)); + ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fs_cb)); + ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fs_cb)); ASSERT(n == ARRAY_SIZE(reqs)); ASSERT(0 == uv_timer_init(loop, &ci.timer_handle)); ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); ASSERT(0 == uv_run(loop)); + ASSERT(n == fs_cb_called); ASSERT(1 == timer_cb_called); cleanup_threadpool(); diff --git a/test/test-threadpool.c b/test/test-threadpool.c index 2b22619611..bde9f47281 100644 --- a/test/test-threadpool.c +++ b/test/test-threadpool.c @@ -35,7 +35,8 @@ static void work_cb(uv_work_t* req) { } -static void after_work_cb(uv_work_t* req) { +static void after_work_cb(uv_work_t* req, int status) { + ASSERT(status == 0); ASSERT(req == &work_req); ASSERT(req->data == &data); after_work_cb_count++;