Skip to content

Commit

Permalink
Merge pull request #161 from JuliaParallel/an/dist
Browse files Browse the repository at this point in the history
Fixes the broadcasting and mean with dimension
  • Loading branch information
andreasnoack authored Aug 15, 2018
2 parents f8c26bd + 6b93668 commit 721858c
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 34 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ os:
- osx
julia:
- 0.7
- 1.0
- nightly
matrix:
# allow_failures:
Expand Down
1 change: 1 addition & 0 deletions src/DistributedArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module DistributedArrays
using Distributed
using Serialization
using LinearAlgebra
using Statistics

import Base: +, -, *, div, mod, rem, &, |, xor
import Base.Callable
Expand Down
8 changes: 4 additions & 4 deletions src/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,9 @@ function Base.reshape(A::DArray{T,1,S}, d::Dims) where {T,S<:Array}
i2 = CartesianIndices(sztail)[i]
globalidx = [ I[j][i2[j-1]] for j=2:nd ]

a = sub2ind(d, d1offs, globalidx...)
a = LinearIndices(d)[d1offs, globalidx...]

B[:,i] = A[a:(a+nr-1)]
B[:,i] = Array(A[a:(a+nr-1)])
end
B
end
Expand Down Expand Up @@ -706,15 +706,15 @@ end
Base.size(P::ProductIndices) = P.sz
# This gets passed to map to avoid breaking propagation of inbounds
Base.@propagate_inbounds propagate_getindex(A, I...) = A[I...]
Base.@propagate_inbounds Base.getindex(P::ProductIndices{_,N}, I::Vararg{Int, N}) where {_,N} =
Base.@propagate_inbounds Base.getindex(P::ProductIndices{J,N}, I::Vararg{Int, N}) where {J,N} =
Bool((&)(map(propagate_getindex, P.indices, I)...))

struct MergedIndices{I,N} <: AbstractArray{CartesianIndex{N}, N}
indices::I
sz::NTuple{N,Int}
end
Base.size(M::MergedIndices) = M.sz
Base.@propagate_inbounds Base.getindex(M::MergedIndices{_,N}, I::Vararg{Int, N}) where {_,N} =
Base.@propagate_inbounds Base.getindex(M::MergedIndices{J,N}, I::Vararg{Int, N}) where {J,N} =
CartesianIndex(map(propagate_getindex, M.indices, I))
# Additionally, we optimize bounds checking when using MergedIndices as an
# array index since checking, e.g., A[1:500, 1:500] is *way* faster than
Expand Down
47 changes: 19 additions & 28 deletions src/mapreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,25 @@ end
Base.BroadcastStyle(::Type{<:DArray}) = Broadcast.ArrayStyle{DArray}()
Base.BroadcastStyle(::Type{<:DArray}, ::Any) = Broadcast.ArrayStyle{DArray}()

function Base.similar(bc::Broadcast.Broadcasted{Broadcast.ArrayStyle{DArray}}, ::Type{ElType}) where {ElType}
DA = find_darray(bc)
DArray(I -> Array{ElType}(undef, map(length,I)), DA)
end

"`DA = find_darray(As)` returns the first DArray among the arguments."
find_darray(bc::Base.Broadcast.Broadcasted) = find_darray(bc.args)
find_darray(args::Tuple) = find_darray(find_darray(args[1]), Base.tail(args))
find_darray(x) = x
find_darray(a::DArray, rest) = a
find_darray(::Any, rest) = find_darray(rest)

function Base.copyto!(dest::DArray, bc::Broadcast.Broadcasted{Broadcast.ArrayStyle{DArray}})
@sync for p in procs(dest)
@async remotecall_fetch(p) do
copyto!(localpart(dest), rewrite_local(bc))
end
function Base.copy(bc::Broadcast.Broadcasted{Broadcast.ArrayStyle{DArray}})
T = Base.Broadcast.combine_eltypes(bc.f, bc.args)
shape = Base.Broadcast.combine_axes(bc.args...)
iter = Base.CartesianIndices(shape)
D = DArray(map(length, shape)) do I
A = map(bc.args) do a
if isa(a, Union{Number,Ref})
return a
else
return localtype(a)(
a[ntuple(i -> i > ndims(a) ? 1 : (size(a, i) == 1 ? (1:1) : I[i]), length(shape))...]
)
end
end
broadcast(bc.f, A...)
end
dest
return D
end

"""
Transform a Broadcasted{Broadcast.ArrayStyle{DArray}} object into an equivalent
Broadcasted{Broadcast.DefaultArrayStyle} object for the localparts.
"""
rewrite_local(bc::Broadcast.Broadcasted{Broadcast.ArrayStyle{DArray}}) = Broadcast.broadcasted(bc.f, rewrite_local(bc.args)...)
rewrite_local(args::Tuple) = map(rewrite_local, args)
rewrite_local(a::DArray) = localpart(a)
rewrite_local(x) = x


function Base.reduce(f, d::DArray)
results = asyncmap(procs(d)) do p
remotecall_fetch(p) do
Expand Down Expand Up @@ -128,6 +116,7 @@ function Base.mapreducedim!(f, op, R::DArray, A::DArray)
return mapreducedim_between!(identity, op, R, B, region)
end

## Some special cases
function Base._all(f, A::DArray, ::Colon)
B = asyncmap(procs(A)) do p
remotecall_fetch(p) do
Expand Down Expand Up @@ -171,6 +160,8 @@ function Base.extrema(d::DArray)
return reduce((t,s) -> (min(t[1], s[1]), max(t[2], s[2])), r)
end

Statistics._mean(A::DArray, region) = sum(A, dims = region) ./ prod((size(A, i) for i in region))

# Unary vector functions
(-)(D::DArray) = map(-, D)

Expand Down
6 changes: 5 additions & 1 deletion test/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ check_leaks()
@testset "test statistical functions on DArrays" begin
dims = (20,20,20)
DA = drandn(dims)
A = convert(Array, DA)
A = Array(DA)

@testset "test $f for dimension $dms" for f in (mean, ), dms in (1, 2, 3, (1,2), (1,3), (2,3), (1,2,3))
# std is pending implementation
Expand Down Expand Up @@ -835,6 +835,10 @@ check_leaks()
c = a .- m
d = convert(Array, a) .- convert(Array, m)
@test c == d
e = @DArray [ones(10) for i=1:4]
f = 2 .* e
@test Array(f) == 2 .* Array(e)
@test Array(map(x -> sum(x) .+ 2, e)) == map(x -> sum(x) .+ 2, e)
d_closeall()
end

Expand Down
2 changes: 1 addition & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ end
@everywhere using Random
@everywhere using LinearAlgebra

@everywhere srand(1234 + myid())
@everywhere Random.seed!(1234 + myid())

const MYID = myid()
const OTHERIDS = filter(id-> id != MYID, procs())[rand(1:(nprocs()-1))]
Expand Down

0 comments on commit 721858c

Please sign in to comment.