diff --git a/base/clusterserialize.jl b/base/clusterserialize.jl index b425319668abf..3f2dc8b263e08 100644 --- a/base/clusterserialize.jl +++ b/base/clusterserialize.jl @@ -2,8 +2,7 @@ import .Serializer: known_object_data, object_number, serialize_cycle, deserialize_cycle, writetag, __deserialized_types__, serialize_typename, deserialize_typename, - TYPENAME_TAG, GLOBALREF_TAG, object_numbers, - serialize_global_from_main, deserialize_global_from_main + TYPENAME_TAG, object_numbers type ClusterSerializer{I<:IO} <: AbstractSerializer io::I @@ -13,10 +12,13 @@ type ClusterSerializer{I<:IO} <: AbstractSerializer pid::Int # Worker we are connected to. sent_objects::Set{UInt64} # used by serialize (track objects sent) sent_globals::Dict + glbs_in_tname::Dict # A dict tracking globals referenced in anonymous + # functions. + anonfunc_id::UInt64 ClusterSerializer(io::I) = new(io, 0, ObjectIdDict(), Base.worker_id_from_socket(io), - Set{UInt64}(), Dict()) + Set{UInt64}(), Dict(), Dict(), 0) end ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io) @@ -33,6 +35,9 @@ function deserialize(s::ClusterSerializer, ::Type{TypeName}) else tn = deserialize_typename(s, number) end + + # retrieve arrays of global syms sent if any and deserialize them all. + foreach(sym->deserialize_global_from_main(s, sym), deserialize(s)) return tn end @@ -45,78 +50,95 @@ function serialize(s::ClusterSerializer, t::TypeName) serialize(s, send_whole) write(s.io, identifier) if send_whole + # Track globals referenced in this anonymous function. + # This information is used to resend modified globals when we + # only send the identifier. + prev = s.anonfunc_id + s.anonfunc_id = identifier serialize_typename(s, t) + s.anonfunc_id = prev push!(s.sent_objects, identifier) + finalizer(t, x->cleanup_tname_glbs(s, identifier)) end -# #println(t.module, ":", t.name, ", id:", identifier, send_whole ? " sent" : " NOT sent") + + # Send global refs if required. + syms = syms_2b_sent(s, identifier) + serialize(s, syms) + foreach(sym->serialize_global_from_main(s, sym), syms) nothing end -const FLG_SER_VAL = UInt8(1) -const FLG_ISCONST_VAL = UInt8(2) -isflagged(v, flg) = (v & flg == flg) - -# We will send/resend a global object if -# a) has not been sent previously, i.e., we are seeing this object_id for the -# for the first time, or, -# b) hash value has changed - -function serialize_global_from_main(s::ClusterSerializer, g::GlobalRef) - v = getfield(Main, g.name) - println(g) +function serialize(s::ClusterSerializer, g::GlobalRef) + # Record if required and then invoke the default GlobalRef serializer. + sym = g.name + if g.mod === Main && isdefined(g.mod, sym) + v = getfield(Main, sym) + if !isa(v, DataType) && !isa(v, Module) && + (sym in names(Main, false, false)) && (s.anonfunc_id != 0) + # FIXME : There must be a better way to detect if a binding has been imported + # into Main or has been primarily defined here. + push!(get!(s.glbs_in_tname, s.anonfunc_id, []), sym) + end + end - serialize(s, g.name) + invoke(serialize, (AbstractSerializer, GlobalRef), s, g) +end - flags = UInt8(0) - if isbits(v) - flags = flags | FLG_SER_VAL - else - oid = object_id(v) - if haskey(s.sent_globals, oid) - # We have sent this object before, see if it has changed. - prev_hash = s.sent_globals[oid] - new_hash = hash(v) - if new_hash != prev_hash - flags = flags | FLG_SER_VAL - s.sent_globals[oid] = new_hash - - # No need to setup a new finalizer as only the hash - # value and not the object itself has changed. - end +# Send/resend a global object if +# a) has not been sent previously, i.e., we are seeing this object_id for the first time, or, +# b) hash value has changed or +# c) is a bitstype +function syms_2b_sent(s::ClusterSerializer, identifier) + lst=Symbol[] + check_syms = get(s.glbs_in_tname, identifier, []) + for sym in check_syms + v = getfield(Main, sym) + + if isbits(v) + push!(lst, sym) else - flags = flags | FLG_SER_VAL - try - finalizer(v, x->delete_global_tracker(s,x)) - s.sent_globals[oid] = hash(v) - catch ex - # Do not track objects that cannot be finalized. + oid = object_id(v) + if haskey(s.sent_globals, oid) + # We have sent this object before, see if it has changed. + s.sent_globals[oid] != hash(v) && push!(lst, sym) + else + push!(lst, sym) end end end - isconst(Main, g.name) && (flags = flags | FLG_ISCONST_VAL) - - write(s.io, flags) - isflagged(flags, FLG_SER_VAL) && serialize(s, v) + return unique(lst) end -function deserialize_global_from_main(s::ClusterSerializer) - sym = deserialize(s)::Symbol - flags = read(s.io, UInt8) +function serialize_global_from_main(s::ClusterSerializer, sym) + v = getfield(Main, sym) - if isflagged(flags, FLG_SER_VAL) - v = deserialize(s) - end - - # create/update binding under Main only if the value has been sent - if isflagged(flags, FLG_SER_VAL) - if isflagged(flags, FLG_ISCONST_VAL) - eval(Main, :(const $sym = $v)) - else - eval(Main, :($sym = $v)) + oid = object_id(v) + record_v = true + if isbits(v) + record_v = false + elseif !haskey(s.sent_globals, oid) + # set up a finalizer the first time this object is sent + try + finalizer(v, x->delete_global_tracker(s,x)) + catch ex + # Do not track objects that cannot be finalized. + record_v = false end end + record_v && (s.sent_globals[oid] = hash(v)) - return GlobalRef(Main, sym) + serialize(s, isconst(Main, sym)) + serialize(s, v) +end + +function deserialize_global_from_main(s::ClusterSerializer, sym) + sym_isconst = deserialize(s) + v = deserialize(s) + if sym_isconst + eval(Main, :(const $sym = $v)) + else + eval(Main, :($sym = $v)) + end end function delete_global_tracker(s::ClusterSerializer, v) @@ -128,3 +150,8 @@ function delete_global_tracker(s::ClusterSerializer, v) # TODO: Should release memory from the remote nodes. end +function cleanup_tname_glbs(s::ClusterSerializer, identifier) + delete!(s.glbs_in_tname, identifier) +end + +# TODO: cleanup from s.sent_objects diff --git a/base/serialize.jl b/base/serialize.jl index 2ec4f252bad3a..d3b384cb5c059 100644 --- a/base/serialize.jl +++ b/base/serialize.jl @@ -381,7 +381,7 @@ end function serialize(s::AbstractSerializer, g::GlobalRef) writetag(s.io, GLOBALREF_TAG) - if g.mod === Main && isdefined(g.mod, g.name) + if g.mod === Main && isdefined(g.mod, g.name) && isconst(g.mod, g.name) v = getfield(g.mod, g.name) if isa(v, DataType) && v === v.name.primary && should_send_whole_type(s, v) # handle references to types in Main by sending the whole type. @@ -389,28 +389,13 @@ function serialize(s::AbstractSerializer, g::GlobalRef) write(s.io, UInt8(1)) serialize(s, v) return - elseif g.name in names(Main, false, false) - # FIXME : - # 1. There must be a better way to detect if a binding has been imported - # into Main or has been primarily defined here. - # 2. Handle bindings in Main pointing to bindings in Base, e.g., my_foo=myid. - write(s.io, UInt8(2)) - serialize_global_from_main(s, g) - return end end - write(s.io, UInt8(0)) - serialize_global_ref(s, g) -end - -function serialize_global_ref(s::AbstractSerializer, g::GlobalRef) serialize(s, g.mod) serialize(s, g.name) end -# default impl only serializes the symbol. -serialize_global_from_main(s::AbstractSerializer, g::GlobalRef) = serialize_global_ref(s, g) function serialize(s::AbstractSerializer, t::TypeName) serialize_cycle(s, t) && return @@ -745,20 +730,13 @@ end function deserialize(s::AbstractSerializer, ::Type{GlobalRef}) kind = read(s.io, UInt8) if kind == 0 - return deserialize_global_ref(s) - elseif kind == 1 + return GlobalRef(deserialize(s)::Module, deserialize(s)::Symbol) + else ty = deserialize(s) return GlobalRef(ty.name.module, ty.name.name) - else # kind == 2 - return deserialize_global_from_main(s) end end -deserialize_global_ref(s::AbstractSerializer) = GlobalRef(deserialize(s)::Module, deserialize(s)::Symbol) - -# default impl is same as any global ref, i.e., only the module and symbol. -deserialize_global_from_main(s::AbstractSerializer) = deserialize_global_ref() - function deserialize(s::AbstractSerializer, ::Type{Union}) types = deserialize(s) Union{types...} diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index 19bfa77c66e8a..2ffe20ca10f51 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -1266,16 +1266,66 @@ test_add_procs_threaded_blas() global v1 = 1 @test remotecall_fetch(()->v1, id_other) == v1 @test remotecall_fetch(()->isdefined(Main, :v1), id_other) == true -v1 = 2 -@test remotecall_fetch(()->v1, id_other) == 2 +for i in 2:5 + global v1 = i + @test remotecall_fetch(()->v1, id_other) == i +end # non-bitstypes -global v2 = ones(10) +global v2 = zeros(10) for i in 1:5 v2[i] = i @test remotecall_fetch(()->v2, id_other) == v2 end +# Test that a global is not being repeatedly serialized when +# a) referenced multiple times in the closure +# b) hash value has not changed. + +@everywhere begin + global testsercnt_d = Dict() + type TestSerCnt + v + end + import Base.hash, Base.== + hash(x::TestSerCnt, h::UInt) = hash(hash(x.v), h) + ==(x1::TestSerCnt, x2::TestSerCnt) = (x1.v == x2.v) + + function Base.serialize(s::AbstractSerializer, t::TestSerCnt) + Base.Serializer.serialize_type(s, TestSerCnt) + serialize(s, t.v) + global testsercnt_d + cnt = get!(testsercnt_d, object_id(t), 0) + testsercnt_d[object_id(t)] = cnt+1 + end + + Base.deserialize(s::AbstractSerializer, ::Type{TestSerCnt}) = TestSerCnt(deserialize(s)) +end + +# hash value of tsc is not changed +global tsc = TestSerCnt(zeros(10)) +for i in 1:5 + remotecall_fetch(()->tsc, id_other) +end +# should have been serialized only once +@test testsercnt_d[object_id(tsc)] == 1 + +# hash values are changed +n=5 +testsercnt_d[object_id(tsc)] = 0 +for i in 1:n + tsc.v[i] = i + remotecall_fetch(()->tsc, id_other) +end +# should have been serialized as many times as the loop +@test testsercnt_d[object_id(tsc)] == n + +# Multiple references in a closure should be serialized only once. +global mrefs = TestSerCnt(ones(10)) +@test remotecall_fetch(()->(mrefs.v, 2*mrefs.v, 3*mrefs.v), id_other) == (ones(10), 2*ones(10), 3*ones(10)) +@test testsercnt_d[object_id(mrefs)] == 1 + + # nested anon functions global f1 = x->x global f2 = x->f1(x) @@ -1288,17 +1338,19 @@ const c1 = ones(10) @test remotecall_fetch(()->c1, id_other) == c1 @test remotecall_fetch(()->isconst(Main, :c1), id_other) == true -# Test same call with local vars +# Test same calls with local vars function wrapped_var_ser_tests() # bitstypes local lv1 = 1 @test remotecall_fetch(()->lv1, id_other) == lv1 @test remotecall_fetch(()->isdefined(Main, :lv1), id_other) == false - lv1 = 2 - @test remotecall_fetch(()->lv1, id_other) == 2 + for i in 2:5 + lv1 = i + @test remotecall_fetch(()->lv1, id_other) == i + end # non-bitstypes - local lv2 = ones(10) + local lv2 = zeros(10) for i in 1:5 lv2[i] = i @test remotecall_fetch(()->lv2, id_other) == lv2 @@ -1314,21 +1366,34 @@ end wrapped_var_ser_tests() +# Test internal data structures being cleaned up upon gc. +global ids_cleanup = ones(6) +global ids_func = ()->ids_cleanup + +clust_ser = (Base.worker_from_id(id_other)).w_serializer +@test remotecall_fetch(ids_func, id_other) == ids_cleanup + +@test haskey(clust_ser.sent_globals, object_id(ids_cleanup)) +finalize(ids_cleanup) +@test !haskey(clust_ser.sent_globals, object_id(ids_cleanup)) + +# TODO Add test for cleanup from `clust_ser.glbs_in_tname` + # reported github issues - Mostly tests with globals and various parallel macros #2669, #5390 v2669=10 -@test fetch(@spawn (1+v2669)) == 10 +@test fetch(@spawn (1+v2669)) == 11 #12367 refs = [] if true n = 10 - for (idx,p) in enumerate(procs()) - ref[idx] = @spawnat p begin + for p in procs() + push!(refs, @spawnat p begin @sync for i in 1:n nothing end - end + end) end end foreach(wait, refs)