diff --git a/ci/docker/codecov/build.Dockerfile b/ci/docker/codecov/build.Dockerfile index 8630e08352..6af11fb6de 100644 --- a/ci/docker/codecov/build.Dockerfile +++ b/ci/docker/codecov/build.Dockerfile @@ -79,8 +79,8 @@ RUN wget -q https://github.com/gperftools/gperftools/releases/download/gperftool rm -rf /root/gperftools.tar.gz /root/gperftools-${GPERFTOOLS_VERSION} # Install HPX -ARG HPX_FORK=STEllAR-GROUP -ARG HPX_VERSION=1.6.0 +ARG HPX_FORK=msimberg +ARG HPX_VERSION=cuda-event-callback ARG HPX_WITH_CUDA=OFF ARG HPX_PATH=/usr/local/hpx RUN wget -q https://github.com/${HPX_FORK}/hpx/archive/${HPX_VERSION}.tar.gz -O hpx.tar.gz && \ diff --git a/ci/docker/release/build.Dockerfile b/ci/docker/release/build.Dockerfile index 93fad06d30..8adff004e2 100644 --- a/ci/docker/release/build.Dockerfile +++ b/ci/docker/release/build.Dockerfile @@ -79,8 +79,8 @@ RUN wget -q https://github.com/gperftools/gperftools/releases/download/gperftool rm -rf /root/gperftools.tar.gz /root/gperftools-${GPERFTOOLS_VERSION} # Install HPX -ARG HPX_FORK=STEllAR-GROUP -ARG HPX_VERSION=1.6.0 +ARG HPX_FORK=msimberg +ARG HPX_VERSION=cuda-event-callback ARG HPX_WITH_CUDA=OFF ARG HPX_PATH=/usr/local/hpx RUN wget -q https://github.com/${HPX_FORK}/hpx/archive/${HPX_VERSION}.tar.gz -O hpx.tar.gz && \ diff --git a/include/dlaf/cublas/executor.h b/include/dlaf/cublas/executor.h index 2808bc3940..37e83c3981 100644 --- a/include/dlaf/cublas/executor.h +++ b/include/dlaf/cublas/executor.h @@ -31,51 +31,13 @@ #include "dlaf/common/assert.h" #include "dlaf/cublas/error.h" +#include "dlaf/cublas/handle_pool.h" #include "dlaf/cuda/error.h" #include "dlaf/cuda/executor.h" namespace dlaf { namespace cublas { namespace internal { -class HandlePoolImpl { - int device_; - std::size_t num_worker_threads_ = hpx::get_num_worker_threads(); - std::vector handles_; - cublasPointerMode_t ptr_mode_; - -public: - HandlePoolImpl(int device, cublasPointerMode_t ptr_mode) - : device_(device), handles_(num_worker_threads_), ptr_mode_(ptr_mode) { - DLAF_CUDA_CALL(cudaSetDevice(device_)); - - for (auto& h : handles_) { - DLAF_CUBLAS_CALL(cublasCreate(&h)); - } - } - - HandlePoolImpl& operator=(HandlePoolImpl&&) = default; - HandlePoolImpl(HandlePoolImpl&&) = default; - HandlePoolImpl(const HandlePoolImpl&) = delete; - HandlePoolImpl& operator=(const HandlePoolImpl&) = delete; - - ~HandlePoolImpl() { - for (auto& h : handles_) { - DLAF_CUBLAS_CALL(cublasDestroy(h)); - } - } - - cublasHandle_t getNextHandle(cudaStream_t stream) { - cublasHandle_t handle = handles_[hpx::get_worker_thread_num()]; - DLAF_CUDA_CALL(cudaSetDevice(device_)); - DLAF_CUBLAS_CALL(cublasSetStream(handle, stream)); - DLAF_CUBLAS_CALL(cublasSetPointerMode(handle, ptr_mode_)); - return handle; - } - - int getDevice() { - return device_; - } -}; template struct isAsyncCublasCallableImpl : std::false_type { @@ -99,37 +61,6 @@ struct isDataflowCublasCallable std::declval()))> {}; } -/// A pool of cuBLAS handles with reference semantics (copying points to the -/// same underlying cuBLAS handles, last reference destroys the references). -/// Allows access to cuBLAS handles associated with a particular stream. The -/// user must ensure that the handle pool and the stream use the same device. -/// Each HPX worker thread is assigned thread local cuBLAS handle. -class HandlePool { - std::shared_ptr handles_ptr_; - -public: - HandlePool(int device = 0, cublasPointerMode_t ptr_mode = CUBLAS_POINTER_MODE_HOST) - : handles_ptr_(std::make_shared(device, ptr_mode)) {} - - cublasHandle_t getNextHandle(cudaStream_t stream) { - DLAF_ASSERT(bool(handles_ptr_), ""); - return handles_ptr_->getNextHandle(stream); - } - - int getDevice() { - DLAF_ASSERT(bool(handles_ptr_), ""); - return handles_ptr_->getDevice(); - } - - bool operator==(HandlePool const& rhs) const noexcept { - return handles_ptr_ == rhs.handles_ptr_; - } - - bool operator!=(HandlePool const& rhs) const noexcept { - return !(*this == rhs); - } -}; - /// An executor for cuBLAS functions. Uses handles and streams from the given /// HandlePool and StreamPool. A cuBLAS function is defined as any function that /// takes a cuBLAS handle as the first argument. The executor inserts a cuBLAS diff --git a/include/dlaf/cublas/handle_pool.h b/include/dlaf/cublas/handle_pool.h new file mode 100644 index 0000000000..4a666632b8 --- /dev/null +++ b/include/dlaf/cublas/handle_pool.h @@ -0,0 +1,109 @@ +// +// Distributed Linear Algebra with Future (DLAF) +// +// Copyright (c) 2018-2021, ETH Zurich +// All rights reserved. +// +// Please, refer to the LICENSE file in the root directory. +// SPDX-License-Identifier: BSD-3-Clause +// + +#pragma once + +/// @file + +#ifdef DLAF_WITH_CUDA + +#include +#include +#include +#include + +#include +#include + +#include + +#include "dlaf/common/assert.h" +#include "dlaf/cublas/error.h" +#include "dlaf/cuda/error.h" +#include "dlaf/cuda/executor.h" + +namespace dlaf { +namespace cublas { +namespace internal { +class HandlePoolImpl { + int device_; + std::size_t num_worker_threads_ = hpx::get_num_worker_threads(); + std::vector handles_; + cublasPointerMode_t ptr_mode_; + +public: + HandlePoolImpl(int device, cublasPointerMode_t ptr_mode) + : device_(device), handles_(num_worker_threads_), ptr_mode_(ptr_mode) { + DLAF_CUDA_CALL(cudaSetDevice(device_)); + + for (auto& h : handles_) { + DLAF_CUBLAS_CALL(cublasCreate(&h)); + } + } + + HandlePoolImpl& operator=(HandlePoolImpl&&) = default; + HandlePoolImpl(HandlePoolImpl&&) = default; + HandlePoolImpl(const HandlePoolImpl&) = delete; + HandlePoolImpl& operator=(const HandlePoolImpl&) = delete; + + ~HandlePoolImpl() { + for (auto& h : handles_) { + DLAF_CUBLAS_CALL(cublasDestroy(h)); + } + } + + cublasHandle_t getNextHandle(cudaStream_t stream) { + cublasHandle_t handle = handles_[hpx::get_worker_thread_num()]; + DLAF_CUDA_CALL(cudaSetDevice(device_)); + DLAF_CUBLAS_CALL(cublasSetStream(handle, stream)); + DLAF_CUBLAS_CALL(cublasSetPointerMode(handle, ptr_mode_)); + return handle; + } + + int getDevice() { + return device_; + } +}; +} + +/// A pool of cuBLAS handles with reference semantics (copying points to the +/// same underlying cuBLAS handles, last reference destroys the references). +/// Allows access to cuBLAS handles associated with a particular stream. The +/// user must ensure that the handle pool and the stream use the same device. +/// Each HPX worker thread is assigned thread local cuBLAS handle. +class HandlePool { + std::shared_ptr handles_ptr_; + +public: + HandlePool(int device = 0, cublasPointerMode_t ptr_mode = CUBLAS_POINTER_MODE_HOST) + : handles_ptr_(std::make_shared(device, ptr_mode)) {} + + cublasHandle_t getNextHandle(cudaStream_t stream) { + DLAF_ASSERT(bool(handles_ptr_), ""); + return handles_ptr_->getNextHandle(stream); + } + + int getDevice() { + DLAF_ASSERT(bool(handles_ptr_), ""); + return handles_ptr_->getDevice(); + } + + bool operator==(HandlePool const& rhs) const noexcept { + return handles_ptr_ == rhs.handles_ptr_; + } + + bool operator!=(HandlePool const& rhs) const noexcept { + return !(*this == rhs); + } +}; +} +} + +#endif diff --git a/include/dlaf/cuda/executor.h b/include/dlaf/cuda/executor.h index b8952614d8..66333ef65a 100644 --- a/include/dlaf/cuda/executor.h +++ b/include/dlaf/cuda/executor.h @@ -24,111 +24,15 @@ #include #include #include +#include #include #include "dlaf/common/assert.h" #include "dlaf/cuda/error.h" +#include "dlaf/cuda/stream_pool.h" namespace dlaf { namespace cuda { -namespace internal { - -struct StreamPoolImpl { - int device_; - std::size_t num_worker_threads_ = hpx::get_num_worker_threads(); - std::size_t num_streams_per_worker_thread_; - std::vector streams_; - std::vector> current_stream_idxs_; - - StreamPoolImpl(int device, std::size_t num_streams_per_worker_thread, - hpx::threads::thread_priority hpx_thread_priority) - : device_(device), num_streams_per_worker_thread_(num_streams_per_worker_thread), - streams_(num_worker_threads_ * num_streams_per_worker_thread), - current_stream_idxs_(num_worker_threads_, {std::size_t(0)}) { - DLAF_CUDA_CALL(cudaSetDevice(device)); - - // We map hpx::threads::thread_priority::high to the highest CUDA stream - // priority, and the rest to the lowest. Typically CUDA streams will only - // have two priorities. - int least_priority, greatest_priority; - DLAF_CUDA_CALL(cudaDeviceGetStreamPriorityRange(&least_priority, &greatest_priority)); - int stream_priority = least_priority; - if (hpx_thread_priority == hpx::threads::thread_priority::high) { - stream_priority = greatest_priority; - } - - for (auto& s : streams_) { - DLAF_CUDA_CALL(cudaStreamCreateWithPriority(&s, cudaStreamNonBlocking, stream_priority)); - } - } - - StreamPoolImpl& operator=(StreamPoolImpl&&) = default; - StreamPoolImpl(StreamPoolImpl&&) = default; - StreamPoolImpl(const StreamPoolImpl&) = delete; - StreamPoolImpl& operator=(const StreamPoolImpl&) = delete; - - ~StreamPoolImpl() { - for (auto& s : streams_) { - DLAF_CUDA_CALL(cudaStreamDestroy(s)); - } - } - - cudaStream_t getNextStream() { - // Set the device corresponding to the CUBLAS handle. - // - // The CUBLAS library context is tied to the current CUDA device [1]. A previous task scheduled on - // the same thread may have set a different device, this makes sure the correct device is used. The - // function is considered very low overhead call [2]. - // - // [1]: https://docs.nvidia.com/cuda/cublas/index.html#cublascreate - // [2]: CUDA Runtime API, section 5.1 Device Management - DLAF_CUDA_CALL(cudaSetDevice(device_)); - const std::size_t worker_thread_num = hpx::get_worker_thread_num(); - DLAF_ASSERT(worker_thread_num != std::size_t(-1), worker_thread_num); - std::size_t stream_idx = - worker_thread_num * num_streams_per_worker_thread_ + - (++current_stream_idxs_[worker_thread_num].data_ % num_streams_per_worker_thread_); - - return streams_[stream_idx]; - } - - int getDevice() { - return device_; - } -}; -} - -/// A pool of CUDA streams with reference semantics (copying points to the same -/// underlying CUDA streams, last reference destroys the references). Allows -/// access to CUDA streams in a round-robin fashion. Each HPX worker thread is -/// assigned a set of thread local CUDA streams. -class StreamPool { - std::shared_ptr streams_ptr_; - -public: - StreamPool(int device = 0, std::size_t num_streams_per_worker_thread = 3, - hpx::threads::thread_priority hpx_thread_priority = hpx::threads::thread_priority::default_) - : streams_ptr_(std::make_shared(device, num_streams_per_worker_thread, - hpx_thread_priority)) {} - - cudaStream_t getNextStream() { - DLAF_ASSERT(bool(streams_ptr_), ""); - return streams_ptr_->getNextStream(); - } - - int getDevice() { - DLAF_ASSERT(bool(streams_ptr_), ""); - return streams_ptr_->getDevice(); - } - - bool operator==(StreamPool const& rhs) const noexcept { - return streams_ptr_ == rhs.streams_ptr_; - } - - bool operator!=(StreamPool const& rhs) const noexcept { - return !(*this == rhs); - } -}; /// An executor for CUDA functions. Uses streams from the given StreamPool. A /// CUDA function is defined as any function that takes a CUDA stream as the diff --git a/include/dlaf/cuda/stream_pool.h b/include/dlaf/cuda/stream_pool.h new file mode 100644 index 0000000000..65e3d7c124 --- /dev/null +++ b/include/dlaf/cuda/stream_pool.h @@ -0,0 +1,134 @@ +// +// Distributed Linear Algebra with Future (DLAF) +// +// Copyright (c) 2018-2021, ETH Zurich +// All rights reserved. +// +// Please, refer to the LICENSE file in the root directory. +// SPDX-License-Identifier: BSD-3-Clause +// + +#pragma once + +/// @file + +#ifdef DLAF_WITH_CUDA + +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include "dlaf/common/assert.h" +#include "dlaf/cuda/error.h" + +namespace dlaf { +namespace cuda { +namespace internal { + +struct StreamPoolImpl { + int device_; + std::size_t num_worker_threads_ = hpx::get_num_worker_threads(); + std::size_t num_streams_per_worker_thread_; + std::vector streams_; + std::vector> current_stream_idxs_; + + StreamPoolImpl(int device, std::size_t num_streams_per_worker_thread, + hpx::threads::thread_priority hpx_thread_priority) + : device_(device), num_streams_per_worker_thread_(num_streams_per_worker_thread), + streams_(num_worker_threads_ * num_streams_per_worker_thread), + current_stream_idxs_(num_worker_threads_, {std::size_t(0)}) { + DLAF_CUDA_CALL(cudaSetDevice(device)); + + // We map hpx::threads::thread_priority::high to the highest CUDA stream + // priority, and the rest to the lowest. Typically CUDA streams will only + // have two priorities. + int least_priority, greatest_priority; + DLAF_CUDA_CALL(cudaDeviceGetStreamPriorityRange(&least_priority, &greatest_priority)); + int stream_priority = least_priority; + if (hpx_thread_priority == hpx::threads::thread_priority::high) { + stream_priority = greatest_priority; + } + + for (auto& s : streams_) { + DLAF_CUDA_CALL(cudaStreamCreateWithPriority(&s, cudaStreamNonBlocking, stream_priority)); + } + } + + StreamPoolImpl& operator=(StreamPoolImpl&&) = default; + StreamPoolImpl(StreamPoolImpl&&) = default; + StreamPoolImpl(const StreamPoolImpl&) = delete; + StreamPoolImpl& operator=(const StreamPoolImpl&) = delete; + + ~StreamPoolImpl() { + for (auto& s : streams_) { + DLAF_CUDA_CALL(cudaStreamDestroy(s)); + } + } + + cudaStream_t getNextStream() { + // Set the device corresponding to the CUBLAS handle. + // + // The CUBLAS library context is tied to the current CUDA device [1]. A previous task scheduled on + // the same thread may have set a different device, this makes sure the correct device is used. The + // function is considered very low overhead call [2]. + // + // [1]: https://docs.nvidia.com/cuda/cublas/index.html#cublascreate + // [2]: CUDA Runtime API, section 5.1 Device Management + DLAF_CUDA_CALL(cudaSetDevice(device_)); + const std::size_t worker_thread_num = hpx::get_worker_thread_num(); + DLAF_ASSERT(worker_thread_num != std::size_t(-1), worker_thread_num); + std::size_t stream_idx = + worker_thread_num * num_streams_per_worker_thread_ + + (++current_stream_idxs_[worker_thread_num].data_ % num_streams_per_worker_thread_); + + return streams_[stream_idx]; + } + + int getDevice() { + return device_; + } +}; +} + +/// A pool of CUDA streams with reference semantics (copying points to the same +/// underlying CUDA streams, last reference destroys the references). Allows +/// access to CUDA streams in a round-robin fashion. Each HPX worker thread is +/// assigned a set of thread local CUDA streams. +class StreamPool { + std::shared_ptr streams_ptr_; + +public: + StreamPool(int device = 0, std::size_t num_streams_per_worker_thread = 3, + hpx::threads::thread_priority hpx_thread_priority = hpx::threads::thread_priority::default_) + : streams_ptr_(std::make_shared(device, num_streams_per_worker_thread, + hpx_thread_priority)) {} + + cudaStream_t getNextStream() { + DLAF_ASSERT(bool(streams_ptr_), ""); + return streams_ptr_->getNextStream(); + } + + int getDevice() { + DLAF_ASSERT(bool(streams_ptr_), ""); + return streams_ptr_->getDevice(); + } + + bool operator==(StreamPool const& rhs) const noexcept { + return streams_ptr_ == rhs.streams_ptr_; + } + + bool operator!=(StreamPool const& rhs) const noexcept { + return !(*this == rhs); + } +}; +} +} + +#endif diff --git a/include/dlaf/cusolver/executor.h b/include/dlaf/cusolver/executor.h index 0b61d9e531..08f3958b3d 100644 --- a/include/dlaf/cusolver/executor.h +++ b/include/dlaf/cusolver/executor.h @@ -25,55 +25,19 @@ #include #include #include -#include #include #include #include "dlaf/common/assert.h" #include "dlaf/cublas/executor.h" +#include "dlaf/cublas/handle_pool.h" #include "dlaf/cuda/error.h" #include "dlaf/cusolver/error.h" +#include "dlaf/cusolver/handle_pool.h" namespace dlaf { namespace cusolver { namespace internal { -class HandlePoolImpl { - int device_; - std::size_t num_worker_threads_ = hpx::get_num_worker_threads(); - std::vector handles_; - -public: - HandlePoolImpl(int device) : device_(device), handles_(num_worker_threads_) { - DLAF_CUDA_CALL(cudaSetDevice(device_)); - - for (auto& h : handles_) { - DLAF_CUSOLVER_CALL(cusolverDnCreate(&h)); - } - } - - HandlePoolImpl& operator=(HandlePoolImpl&&) = default; - HandlePoolImpl(HandlePoolImpl&&) = default; - HandlePoolImpl(const HandlePoolImpl&) = delete; - HandlePoolImpl& operator=(const HandlePoolImpl&) = delete; - - ~HandlePoolImpl() { - for (auto& h : handles_) { - DLAF_CUSOLVER_CALL(cusolverDnDestroy(h)); - } - } - - cusolverDnHandle_t getNextHandle(cudaStream_t stream) { - cusolverDnHandle_t handle = handles_[hpx::get_worker_thread_num()]; - DLAF_CUDA_CALL(cudaSetDevice(device_)); - DLAF_CUSOLVER_CALL(cusolverDnSetStream(handle, stream)); - return handle; - } - - int getDevice() { - return device_; - } -}; - template struct isAsyncCusolverCallableImpl : std::false_type { struct dummy_type {}; @@ -96,36 +60,6 @@ struct isDataflowCusolverCallable std::declval()))> {}; } -/// A pool of cuSOLVER handles with reference semantics (copying points to the -/// same underlying cuSOLVER handles, last reference destroys the references). -/// Allows access to cuSOLVER handles associated with a particular stream. The -/// user must ensure that the handle pool and the stream use the same device. -/// Each HPX worker thread is assigned thread local cuSOLVER handle. -class HandlePool { - std::shared_ptr handles_ptr_; - -public: - HandlePool(int device = 0) : handles_ptr_(std::make_shared(device)) {} - - cusolverDnHandle_t getNextHandle(cudaStream_t stream) { - DLAF_ASSERT(bool(handles_ptr_), ""); - return handles_ptr_->getNextHandle(stream); - } - - int getDevice() { - DLAF_ASSERT(bool(handles_ptr_), ""); - return handles_ptr_->getDevice(); - } - - bool operator==(HandlePool const& rhs) const noexcept { - return handles_ptr_ == rhs.handles_ptr_; - } - - bool operator!=(HandlePool const& rhs) const noexcept { - return !(*this == rhs); - } -}; - /// An executor for cuSOLVER functions. Uses handles and streams from the given /// HandlePool and StreamPool. A cuSOLVER function is defined as any function /// that takes a cuSOLVER handle as the first argument. The executor inserts a diff --git a/include/dlaf/cusolver/handle_pool.h b/include/dlaf/cusolver/handle_pool.h new file mode 100644 index 0000000000..1cc1aabda1 --- /dev/null +++ b/include/dlaf/cusolver/handle_pool.h @@ -0,0 +1,104 @@ +// +// Distributed Linear Algebra with Future (DLAF) +// +// Copyright (c) 2018-2021, ETH Zurich +// All rights reserved. +// +// Please, refer to the LICENSE file in the root directory. +// SPDX-License-Identifier: BSD-3-Clause +// + +#pragma once + +/// @file + +#ifdef DLAF_WITH_CUDA + +#include +#include +#include +#include + +#include +#include + +#include + +#include "dlaf/common/assert.h" +#include "dlaf/cuda/error.h" +#include "dlaf/cusolver/error.h" + +namespace dlaf { +namespace cusolver { +namespace internal { +class HandlePoolImpl { + int device_; + std::size_t num_worker_threads_ = hpx::get_num_worker_threads(); + std::vector handles_; + +public: + HandlePoolImpl(int device) : device_(device), handles_(num_worker_threads_) { + DLAF_CUDA_CALL(cudaSetDevice(device_)); + + for (auto& h : handles_) { + DLAF_CUSOLVER_CALL(cusolverDnCreate(&h)); + } + } + + HandlePoolImpl& operator=(HandlePoolImpl&&) = default; + HandlePoolImpl(HandlePoolImpl&&) = default; + HandlePoolImpl(const HandlePoolImpl&) = delete; + HandlePoolImpl& operator=(const HandlePoolImpl&) = delete; + + ~HandlePoolImpl() { + for (auto& h : handles_) { + DLAF_CUSOLVER_CALL(cusolverDnDestroy(h)); + } + } + + cusolverDnHandle_t getNextHandle(cudaStream_t stream) { + cusolverDnHandle_t handle = handles_[hpx::get_worker_thread_num()]; + DLAF_CUDA_CALL(cudaSetDevice(device_)); + DLAF_CUSOLVER_CALL(cusolverDnSetStream(handle, stream)); + return handle; + } + + int getDevice() { + return device_; + } +}; +} + +/// A pool of cuSOLVER handles with reference semantics (copying points to the +/// same underlying cuSOLVER handles, last reference destroys the handles). +/// Allows access to cuSOLVER handles associated with a particular stream. The +/// user must ensure that the handle pool and the stream use the same device. +/// Each HPX worker thread is assigned thread local cuSOLVER handle. +class HandlePool { + std::shared_ptr handles_ptr_; + +public: + HandlePool(int device = 0) : handles_ptr_(std::make_shared(device)) {} + + cusolverDnHandle_t getNextHandle(cudaStream_t stream) { + DLAF_ASSERT(bool(handles_ptr_), ""); + return handles_ptr_->getNextHandle(stream); + } + + int getDevice() { + DLAF_ASSERT(bool(handles_ptr_), ""); + return handles_ptr_->getDevice(); + } + + bool operator==(HandlePool const& rhs) const noexcept { + return handles_ptr_ == rhs.handles_ptr_; + } + + bool operator!=(HandlePool const& rhs) const noexcept { + return !(*this == rhs); + } +}; +} +} + +#endif diff --git a/include/dlaf/factorization/cholesky/impl.h b/include/dlaf/factorization/cholesky/impl.h index 67f51ccf0a..5b5ab986a4 100644 --- a/include/dlaf/factorization/cholesky/impl.h +++ b/include/dlaf/factorization/cholesky/impl.h @@ -42,45 +42,41 @@ namespace dlaf { namespace factorization { namespace internal { -template -void potrfDiagTile(Executor&& exec, hpx::future> matrix_tile) { - hpx::dataflow(exec, matrix::unwrapExtendTiles(tile::potrf_o), blas::Uplo::Lower, - std::move(matrix_tile)); +template +void potrfDiagTile(hpx::future> matrix_tile) { + transformDetach(hpx::threads::thread_priority::normal, tile::potrf_o, blas::Uplo::Lower, + std::move(matrix_tile)); } -template -void trsmPanelTile(Executor&& executor_hp, hpx::shared_future> kk_tile, - hpx::future> matrix_tile) { - hpx::dataflow(executor_hp, matrix::unwrapExtendTiles(tile::trsm_o), blas::Side::Right, - blas::Uplo::Lower, blas::Op::ConjTrans, blas::Diag::NonUnit, T(1.0), std::move(kk_tile), - std::move(matrix_tile)); +template typename MatrixTileSender, + Device device, class T> +void trsmPanelTile(KKTileSender kk_tile, MatrixTileSender> matrix_tile) { + transformDetach(hpx::threads::thread_priority::high, tile::trsm_o, blas::Side::Right, + blas::Uplo::Lower, blas::Op::ConjTrans, blas::Diag::NonUnit, T(1.0), + std::move(kk_tile), std::move(matrix_tile)); } -template -void herkTrailingDiagTile(Executor&& trailing_matrix_executor, - hpx::shared_future> panel_tile, - hpx::future> matrix_tile) { - hpx::dataflow(trailing_matrix_executor, matrix::unwrapExtendTiles(tile::herk_o), blas::Uplo::Lower, - blas::Op::NoTrans, BaseType(-1.0), panel_tile, BaseType(1.0), - std::move(matrix_tile)); +template typename MatrixTileSender, + Device device, class T> +void herkTrailingDiagTile(hpx::threads::thread_priority priority, PanelTileSender panel_tile, + MatrixTileSender> matrix_tile) { + transformDetach(priority, tile::herk_o, blas::Uplo::Lower, blas::Op::NoTrans, + BaseType(-1.0), std::move(panel_tile), BaseType(1.0), + std::move(matrix_tile)); } -template -void gemmTrailingMatrixTile(Executor&& trailing_matrix_executor, - hpx::shared_future> panel_tile, - hpx::shared_future> col_panel, - hpx::future> matrix_tile) { - hpx::dataflow(trailing_matrix_executor, matrix::unwrapExtendTiles(tile::gemm_o), blas::Op::NoTrans, - blas::Op::ConjTrans, T(-1.0), std::move(panel_tile), std::move(col_panel), T(1.0), - std::move(matrix_tile)); +template typename MatrixTileSender, + Device device, class T> +void gemmTrailingMatrixTile(hpx::threads::thread_priority priority, PanelColTileSender panel_tile, + PanelColTileSender col_panel, + MatrixTileSender> matrix_tile) { + transformDetach(priority, tile::gemm_o, blas::Op::NoTrans, blas::Op::ConjTrans, T(-1.0), + std::move(panel_tile), std::move(col_panel), T(1.0), std::move(matrix_tile)); } // Local implementation of Lower Cholesky factorization. template void Cholesky::call_L(Matrix& mat_a) { - auto executor_hp = dlaf::getHpExecutor(); - auto executor_np = dlaf::getNpExecutor(); - // Number of tile (rows = cols) SizeType nrtile = mat_a.nrTiles().cols(); @@ -88,26 +84,29 @@ void Cholesky::call_L(Matrix& mat_a) { // Cholesky decomposition on mat_a(k,k) r/w potrf (lapack operation) auto kk = LocalTileIndex{k, k}; - potrfDiagTile(executor_hp, mat_a(kk)); + potrfDiagTile(mat_a.readwrite_sender(kk)); for (SizeType i = k + 1; i < nrtile; ++i) { // Update panel mat_a(i,k) with trsm (blas operation), using data mat_a.read(k,k) - trsmPanelTile(executor_hp, mat_a.read(kk), mat_a(LocalTileIndex{i, k})); + trsmPanelTile(mat_a.read_sender(kk), mat_a.readwrite_sender(LocalTileIndex{i, k})); } for (SizeType j = k + 1; j < nrtile; ++j) { // first trailing panel gets high priority (look ahead). - auto& trailing_matrix_executor = (j == k + 1) ? executor_hp : executor_np; + const auto trailing_matrix_priority = + (j == k + 1) ? hpx::threads::thread_priority::high : hpx::threads::thread_priority::normal; // Update trailing matrix: diagonal element mat_a(j,j), reading mat_a.read(j,k), using herk (blas operation) - herkTrailingDiagTile(trailing_matrix_executor, mat_a.read(LocalTileIndex{j, k}), - mat_a(LocalTileIndex{j, j})); + herkTrailingDiagTile(trailing_matrix_priority, mat_a.read_sender(LocalTileIndex{j, k}), + mat_a.readwrite_sender(LocalTileIndex{j, j})); for (SizeType i = j + 1; i < nrtile; ++i) { // Update remaining trailing matrix mat_a(i,j), reading mat_a.read(i,k) and mat_a.read(j,k), // using gemm (blas operation) - gemmTrailingMatrixTile(trailing_matrix_executor, mat_a.read(LocalTileIndex{i, k}), - mat_a.read(LocalTileIndex{j, k}), mat_a(LocalTileIndex{i, j})); + gemmTrailingMatrixTile(trailing_matrix_priority, + mat_a.read_sender(LocalTileIndex{i, k}), + mat_a.read_sender(LocalTileIndex{j, k}), + mat_a.readwrite_sender(LocalTileIndex{i, j})); } } } @@ -115,11 +114,6 @@ void Cholesky::call_L(Matrix& mat_a) { template void Cholesky::call_L(comm::CommunicatorGrid grid, Matrix& mat_a) { - using hpx::util::unwrapping; - using hpx::dataflow; - - auto executor_hp = dlaf::getHpExecutor(); - auto executor_np = dlaf::getNpExecutor(); auto executor_mpi = dlaf::getMPIExecutor(); // Set up MPI executor pipelines @@ -141,7 +135,7 @@ void Cholesky::call_L(comm::CommunicatorGrid grid, Matrix(mat_a.readwrite_sender(kk_idx)); // If there is no trailing matrix if (k == nrtile - 1) @@ -180,7 +174,7 @@ void Cholesky::call_L(comm::CommunicatorGrid grid, Matrix(k)); - trsmPanelTile(executor_hp, panelT.read(diag_wp_idx), mat_a(ik_idx)); + trsmPanelTile(panelT.read_sender(diag_wp_idx), mat_a.readwrite_sender(ik_idx)); panel.setTile(local_idx, mat_a.read(ik_idx)); } @@ -203,12 +197,13 @@ void Cholesky::call_L(comm::CommunicatorGrid grid, Matrix(jt_idx); - auto& trailing_matrix_executor = (jt_idx == kt) ? executor_hp : executor_np; + const auto trailing_matrix_priority = + (jt_idx == kt) ? hpx::threads::thread_priority::high : hpx::threads::thread_priority::normal; if (this_rank.row() == owner.row()) { const auto i = distr.localTileFromGlobalTile(jt_idx); - herkTrailingDiagTile(trailing_matrix_executor, panel.read({Coord::Row, i}), - mat_a(LocalTileIndex{i, j})); + herkTrailingDiagTile(trailing_matrix_priority, panel.read_sender({Coord::Row, i}), + mat_a.readwrite_sender(LocalTileIndex{i, j})); } for (SizeType i_idx = jt_idx + 1; i_idx < nrtile; ++i_idx) { @@ -218,8 +213,12 @@ void Cholesky::call_L(comm::CommunicatorGrid grid, Matrix(i_idx); - gemmTrailingMatrixTile(executor_np, panel.read({Coord::Row, i}), panelT.read({Coord::Col, j}), - mat_a(LocalTileIndex{i, j})); + // TODO: This was using executor_np. Was that intentional, or should it + // be trailing_matrix_executor/priority? + gemmTrailingMatrixTile(hpx::threads::thread_priority::normal, + panel.read_sender({Coord::Row, i}), + panelT.read_sender({Coord::Col, j}), + mat_a.readwrite_sender(LocalTileIndex{i, j})); } } diff --git a/include/dlaf/matrix/copy.h b/include/dlaf/matrix/copy.h index fbb2e0786d..f16c1759f3 100644 --- a/include/dlaf/matrix/copy.h +++ b/include/dlaf/matrix/copy.h @@ -15,6 +15,7 @@ #include "dlaf/executors.h" #include "dlaf/matrix/copy_tile.h" +#include "dlaf/sender/transform.h" #include "dlaf/types.h" #include "dlaf/util_matrix.h" @@ -36,10 +37,16 @@ void copy(Matrix& source, Matrix& dest) { const SizeType local_tile_rows = distribution.localNrTiles().rows(); const SizeType local_tile_cols = distribution.localNrTiles().cols(); - for (SizeType j = 0; j < local_tile_cols; ++j) - for (SizeType i = 0; i < local_tile_rows; ++i) - hpx::dataflow(dlaf::getCopyExecutor(), unwrapExtendTiles(copy_o), - source.read(LocalTileIndex(i, j)), dest(LocalTileIndex(i, j))); + for (SizeType j = 0; j < local_tile_cols; ++j) { + for (SizeType i = 0; i < local_tile_rows; ++i) { + transformDetach< + internal::CopyBackend::value>(hpx::threads::thread_priority::normal, + copy_o, + source.read_sender(LocalTileIndex(i, j)), + dest.readwrite_sender( + LocalTileIndex(i, j))); + } + } } } } diff --git a/include/dlaf/matrix/copy_tile.h b/include/dlaf/matrix/copy_tile.h index 467e1a77c9..ca29ff5663 100644 --- a/include/dlaf/matrix/copy_tile.h +++ b/include/dlaf/matrix/copy_tile.h @@ -22,10 +22,36 @@ #include "dlaf/executors.h" #include "dlaf/lapack/tile.h" #include "dlaf/matrix/tile.h" +#include "dlaf/sender/transform.h" namespace dlaf { namespace matrix { namespace internal { +template +struct CopyBackend; + +template <> +struct CopyBackend { + static constexpr Backend value = Backend::MC; +}; + +#ifdef DLAF_WITH_CUDA +template <> +struct CopyBackend { + static constexpr Backend value = Backend::GPU; +}; + +template <> +struct CopyBackend { + static constexpr Backend value = Backend::GPU; +}; + +template <> +struct CopyBackend { + static constexpr Backend value = Backend::GPU; +}; +#endif + template struct CopyTile; @@ -155,32 +181,37 @@ struct Duplicate { } }; -namespace internal { -template -struct DuplicateIfNeeded { - template class Future> - static auto call(Future> tile) { - return getUnwrapReturnValue(hpx::dataflow(dlaf::getCopyExecutor(), - unwrapExtendTiles(Duplicate{}), tile)); - } -}; - -template -struct DuplicateIfNeeded { - template class Future> - static auto call(Future> tile) { - return tile; - } -}; -} - /// Helper function for duplicating an input tile to Destination asynchronously, /// but only if the destination device is different from the source device. /// /// When Destination and Source are the same, returns the input tile unmodified. -template class Future> -auto duplicateIfNeeded(Future> tile) { - return internal::DuplicateIfNeeded::call(std::move(tile)); +template +auto duplicateIfNeeded(hpx::future> tile) { + if constexpr (Source == Destination) { + return tile; + } + else { + return hpx::execution::experimental::make_future( + dlaf::transform< + internal::CopyBackend::value>(hpx::threads::thread_priority::normal, + dlaf::matrix::Duplicate{}, + std::move(tile))); + } +} + +template +auto duplicateIfNeeded(hpx::shared_future> tile) { + if constexpr (Source == Destination) { + return tile; + } + else { + return hpx::execution::experimental::make_future( + dlaf::transform< + internal::CopyBackend::value>(hpx::threads::thread_priority::normal, + dlaf::matrix::Duplicate{}, + hpx::execution::experimental::keep_future( + std::move(tile)))); + } } } } diff --git a/include/dlaf/matrix/matrix.h b/include/dlaf/matrix/matrix.h index a609d09089..4f2613982f 100644 --- a/include/dlaf/matrix/matrix.h +++ b/include/dlaf/matrix/matrix.h @@ -128,6 +128,14 @@ class Matrix : public Matrix { return operator()(this->distribution().localTileIndex(index)); } + auto readwrite_sender(const LocalTileIndex& index) noexcept { + return this->operator()(index); + } + + auto readwrite_sender(const GlobalTileIndex& index) { + return readwrite_sender(this->distribution().localTileIndex(index)); + } + protected: using Matrix::tileLinearIndex; @@ -175,6 +183,16 @@ class Matrix : public internal::MatrixBase { return read(distribution().localTileIndex(index)); } + auto read_sender(const LocalTileIndex& index) noexcept { + // We want to explicitly deal with the shared_future, not the const& to the + // value. + return hpx::execution::experimental::keep_future(read(index)); + } + + auto read_sender(const GlobalTileIndex& index) { + return read_sender(distribution().localTileIndex(index)); + } + /// Synchronization barrier for all local tiles in the matrix /// /// This blocking call does not return until all operations, i.e. both RO and RW, diff --git a/include/dlaf/matrix/panel.h b/include/dlaf/matrix/panel.h index 28cde56096..a503024851 100644 --- a/include/dlaf/matrix/panel.h +++ b/include/dlaf/matrix/panel.h @@ -104,6 +104,10 @@ struct Panel { } } + auto read_sender(const LocalTileIndex& index) { + return hpx::execution::experimental::keep_future(read(index)); + } + /// Set the panel to enable access to the range of tiles [start, end) /// /// With respect to the parent matrix. @@ -300,9 +304,12 @@ struct Panel : public Panel { return BaseT::data_(BaseT::fullIndex(index)); } + auto readwrite_sender(const LocalTileIndex& index) { + return hpx::execution::experimental::keep_future(this->operator()(index)); + } + protected: using BaseT = Panel; }; - } } diff --git a/include/dlaf/matrix/tile.tpp b/include/dlaf/matrix/tile.tpp index 4d9965d2b7..740c1537f7 100644 --- a/include/dlaf/matrix/tile.tpp +++ b/include/dlaf/matrix/tile.tpp @@ -30,7 +30,7 @@ Tile::Tile(Tile&& rhs) noexcept template Tile::~Tile() { if (p_) { - if (std::uncaught_exception()) + if (std::uncaught_exceptions() > 0) p_->set_exception(std::make_exception_ptr(ContinuationException{})); else p_->set_value(Tile(size_, std::move(memory_view_), ld_)); diff --git a/include/dlaf/sender/lift_non_sender.h b/include/dlaf/sender/lift_non_sender.h new file mode 100644 index 0000000000..1ebd9d99c1 --- /dev/null +++ b/include/dlaf/sender/lift_non_sender.h @@ -0,0 +1,31 @@ +// +// Distributed Linear Algebra with Future (DLAF) +// +// Copyright (c) 2020-2021, ETH Zurich +// All rights reserved. +// +// Please, refer to the LICENSE file in the root directory. +// SPDX-License-Identifier: BSD-3-Clause +// +#pragma once + +#include + +#include +#include + +namespace dlaf { +namespace internal { +// Utility to make a sender out of a non-sender (non-senders are wrapped in +// just). +template ::value>> +decltype(auto) liftNonSender(S&& s) { + return std::forward(s); +} + +template ::value>> +auto liftNonSender(S&& s) { + return hpx::execution::experimental::just(std::forward(s)); +} +} +} diff --git a/include/dlaf/sender/transform.h b/include/dlaf/sender/transform.h new file mode 100644 index 0000000000..50acc1d907 --- /dev/null +++ b/include/dlaf/sender/transform.h @@ -0,0 +1,203 @@ +// +// Distributed Linear Algebra with Future (DLAF) +// +// Copyright (c) 2020-2021, ETH Zurich +// All rights reserved. +// +// Please, refer to the LICENSE file in the root directory. +// SPDX-License-Identifier: BSD-3-Clause +// +#pragma once + +#include + +#include "dlaf/init.h" +#include "dlaf/sender/when_all_lift.h" +#include "dlaf/types.h" + +#ifdef DLAF_WITH_CUDA +#include +#include +#include + +#include "dlaf/cublas/handle_pool.h" +#include "dlaf/cuda/stream_pool.h" +#include "dlaf/cusolver/handle_pool.h" +#endif + +namespace dlaf { +namespace internal { +// DLAF-specific transform, templated on a backend. This, together with +// when_all, takes the place of dataflow(executor, ...) for futures. +template +struct Transform; + +// For Backend::MC we use the regular thread pool scheduler from HPX. +template <> +struct Transform { + template + static auto call(hpx::threads::thread_priority priority, S&& s, F&& f) { + namespace ex = hpx::execution::experimental; + return ex::transform(ex::on(std::forward(s), ex::make_with_priority(ex::executor{}, priority)), + hpx::util::unwrapping(std::forward(f))); + } +}; + +#ifdef DLAF_WITH_CUDA +// For Backend::GPU we use a custom sender. +template <> +struct Transform { + template + struct GPUTransformSender { + cuda::StreamPool stream_pool; + cublas::HandlePool cublas_handle_pool; + cusolver::HandlePool cusolver_handle_pool; + std::decay_t s; + std::decay_t f; + + template + static auto call_helper(cudaStream_t stream, cublasHandle_t cublas_handle, + cusolverDnHandle_t cusolver_handle, G&& g, Us&... ts) { + using unwrapping_function_type = decltype(hpx::util::unwrapping(std::forward(g))); + static_assert(std::is_invocable_v || + std::is_invocable_v || + std::is_invocable_v, + "function passed to transform must be invocable with a cublasStream_t as the " + "last argument or a cublasHandle_t/cusolverDnHandle_t as the first argument"); + + if constexpr (std::is_invocable_v) { + (void) cublas_handle; + (void) cusolver_handle; + return std::invoke(hpx::util::unwrapping(std::forward(g)), ts..., stream); + } + else if constexpr (std::is_invocable_v) { + (void) cusolver_handle; + return std::invoke(hpx::util::unwrapping(std::forward(g)), cublas_handle, ts...); + } + else if constexpr (std::is_invocable_v) { + (void) cublas_handle; + return std::invoke(hpx::util::unwrapping(std::forward(g)), cusolver_handle, ts...); + } + } + + template + struct invoke_result_helper; + + template