Skip to content
This repository has been archived by the owner on Mar 12, 2021. It is now read-only.

Commit

Permalink
Merge pull request #602 from JuliaGPU/tb/mapreduce
Browse files Browse the repository at this point in the history
Reimplement mapreduce.
  • Loading branch information
maleadt authored Feb 25, 2020
2 parents 0cdf32c + c007592 commit 266c6ed
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 155 deletions.
10 changes: 5 additions & 5 deletions Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ uuid = "c5f51814-7f29-56b8-a69c-e4d8f6be1fde"
version = "6.0.0"

[[CUDAnative]]
deps = ["Adapt", "BinaryProvider", "CEnum", "CUDAapi", "CUDAdrv", "DataStructures", "InteractiveUtils", "LLVM", "Libdl", "Pkg", "Printf", "TimerOutputs"]
git-tree-sha1 = "f5863b95140711306d1d6b31ada234d7bfea2416"
repo-rev = "7c1311b19a0f1eda7e0de724f2fd5f27b70daf1e"
deps = ["Adapt", "BinaryProvider", "CEnum", "CUDAapi", "CUDAdrv", "DataStructures", "InteractiveUtils", "LLVM", "Libdl", "MacroTools", "Pkg", "Printf", "TimerOutputs"]
git-tree-sha1 = "5beaa8e6a8fb65003b7b2555a73e7666468b22f6"
repo-rev = "46d5331"
repo-url = "https://github.com/JuliaGPU/CUDAnative.jl.git"
uuid = "be33ccc6-a3ff-5ff2-a52e-74243cff1e17"
version = "2.10.2"
Expand All @@ -62,8 +62,8 @@ uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b"

[[GPUArrays]]
deps = ["AbstractFFTs", "Adapt", "LinearAlgebra", "Printf", "Random", "Serialization"]
git-tree-sha1 = "7eefc8f91796ee7a657641824a8d4eda9d81e93f"
repo-rev = "02b3fb82f06c741c7542e331022463683c01c6f5"
git-tree-sha1 = "6aeda18f125c5b9599c0de40865b67319b69c9b8"
repo-rev = "d6e37deb13c3202947765f8d574c4f9a1b511012"
repo-url = "https://github.com/JuliaGPU/GPUArrays.jl.git"
uuid = "0c68f7d7-f131-5f86-a1c3-88cf8149b2d7"
version = "2.0.1"
Expand Down
30 changes: 3 additions & 27 deletions src/accumulate.jl
Original file line number Diff line number Diff line change
@@ -1,30 +1,5 @@
# scan and accumulate

# FIXME: certain Base operations, like accumulate, don't allow to pass a neutral element
# since it is not required by the Base implementation (as opposed to eg. reduce).
# to stick to the API, we use global state to provide a callback.
const neutral_elements = Dict{Function,Function}(
Base.:(+) => zero,
Base.add_sum => zero,
Base.:(*) => one,
Base.mul_prod => one,
)
function neutral_element!(op, f)
if haskey(neutral_elements, op)
@warn "Overriding neutral element for $op"
end
neutral_elements[op] = f
end
function neutral_element(op, T)
if !haskey(neutral_elements, op)
error("""CuArrays.jl needs to know the neutral element for your operator $op.
Please register your operator using: `CuArrays.neutral_element!($op, f::Function)`,
providing a function that returns a neutral element for a given element type.""")
end
f = neutral_elements[op]
return f(T)
end

## COV_EXCL_START

# partial scan of individual thread blocks within a grid
Expand Down Expand Up @@ -152,14 +127,15 @@ end
## COV_EXCL_STOP

function scan!(f::Function, output::CuArray{T}, input::CuArray;
dims::Integer, init=nothing, neutral=neutral_element(f, T)) where {T}

dims::Integer, init=nothing, neutral=GPUArrays.neutral_element(f, T)) where {T}
dims > 0 || throw(ArgumentError("dims must be a positive integer"))
inds_t = axes(input)
axes(output) == inds_t || throw(DimensionMismatch("shape of B must match A"))
dims > ndims(input) && return copyto!(output, input)
isempty(inds_t[dims]) && return output

f = cufunc(f)

# iteration domain across the main dimension
Rdim = CartesianIndices((size(input, dims),))

Expand Down
274 changes: 184 additions & 90 deletions src/mapreduce.jl
Original file line number Diff line number Diff line change
@@ -1,111 +1,205 @@
## COV_EXCL_START

@inline function reduce_block(arr::CuDeviceArray, op)
sync_threads()
len = blockDim().x
while len != 1
sync_threads()
skip = (len + 1) >> 1
reduce_to = threadIdx().x - skip
if 0 < reduce_to <= (len >> 1)
arr[reduce_to] = op(arr[reduce_to], arr[threadIdx().x])
end
len = skip
# TODO
# - serial version for lower latency
# - block-stride loop to delay need for second kernel launch

# Reduce a value across a warp
@inline function reduce_warp(op, val)
# offset = CUDAnative.warpsize() ÷ 2
# while offset > 0
# val = op(val, shfl_down_sync(0xffffffff, val, offset))
# offset ÷= 2
# end

# Loop unrolling for warpsize = 32
val = op(val, shfl_down_sync(0xffffffff, val, 16, 32))
val = op(val, shfl_down_sync(0xffffffff, val, 8, 32))
val = op(val, shfl_down_sync(0xffffffff, val, 4, 32))
val = op(val, shfl_down_sync(0xffffffff, val, 2, 32))
val = op(val, shfl_down_sync(0xffffffff, val, 1, 32))

return val
end

# Reduce a value across a block, using shared memory for communication
@inline function reduce_block(op, val::T, neutral, shuffle::Val{true}) where T
# shared mem for 32 partial sums
shared = @cuStaticSharedMem(T, 32) # NOTE: this is an upper bound; better detect it

wid, lane = fldmod1(threadIdx().x, CUDAnative.warpsize())

# each warp performs partial reduction
val = reduce_warp(op, val)

# write reduced value to shared memory
if lane == 1
@inbounds shared[wid] = val
end

# wait for all partial reductions
sync_threads()
end

function mapreducedim_kernel_parallel(f, op, R::CuDeviceArray{T}, A::CuDeviceArray{T},
CIS, Rlength, Slength) where {T}
for Ri_base in 0:(gridDim().x * blockDim().y):(Rlength-1)
Ri = Ri_base + (blockIdx().x - 1) * blockDim().y + threadIdx().y
Ri > Rlength && return
RI = Tuple(CartesianIndices(R)[Ri])
S = @cuStaticSharedMem(T, 512)
Si_folded_base = (threadIdx().y - 1) * blockDim().x
Si_folded = Si_folded_base + threadIdx().x
# serial reduction of A into S by Slength ÷ xthreads
for Si_base in 0:blockDim().x:(Slength-1)
Si = Si_base + threadIdx().x
Si > Slength && break
SI = Tuple(CIS[Si])
AI = ifelse.(size(R) .== 1, SI, RI)
if Si_base == 0
S[Si_folded] = f(A[AI...])
else
S[Si_folded] = op(S[Si_folded], f(A[AI...]))
end
end
# block-parallel reduction of S to S[1] by xthreads
reduce_block(view(S, (Si_folded_base + 1):512), op)
# reduce S[1] into R
threadIdx().x == 1 && (R[Ri] = op(R[Ri], S[Si_folded]))
# read from shared memory only if that warp existed
val = if threadIdx().x <= fld1(blockDim().x, CUDAnative.warpsize())
@inbounds shared[lane]
else
neutral
end
return

# final reduce within first warp
if wid == 1
val = reduce_warp(op, val)
end

return val
end
@inline function reduce_block(op, val::T, neutral, shuffle::Val{false}) where T
threads = blockDim().x
thread = threadIdx().x

## COV_EXCL_STOP
# shared mem for a complete reduction
shared = @cuDynamicSharedMem(T, (2*threads,))
@inbounds shared[thread] = val

# perform a reduction
d = threads>>1
while d > 0
sync_threads()
if thread <= d
shared[thread] = op(shared[thread], shared[thread+d])
end
d >>= 1
end

function Base._mapreducedim!(f, op, R::CuArray{T}, A::CuArray{T}) where {T}
# the kernel as generated from `f` and `op` can require lots of registers (eg. #160),
# so we need to be careful about how many threads we launch not to run out of them.
Rlength = length(R)
Ssize = ifelse.(size(R) .== 1, size(A), 1)
Slength = prod(Ssize)
CIS = CartesianIndices(Ssize)

parallel_args = (f, op, R, A, CIS, Rlength, Slength)
GC.@preserve parallel_args begin
parallel_kargs = cudaconvert.(parallel_args)
parallel_tt = Tuple{Core.Typeof.(parallel_kargs)...}
parallel_kernel = cufunction(mapreducedim_kernel_parallel, parallel_tt)

# we are limited in how many threads we can launch...
## by the kernel
kernel_threads = CUDAnative.maxthreads(parallel_kernel)
## by the device
dev = CUDAdrv.device()
block_threads = (x=attribute(dev, CUDAdrv.DEVICE_ATTRIBUTE_MAX_BLOCK_DIM_X),
y=attribute(dev, CUDAdrv.DEVICE_ATTRIBUTE_MAX_BLOCK_DIM_Y),
total=attribute(dev, CUDAdrv.DEVICE_ATTRIBUTE_MAX_THREADS_PER_BLOCK))

# figure out a legal launch configuration
y_thr = min(nextpow(2, Rlength ÷ 512 + 1), 512, block_threads.y, kernel_threads)
x_thr = min(512 ÷ y_thr, Slength, block_threads.x,
ceil(Int, block_threads.total/y_thr),
ceil(Int, kernel_threads/y_thr))

blk, thr = (Rlength - 1) ÷ y_thr + 1, (x_thr, y_thr, 1)
parallel_kernel(parallel_kargs...; threads=thr, blocks=blk)
# load the final value on the first thread
if thread == 1
val = @inbounds shared[thread]
end

return R
return val
end

import Base.minimum, Base.maximum, Base.reduce
# Partially reduce an array across the grid. The reduction is partial, with multiple
# blocks `gridDim_reduce` working on reducing data from `A` and writing it to multiple
# outputs in `R`. All elements to be processed can be addressed by the product of the
# two iterators `Rreduce` and `Rother`, where the latter iterator will have singleton
# entries for the dimensions that should be reduced (and vice versa). The output array
# is expected to have an additional dimension with as size the number of reduced values
# for every reduction (i.e. more than one if there's multiple blocks participating).
function partial_mapreduce_grid(f, op, A, R, neutral, Rreduce, Rother, gridDim_reduce, shuffle)
# decompose the 1D hardware indices into separate ones for reduction (across threads
# and possibly blocks if it doesn't fit) and other elements (remaining blocks)
threadIdx_reduce = threadIdx().x
blockDim_reduce = blockDim().x
blockIdx_other, blockIdx_reduce = fldmod1(blockIdx().x, gridDim_reduce)

# block-based indexing into the values outside of the reduction dimension
# (that means we can safely synchronize threads within this block)
iother = blockIdx_other
@inbounds if iother <= length(Rother)
Iother = Rother[iother]

# load the neutral value
Iout = CartesianIndex(Tuple(Iother)..., blockIdx_reduce)
neutral = if neutral === nothing
R[Iout]
else
neutral
end

_reduced_dims(x::CuArray, ::Colon) = Tuple(Base.ones(Int, ndims(x)))
_reduced_dims(x::CuArray, dims) = Base.reduced_indices(x, dims)
# get a value that should be reduced
ireduce = threadIdx_reduce + (blockIdx_reduce - 1) * blockDim_reduce
val = if ireduce <= length(Rreduce)
Ireduce = Rreduce[ireduce]
J = max(Iother, Ireduce)
f(A[J])
else
neutral
end
val = op(val, neutral)

_initarray(x::CuArray{T}, dims, init) where {T} = fill!(similar(x, T, _reduced_dims(x, dims)), init)
val = reduce_block(op, val, neutral, shuffle)

function _reduce(op, x::CuArray, init, ::Colon)
mx = _initarray(x, :, init)
Base._mapreducedim!(identity, op, mx, x)
collect(mx)[1]
end
# write back to memory
if threadIdx_reduce == 1
R[Iout] = val
end
end

function _reduce(op, x::CuArray, init, dims)
mx = _initarray(x, dims, init)
Base._mapreducedim!(identity, op, mx, x)
return
end

"""
reduce(op, x::CuArray; dims=:, init)
## COV_EXCL_STOP

The initial value `init` is mandatory for `reduce` on `CuArray`'s. It must be a neutral element for `op`.
"""
reduce(op, x::CuArray; dims=:, init) = _reduce(op, x, init, dims)
NVTX.@range function GPUArrays.mapreducedim!(f, op, R::CuArray{T}, A::AbstractArray, init=nothing) where T
Base.check_reducedims(R, A)
isempty(A) && return R

f = cufunc(f)
op = cufunc(op)

# be conservative about using shuffle instructions
shuffle = true
shuffle &= capability(device()) >= v"3.0"
shuffle &= T in (Int32, Int64, Float32, Float64, ComplexF32, ComplexF64)
# TODO: add support for Bool (CUDAnative.jl#420)

# iteration domain, split in two: one part covers the dimensions that should
# be reduced, and the other covers the rest. combining both covers all values.
Rall = CartesianIndices(A)
Rother = CartesianIndices(R)
Rreduce = CartesianIndices(ifelse.(axes(A) .== axes(R), Ref(Base.OneTo(1)), axes(A)))
# NOTE: we hard-code `OneTo` (`first.(axes(A))` would work too) or we get a
# CartesianIndices object with UnitRanges that behave badly on the GPU.
@assert length(Rall) == length(Rother) * length(Rreduce)

# allocate an additional, empty dimension to write the reduced value to.
# this does not affect the actual location in memory of the final values,
# but allows us to write a generalized kernel supporting partial reductions.
R′ = reshape(R, (size(R)..., 1))

# determine how many threads we can launch
args = (f, op, A, R′, init, Rreduce, Rother, 1, Val(shuffle))
kernel_args = cudaconvert.(args)
kernel_tt = Tuple{Core.Typeof.(kernel_args)...}
kernel = cufunction(partial_mapreduce_grid, kernel_tt)
kernel_config =
launch_configuration(kernel.fun; shmem = shuffle ? 0 : threads->2*threads*sizeof(T))

# determine the launch configuration
dev = device()
reduce_threads = shuffle ? nextwarp(dev, length(Rreduce)) : nextpow(2, length(Rreduce))
if reduce_threads > kernel_config.threads
reduce_threads = shuffle ? prevwarp(dev, kernel_config.threads) : prevpow(2, kernel_config.threads)
end
reduce_blocks = cld(length(Rreduce), reduce_threads)
other_blocks = length(Rother)
threads, blocks = reduce_threads, reduce_blocks*other_blocks
shmem = shuffle ? 0 : 2*threads*sizeof(T)

# perform the actual reduction
if reduce_blocks == 1
# we can cover the dimensions to reduce using a single block
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
f, op, A, R′, init, Rreduce, Rother, 1, Val(shuffle))
else
# we need multiple steps to cover all values to reduce
partial = similar(R, (size(R)..., reduce_blocks))
if init === nothing
# without an explicit initializer we need to copy from the output container
sz = prod(size(R))
for i in 1:reduce_blocks
# TODO: async copies (or async fill!, but then we'd need to load first)
# or maybe just broadcast since that extends singleton dimensions
copyto!(partial, (i-1)*sz+1, R, 1, sz)
end
end
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
f, op, A, partial, init, Rreduce, Rother, reduce_blocks, Val(shuffle))

minimum(x::CuArray{T}; dims=:) where {T} = _reduce(min, x, typemax(T), dims)
maximum(x::CuArray{T}; dims=:) where {T} = _reduce(max, x, typemin(T), dims)
GPUArrays.mapreducedim!(identity, op, R′, partial, init)
end

return R
end
Loading

0 comments on commit 266c6ed

Please sign in to comment.