diff --git a/include/boost/compute/distributed/exclusive_scan.hpp b/include/boost/compute/distributed/exclusive_scan.hpp new file mode 100644 index 000000000..884de3f38 --- /dev/null +++ b/include/boost/compute/distributed/exclusive_scan.hpp @@ -0,0 +1,183 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2013 Kyle Lutz +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#ifndef BOOST_COMPUTE_DISTRIBUTED_EXCLUSIVE_SCAN_HPP +#define BOOST_COMPUTE_DISTRIBUTED_EXCLUSIVE_SCAN_HPP + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace boost { +namespace compute { +namespace distributed { + +template< + class InputType, weight_func weight, class Alloc, + class OutputType, + class BinaryOperator +> +inline void +exclusive_scan(const vector &input, + vector &result, + OutputType init, + BinaryOperator binary_op, + command_queue &queue) +{ + BOOST_ASSERT(input.parts() == result.parts()); + BOOST_ASSERT(input.size() == result.size()); + + std::vector input_tails; + input_tails.reserve(input.parts() - 1); + for(size_t i = 0; i < input.parts(); i++) + { + if(input.begin(i) != input.end(i) && i < (input.parts() - 1)) + { + input_tails.push_back( + static_cast( + (input.end(i) - 1).read(queue.get(i)) + ) + ); + } + + if(i == 0) + { + ::boost::compute::exclusive_scan( + input.begin(i), + input.end(i), + result.begin(i), + init, + binary_op, + queue.get(i) + ); + } + else + { + ::boost::compute::exclusive_scan( + input.begin(i), + input.end(i), + result.begin(i), + input_tails[i - 1], + binary_op, + queue.get(i) + ); + } + } + + // find device for calculating partial sum of last elements of input vector + ::boost::compute::command_queue& device_queue = queue.get(0); + // CPU device is preferred, however if there is none, the first device + // queue is used + for(size_t i = 0; i < queue.size(); i++) + { + if(queue.get(i).get_device().type() & ::boost::compute::device::cpu) + { + device_queue = queue.get(i); + break; + } + } + + std::vector output_tails(input_tails.size()); + for(size_t i = 0; i < input.parts() - 1; i++) + { + if(input.begin(i) != input.end(i)) + { + output_tails[i] = (result.end(i) - 1).read(queue.get(i)); + } + } + ::boost::compute::vector output_tails_device( + output_tails.size(), device_queue.get_context() + ); + ::boost::compute::copy_async( + output_tails.begin(), + output_tails.end(), + output_tails_device.begin(), + device_queue + ); + ::boost::compute::inclusive_scan( + output_tails_device.begin(), + output_tails_device.end(), + output_tails_device.begin(), + device_queue + ); + ::boost::compute::copy( + output_tails_device.begin(), + output_tails_device.end(), + output_tails.begin(), + device_queue + ); + for(size_t i = 1; i < input.parts(); i++) + { + ::boost::compute::transform( + result.begin(i), + result.end(i), + ::boost::compute::make_constant_iterator( + output_tails[i - 1] + ), + result.begin(i), + binary_op, + queue.get(i) + ); + } +} + +/// \overload +template< + class InputType, weight_func weight, class Alloc, + class OutputType +> +inline void +exclusive_scan(const vector &input, + vector &result, + OutputType init, + command_queue &queue) +{ + ::boost::compute::distributed::exclusive_scan( + input, + result, + init, + boost::compute::plus(), + queue + ); +} + +/// \overload +template< + class InputType, weight_func weight, class Alloc, + class OutputType +> +inline void +exclusive_scan(const vector &input, + vector &result, + command_queue &queue) +{ + ::boost::compute::distributed::exclusive_scan( + input, + result, + OutputType(0), + boost::compute::plus(), + queue + ); +} + +} // end distributed namespace +} // end compute namespace +} // end boost namespace + +#endif /* BOOST_COMPUTE_DISTRIBUTED_SCAN_HPP */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3cd2d2fb7..0fba1a464 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -87,6 +87,7 @@ add_compute_test("distributed.vector" test_distributed_vector.cpp) add_compute_test("distributed.copy" test_distributed_copy.cpp) add_compute_test("distributed.reduce" test_distributed_reduce.cpp) add_compute_test("distributed.transform" test_distributed_transform.cpp) +add_compute_test("distributed.transform" test_distributed_scan.cpp) add_compute_test("utility.extents" test_extents.cpp) add_compute_test("utility.invoke" test_invoke.cpp) diff --git a/test/test_distributed_scan.cpp b/test/test_distributed_scan.cpp new file mode 100644 index 000000000..4c6e423f8 --- /dev/null +++ b/test/test_distributed_scan.cpp @@ -0,0 +1,169 @@ +//---------------------------------------------------------------------------// +// Copyright (c) 2016 Jakub Szuppe +// +// Distributed under the Boost Software License, Version 1.0 +// See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt +// +// See http://boostorg.github.com/compute for more information. +//---------------------------------------------------------------------------// + +#define BOOST_TEST_MODULE TestDistributedScan +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "check_macros.hpp" +#include "context_setup.hpp" + +#include "distributed_check_functions.hpp" +#include "distributed_queue_setup.hpp" + +namespace bc = boost::compute; + +BOOST_AUTO_TEST_CASE(exclusive_scan_int) +{ + // construct distributed::command_queue + bc::distributed::command_queue distributed_queue = + get_distributed_queue(queue, 4); + + std::vector data(size_t(128)); + for(size_t i = 0; i < data.size(); i++) { + data[i] = i; + } + + bc::distributed::vector distributed_input( + data.begin(), data.end(), distributed_queue + ); + bc::distributed::vector distributed_result( + data.size(), distributed_queue + ); + distributed_queue.finish(); + + BOOST_CHECK( + distributed_equal( + distributed_input, + data.begin(), data.end(), + distributed_queue + ) + ); + + bc::distributed::exclusive_scan( + distributed_input, + distributed_result, + bc::int_(10), + distributed_queue + ); + distributed_queue.finish(); + + bc::vector device_input(data.begin(), data.end(), queue); + bc::vector device_expected(data.size(), context); + std::vector host_expected(device_expected.size()); + bc::exclusive_scan( + device_input.begin(), + device_input.end(), + device_expected.begin(), + bc::int_(10), + queue + ); + bc::copy( + device_expected.begin(), + device_expected.end(), + host_expected.begin(), + queue + ); + queue.finish(); + + BOOST_CHECK( + distributed_equal( + distributed_input, + data.begin(), data.end(), + distributed_queue + ) + ); + BOOST_CHECK( + distributed_equal( + distributed_result, + host_expected.begin(), host_expected.end(), + distributed_queue + ) + ); +} + +BOOST_AUTO_TEST_CASE(exclusive_scan_custom_function_int) +{ + // construct distributed::command_queue + bc::distributed::command_queue distributed_queue = + get_distributed_queue(queue, 3); + + BOOST_COMPUTE_FUNCTION(bc::int_, custom_sum, (bc::int_ x, bc::int_ y), + { + return x + y; + }); + + std::vector data(size_t(128)); + for(size_t i = 0; i < data.size(); i++) { + data[i] = i; + } + + bc::distributed::vector distributed_input( + data.begin(), data.end(), distributed_queue + ); + distributed_queue.finish(); + + BOOST_CHECK( + distributed_equal( + distributed_input, + data.begin(), data.end(), + distributed_queue + ) + ); + + bc::distributed::exclusive_scan( + distributed_input, + distributed_input, + bc::int_(10), + custom_sum, + distributed_queue + ); + distributed_queue.finish(); + + bc::vector device_input(data.begin(), data.end(), queue); + bc::vector device_expected(data.size(), context); + std::vector host_expected(device_expected.size()); + bc::exclusive_scan( + device_input.begin(), + device_input.end(), + device_expected.begin(), + bc::int_(10), + queue + ); + bc::copy( + device_expected.begin(), + device_expected.end(), + host_expected.begin(), + queue + ); + queue.finish(); + BOOST_CHECK( + distributed_equal( + distributed_input, + host_expected.begin(), host_expected.end(), + distributed_queue + ) + ); +} + +BOOST_AUTO_TEST_SUITE_END()