Skip to content

Commit

Permalink
add procs argument to @-everywhere and move code to a function
Browse files Browse the repository at this point in the history
adding remotecall_eval lets us simplify code that previously needed
awkward contortions to use remotecall_fetch on an anonymous function

fixes a code-duplication bug in the case where something is spliced into expr

and removes and corrects misleading information in the doc string
  • Loading branch information
vtjnash committed Jul 18, 2017
1 parent 3b063d6 commit 1e811d9
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 39 deletions.
80 changes: 52 additions & 28 deletions base/distributed/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -134,55 +134,79 @@ end
extract_imports(x) = extract_imports!(Symbol[], x)

"""
@everywhere expr
@everywhere [procs()] expr
Execute an expression under `Main` everywhere. Equivalent to calling
`eval(Main, expr)` on all processes. Errors on any of the processes are collected into a
`CompositeException` and thrown. For example :
Execute an expression under `Main` on all `procs`.
Errors on any of the processes are collected into a
`CompositeException` and thrown. For example:
@everywhere bar=1
@everywhere bar = 1
will define `Main.bar` on all processes.
Unlike [`@spawn`](@ref) and [`@spawnat`](@ref),
`@everywhere` does not capture any local variables. Prefixing
`@everywhere` with [`@eval`](@ref) allows us to broadcast
local variables using interpolation :
`@everywhere` does not capture any local variables.
Instead, local variables can be broadcast using interpolation:
foo = 1
@eval @everywhere bar=\$foo
@everywhere bar = \$foo
The expression is evaluated under `Main` irrespective of where `@everywhere` is called from.
For example :
The optional argument `procs` allows specifying a subset of all
processes to have execute the expression.
module FooBar
foo() = @everywhere bar()=myid()
end
FooBar.foo()
will result in `Main.bar` being defined on all processes and not `FooBar.bar`.
Equivalent to calling `remotecall_eval(Main, procs, expr)`.
"""
macro everywhere(ex)
return :(@everywhere $procs() $(esc(ex))) # interpolation needs to work around hygiene bugs (#22307)
end

macro everywhere(procs, ex)
imps = [Expr(:import, m) for m in extract_imports(ex)]
quote
$(isempty(imps) ? nothing : Expr(:toplevel, imps...))
sync_begin()
for pid in workers()
async_run_thunk(()->remotecall_fetch(eval_ew_expr, pid, $(Expr(:quote,ex))))
yield() # ensure that the remotecall_fetch has been started
return quote
$(isempty(imps) ? nothing : Expr(:toplevel, imps...)) # run imports locally first
let ex = $(Expr(:quote, ex)), procs = $(esc(procs))
remotecall_eval(Main, procs, ex)
end
end
end

"""
remotecall_eval(m::Module, procs, expression)
Execute an expression under module `m` on the processes
specified in `procs`.
Errors on any of the processes are collected into a
`CompositeException` and thrown.
See also `@everywhere`.
"""
function remotecall_eval(m::Module, procs, ex)
@sync begin
run_locally = 0
for pid in procs
if pid == myid()
run_locally += 1
else
@async remotecall_wait(Core.eval, pid, m, ex)
end
end
yield() # ensure that the remotecall_fetch have had a chance to start

# execute locally last as we do not want local execution to block serialization
# of the request to remote nodes.
if nprocs() > 1
async_run_thunk(()->eval_ew_expr($(Expr(:quote,ex))))
for _ in 1:run_locally
@async Core.eval(m, ex)
end

sync_end()
end
nothing
end

# optimized version of remotecall_eval for a single pid
# and which also fetches the return value
function remotecall_eval(m::Module, pid::Int, ex)
return remotecall_fetch(Core.eval, pid, m, ex)
end

eval_ew_expr(ex) = (eval(Main, ex); nothing)

# Statically split range [1,N] into equal sized chunks for np processors
function splitrange(N::Int, np::Int)
Expand Down
18 changes: 7 additions & 11 deletions test/compile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -578,27 +578,23 @@ let
end
""")

@sync for wid in test_workers
@async remotecall_fetch(Core.eval, wid, Base, quote
unshift!(LOAD_PATH, $load_path)
unshift!(LOAD_CACHE_PATH, $load_cache_path)
end)
@everywhere test_workers begin
unshift!(LOAD_PATH, $load_path)
unshift!(Base.LOAD_CACHE_PATH, $load_cache_path)
end
try
@eval using $ModuleB
uuid = Base.module_uuid(getfield(Main, ModuleB))
for wid in test_workers
@test remotecall_fetch(Core.eval, wid, Main, :( Base.module_uuid($ModuleB) )) == uuid
@test Base.Distributed.remotecall_eval(Main, wid, :( Base.module_uuid($ModuleB) )) == uuid
if wid != myid() # avoid world-age errors on the local proc
@test remotecall_fetch(g, wid) == wid
end
end
finally
@sync for wid in test_workers
@async remotecall_fetch(Core.eval, wid, Base, quote
shift!(LOAD_PATH)
shift!(LOAD_CACHE_PATH)
end)
@everywhere test_workers begin
shift!(LOAD_PATH)
shift!(Base.LOAD_CACHE_PATH)
end
end
finally
Expand Down

0 comments on commit 1e811d9

Please sign in to comment.