Skip to content

Commit

Permalink
Track referenced globals in TypeName and serialize if required.
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Dec 18, 2016
1 parent cb2dd92 commit b85aea6
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 93 deletions.
143 changes: 86 additions & 57 deletions base/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import .Serializer: known_object_data, object_number, serialize_cycle, deserialize_cycle, writetag,
__deserialized_types__, serialize_typename, deserialize_typename,
TYPENAME_TAG, GLOBALREF_TAG, object_numbers,
serialize_global_from_main, deserialize_global_from_main
TYPENAME_TAG, object_numbers

type ClusterSerializer{I<:IO} <: AbstractSerializer
io::I
Expand All @@ -13,10 +12,13 @@ type ClusterSerializer{I<:IO} <: AbstractSerializer
pid::Int # Worker we are connected to.
sent_objects::Set{UInt64} # used by serialize (track objects sent)
sent_globals::Dict
glbs_in_tname::Dict # A dict tracking globals referenced in anonymous
# functions.
anonfunc_id::UInt64

ClusterSerializer(io::I) = new(io, 0, ObjectIdDict(),
Base.worker_id_from_socket(io),
Set{UInt64}(), Dict())
Set{UInt64}(), Dict(), Dict(), 0)
end
ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io)

Expand All @@ -33,6 +35,9 @@ function deserialize(s::ClusterSerializer, ::Type{TypeName})
else
tn = deserialize_typename(s, number)
end

# retrieve arrays of global syms sent if any and deserialize them all.
foreach(sym->deserialize_global_from_main(s, sym), deserialize(s))
return tn
end

Expand All @@ -45,78 +50,97 @@ function serialize(s::ClusterSerializer, t::TypeName)
serialize(s, send_whole)
write(s.io, identifier)
if send_whole
# Track globals referenced in this anonymous function.
# This information is used to resend modified globals when we
# only send the identifier.
prev = s.anonfunc_id
s.anonfunc_id = identifier
serialize_typename(s, t)
s.anonfunc_id = prev
push!(s.sent_objects, identifier)
finalizer(t, x->cleanup_tname_glbs(s, identifier))
end
# #println(t.module, ":", t.name, ", id:", identifier, send_whole ? " sent" : " NOT sent")

# Send global refs if required.
syms = syms_2b_sent(s, identifier)
serialize(s, syms)
foreach(sym->serialize_global_from_main(s, sym), syms)
nothing
end

const FLG_SER_VAL = UInt8(1)
const FLG_ISCONST_VAL = UInt8(2)
isflagged(v, flg) = (v & flg == flg)

# We will send/resend a global object if
# a) has not been sent previously, i.e., we are seeing this object_id for the
# for the first time, or,
# b) hash value has changed

function serialize_global_from_main(s::ClusterSerializer, g::GlobalRef)
v = getfield(Main, g.name)
println(g)
function serialize(s::ClusterSerializer, g::GlobalRef)
# Record if required and then invoke the default GlobalRef serializer.
sym = g.name
if g.mod === Main && isdefined(g.mod, sym)
v = getfield(Main, sym)
if !isa(v, DataType) && !isa(v, Module) &&
(sym in names(Main, false, false)) && (s.anonfunc_id != 0)
# FIXME :
# 1. There must be a better way to detect if a binding has been imported
# into Main or has been primarily defined here.
# 2. Handle bindings in Main pointing to bindings in Base like `foo=myid`
push!(get!(s.glbs_in_tname, s.anonfunc_id, []), sym)
end
end

serialize(s, g.name)
invoke(serialize, (AbstractSerializer, GlobalRef), s, g)
end

flags = UInt8(0)
if isbits(v)
flags = flags | FLG_SER_VAL
else
oid = object_id(v)
if haskey(s.sent_globals, oid)
# We have sent this object before, see if it has changed.
prev_hash = s.sent_globals[oid]
new_hash = hash(v)
if new_hash != prev_hash
flags = flags | FLG_SER_VAL
s.sent_globals[oid] = new_hash

# No need to setup a new finalizer as only the hash
# value and not the object itself has changed.
end
# Send/resend a global object if
# a) has not been sent previously, i.e., we are seeing this object_id for the first time, or,
# b) hash value has changed or
# c) is a bitstype
function syms_2b_sent(s::ClusterSerializer, identifier)
lst=Symbol[]
check_syms = get(s.glbs_in_tname, identifier, [])
for sym in check_syms
v = getfield(Main, sym)

if isbits(v)
push!(lst, sym)
else
flags = flags | FLG_SER_VAL
try
finalizer(v, x->delete_global_tracker(s,x))
s.sent_globals[oid] = hash(v)
catch ex
# Do not track objects that cannot be finalized.
oid = object_id(v)
if haskey(s.sent_globals, oid)
# We have sent this object before, see if it has changed.
s.sent_globals[oid] != hash(v) && push!(lst, sym)
else
push!(lst, sym)
end
end
end
isconst(Main, g.name) && (flags = flags | FLG_ISCONST_VAL)

write(s.io, flags)
isflagged(flags, FLG_SER_VAL) && serialize(s, v)
return unique(lst)
end

function deserialize_global_from_main(s::ClusterSerializer)
sym = deserialize(s)::Symbol
flags = read(s.io, UInt8)
function serialize_global_from_main(s::ClusterSerializer, sym)
v = getfield(Main, sym)

if isflagged(flags, FLG_SER_VAL)
v = deserialize(s)
end

# create/update binding under Main only if the value has been sent
if isflagged(flags, FLG_SER_VAL)
if isflagged(flags, FLG_ISCONST_VAL)
eval(Main, :(const $sym = $v))
else
eval(Main, :($sym = $v))
oid = object_id(v)
record_v = true
if isbits(v)
record_v = false
elseif !haskey(s.sent_globals, oid)
# set up a finalizer the first time this object is sent
try
finalizer(v, x->delete_global_tracker(s,x))
catch ex
# Do not track objects that cannot be finalized.
record_v = false
end
end
record_v && (s.sent_globals[oid] = hash(v))

return GlobalRef(Main, sym)
serialize(s, isconst(Main, sym))
serialize(s, v)
end

function deserialize_global_from_main(s::ClusterSerializer, sym)
sym_isconst = deserialize(s)
v = deserialize(s)
if sym_isconst
eval(Main, :(const $sym = $v))
else
eval(Main, :($sym = $v))
end
end

function delete_global_tracker(s::ClusterSerializer, v)
Expand All @@ -128,3 +152,8 @@ function delete_global_tracker(s::ClusterSerializer, v)
# TODO: Should release memory from the remote nodes.
end

function cleanup_tname_glbs(s::ClusterSerializer, identifier)
delete!(s.glbs_in_tname, identifier)
end

# TODO: cleanup from s.sent_objects
28 changes: 3 additions & 25 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -381,36 +381,21 @@ end

function serialize(s::AbstractSerializer, g::GlobalRef)
writetag(s.io, GLOBALREF_TAG)
if g.mod === Main && isdefined(g.mod, g.name)
if g.mod === Main && isdefined(g.mod, g.name) && isconst(g.mod, g.name)
v = getfield(g.mod, g.name)
if isa(v, DataType) && v === v.name.primary && should_send_whole_type(s, v)
# handle references to types in Main by sending the whole type.
# needed to be able to send nested functions (#15451).
write(s.io, UInt8(1))
serialize(s, v)
return
elseif g.name in names(Main, false, false)
# FIXME :
# 1. There must be a better way to detect if a binding has been imported
# into Main or has been primarily defined here.
# 2. Handle bindings in Main pointing to bindings in Base, e.g., my_foo=myid.
write(s.io, UInt8(2))
serialize_global_from_main(s, g)
return
end
end

write(s.io, UInt8(0))
serialize_global_ref(s, g)
end

function serialize_global_ref(s::AbstractSerializer, g::GlobalRef)
serialize(s, g.mod)
serialize(s, g.name)
end

# default impl only serializes the symbol.
serialize_global_from_main(s::AbstractSerializer, g::GlobalRef) = serialize_global_ref(s, g)

function serialize(s::AbstractSerializer, t::TypeName)
serialize_cycle(s, t) && return
Expand Down Expand Up @@ -745,20 +730,13 @@ end
function deserialize(s::AbstractSerializer, ::Type{GlobalRef})
kind = read(s.io, UInt8)
if kind == 0
return deserialize_global_ref(s)
elseif kind == 1
return GlobalRef(deserialize(s)::Module, deserialize(s)::Symbol)
else
ty = deserialize(s)
return GlobalRef(ty.name.module, ty.name.name)
else # kind == 2
return deserialize_global_from_main(s)
end
end

deserialize_global_ref(s::AbstractSerializer) = GlobalRef(deserialize(s)::Module, deserialize(s)::Symbol)

# default impl is same as any global ref, i.e., only the module and symbol.
deserialize_global_from_main(s::AbstractSerializer) = deserialize_global_ref()

function deserialize(s::AbstractSerializer, ::Type{Union})
types = deserialize(s)
Union{types...}
Expand Down
87 changes: 76 additions & 11 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1266,16 +1266,66 @@ test_add_procs_threaded_blas()
global v1 = 1
@test remotecall_fetch(()->v1, id_other) == v1
@test remotecall_fetch(()->isdefined(Main, :v1), id_other) == true
v1 = 2
@test remotecall_fetch(()->v1, id_other) == 2
for i in 2:5
global v1 = i
@test remotecall_fetch(()->v1, id_other) == i
end

# non-bitstypes
global v2 = ones(10)
global v2 = zeros(10)
for i in 1:5
v2[i] = i
@test remotecall_fetch(()->v2, id_other) == v2
end

# Test that a global is not being repeatedly serialized when
# a) referenced multiple times in the closure
# b) hash value has not changed.

@everywhere begin
global testsercnt_d = Dict()
type TestSerCnt
v
end
import Base.hash, Base.==
hash(x::TestSerCnt, h::UInt) = hash(hash(x.v), h)
==(x1::TestSerCnt, x2::TestSerCnt) = (x1.v == x2.v)

function Base.serialize(s::AbstractSerializer, t::TestSerCnt)
Base.Serializer.serialize_type(s, TestSerCnt)
serialize(s, t.v)
global testsercnt_d
cnt = get!(testsercnt_d, object_id(t), 0)
testsercnt_d[object_id(t)] = cnt+1
end

Base.deserialize(s::AbstractSerializer, ::Type{TestSerCnt}) = TestSerCnt(deserialize(s))
end

# hash value of tsc is not changed
global tsc = TestSerCnt(zeros(10))
for i in 1:5
remotecall_fetch(()->tsc, id_other)
end
# should have been serialized only once
@test testsercnt_d[object_id(tsc)] == 1

# hash values are changed
n=5
testsercnt_d[object_id(tsc)] = 0
for i in 1:n
tsc.v[i] = i
remotecall_fetch(()->tsc, id_other)
end
# should have been serialized as many times as the loop
@test testsercnt_d[object_id(tsc)] == n

# Multiple references in a closure should be serialized only once.
global mrefs = TestSerCnt(ones(10))
@test remotecall_fetch(()->(mrefs.v, 2*mrefs.v, 3*mrefs.v), id_other) == (ones(10), 2*ones(10), 3*ones(10))
@test testsercnt_d[object_id(mrefs)] == 1


# nested anon functions
global f1 = x->x
global f2 = x->f1(x)
Expand All @@ -1288,17 +1338,19 @@ const c1 = ones(10)
@test remotecall_fetch(()->c1, id_other) == c1
@test remotecall_fetch(()->isconst(Main, :c1), id_other) == true

# Test same call with local vars
# Test same calls with local vars
function wrapped_var_ser_tests()
# bitstypes
local lv1 = 1
@test remotecall_fetch(()->lv1, id_other) == lv1
@test remotecall_fetch(()->isdefined(Main, :lv1), id_other) == false
lv1 = 2
@test remotecall_fetch(()->lv1, id_other) == 2
for i in 2:5
lv1 = i
@test remotecall_fetch(()->lv1, id_other) == i
end

# non-bitstypes
local lv2 = ones(10)
local lv2 = zeros(10)
for i in 1:5
lv2[i] = i
@test remotecall_fetch(()->lv2, id_other) == lv2
Expand All @@ -1314,21 +1366,34 @@ end

wrapped_var_ser_tests()

# Test internal data structures being cleaned up upon gc.
global ids_cleanup = ones(6)
global ids_func = ()->ids_cleanup

clust_ser = (Base.worker_from_id(id_other)).w_serializer
@test remotecall_fetch(ids_func, id_other) == ids_cleanup

@test haskey(clust_ser.sent_globals, object_id(ids_cleanup))
finalize(ids_cleanup)
@test !haskey(clust_ser.sent_globals, object_id(ids_cleanup))

# TODO Add test for cleanup from `clust_ser.glbs_in_tname`

# reported github issues - Mostly tests with globals and various parallel macros
#2669, #5390
v2669=10
@test fetch(@spawn (1+v2669)) == 10
@test fetch(@spawn (1+v2669)) == 11

#12367
refs = []
if true
n = 10
for (idx,p) in enumerate(procs())
ref[idx] = @spawnat p begin
for p in procs()
push!(refs, @spawnat p begin
@sync for i in 1:n
nothing
end
end
end)
end
end
foreach(wait, refs)
Expand Down

0 comments on commit b85aea6

Please sign in to comment.