Skip to content

Commit

Permalink
Merge pull request #5380 from amitmurthy/amitm/shmem_basic
Browse files Browse the repository at this point in the history
SharedArray - take 2
  • Loading branch information
ViralBShah committed Jan 20, 2014
2 parents c67aff0 + d913a75 commit aabd983
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 2 deletions.
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export
RoundUp,
Schur,
Set,
SharedArray,
SparseMatrixCSC,
StatStruct,
StridedArray,
Expand Down
4 changes: 2 additions & 2 deletions base/mmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ function mmap_stream_settings(s::IO)
end

# Mmapped-array constructor
function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset)
function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset; grow::Bool=true)
prot, flags, iswrite = mmap_stream_settings(s)
len = prod(dims)*sizeof(T)
if len > typemax(Int)
error("file is too large to memory-map on this platform")
end
if iswrite
if iswrite && grow
pmap, delta = mmap_grow(len, prot, flags, fd(s), offset)
else
pmap, delta = mmap(len, prot, flags, fd(s), offset)
Expand Down
260 changes: 260 additions & 0 deletions base/sharedarray.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
type SharedArray{T,N} <: AbstractArray{T,N}
dims::NTuple{N,Int}
pids::Vector{Int}
refs::Array{RemoteRef}

# Fields below are not to be serialized
# Local shmem map.
loc_shmarr::Array{T,N}

# idx of current workers pid into the pids vector, 0 if this shared array is not mapped locally.
loc_pididx::Int

# the local partition into the array when viewed as a single dimensional array.
loc_subarr_1d

SharedArray(d,p,r) = new(d,p,r)
end

function SharedArray(T::Type, dims::NTuple; init=false, pids=workers())
N = length(dims)

!isbits(T) ? error("Type of Shared Array elements must be bits types") : nothing
@windows_only error(" SharedArray is not supported on Windows yet.")

len_sa = prod(dims)
if length(pids) > len_sa
pids = pids[1:len_sa]
end

onlocalhost = assert_same_host(pids)

local shm_seg_name = ""
local loc_shmarr
local sa = nothing
local shmmem_create_pid
try
# On OSX, the shm_seg_name length must be < 32 characters
shm_seg_name = string("/jl", getpid(), int64(time() * 10^9))
if onlocalhost
shmmem_create_pid = myid()
loc_shmarr = shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR)
else
# The shared array is being created on a remote machine....
shmmem_create_pid = pids[1]
remotecall(pids[1], () -> begin shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR); nothing end)
end

func_mapshmem = () -> shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR)

refs = Array(RemoteRef, length(pids))
for (i, p) in enumerate(pids)
refs[i] = remotecall(p, func_mapshmem)
end

# Wait till all the workers have mapped the segment
for i in 1:length(refs)
wait(refs[i])
end

# All good, immediately unlink the segment.
remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end)
shm_seg_name = ""

sa = SharedArray{T,N}(dims, pids, refs)
if onlocalhost
init_loc_flds(sa)

# In the event that myid() is not part of pids, loc_shmarr will not be set
# in the init function above, hence setting it here if available.
sa.loc_shmarr = loc_shmarr
else
sa.loc_pididx = 0
end

# if present init function is called on each of the parts
@sync begin
if isa(init, Function)
for p in pids
@async remotecall_wait(p, init, sa)
end
end
end

finally
if shm_seg_name != ""
remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end)
end
end
sa
end

SharedArray(T, I::Int...; kwargs...) = SharedArray(T, I; kwargs...)


length(sa::SharedArray) = prod(sa.dims)
size(sa::SharedArray) = sa.dims
procs(sa::SharedArray) = sa.pids



function range_1dim(sa::SharedArray, n)
l = length(sa)
nw = length(sa.pids)
partlen = div(l, nw)

if n == nw
return (((n-1) * partlen) + 1):l
else
return (((n-1) * partlen) + 1):(n*partlen)
end
end

sub_1dim(sa::SharedArray, n) = sub(sa.loc_shmarr, range_1dim(sa, n))

function init_loc_flds(sa)
if myid() in sa.pids
sa.loc_pididx = findfirst(sa.pids, myid())
sa.loc_shmarr = fetch(sa.refs[sa.loc_pididx])
sa.loc_subarr_1d = sub_1dim(sa, sa.loc_pididx)
else
sa.loc_pididx = 0
end
end


# Don't serialize loc_shmarr (it is the complete array) and
# pididx, which is relevant to the current process only
function serialize(s, sa::SharedArray)
serialize_type(s, typeof(sa))
serialize(s, length(SharedArray.names))
for n in SharedArray.names
if n in [:loc_shmarr, :loc_pididx, :loc_subarr_1d]
writetag(s, UndefRefTag)
else
serialize(s, getfield(sa, n))
end
end
end

function deserialize{T,N}(s, t::Type{SharedArray{T,N}})
sa = invoke(deserialize, (Any, DataType), s, t)
init_loc_flds(sa)
if (sa.loc_pididx == 0)
error("SharedArray cannot be used on a non-participating process")
end
sa
end

convert(::Type{Array}, sa::SharedArray) = sa.loc_shmarr

# avoiding ambiguity warnings
getindex(sa::SharedArray, x::Real) = getindex(sa.loc_shmarr, x)
getindex(sa::SharedArray, x::AbstractArray) = getindex(sa.loc_shmarr, x)

# pass through getindex and setindex! - they always work on the complete array unlike DArrays
getindex(sa::SharedArray, args...) = getindex(sa.loc_shmarr, args...)
setindex!(sa::SharedArray, args...) = (setindex!(sa.loc_shmarr, args...); sa)

# convenience constructors
function shmem_fill(v, dims; kwargs...)
SharedArray(typeof(v), dims; init = S->fill!(S.loc_subarr_1d, v), kwargs...)
end
shmem_fill(v, I::Int...; kwargs...) = shmem_fill(v, I; kwargs...)

# rand variant with range
function shmem_rand(TR::Union(DataType, Range1), dims; kwargs...)
if isa(TR, Range1)
SharedArray(Int, dims; init = S -> map!((x)->rand(TR), S.loc_subarr_1d), kwargs...)
else
SharedArray(TR, dims; init = S -> map!((x)->rand(TR), S.loc_subarr_1d), kwargs...)
end
end
shmem_rand(TR::Union(DataType, Range1), i::Int; kwargs...) = shmem_rand(TR, (i,); kwargs...)
shmem_rand(TR::Union(DataType, Range1), I::Int...; kwargs...) = shmem_rand(TR, I; kwargs...)

shmem_rand(dims; kwargs...) = shmem_rand(Float64, dims; kwargs...)
shmem_rand(I::Int...; kwargs...) = shmem_rand(I; kwargs...)

function shmem_randn(dims; kwargs...)
SharedArray(Float64, dims; init = S-> map!((x)->randn(), S.loc_subarr_1d), kwargs...)
end
shmem_randn(I::Int...; kwargs...) = shmem_randn(I; kwargs...)



function print_shmem_limits(slen)
try
@linux_only pfx = "kernel"
@osx_only pfx = "kern.sysv"

shmmax_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmmax`)[1]))[end]), 1024*1024)
page_size = int(split(readall(readsfrom(`getconf PAGE_SIZE`)[1]))[end])
shmall_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmall`)[1]))[end]) * page_size, 1024*1024)

println("System max size of single shmem segment(MB) : ", shmmax_MB,
"\nSystem max size of all shmem segments(MB) : ", shmall_MB,
"\nRequested size(MB) : ", div(slen, 1024*1024),
"\nPlease ensure requested size is within system limits.",
"\nIf not, increase system limits and try again."
)
catch e
nothing # Ignore any errors in this...
end
end

# utilities
function shm_mmap_array(T, dims, shm_seg_name, mode)
local s = nothing
local A = nothing
try
fd_mem = shm_open(shm_seg_name, mode, S_IRUSR | S_IWUSR)
if !(fd_mem > 0)
error("shm_open() failed")
end

s = fdio(fd_mem, true)

# On OSX, ftruncate must to used to set size of segment, just lseek does not work.
# and only at creation time
if (mode & JL_O_CREAT) == JL_O_CREAT
rc = ccall(:ftruncate, Int, (Int, Int), fd_mem, prod(dims)*sizeof(T))
if rc != 0
ec = errno()
error("ftruncate() failed, errno : ", ec)
end
end

A = mmap_array(T, dims, s, 0, grow=false)
catch e
print_shmem_limits(prod(dims)*sizeof(T))
rethrow(e)

finally
if s != nothing
close(s)
end
end
A
end

@unix_only shm_unlink(shm_seg_name) = ccall(:shm_unlink, Cint, (Ptr{Uint8},), shm_seg_name)
@unix_only shm_open(shm_seg_name, oflags, permissions) = ccall(:shm_open, Int, (Ptr{Uint8}, Int, Int), shm_seg_name, oflags, permissions)


function assert_same_host(procs)
myip =
resp = Array(Any, length(procs))

@sync begin
for (i, p) in enumerate(procs)
@async resp[i] = remotecall_fetch(p, () -> getipaddr())
end
end

if !all(x->x==resp[1], resp)
error("SharedArray requires all requested processes to be on the same machine.")
end

return (resp[1] != getipaddr()) ? false : true
end
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ include("combinatorics.jl")
# distributed arrays and memory-mapped arrays
include("darray.jl")
include("mmap.jl")
include("sharedarray.jl")

# utilities - version, timing, help, edit, metaprogramming
include("sysinfo.jl")
Expand Down
22 changes: 22 additions & 0 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,28 @@ is ``DArray``\ -specific, but we list it here for completeness::
end


Shared Arrays (EXPERIMENTAL FEATURE)
------------------------------------

Shared Arrays use system shared memory to map the same array across many processes.

The constructor for a shared array is of the form
``SharedArray(T::Type, dims::NTuple; init=false, pids=workers())``
which creates a shared array of type ``T`` and size ``dims`` across the processes
specified by ``pids`` - all of which have to be on the same host.

If an ``init`` function of the type ``initfn(S::SharedArray)`` is specified,
it is called on all the participating workers.

Unlike distributed arrays, a shareds array is accessible only from those participating workers
specified by the ``pids`` named argument (and the creating process too, if it is on the same host).

Indexing into a shared array (both for setting as well as accessing values) is the same as
for a regular array.



ClusterManagers
---------------

Expand Down
31 changes: 31 additions & 0 deletions doc/stdlib/base.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4515,6 +4515,37 @@ Distributed Arrays

Get the vector of processors storing pieces of ``d``


Shared Arrays (EXPERIMENTAL FEATURE)
------------------------------------

.. function:: SharedArray(T::Type, dims::NTuple; init=false, pids=workers())

Construct a SharedArray of type ``T`` and size ``dims`` across the processes
specified by ``pids`` - all of which have to be on the same host.

If an ``init`` function of the type ``initfn(S::SharedArray)`` is specified,
it is called on all the participating workers.

The following fields in type ``SharedArray`` are initialized appropriately on each
participating process.

``loc_shmarr::Array{T,N}`` - the shared memory segment mapped appropriately into
the current process. Note: For indexed access it is NOT required to use this field.
A ``SharedArray`` object can be used just like a regular array.

``loc_pididx::Int`` - index of the current process into the ``pids`` vector. Can be
used while distributing computational work across participating workers

``loc_subarr_1d`` - a 1-d subarray of the entire array, when equally partitioned
across participating workers. Can be used as a simple work partitioning scheme.


.. function:: procs(sa::SharedArray)

Get the vector of processes that have mapped the shared array


System
------

Expand Down
Loading

0 comments on commit aabd983

Please sign in to comment.