Skip to content

Commit

Permalink
add :dynamic scheduling option for Threads.@threads
Browse files Browse the repository at this point in the history
  • Loading branch information
IanButterworth committed Jan 25, 2022
1 parent 2682819 commit 03255f8
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 13 deletions.
61 changes: 48 additions & 13 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
"""
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))

function threading_run(func)
ccall(:jl_enter_threaded_region, Cvoid, ())
function threading_run(func, static)
static && ccall(:jl_enter_threaded_region, Cvoid, ())
n = nthreads()
tasks = Vector{Task}(undef, n)
for i = 1:n
t = Task(func)
t.sticky = true
ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1)
t.sticky = static
static && ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1)
tasks[i] = t
schedule(t)
end
Expand All @@ -38,7 +38,7 @@ function threading_run(func)
wait(tasks[i])
end
finally
ccall(:jl_exit_threaded_region, Cvoid, ())
static && ccall(:jl_exit_threaded_region, Cvoid, ())
end
end

Expand Down Expand Up @@ -86,15 +86,17 @@ function _threadsfor(iter, lbody, schedule)
end
end
end
if threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0
if $(schedule === :dynamic)
threading_run(threadsfor_fun, false)
elseif threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0
$(if schedule === :static
:(error("`@threads :static` can only be used from thread 1 and not nested"))
else
# only use threads when called from thread 1, outside @threads
# only use threads when called from thread 1, outside @threads :static
:(Base.invokelatest(threadsfor_fun, true))
end)
end)
else
threading_run(threadsfor_fun)
threading_run(threadsfor_fun, true)
end
nothing
end
Expand All @@ -110,15 +112,48 @@ A barrier is placed at the end of the loop which waits for all tasks to finish
execution.
The `schedule` argument can be used to request a particular scheduling policy.
The only currently supported value is `:static`, which creates one task per thread
and divides the iterations equally among them. Specifying `:static` is an error
if used from inside another `@threads` loop or from a thread other than 1.
Options are:
- `:static` is the default schedule which creates one task per thread and divides the
iterations equally among them, assigning each task specifically to each thread.
Specifying `:static` is an error if used from inside another `@threads` loop
or from a thread other than 1.
- `:dynamic` is like `:static` except the tasks are assigned to threads dynamically,
allowing more flexible scheduling if other tasks are active on other threads.
Specifying `:dynamic` is allowed from inside another `@threads` loop and from
threads other than 1.
The default schedule (used when no `schedule` argument is present) is subject to change.
For example, here an illustration of the different scheduling strategies, where `busywait`
is a non-yielding timed loop that runs for a number of seconds.
```julia-repl
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.nthreads()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.nthreads()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
```
The `:dynamic` example takes 2 seconds because one of the non-occupied threads is able
to run two of the 1-second iterations to complete the for loop.
!!! compat "Julia 1.5"
The `schedule` argument is available as of Julia 1.5.
!!! compat "Julia 1.8"
The `:dynamic` option for the `schedule` argument is available as of Julia 1.8.
See also: [`@spawn`](@ref Threads.@spawn), [`nthreads()`](@ref Threads.nthreads),
[`threadid()`](@ref Threads.threadid), `pmap` in [`Distributed`](@ref man-distributed),
`BLAS.set_num_threads` in [`LinearAlgebra`](@ref man-linalg).
Expand All @@ -133,7 +168,7 @@ macro threads(args...)
# for now only allow quoted symbols
sched = nothing
end
if sched !== :static
if sched !== :static && sched !== :dynamic
throw(ArgumentError("unsupported schedule argument in @threads"))
end
elseif na == 1
Expand Down
40 changes: 40 additions & 0 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,46 @@ end
@test _atthreads_static_schedule() == [1:nthreads();]
@test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end

# dynamic schedule
function _atthreads_dynamic_schedule()
inc = Threads.Atomic{Int}(0)
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.atomic_add!(inc, 1)
end
return inc
end
inc = _atthreads_dynamic_schedule()
@test inc[] == nthreads()

# nested dynamic schedule
function _atthreads_dynamic_dynamic_schedule()
inc = Threads.Atomic{Int}(0)
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.atomic_add!(inc, 1)
end
end

return inc
end
inc = _atthreads_dynamic_dynamic_schedule()
@test inc[] == nthreads() * nthreads()

function _atthreads_static_dynamic_schedule()
ids = zeros(Int, nthreads())
inc = Threads.Atomic{Int}(0)
Threads.@threads :static for i = 1:nthreads()
ids[i] = Threads.threadid()
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.atomic_add!(inc, 1)
end
end
return ids, inc
end
ids, inc = _atthreads_static_dynamic_schedule()
@test ids == [1:nthreads();]
@test inc[] == nthreads() * nthreads()

try
@macroexpand @threads(for i = 1:10, j = 1:10; end)
catch ex
Expand Down

0 comments on commit 03255f8

Please sign in to comment.