Skip to content

Commit

Permalink
Add distributed::exclusive_scan()
Browse files Browse the repository at this point in the history
  • Loading branch information
jszuppe committed Aug 29, 2016
1 parent 6067c3b commit 43c31b3
Show file tree
Hide file tree
Showing 3 changed files with 353 additions and 0 deletions.
183 changes: 183 additions & 0 deletions include/boost/compute/distributed/exclusive_scan.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//---------------------------------------------------------------------------//
// Copyright (c) 2013 Kyle Lutz <kyle.r.lutz@gmail.com>
//
// Distributed under the Boost Software License, Version 1.0
// See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt
//
// See http://boostorg.github.com/compute for more information.
//---------------------------------------------------------------------------//

#ifndef BOOST_COMPUTE_DISTRIBUTED_EXCLUSIVE_SCAN_HPP
#define BOOST_COMPUTE_DISTRIBUTED_EXCLUSIVE_SCAN_HPP

#include <vector>

#include <boost/compute/container/vector.hpp>
#include <boost/compute/algorithm/copy.hpp>
#include <boost/compute/algorithm/exclusive_scan.hpp>
#include <boost/compute/algorithm/inclusive_scan.hpp>
#include <boost/compute/algorithm/merge.hpp>
#include <boost/compute/iterator/buffer_iterator.hpp>
#include <boost/compute/allocator/pinned_allocator.hpp>

#include <boost/compute/distributed/command_queue.hpp>
#include <boost/compute/distributed/vector.hpp>

namespace boost {
namespace compute {
namespace distributed {

template<
class InputType, weight_func weight, class Alloc,
class OutputType,
class BinaryOperator
>
inline void
exclusive_scan(const vector<InputType, weight, Alloc> &input,
vector<OutputType, weight, Alloc> &result,
OutputType init,
BinaryOperator binary_op,
command_queue &queue)
{
BOOST_ASSERT(input.parts() == result.parts());
BOOST_ASSERT(input.size() == result.size());

std::vector<OutputType> input_tails;
input_tails.reserve(input.parts() - 1);
for(size_t i = 0; i < input.parts(); i++)
{
if(input.begin(i) != input.end(i) && i < (input.parts() - 1))
{
input_tails.push_back(
static_cast<OutputType>(
(input.end(i) - 1).read(queue.get(i))
)
);
}

if(i == 0)
{
::boost::compute::exclusive_scan(
input.begin(i),
input.end(i),
result.begin(i),
init,
binary_op,
queue.get(i)
);
}
else
{
::boost::compute::exclusive_scan(
input.begin(i),
input.end(i),
result.begin(i),
input_tails[i - 1],
binary_op,
queue.get(i)
);
}
}

// find device for calculating partial sum of last elements of input vector
::boost::compute::command_queue& device_queue = queue.get(0);
// CPU device is preferred, however if there is none, the first device
// queue is used
for(size_t i = 0; i < queue.size(); i++)
{
if(queue.get(i).get_device().type() & ::boost::compute::device::cpu)
{
device_queue = queue.get(i);
break;
}
}

std::vector<OutputType> output_tails(input_tails.size());
for(size_t i = 0; i < input.parts() - 1; i++)
{
if(input.begin(i) != input.end(i))
{
output_tails[i] = (result.end(i) - 1).read(queue.get(i));
}
}
::boost::compute::vector<OutputType> output_tails_device(
output_tails.size(), device_queue.get_context()
);
::boost::compute::copy_async(
output_tails.begin(),
output_tails.end(),
output_tails_device.begin(),
device_queue
);
::boost::compute::inclusive_scan(
output_tails_device.begin(),
output_tails_device.end(),
output_tails_device.begin(),
device_queue
);
::boost::compute::copy(
output_tails_device.begin(),
output_tails_device.end(),
output_tails.begin(),
device_queue
);
for(size_t i = 1; i < input.parts(); i++)
{
::boost::compute::transform(
result.begin(i),
result.end(i),
::boost::compute::make_constant_iterator(
output_tails[i - 1]
),
result.begin(i),
binary_op,
queue.get(i)
);
}
}

/// \overload
template<
class InputType, weight_func weight, class Alloc,
class OutputType
>
inline void
exclusive_scan(const vector<InputType, weight, Alloc> &input,
vector<OutputType, weight, Alloc> &result,
OutputType init,
command_queue &queue)
{
::boost::compute::distributed::exclusive_scan(
input,
result,
init,
boost::compute::plus<OutputType>(),
queue
);
}

/// \overload
template<
class InputType, weight_func weight, class Alloc,
class OutputType
>
inline void
exclusive_scan(const vector<InputType, weight, Alloc> &input,
vector<OutputType, weight, Alloc> &result,
command_queue &queue)
{
::boost::compute::distributed::exclusive_scan(
input,
result,
OutputType(0),
boost::compute::plus<OutputType>(),
queue
);
}

} // end distributed namespace
} // end compute namespace
} // end boost namespace

#endif /* BOOST_COMPUTE_DISTRIBUTED_SCAN_HPP */
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ add_compute_test("distributed.vector" test_distributed_vector.cpp)
add_compute_test("distributed.copy" test_distributed_copy.cpp)
add_compute_test("distributed.reduce" test_distributed_reduce.cpp)
add_compute_test("distributed.transform" test_distributed_transform.cpp)
add_compute_test("distributed.transform" test_distributed_scan.cpp)

add_compute_test("utility.extents" test_extents.cpp)
add_compute_test("utility.invoke" test_invoke.cpp)
Expand Down
169 changes: 169 additions & 0 deletions test/test_distributed_scan.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//---------------------------------------------------------------------------//
// Copyright (c) 2016 Jakub Szuppe <j.szuppe@gmail.com>
//
// Distributed under the Boost Software License, Version 1.0
// See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt
//
// See http://boostorg.github.com/compute for more information.
//---------------------------------------------------------------------------//

#define BOOST_TEST_MODULE TestDistributedScan
#include <boost/test/unit_test.hpp>

#include <algorithm>

#include <boost/compute/algorithm.hpp>
#include <boost/compute/functional.hpp>
#include <boost/compute/algorithm.hpp>
#include <boost/compute/function.hpp>
#include <boost/compute/container/vector.hpp>

#include <boost/compute/distributed/context.hpp>
#include <boost/compute/distributed/command_queue.hpp>
#include <boost/compute/distributed/vector.hpp>
#include <boost/compute/distributed/exclusive_scan.hpp>
#include <boost/compute/distributed/copy.hpp>

#include "check_macros.hpp"
#include "context_setup.hpp"

#include "distributed_check_functions.hpp"
#include "distributed_queue_setup.hpp"

namespace bc = boost::compute;

BOOST_AUTO_TEST_CASE(exclusive_scan_int)
{
// construct distributed::command_queue
bc::distributed::command_queue distributed_queue =
get_distributed_queue(queue, 4);

std::vector<bc::int_> data(size_t(128));
for(size_t i = 0; i < data.size(); i++) {
data[i] = i;
}

bc::distributed::vector<bc::int_> distributed_input(
data.begin(), data.end(), distributed_queue
);
bc::distributed::vector<bc::int_> distributed_result(
data.size(), distributed_queue
);
distributed_queue.finish();

BOOST_CHECK(
distributed_equal(
distributed_input,
data.begin(), data.end(),
distributed_queue
)
);

bc::distributed::exclusive_scan(
distributed_input,
distributed_result,
bc::int_(10),
distributed_queue
);
distributed_queue.finish();

bc::vector<bc::int_> device_input(data.begin(), data.end(), queue);
bc::vector<bc::int_> device_expected(data.size(), context);
std::vector<bc::int_> host_expected(device_expected.size());
bc::exclusive_scan(
device_input.begin(),
device_input.end(),
device_expected.begin(),
bc::int_(10),
queue
);
bc::copy(
device_expected.begin(),
device_expected.end(),
host_expected.begin(),
queue
);
queue.finish();

BOOST_CHECK(
distributed_equal(
distributed_input,
data.begin(), data.end(),
distributed_queue
)
);
BOOST_CHECK(
distributed_equal(
distributed_result,
host_expected.begin(), host_expected.end(),
distributed_queue
)
);
}

BOOST_AUTO_TEST_CASE(exclusive_scan_custom_function_int)
{
// construct distributed::command_queue
bc::distributed::command_queue distributed_queue =
get_distributed_queue(queue, 3);

BOOST_COMPUTE_FUNCTION(bc::int_, custom_sum, (bc::int_ x, bc::int_ y),
{
return x + y;
});

std::vector<bc::int_> data(size_t(128));
for(size_t i = 0; i < data.size(); i++) {
data[i] = i;
}

bc::distributed::vector<bc::int_> distributed_input(
data.begin(), data.end(), distributed_queue
);
distributed_queue.finish();

BOOST_CHECK(
distributed_equal(
distributed_input,
data.begin(), data.end(),
distributed_queue
)
);

bc::distributed::exclusive_scan(
distributed_input,
distributed_input,
bc::int_(10),
custom_sum,
distributed_queue
);
distributed_queue.finish();

bc::vector<bc::int_> device_input(data.begin(), data.end(), queue);
bc::vector<bc::int_> device_expected(data.size(), context);
std::vector<bc::int_> host_expected(device_expected.size());
bc::exclusive_scan(
device_input.begin(),
device_input.end(),
device_expected.begin(),
bc::int_(10),
queue
);
bc::copy(
device_expected.begin(),
device_expected.end(),
host_expected.begin(),
queue
);
queue.finish();
BOOST_CHECK(
distributed_equal(
distributed_input,
host_expected.begin(), host_expected.end(),
distributed_queue
)
);
}

BOOST_AUTO_TEST_SUITE_END()

0 comments on commit 43c31b3

Please sign in to comment.