diff --git a/base/task.jl b/base/task.jl index 1985fc03b9611..02d1a4dbccb81 100644 --- a/base/task.jl +++ b/base/task.jl @@ -409,7 +409,7 @@ end function enq_work(t::Task) (t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable") - if t.sticky + if t.sticky || Threads.nthreads() == 1 tid = Threads.threadid(t) if tid == 0 tid = Threads.threadid() diff --git a/src/gc.c b/src/gc.c index 785018c5a5b05..5a848ba1c08ad 100644 --- a/src/gc.c +++ b/src/gc.c @@ -2500,7 +2500,9 @@ static void mark_roots(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) gc_mark_queue_obj(gc_cache, sp, jl_main_module); // tasks +#ifdef JULIA_ENABLE_THREADING jl_gc_mark_enqueued_tasks(gc_cache, sp); +#endif // invisible builtin values if (jl_an_empty_vec_any != NULL) diff --git a/src/partr.c b/src/partr.c index 4c8fc326e2195..672de71fcccb2 100644 --- a/src/partr.c +++ b/src/partr.c @@ -14,8 +14,6 @@ extern "C" { #endif -#define JULIA_ENABLE_PARTR - #ifdef JULIA_ENABLE_THREADING // GC functions used @@ -176,6 +174,13 @@ static inline jl_task_t *multiq_deletemin(void) } +void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) +{ + for (int16_t i = 0; i < heap_p; ++i) + for (int16_t j = 0; j < heaps[i].ntasks; ++j) + jl_gc_mark_queue_obj_explicit(gc_cache, sp, (jl_value_t *)heaps[i].tasks[j]); +} + // parallel task runtime // --- @@ -217,8 +222,18 @@ void jl_threadfun(void *arg) jl_finish_task(jl_current_task, jl_nothing); // noreturn } +// enqueue the specified task for execution +JL_DLLEXPORT void jl_enqueue_task(jl_task_t *task) +{ + multiq_insert(task, task->prio); +} + +#endif // JULIA_ENABLE_THREADING + + JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) { +#ifdef JULIA_ENABLE_THREADING jl_ptls_t ptls = jl_get_ptls_states(); /* ensure thread tid is awake if necessary */ if (ptls->tid != tid && !_threadedregion && tid != -1) { @@ -230,13 +245,9 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) jl_wake_libuv(); else uv_stop(jl_global_event_loop()); -} - - -// enqueue the specified task for execution -JL_DLLEXPORT void jl_enqueue_task(jl_task_t *task) -{ - multiq_insert(task, task->prio); +#else + uv_stop(jl_global_event_loop()); +#endif } @@ -246,7 +257,11 @@ static jl_task_t *get_next_task(jl_value_t *getsticky) jl_task_t *task = (jl_task_t*)jl_apply(&getsticky, 1); if (jl_typeis(task, jl_task_type)) return task; +#ifdef JULIA_ENABLE_THREADING return multiq_deletemin(); +#else + return NULL; +#endif } @@ -277,6 +292,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) #endif } } +#ifdef JULIA_ENABLE_THREADING else { int sleepnow = 0; uv_mutex_lock(&sleep_lock); @@ -293,6 +309,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) jl_gc_safe_leave(ptls, gc_state); } } +#endif } else { if (++spin_count > 1000 && jl_atomic_load(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) { @@ -317,16 +334,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) } } - -void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) -{ - for (int16_t i = 0; i < heap_p; ++i) - for (int16_t j = 0; j < heaps[i].ntasks; ++j) - jl_gc_mark_queue_obj_explicit(gc_cache, sp, (jl_value_t *)heaps[i].tasks[j]); -} - -#endif // JULIA_ENABLE_THREADING - #ifdef __cplusplus } #endif diff --git a/src/threading.c b/src/threading.c index 67f9317d6f136..d3d91c7b9abe8 100644 --- a/src/threading.c +++ b/src/threading.c @@ -505,19 +505,23 @@ JL_DLLEXPORT void jl_threading_run(jl_value_t *func) args2[0] = schd_func; args2[1] = (jl_value_t*)t; jl_apply(args2, 2); +#ifdef JULIA_ENABLE_THREADING if (i == 1) { // let threads know work is coming (optimistic) uv_mutex_lock(&sleep_lock); uv_cond_broadcast(&sleep_alarm); uv_mutex_unlock(&sleep_lock); } +#endif } +#ifdef JULIA_ENABLE_THREADING if (nthreads > 2) { // let threads know work is ready (guaranteed) uv_mutex_lock(&sleep_lock); uv_cond_broadcast(&sleep_alarm); uv_mutex_unlock(&sleep_lock); } +#endif // join with all tasks JL_TRY { for (int i = 0; i < nthreads; i++) {