Skip to content

Commit

Permalink
make @sync lexically scoped and merge @schedule with @async (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson authored May 23, 2018
1 parent 0f8d4f8 commit 3b8b5eb
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 34 deletions.
5 changes: 2 additions & 3 deletions src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,

# imports for use
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected,
VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk,
binding_module, notify_error, atexit, julia_exename, julia_cmd,
AsyncGenerator, acquire, release, invokelatest,
VERSION_STRING, binding_module, notify_error, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, coalesce, notnothing

using Serialization, Sockets
Expand Down
24 changes: 12 additions & 12 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ function check_worker_state(w::Worker)
else
w.ct_time = time()
if myid() > w.id
@schedule exec_conn_func(w)
@async exec_conn_func(w)
else
# route request via node 1
@schedule remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
@async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
wait_for_conn(w)
end
Expand All @@ -144,7 +144,7 @@ function wait_for_conn(w)
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

@schedule (sleep(timeout); notify(w.c_state; all=true))
@async (sleep(timeout); notify(w.c_state; all=true))
wait(w.c_state)
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
Expand Down Expand Up @@ -200,7 +200,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin))
else
sock = listen(interface, LPROC.bind_port)
end
@schedule while isopen(sock)
@async while isopen(sock)
client = accept(sock)
process_messages(client, client, true)
end
Expand Down Expand Up @@ -231,7 +231,7 @@ end


function redirect_worker_output(ident, stream)
@schedule while !eof(stream)
@async while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
Expand Down Expand Up @@ -265,7 +265,7 @@ function read_worker_host_port(io::IO)
leader = String[]
try
while ntries > 0
readtask = @schedule readline(io)
readtask = @async readline(io)
yield()
while !istaskdone(readtask) && ((time() - t0) < timeout)
sleep(0.05)
Expand Down Expand Up @@ -396,13 +396,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
# call manager's `launch` is a separate task. This allows the master
# process initiate the connection setup process as and when workers come
# online
t_launch = @schedule launch(manager, params, launched, launch_ntfy)
t_launch = @async launch(manager, params, launched, launch_ntfy)

@sync begin
while true
if isempty(launched)
istaskdone(t_launch) && break
@schedule (sleep(1); notify(launch_ntfy))
@async (sleep(1); notify(launch_ntfy))
wait(launch_ntfy)
end

Expand Down Expand Up @@ -574,7 +574,7 @@ function create_worker(manager, wconfig)
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy())
send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message)

@schedule manage(w.manager, w.id, w.config, :register)
@async manage(w.manager, w.id, w.config, :register)
wait(rr_ntfy_join)
lock(client_refs) do
delete!(PGRP.refs, ntfy_oid)
Expand Down Expand Up @@ -621,7 +621,7 @@ function check_master_connect()
if ccall(:jl_running_on_valgrind,Cint,()) != 0
return
end
@schedule begin
@async begin
start = time()
while !haskey(map_pid_wrkr, 1) && (time() - start) < timeout
sleep(1.0)
Expand Down Expand Up @@ -844,13 +844,13 @@ function rmprocs(pids...; waitfor=typemax(Int))

pids = vcat(pids...)
if waitfor == 0
t = @schedule _rmprocs(pids, typemax(Int))
t = @async _rmprocs(pids, typemax(Int))
yield()
return t
else
_rmprocs(pids, waitfor)
# return a dummy task object that user code can wait on.
return @schedule nothing
return @async nothing
end
end

Expand Down
36 changes: 29 additions & 7 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let nextidx = 0
end
end

spawnat(p, thunk) = sync_add(remotecall(thunk, p))
spawnat(p, thunk) = remotecall(thunk, p)

spawn_somewhere(thunk) = spawnat(nextproc(),thunk)

Expand Down Expand Up @@ -41,7 +41,14 @@ julia> fetch(f)
"""
macro spawn(expr)
thunk = esc(:(()->($expr)))
:(spawn_somewhere($thunk))
var = esc(Base.sync_varname)
quote
local ref = spawn_somewhere($thunk)
if $(Expr(:isdefined, var))
push!($var, ref)
end
ref
end
end

"""
Expand All @@ -64,7 +71,14 @@ julia> fetch(f)
"""
macro spawnat(p, expr)
thunk = esc(:(()->($expr)))
:(spawnat($(esc(p)), $thunk))
var = esc(Base.sync_varname)
quote
local ref = spawnat($(esc(p)), $thunk)
if $(Expr(:isdefined, var))
push!($var, ref)
end
ref
end
end

"""
Expand Down Expand Up @@ -250,7 +264,9 @@ function preduce(reducer, f, R)
end

function pfor(f, R)
[@spawn f(R, first(c), last(c)) for c in splitrange(length(R), nworkers())]
@async @sync for c in splitrange(length(R), nworkers())
@spawn f(R, first(c), last(c))
end
end

function make_preduce_body(var, body)
Expand Down Expand Up @@ -316,9 +332,15 @@ macro distributed(args...)
r = loop.args[1].args[2]
body = loop.args[2]
if na==1
thecall = :(pfor($(make_pfor_body(var, body)), $(esc(r))))
syncvar = esc(Base.sync_varname)
return quote
local ref = pfor($(make_pfor_body(var, body)), $(esc(r)))
if $(Expr(:isdefined, syncvar))
push!($syncvar, ref)
end
ref
end
else
thecall = :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r))))
return :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r))))
end
thecall
end
2 changes: 1 addition & 1 deletion src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:

for (i,(machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
launch_tasks[i] = @schedule try
launch_tasks[i] = @async try
launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
Expand Down
2 changes: 1 addition & 1 deletion src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ function flush_gc_msgs()
end
catch e
bt = catch_backtrace()
@schedule showerror(stderr, e, bt)
@async showerror(stderr, e, bt)
end
end

Expand Down
1 change: 0 additions & 1 deletion src/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -209,5 +209,4 @@ precompile(Tuple{typeof(Distributed.test_existing_ref), Distributed.Future})
precompile(Tuple{typeof(Base.finalizer), Distributed.Future, typeof(Distributed.finalize_ref)})
precompile(Tuple{typeof(Base.hash), Distributed.Future, UInt64})
precompile(Tuple{typeof(Base.ht_keyindex), Base.Dict{WeakRef, Nothing}, Distributed.Future})
precompile(Tuple{typeof(Base.sync_add), Distributed.Future})
precompile(Tuple{Type{Union{}}, Distributed.RRID})
12 changes: 6 additions & 6 deletions src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function schedule_call(rid, thunk)
rv = RemoteValue(def_rv_channel())
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid.whence)
@schedule run_work_thunk(rv, thunk)
@async run_work_thunk(rv, thunk)
return rv
end
end
Expand Down Expand Up @@ -104,7 +104,7 @@ end

## message event handlers ##
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
@schedule process_tcp_streams(r_stream, w_stream, incoming)
@async process_tcp_streams(r_stream, w_stream, incoming)
end

function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
Expand Down Expand Up @@ -132,7 +132,7 @@ Julia version number to perform the authentication handshake.
See also [`cluster_cookie`](@ref).
"""
function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
@schedule message_handler_loop(r_stream, w_stream, incoming)
@async message_handler_loop(r_stream, w_stream, incoming)
end

function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
Expand Down Expand Up @@ -265,21 +265,21 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
end
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
@schedule begin
@async begin
v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false)
deliver_result(w_stream, :call_fetch, header.notify_oid, v)
end
end

function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
@schedule begin
@async begin
rv = schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
end
end

function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
@schedule run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true)
@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true)
end

function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
Expand Down
2 changes: 1 addition & 1 deletion src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ end

any_gc_flag = Condition()
function start_gc_msgs_task()
@schedule while true
@async while true
wait(any_gc_flag)
flush_gc_msgs()
end
Expand Down
4 changes: 2 additions & 2 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ c=Channel{Int}(1)

# test channel iterations
function test_iteration(in_c, out_c)
t=@schedule for v in in_c
t=@async for v in in_c
put!(out_c, v)
end

Expand Down Expand Up @@ -1105,7 +1105,7 @@ append!(testruns, [
for (addp_testf, expected_errstr, env) in testruns
old_stdout = stdout
stdout_out, stdout_in = redirect_stdout()
stdout_txt = @schedule filter!(readlines(stdout_out)) do s
stdout_txt = @async filter!(readlines(stdout_out)) do s
return !startswith(s, "\tFrom failed worker startup:\t")
end
try
Expand Down

0 comments on commit 3b8b5eb

Please sign in to comment.