diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 6ff5c516..803093bc 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -87,15 +87,22 @@ Base.@propagate_inbounds _map_getindex(args::Tuple{}, I) = () # Reduce an array across the grid. All elements to be processed can be addressed by the # product of the two iterators `Rreduce` and `Rother`, where the latter iterator will have # singleton entries for the dimensions that should be reduced (and vice versa). -function mapreduce_grid(f, op, neutral, Rreduce, Rother, shuffle, R, As...) +function partial_mapreduce_grid(f, op, neutral, Rreduce, Rother, shuffle, R, As...) + # decompose the 1D hardware indices into separate ones for reduction (across threads + # and possibly blocks if it doesn't fit) and other elements (remaining blocks) + threadIdx_reduce = threadIdx().x + blockDim_reduce = blockDim().x + blockIdx_reduce, blockIdx_other = fldmod1(blockIdx().x, length(Rother)) + gridDim_reduce = gridDim().x ÷ length(Rother) + # block-based indexing into the values outside of the reduction dimension # (that means we can safely synchronize threads within this block) - iother = blockIdx().x + iother = blockIdx_other @inbounds if iother <= length(Rother) Iother = Rother[iother] # load the neutral value - Iout = Iother + Iout = CartesianIndex(Tuple(Iother)..., blockIdx_reduce) neutral = if neutral === nothing R[Iout] else @@ -105,19 +112,18 @@ function mapreduce_grid(f, op, neutral, Rreduce, Rother, shuffle, R, As...) val = op(neutral, neutral) # reduce serially across chunks of input vector that don't fit in a block - ireduce = threadIdx().x + ireduce = threadIdx_reduce + (blockIdx_reduce - 1) * blockDim_reduce while ireduce <= length(Rreduce) Ireduce = Rreduce[ireduce] J = max(Iother, Ireduce) val = op(val, f(_map_getindex(As, J)...)) - ireduce += blockDim().x + ireduce += blockDim_reduce * gridDim_reduce end - # reduce in parallel within the current block val = reduce_block(op, val, neutral, shuffle) # write back to memory - if threadIdx().x == 1 + if threadIdx_reduce == 1 R[Iout] = val end end @@ -152,21 +158,79 @@ NVTX.@range function GPUArrays.mapreducedim!(f, op, R::CuArray{T}, As::AbstractA # CartesianIndices object with UnitRanges that behave badly on the GPU. @assert length(Rall) == length(Rother) * length(Rreduce) - function configurator(kernel) - config = launch_configuration(kernel.fun) - dev = device() - - threads = shuffle ? nextwarp(dev, length(Rreduce)) : nextpow(2, length(Rreduce)) - if threads > config.threads - threads = shuffle ? prevwarp(dev, config.threads) : prevpow(2, config.threads) + # allocate an additional, empty dimension to write the reduced value to. + # this does not affect the actual location in memory of the final values, + # but allows us to write a generalized kernel supporting partial reductions. + R′ = reshape(R, (size(R)..., 1)) + + # how many threads do we want? + # + # threads in a block work together to reduce values across the reduction dimensions; + # we want as many as possible to improve algorithm efficiency and execution occupancy. + dev = device() + wanted_threads = shuffle ? nextwarp(dev, length(Rreduce)) : nextpow(2, length(Rreduce)) + function compute_threads(max_threads) + if wanted_threads > max_threads + shuffle ? prevwarp(dev, max_threads) : prevpow(2, max_threads) + else + wanted_threads end - blocks = length(Rother) - shmem = shuffle ? 0 : 2*threads*sizeof(T) + end - return (threads=threads, blocks=blocks, shmem=shmem) + # how many threads can we launch? + # + # we might not be able to launch all those threads to reduce each slice in one go. + # that's why each threads also loops across their inputs, processing multiple values + # so that we can span the entire reduction dimension using a single thread block. + args = (f, op, init, Rreduce, Rother, Val(shuffle), R′, As...) + kernel_args = cudaconvert.(args) + kernel_tt = Tuple{Core.Typeof.(kernel_args)...} + kernel = cufunction(partial_mapreduce_grid, kernel_tt) + compute_shmem(threads) = shuffle ? 0 : 2*threads*sizeof(T) + kernel_config = launch_configuration(kernel.fun; shmem=compute_shmem∘compute_threads) + reduce_threads = compute_threads(kernel_config.threads) + reduce_shmem = compute_shmem(reduce_threads) + + # how many blocks should we launch? + # + # even though we can always reduce each slice in a single thread block, that may not be + # optimal as it might not saturate the GPU. we already launch some blocks to process + # independent dimensions in parallel; pad that number to ensure full occupancy. + other_blocks = length(Rother) + reduce_blocks = if other_blocks >= kernel_config.blocks + 1 + else + min(cld(length(Rreduce), reduce_threads), # how many we need at most + cld(kernel_config.blocks, other_blocks)) # maximize occupancy end - @cuda config=configurator mapreduce_grid(f, op, init, Rreduce, Rother, Val(shuffle), R, As...) + # determine the launch configuration + threads = reduce_threads + shmem = reduce_shmem + blocks = reduce_blocks*other_blocks + + # perform the actual reduction + if reduce_blocks == 1 + # we can cover the dimensions to reduce using a single block + @cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid( + f, op, init, Rreduce, Rother, Val(shuffle), R′, As...) + else + # we need multiple steps to cover all values to reduce + partial = similar(R, (size(R)..., reduce_blocks)) + if init === nothing + # without an explicit initializer we need to copy from the output container + sz = prod(size(R)) + for i in 1:reduce_blocks + # TODO: async copies (or async fill!, but then we'd need to load first) + # or maybe just broadcast since that extends singleton dimensions + copyto!(partial, (i-1)*sz+1, R, 1, sz) + end + end + @cuda threads=threads blocks=blocks shmem=shmem partial_mapreduce_grid( + f, op, init, Rreduce, Rother, Val(shuffle), partial, As...) + + GPUArrays.mapreducedim!(identity, op, R′, partial; init=init) + end return R end