Skip to content

Commit

Permalink
WIP: Change OMP reduction implementation
Browse files Browse the repository at this point in the history
At most allocate as much as the input vector for OMP reductions.
  • Loading branch information
Thomas Grützmacher committed Jul 28, 2023
1 parent f78df62 commit 6ed0741
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 38 deletions.
45 changes: 27 additions & 18 deletions omp/base/kernel_launch_reduction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ void run_kernel_reduction_impl(std::shared_ptr<const OmpExecutor> exec,
ValueType* result, size_type size,
array<char>& tmp, MappedKernelArgs... args)
{
const auto num_threads = static_cast<int64>(omp_get_max_threads());
const auto ssize = static_cast<int64>(size);
// Limit the number of threads to the number of columns
const auto num_threads = std::min<int64>(omp_get_max_threads(), ssize);
const auto work_per_thread = ceildiv(ssize, num_threads);
const auto required_storage = sizeof(ValueType) * num_threads;
if (tmp.get_num_elems() < required_storage) {
Expand All @@ -82,8 +83,8 @@ void run_kernel_reduction_impl(std::shared_ptr<const OmpExecutor> exec,
}
partial[thread_id] = local_partial;
}
*result =
finalize(std::accumulate(partial, partial + num_threads, identity, op));
*result = finalize(std::accumulate(
partial, partial + required_storage / sizeof(ValueType), identity, op));
}


Expand All @@ -99,7 +100,8 @@ void run_kernel_reduction_sized_impl(syn::value_list<int, remainder_cols>,
{
const auto rows = static_cast<int64>(size[0]);
const auto cols = static_cast<int64>(size[1]);
const auto num_threads = static_cast<int64>(omp_get_max_threads());
// Limit the number of threads to the number of columns
const auto num_threads = std::min<int64>(omp_get_max_threads(), rows);
const auto work_per_thread = ceildiv(rows, num_threads);
const auto required_storage = sizeof(ValueType) * num_threads;
if (tmp.get_num_elems() < required_storage) {
Expand All @@ -109,7 +111,7 @@ void run_kernel_reduction_sized_impl(syn::value_list<int, remainder_cols>,
static_assert(remainder_cols < block_size, "remainder too large");
const auto rounded_cols = cols / block_size * block_size;
GKO_ASSERT(rounded_cols + remainder_cols == cols);
#pragma omp parallel
#pragma omp parallel num_threads(num_threads)
{
const auto thread_id = omp_get_thread_num();
const auto begin = thread_id * work_per_thread;
Expand Down Expand Up @@ -147,8 +149,8 @@ void run_kernel_reduction_sized_impl(syn::value_list<int, remainder_cols>,
}
partial[thread_id] = local_partial;
}
*result =
finalize(std::accumulate(partial, partial + num_threads, identity, op));
*result = finalize(std::accumulate(
partial, partial + required_storage / sizeof(ValueType), identity, op));
}

GKO_ENABLE_IMPLEMENTATION_SELECTION(select_run_kernel_reduction_sized,
Expand Down Expand Up @@ -210,12 +212,12 @@ void run_kernel_row_reduction_impl(std::shared_ptr<const OmpExecutor> exec,
constexpr int block_size = 8;
const auto rows = static_cast<int64>(size[0]);
const auto cols = static_cast<int64>(size[1]);
const auto num_threads = static_cast<int64>(omp_get_max_threads());
const auto available_threads = static_cast<int64>(omp_get_max_threads());
if (rows <= 0) {
return;
}
// enough work to keep all threads busy or only very small reduction sizes
if (rows >= reduction_kernel_oversubscription * num_threads ||
if (rows >= reduction_kernel_oversubscription * available_threads ||
cols < rows) {
#pragma omp parallel for
for (int64 row = 0; row < rows; row++) {
Expand All @@ -229,8 +231,11 @@ void run_kernel_row_reduction_impl(std::shared_ptr<const OmpExecutor> exec,
}
} else {
// small number of rows and large reduction sizes: do partial sum first
const auto num_threads = std::min<int64>(available_threads, cols);
const auto work_per_thread = ceildiv(cols, num_threads);
const auto required_storage = sizeof(ValueType) * rows * num_threads;
const auto temp_elems_per_row = num_threads;
const auto required_storage =
sizeof(ValueType) * rows * temp_elems_per_row;
if (tmp.get_num_elems() < required_storage) {
tmp.resize_and_reset(required_storage);
}
Expand All @@ -247,18 +252,19 @@ void run_kernel_row_reduction_impl(std::shared_ptr<const OmpExecutor> exec,
return fn(row, col, args...);
}());
}
partial[row * num_threads + thread_id] = local_partial;
partial[row * temp_elems_per_row + thread_id] = local_partial;
}
}
// then accumulate the partial sums and write to result
#pragma omp parallel for
for (int64 row = 0; row < rows; row++) {
[&] {
auto local_partial = identity;
for (int64 thread_id = 0; thread_id < num_threads;
for (int64 thread_id = 0; thread_id < temp_elems_per_row;
thread_id++) {
local_partial = op(local_partial,
partial[row * num_threads + thread_id]);
local_partial =
op(local_partial,
partial[row * temp_elems_per_row + thread_id]);
}
result[row * result_stride] = finalize(local_partial);
}();
Expand Down Expand Up @@ -302,12 +308,12 @@ void run_kernel_col_reduction_sized_impl(
{
const auto rows = static_cast<int64>(size[0]);
const auto cols = static_cast<int64>(size[1]);
const auto num_threads = static_cast<int64>(omp_get_max_threads());
const auto available_threads = static_cast<int64>(omp_get_max_threads());
static_assert(remainder_cols < block_size, "remainder too large");
GKO_ASSERT(cols % block_size == remainder_cols);
const auto num_col_blocks = ceildiv(cols, block_size);
// enough work to keep all threads busy or only very small reduction sizes
if (cols >= reduction_kernel_oversubscription * num_threads ||
if (cols >= reduction_kernel_oversubscription * available_threads ||
rows < cols) {
#pragma omp parallel for
for (int64 col_block = 0; col_block < num_col_blocks; col_block++) {
Expand All @@ -324,8 +330,11 @@ void run_kernel_col_reduction_sized_impl(
}
} else {
// number of blocks that need to be reduced afterwards
const auto reduction_size =
ceildiv(reduction_kernel_oversubscription * num_threads, cols);
// This reduction_size definition ensures we don't use more temporary
// storage than the input vector
const auto reduction_size = std::min(
rows, ceildiv(reduction_kernel_oversubscription * available_threads,
cols));
const auto rows_per_thread = ceildiv(rows, reduction_size);
const auto required_storage = sizeof(ValueType) * cols * reduction_size;
if (tmp.get_num_elems() < required_storage) {
Expand Down
27 changes: 7 additions & 20 deletions test/base/kernel_launch_generic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,13 @@ void run1d_reduction_cached(std::shared_ptr<gko::EXEC_TYPE> exec,
static_cast<int64>(size));
// The temporary storage (used for partial sums) must be smaller than
// the input array
ASSERT_LT(temp.get_num_elems() / sizeof(int64), size);
ASSERT_LE(temp.get_num_elems() / sizeof(int64), size);
}
}

TEST_F(KernelLaunch, Reduction1DCached)
{
// Note: Start with at least 200 elements in case the machine has a lot of
// cores
run1d_reduction_cached(exec, {1000, 1000000, 1234567, 7654321});
run1d_reduction_cached(exec, {10, 1000, 1000000, 1234567, 7654321});
}


Expand Down Expand Up @@ -465,7 +463,6 @@ TEST_F(KernelLaunch, Reduction2D) { run2d_reduction(exec); }
void run2d_reduction_cached(std::shared_ptr<gko::EXEC_TYPE> exec,
std::vector<gko::dim<2>> dims)
{
constexpr size_type min_allowed_tmp_elems = 4 * 256;
gko::array<int64> output{exec, 1};
gko::array<char> temp(exec);
for (const auto& dim : dims) {
Expand All @@ -479,10 +476,8 @@ void run2d_reduction_cached(std::shared_ptr<gko::EXEC_TYPE> exec,
ASSERT_EQ(exec->copy_val_to_host(output.get_const_data()),
static_cast<int64>(dim[0] + dim[1]));
// The temporary storage (used for partial sums) must be smaller than
// the input array (or smaller than a set minimum)
const size_type max_tmp_elems =
std::max(dim[0] * dim[1], min_allowed_tmp_elems);
ASSERT_LT(temp.get_num_elems() / sizeof(int64), max_tmp_elems);
// the input array
ASSERT_LE(temp.get_num_elems() / sizeof(int64), dim[0] * dim[1]);
}
}

Expand Down Expand Up @@ -555,9 +550,6 @@ TEST_F(KernelLaunch, ReductionRow2D) { run2d_row_reduction(exec); }
void run2d_row_reduction_cached(std::shared_ptr<gko::EXEC_TYPE> exec,
std::vector<gko::dim<2>> dims)
{
// The 2D row reduction potentially needs a lot of memory for small input
// sizes
constexpr size_type min_allowed_tmp_elems = 4 * 256 * 4 * 256;
const size_type result_stride = 1;
gko::array<char> temp(exec);
for (const auto& dim : dims) {
Expand All @@ -576,10 +568,8 @@ void run2d_row_reduction_cached(std::shared_ptr<gko::EXEC_TYPE> exec,

GKO_ASSERT_ARRAY_EQ(host_ref, output);
// The temporary storage (used for partial sums) must be smaller than
// the input array (or smaller than a set minimum)
const size_type max_tmp_elems =
std::max(dim[0] * dim[1], min_allowed_tmp_elems);
ASSERT_LT(temp.get_num_elems() / sizeof(int64), max_tmp_elems);
// the input array
ASSERT_LE(temp.get_num_elems() / sizeof(int64), dim[0] * dim[1]);
}
}

Expand Down Expand Up @@ -654,7 +644,6 @@ TEST_F(KernelLaunch, ReductionCol2D) { run2d_col_reduction(exec); }
void run2d_col_reduction_cached(std::shared_ptr<gko::EXEC_TYPE> exec,
std::vector<gko::dim<2>> dims)
{
constexpr size_type min_allowed_tmp_elems = 4 * 256;
gko::array<char> temp(exec);
for (const auto& dim : dims) {
gko::array<int64> host_ref{exec->get_master(), dim[1]};
Expand All @@ -671,9 +660,7 @@ void run2d_col_reduction_cached(std::shared_ptr<gko::EXEC_TYPE> exec,
dim, temp);

GKO_ASSERT_ARRAY_EQ(host_ref, output);
const size_type temp_elem_limit =
std::max(min_allowed_tmp_elems, dim[0] * dim[1]);
ASSERT_LT(temp.get_num_elems() / sizeof(int64), temp_elem_limit);
ASSERT_LE(temp.get_num_elems() / sizeof(int64), dim[0] * dim[1]);
}
}

Expand Down

0 comments on commit 6ed0741

Please sign in to comment.