diff --git a/base/task.jl b/base/task.jl index 76395cab5d7d2..b659c9e8594d0 100644 --- a/base/task.jl +++ b/base/task.jl @@ -412,7 +412,7 @@ end function enq_work(t::Task) (t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable") tid = Threads.threadid(t) - if t.sticky || tid != 0 + if t.sticky || tid != 0 || Threads.nthreads() == 1 if tid == 0 tid = Threads.threadid() end diff --git a/src/gc.c b/src/gc.c index f1bc2d6f1e84c..0fe4b1fdc460a 100644 --- a/src/gc.c +++ b/src/gc.c @@ -2499,7 +2499,9 @@ static void mark_roots(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) gc_mark_queue_obj(gc_cache, sp, jl_main_module); // tasks +#ifdef JULIA_ENABLE_THREADING jl_gc_mark_enqueued_tasks(gc_cache, sp); +#endif // invisible builtin values if (jl_an_empty_vec_any != NULL) diff --git a/src/partr.c b/src/partr.c index b11ff75f2bb3d..6fc672b0528e8 100644 --- a/src/partr.c +++ b/src/partr.c @@ -14,7 +14,19 @@ extern "C" { #endif -#define JULIA_ENABLE_PARTR + +// thread sleep state + +static int16_t sleep_check_state; // status of the multi-queue. possible values: + +// no thread should be sleeping--there might be work in the multi-queue. +static const int16_t not_sleeping = 0; + +// it is acceptable for a thread to be sleeping if its sticky queue is empty. +// sleep_check_state == sleeping + 1 + tid means thread tid is checking the multi-queue +// to see if it is safe to transition to sleeping. +static const int16_t sleeping = 1; + #ifdef JULIA_ENABLE_THREADING @@ -46,16 +58,6 @@ static int16_t heap_p; /* unbias state for the RNG */ static uint64_t cong_unbias; -static int16_t sleep_check_state; // status of the multi-queue. possible values: - -// no thread should be sleeping--there might be work in the multi-queue. -static const int16_t not_sleeping = 0; - -// it is acceptable for a thread to be sleeping if its sticky queue is empty. -// sleep_check_state == sleeping + 1 + tid means thread tid is checking the multi-queue -// to see if it is safe to transition to sleeping. -static const int16_t sleeping = 1; - static inline void multiq_init(void) { @@ -178,6 +180,15 @@ static inline jl_task_t *multiq_deletemin(void) } +void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) +{ + int16_t i, j; + for (i = 0; i < heap_p; ++i) + for (j = 0; j < heaps[i].ntasks; ++j) + jl_gc_mark_queue_obj_explicit(gc_cache, sp, (jl_value_t *)heaps[i].tasks[j]); +} + + static int multiq_check_empty(void) { int16_t i; @@ -275,6 +286,13 @@ void jl_threadfun(void *arg) } +// enqueue the specified task for execution +JL_DLLEXPORT void jl_enqueue_task(jl_task_t *task) +{ + multiq_insert(task, task->prio); +} + + // sleep_check_after_threshold() -- if sleep_threshold ns have passed, return 1 static int sleep_check_after_threshold(uint64_t *start_cycles) { @@ -301,6 +319,22 @@ static void wake_thread(int16_t self, int16_t tid) } } +#else // JULIA_ENABLE_THREADING + +static int sleep_check_now(int16_t tid) +{ + (void)tid; + return 1; +} + +static int sleep_check_after_threshold(uint64_t *start_cycles) +{ + (void)start_cycles; + return 1; +} + +#endif + /* ensure thread tid is awake if necessary */ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) { @@ -311,6 +345,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) if (uvlock == self) uv_stop(jl_global_event_loop()); } +#ifdef JULIA_ENABLE_THREADING else { // check if the other threads might be sleeping if (jl_atomic_load_acquire(&sleep_check_state) != not_sleeping) { @@ -333,13 +368,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) jl_wake_libuv(); } } -} - - -// enqueue the specified task for execution -JL_DLLEXPORT void jl_enqueue_task(jl_task_t *task) -{ - multiq_insert(task, task->prio); +#endif } @@ -356,7 +385,11 @@ static jl_task_t *get_next_task(jl_value_t *getsticky) } return task; } +#ifdef JULIA_ENABLE_THREADING return multiq_deletemin(); +#else + return NULL; +#endif } extern volatile unsigned _threadedregion; @@ -373,11 +406,13 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) if (task) return task; +#ifdef JULIA_ENABLE_THREADING jl_cpu_pause(); if (!multiq_check_empty()) { start_cycles = 0; continue; } +#endif jl_cpu_pause(); if (sleep_check_after_threshold(&start_cycles) || (!_threadedregion && ptls->tid == 0)) { @@ -465,17 +500,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) } } - -void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) -{ - int16_t i, j; - for (i = 0; i < heap_p; ++i) - for (j = 0; j < heaps[i].ntasks; ++j) - jl_gc_mark_queue_obj_explicit(gc_cache, sp, (jl_value_t *)heaps[i].tasks[j]); -} - -#endif // JULIA_ENABLE_THREADING - #ifdef __cplusplus } #endif diff --git a/src/task.c b/src/task.c index e8a821e7f38fd..b5b633425d369 100644 --- a/src/task.c +++ b/src/task.c @@ -374,6 +374,8 @@ JL_DLLEXPORT void jl_switchto(jl_task_t **pt) assert(t->tid == ptls->tid); if (!t->sticky && !t->copy_stack) t->tid = -1; +#elif defined(NDEBUG) + (void)refetch_ptls(); #else assert(ptls == refetch_ptls()); #endif diff --git a/src/threading.c b/src/threading.c index 7ce605f0ebeeb..caa477cdbfd12 100644 --- a/src/threading.c +++ b/src/threading.c @@ -505,15 +505,19 @@ JL_DLLEXPORT void jl_threading_run(jl_value_t *func) args2[0] = schd_func; args2[1] = (jl_value_t*)t; jl_apply(args2, 2); +#ifdef JULIA_ENABLE_THREADING if (i == 1) { // let threads know work is coming (optimistic) jl_wakeup_thread(-1); } +#endif } +#ifdef JULIA_ENABLE_THREADING if (nthreads > 2) { // let threads know work is ready (guaranteed) jl_wakeup_thread(-1); } +#endif // join with all tasks JL_TRY { for (int i = 0; i < nthreads; i++) {