Skip to content
This repository has been archived by the owner on Mar 12, 2021. It is now read-only.

Commit

Permalink
Merge #646
Browse files Browse the repository at this point in the history
646: Improve mapreduce performance r=maleadt a=wongalvis14

~More than 3-fold improvement over the latest implementation~

Benchmarking function from #611

First stage: Using the number of "max parallel threads a single block can hold" as the number of blocks, perform reduction with serial iteration if needed

Second stage: Reduction in a single block, no serial iteration

This approach aims to strike an optimal balance between workload of each thread, kernel launch overhead and parallel resource exhaustion.

```
New impl:
julia> @benchmark pi_mc_cu(10000000)
BenchmarkTools.Trial: 
  memory estimate:  16.98 KiB
  allocs estimate:  468
  --------------
  minimum time:     2.520 ms (0.00% GC)
  median time:      2.536 ms (0.00% GC)
  mean time:        2.584 ms (0.64% GC)
  maximum time:     15.600 ms (50.62% GC)
  --------------
  samples:          1930
  evals/sample:     1

Old recursion impl:
julia> @benchmark pi_mc_cu(10000000)
BenchmarkTools.Trial: 
  memory estimate:  17.05 KiB
  allocs estimate:  472
  --------------
  minimum time:     4.059 ms (0.00% GC)
  median time:      4.076 ms (0.00% GC)
  mean time:        4.130 ms (0.64% GC)
  maximum time:     23.199 ms (63.12% GC)
  --------------
  samples:          1209
  evals/sample:     1

Latest serial impl:
BenchmarkTools.Trial: 
  memory estimate:  7.81 KiB
  allocs estimate:  242
  --------------
  minimum time:     8.544 ms (0.00% GC)
  median time:      8.579 ms (0.00% GC)
  mean time:        8.622 ms (0.27% GC)
  maximum time:     26.172 ms (41.80% GC)
  --------------
  samples:          580
  evals/sample:     1
```

Co-authored-by: wongalvis14 <wongalvis14@gmail.com>
Co-authored-by: Tim Besard <tim.besard@gmail.com>
  • Loading branch information
3 people authored Mar 31, 2020
2 parents 31cd748 + 135cc76 commit 5047dc9
Showing 1 changed file with 82 additions and 18 deletions.
100 changes: 82 additions & 18 deletions src/mapreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,22 @@ Base.@propagate_inbounds _map_getindex(args::Tuple{}, I) = ()
# Reduce an array across the grid. All elements to be processed can be addressed by the
# product of the two iterators `Rreduce` and `Rother`, where the latter iterator will have
# singleton entries for the dimensions that should be reduced (and vice versa).
function mapreduce_grid(f, op, neutral, Rreduce, Rother, shuffle, R, As...)
function partial_mapreduce_grid(f, op, neutral, Rreduce, Rother, shuffle, R, As...)
# decompose the 1D hardware indices into separate ones for reduction (across threads
# and possibly blocks if it doesn't fit) and other elements (remaining blocks)
threadIdx_reduce = threadIdx().x
blockDim_reduce = blockDim().x
blockIdx_reduce, blockIdx_other = fldmod1(blockIdx().x, length(Rother))
gridDim_reduce = gridDim().x ÷ length(Rother)

# block-based indexing into the values outside of the reduction dimension
# (that means we can safely synchronize threads within this block)
iother = blockIdx().x
iother = blockIdx_other
@inbounds if iother <= length(Rother)
Iother = Rother[iother]

# load the neutral value
Iout = Iother
Iout = CartesianIndex(Tuple(Iother)..., blockIdx_reduce)
neutral = if neutral === nothing
R[Iout]
else
Expand All @@ -105,19 +112,18 @@ function mapreduce_grid(f, op, neutral, Rreduce, Rother, shuffle, R, As...)
val = op(neutral, neutral)

# reduce serially across chunks of input vector that don't fit in a block
ireduce = threadIdx().x
ireduce = threadIdx_reduce + (blockIdx_reduce - 1) * blockDim_reduce
while ireduce <= length(Rreduce)
Ireduce = Rreduce[ireduce]
J = max(Iother, Ireduce)
val = op(val, f(_map_getindex(As, J)...))
ireduce += blockDim().x
ireduce += blockDim_reduce * gridDim_reduce
end

# reduce in parallel within the current block
val = reduce_block(op, val, neutral, shuffle)

# write back to memory
if threadIdx().x == 1
if threadIdx_reduce == 1
R[Iout] = val
end
end
Expand Down Expand Up @@ -152,21 +158,79 @@ NVTX.@range function GPUArrays.mapreducedim!(f, op, R::CuArray{T}, As::AbstractA
# CartesianIndices object with UnitRanges that behave badly on the GPU.
@assert length(Rall) == length(Rother) * length(Rreduce)

function configurator(kernel)
config = launch_configuration(kernel.fun)
dev = device()

threads = shuffle ? nextwarp(dev, length(Rreduce)) : nextpow(2, length(Rreduce))
if threads > config.threads
threads = shuffle ? prevwarp(dev, config.threads) : prevpow(2, config.threads)
# allocate an additional, empty dimension to write the reduced value to.
# this does not affect the actual location in memory of the final values,
# but allows us to write a generalized kernel supporting partial reductions.
R′ = reshape(R, (size(R)..., 1))

# how many threads do we want?
#
# threads in a block work together to reduce values across the reduction dimensions;
# we want as many as possible to improve algorithm efficiency and execution occupancy.
dev = device()
wanted_threads = shuffle ? nextwarp(dev, length(Rreduce)) : nextpow(2, length(Rreduce))
function compute_threads(max_threads)
if wanted_threads > max_threads
shuffle ? prevwarp(dev, max_threads) : prevpow(2, max_threads)
else
wanted_threads
end
blocks = length(Rother)
shmem = shuffle ? 0 : 2*threads*sizeof(T)
end

return (threads=threads, blocks=blocks, shmem=shmem)
# how many threads can we launch?
#
# we might not be able to launch all those threads to reduce each slice in one go.
# that's why each threads also loops across their inputs, processing multiple values
# so that we can span the entire reduction dimension using a single thread block.
args = (f, op, init, Rreduce, Rother, Val(shuffle), R′, As...)
kernel_args = cudaconvert.(args)
kernel_tt = Tuple{Core.Typeof.(kernel_args)...}
kernel = cufunction(partial_mapreduce_grid, kernel_tt)
compute_shmem(threads) = shuffle ? 0 : 2*threads*sizeof(T)
kernel_config = launch_configuration(kernel.fun; shmem=compute_shmemcompute_threads)
reduce_threads = compute_threads(kernel_config.threads)
reduce_shmem = compute_shmem(reduce_threads)

# how many blocks should we launch?
#
# even though we can always reduce each slice in a single thread block, that may not be
# optimal as it might not saturate the GPU. we already launch some blocks to process
# independent dimensions in parallel; pad that number to ensure full occupancy.
other_blocks = length(Rother)
reduce_blocks = if other_blocks >= kernel_config.blocks
1
else
min(cld(length(Rreduce), reduce_threads), # how many we need at most
cld(kernel_config.blocks, other_blocks)) # maximize occupancy
end

@cuda config=configurator mapreduce_grid(f, op, init, Rreduce, Rother, Val(shuffle), R, As...)
# determine the launch configuration
threads = reduce_threads
shmem = reduce_shmem
blocks = reduce_blocks*other_blocks

# perform the actual reduction
if reduce_blocks == 1
# we can cover the dimensions to reduce using a single block
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
f, op, init, Rreduce, Rother, Val(shuffle), R′, As...)
else
# we need multiple steps to cover all values to reduce
partial = similar(R, (size(R)..., reduce_blocks))
if init === nothing
# without an explicit initializer we need to copy from the output container
sz = prod(size(R))
for i in 1:reduce_blocks
# TODO: async copies (or async fill!, but then we'd need to load first)
# or maybe just broadcast since that extends singleton dimensions
copyto!(partial, (i-1)*sz+1, R, 1, sz)
end
end
@cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid(
f, op, init, Rreduce, Rother, Val(shuffle), partial, As...)

GPUArrays.mapreducedim!(identity, op, R′, partial; init=init)
end

return R
end

0 comments on commit 5047dc9

Please sign in to comment.