Skip to content

Commit

Permalink
Alternate impl to auto serialize globals. Only track referenced globals
Browse files Browse the repository at this point in the history
in TypeName and serialize if required. As parallel macros and remotecalls
create a 0-arg thunk, globals are automatically serialized if changed.
  • Loading branch information
amitmurthy committed Dec 20, 2016
1 parent 6b6f423 commit 3ed5cea
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 93 deletions.
141 changes: 84 additions & 57 deletions base/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import .Serializer: known_object_data, object_number, serialize_cycle, deserialize_cycle, writetag,
__deserialized_types__, serialize_typename, deserialize_typename,
TYPENAME_TAG, GLOBALREF_TAG, object_numbers,
serialize_global_from_main, deserialize_global_from_main
TYPENAME_TAG, object_numbers

type ClusterSerializer{I<:IO} <: AbstractSerializer
io::I
Expand All @@ -13,10 +12,13 @@ type ClusterSerializer{I<:IO} <: AbstractSerializer
pid::Int # Worker we are connected to.
sent_objects::Set{UInt64} # used by serialize (track objects sent)
sent_globals::Dict
glbs_in_tname::Dict # A dict tracking globals referenced in anonymous
# functions.
anonfunc_id::UInt64

ClusterSerializer(io::I) = new(io, 0, ObjectIdDict(),
Base.worker_id_from_socket(io),
Set{UInt64}(), Dict())
Set{UInt64}(), Dict(), Dict(), 0)
end
ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io)

Expand All @@ -33,6 +35,9 @@ function deserialize(s::ClusterSerializer, ::Type{TypeName})
else
tn = deserialize_typename(s, number)
end

# retrieve arrays of global syms sent if any and deserialize them all.
foreach(sym->deserialize_global_from_main(s, sym), deserialize(s))
return tn
end

Expand All @@ -45,78 +50,95 @@ function serialize(s::ClusterSerializer, t::TypeName)
serialize(s, send_whole)
write(s.io, identifier)
if send_whole
# Track globals referenced in this anonymous function.
# This information is used to resend modified globals when we
# only send the identifier.
prev = s.anonfunc_id
s.anonfunc_id = identifier
serialize_typename(s, t)
s.anonfunc_id = prev
push!(s.sent_objects, identifier)
finalizer(t, x->cleanup_tname_glbs(s, identifier))
end
# #println(t.module, ":", t.name, ", id:", identifier, send_whole ? " sent" : " NOT sent")

# Send global refs if required.
syms = syms_2b_sent(s, identifier)
serialize(s, syms)
foreach(sym->serialize_global_from_main(s, sym), syms)
nothing
end

const FLG_SER_VAL = UInt8(1)
const FLG_ISCONST_VAL = UInt8(2)
isflagged(v, flg) = (v & flg == flg)

# We will send/resend a global object if
# a) has not been sent previously, i.e., we are seeing this object_id for the
# for the first time, or,
# b) hash value has changed

function serialize_global_from_main(s::ClusterSerializer, g::GlobalRef)
v = getfield(Main, g.name)
println(g)
function serialize(s::ClusterSerializer, g::GlobalRef)
# Record if required and then invoke the default GlobalRef serializer.
sym = g.name
if g.mod === Main && isdefined(g.mod, sym)
v = getfield(Main, sym)
if !isa(v, DataType) && !isa(v, Module) &&
(sym in names(Main, false, false)) && (s.anonfunc_id != 0)
# FIXME : There must be a better way to detect if a binding has been imported
# into Main or has been primarily defined here.
push!(get!(s.glbs_in_tname, s.anonfunc_id, []), sym)
end
end

serialize(s, g.name)
invoke(serialize, (AbstractSerializer, GlobalRef), s, g)
end

flags = UInt8(0)
if isbits(v)
flags = flags | FLG_SER_VAL
else
oid = object_id(v)
if haskey(s.sent_globals, oid)
# We have sent this object before, see if it has changed.
prev_hash = s.sent_globals[oid]
new_hash = hash(v)
if new_hash != prev_hash
flags = flags | FLG_SER_VAL
s.sent_globals[oid] = new_hash

# No need to setup a new finalizer as only the hash
# value and not the object itself has changed.
end
# Send/resend a global object if
# a) has not been sent previously, i.e., we are seeing this object_id for the first time, or,
# b) hash value has changed or
# c) is a bitstype
function syms_2b_sent(s::ClusterSerializer, identifier)
lst=Symbol[]
check_syms = get(s.glbs_in_tname, identifier, [])
for sym in check_syms
v = getfield(Main, sym)

if isbits(v)
push!(lst, sym)
else
flags = flags | FLG_SER_VAL
try
finalizer(v, x->delete_global_tracker(s,x))
s.sent_globals[oid] = hash(v)
catch ex
# Do not track objects that cannot be finalized.
oid = object_id(v)
if haskey(s.sent_globals, oid)
# We have sent this object before, see if it has changed.
s.sent_globals[oid] != hash(v) && push!(lst, sym)
else
push!(lst, sym)
end
end
end
isconst(Main, g.name) && (flags = flags | FLG_ISCONST_VAL)

write(s.io, flags)
isflagged(flags, FLG_SER_VAL) && serialize(s, v)
return unique(lst)
end

function deserialize_global_from_main(s::ClusterSerializer)
sym = deserialize(s)::Symbol
flags = read(s.io, UInt8)
function serialize_global_from_main(s::ClusterSerializer, sym)
v = getfield(Main, sym)

if isflagged(flags, FLG_SER_VAL)
v = deserialize(s)
end

# create/update binding under Main only if the value has been sent
if isflagged(flags, FLG_SER_VAL)
if isflagged(flags, FLG_ISCONST_VAL)
eval(Main, :(const $sym = $v))
else
eval(Main, :($sym = $v))
oid = object_id(v)
record_v = true
if isbits(v)
record_v = false
elseif !haskey(s.sent_globals, oid)
# set up a finalizer the first time this object is sent
try
finalizer(v, x->delete_global_tracker(s,x))
catch ex
# Do not track objects that cannot be finalized.
record_v = false
end
end
record_v && (s.sent_globals[oid] = hash(v))

return GlobalRef(Main, sym)
serialize(s, isconst(Main, sym))
serialize(s, v)
end

function deserialize_global_from_main(s::ClusterSerializer, sym)
sym_isconst = deserialize(s)
v = deserialize(s)
if sym_isconst
eval(Main, :(const $sym = $v))
else
eval(Main, :($sym = $v))
end
end

function delete_global_tracker(s::ClusterSerializer, v)
Expand All @@ -128,3 +150,8 @@ function delete_global_tracker(s::ClusterSerializer, v)
# TODO: Should release memory from the remote nodes.
end

function cleanup_tname_glbs(s::ClusterSerializer, identifier)
delete!(s.glbs_in_tname, identifier)
end

# TODO: cleanup from s.sent_objects
28 changes: 3 additions & 25 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -381,36 +381,21 @@ end

function serialize(s::AbstractSerializer, g::GlobalRef)
writetag(s.io, GLOBALREF_TAG)
if g.mod === Main && isdefined(g.mod, g.name)
if g.mod === Main && isdefined(g.mod, g.name) && isconst(g.mod, g.name)
v = getfield(g.mod, g.name)
if isa(v, DataType) && v === v.name.primary && should_send_whole_type(s, v)
# handle references to types in Main by sending the whole type.
# needed to be able to send nested functions (#15451).
write(s.io, UInt8(1))
serialize(s, v)
return
elseif g.name in names(Main, false, false)
# FIXME :
# 1. There must be a better way to detect if a binding has been imported
# into Main or has been primarily defined here.
# 2. Handle bindings in Main pointing to bindings in Base, e.g., my_foo=myid.
write(s.io, UInt8(2))
serialize_global_from_main(s, g)
return
end
end

write(s.io, UInt8(0))
serialize_global_ref(s, g)
end

function serialize_global_ref(s::AbstractSerializer, g::GlobalRef)
serialize(s, g.mod)
serialize(s, g.name)
end

# default impl only serializes the symbol.
serialize_global_from_main(s::AbstractSerializer, g::GlobalRef) = serialize_global_ref(s, g)

function serialize(s::AbstractSerializer, t::TypeName)
serialize_cycle(s, t) && return
Expand Down Expand Up @@ -745,20 +730,13 @@ end
function deserialize(s::AbstractSerializer, ::Type{GlobalRef})
kind = read(s.io, UInt8)
if kind == 0
return deserialize_global_ref(s)
elseif kind == 1
return GlobalRef(deserialize(s)::Module, deserialize(s)::Symbol)
else
ty = deserialize(s)
return GlobalRef(ty.name.module, ty.name.name)
else # kind == 2
return deserialize_global_from_main(s)
end
end

deserialize_global_ref(s::AbstractSerializer) = GlobalRef(deserialize(s)::Module, deserialize(s)::Symbol)

# default impl is same as any global ref, i.e., only the module and symbol.
deserialize_global_from_main(s::AbstractSerializer) = deserialize_global_ref()

function deserialize(s::AbstractSerializer, ::Type{Union})
types = deserialize(s)
Union{types...}
Expand Down
87 changes: 76 additions & 11 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1266,16 +1266,66 @@ test_add_procs_threaded_blas()
global v1 = 1
@test remotecall_fetch(()->v1, id_other) == v1
@test remotecall_fetch(()->isdefined(Main, :v1), id_other) == true
v1 = 2
@test remotecall_fetch(()->v1, id_other) == 2
for i in 2:5
global v1 = i
@test remotecall_fetch(()->v1, id_other) == i
end

# non-bitstypes
global v2 = ones(10)
global v2 = zeros(10)
for i in 1:5
v2[i] = i
@test remotecall_fetch(()->v2, id_other) == v2
end

# Test that a global is not being repeatedly serialized when
# a) referenced multiple times in the closure
# b) hash value has not changed.

@everywhere begin
global testsercnt_d = Dict()
type TestSerCnt
v
end
import Base.hash, Base.==
hash(x::TestSerCnt, h::UInt) = hash(hash(x.v), h)
==(x1::TestSerCnt, x2::TestSerCnt) = (x1.v == x2.v)

function Base.serialize(s::AbstractSerializer, t::TestSerCnt)
Base.Serializer.serialize_type(s, TestSerCnt)
serialize(s, t.v)
global testsercnt_d
cnt = get!(testsercnt_d, object_id(t), 0)
testsercnt_d[object_id(t)] = cnt+1
end

Base.deserialize(s::AbstractSerializer, ::Type{TestSerCnt}) = TestSerCnt(deserialize(s))
end

# hash value of tsc is not changed
global tsc = TestSerCnt(zeros(10))
for i in 1:5
remotecall_fetch(()->tsc, id_other)
end
# should have been serialized only once
@test testsercnt_d[object_id(tsc)] == 1

# hash values are changed
n=5
testsercnt_d[object_id(tsc)] = 0
for i in 1:n
tsc.v[i] = i
remotecall_fetch(()->tsc, id_other)
end
# should have been serialized as many times as the loop
@test testsercnt_d[object_id(tsc)] == n

# Multiple references in a closure should be serialized only once.
global mrefs = TestSerCnt(ones(10))
@test remotecall_fetch(()->(mrefs.v, 2*mrefs.v, 3*mrefs.v), id_other) == (ones(10), 2*ones(10), 3*ones(10))
@test testsercnt_d[object_id(mrefs)] == 1


# nested anon functions
global f1 = x->x
global f2 = x->f1(x)
Expand All @@ -1288,17 +1338,19 @@ const c1 = ones(10)
@test remotecall_fetch(()->c1, id_other) == c1
@test remotecall_fetch(()->isconst(Main, :c1), id_other) == true

# Test same call with local vars
# Test same calls with local vars
function wrapped_var_ser_tests()
# bitstypes
local lv1 = 1
@test remotecall_fetch(()->lv1, id_other) == lv1
@test remotecall_fetch(()->isdefined(Main, :lv1), id_other) == false
lv1 = 2
@test remotecall_fetch(()->lv1, id_other) == 2
for i in 2:5
lv1 = i
@test remotecall_fetch(()->lv1, id_other) == i
end

# non-bitstypes
local lv2 = ones(10)
local lv2 = zeros(10)
for i in 1:5
lv2[i] = i
@test remotecall_fetch(()->lv2, id_other) == lv2
Expand All @@ -1314,21 +1366,34 @@ end

wrapped_var_ser_tests()

# Test internal data structures being cleaned up upon gc.
global ids_cleanup = ones(6)
global ids_func = ()->ids_cleanup

clust_ser = (Base.worker_from_id(id_other)).w_serializer
@test remotecall_fetch(ids_func, id_other) == ids_cleanup

@test haskey(clust_ser.sent_globals, object_id(ids_cleanup))
finalize(ids_cleanup)
@test !haskey(clust_ser.sent_globals, object_id(ids_cleanup))

# TODO Add test for cleanup from `clust_ser.glbs_in_tname`

# reported github issues - Mostly tests with globals and various parallel macros
#2669, #5390
v2669=10
@test fetch(@spawn (1+v2669)) == 10
@test fetch(@spawn (1+v2669)) == 11

#12367
refs = []
if true
n = 10
for (idx,p) in enumerate(procs())
ref[idx] = @spawnat p begin
for p in procs()
push!(refs, @spawnat p begin
@sync for i in 1:n
nothing
end
end
end)
end
end
foreach(wait, refs)
Expand Down

0 comments on commit 3ed5cea

Please sign in to comment.