From 28fea2e421b992aebf7485af44fd3b2d345feaf8 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 21 Apr 2021 09:31:53 +0200 Subject: [PATCH 01/15] TEMP: Playing around with senders/receivers --- include/dlaf/blas/tile.h | 55 +++++++ include/dlaf/communication/sync/broadcast.h | 98 ++++++++++++ include/dlaf/matrix/matrix.h | 22 +++ include/dlaf/matrix/matrix_const.tpp | 4 + include/dlaf/matrix/tile.tpp | 2 +- include/dlaf/solver/triangular/impl.h | 36 ++++- include/dlaf/transform.h | 161 ++++++++++++++++++++ spack/packages/dla-future/package.py | 3 +- test/include/dlaf_test/matrix/util_matrix.h | 10 ++ test/unit/solver/test_triangular.cpp | 5 + 10 files changed, 390 insertions(+), 6 deletions(-) create mode 100644 include/dlaf/transform.h diff --git a/include/dlaf/blas/tile.h b/include/dlaf/blas/tile.h index e9d42447a0..95e574d143 100644 --- a/include/dlaf/blas/tile.h +++ b/include/dlaf/blas/tile.h @@ -11,7 +11,13 @@ #include "blas.hh" +#include + +#include +#include + #include "dlaf/common/callable_object.h" +#include "dlaf/init.h" #include "dlaf/matrix/tile.h" #include "dlaf/types.h" #include "dlaf/util_blas.h" @@ -39,6 +45,29 @@ void gemm(const blas::Op op_a, const blas::Op op_b, const T alpha, const Tile +// void gemm(const blas::Op op_a, const blas::Op op_b, const T alpha, +// hpx::experimental::detail::async_rw_mutex_access_wrapper< +// Tile, const Tile, +// hpx::experimental::detail::async_rw_mutex_access_type::read>, +// hpx::experimental::detail::async_rw_mutex_access_wrapper< +// Tile, const dlaf::matrix::Tile, +// hpx::experimental::detail::async_rw_mutex_access_type::read>, +// const T beta, +// hpx::experimental::detail::async_rw_mutex_access_wrapper< +// Tile, const Tile, +// hpx::experimental::detail::async_rw_mutex_access_type::readwrite> +// c) noexcept { +// // TODO: Should async_rw_mutex_access_wrapper have a get method? +// // auto s = tile::internal::getGemmSizes(op_a, op_b, a.get(), b.get(), c); +// // blas::gemm(blas::Layout::ColMajor, op_a, op_b, s.m, s.n, s.k, alpha, a.get().ptr(), a.get().ld(), +// // b.get().ptr(), b.get().ld(), beta, c.ptr(), c.ld()); +// } + /// Computes matrix matrix multiplication where matrix @p a is hermitian (symmetric if T is real). template void hemm(const blas::Side side, const blas::Uplo uplo, const T alpha, @@ -76,6 +105,15 @@ void trsm(const blas::Side side, const blas::Uplo uplo, const blas::Op op, const b.ld()); } +template +void trsm(const blas::Side side, const blas::Uplo uplo, const blas::Op op, const blas::Diag diag, + const T alpha, std::reference_wrapper> a, + const Tile& b) noexcept { + auto s = tile::internal::getTrsmSizes(side, a.get(), b); + blas::trsm(blas::Layout::ColMajor, side, uplo, op, diag, s.m, s.n, alpha, a.get().ptr(), a.get().ld(), + b.ptr(), b.ld()); +} + #ifdef DLAF_WITH_CUDA namespace internal { template @@ -226,5 +264,22 @@ DLAF_MAKE_CALLABLE_OBJECT(her2k); DLAF_MAKE_CALLABLE_OBJECT(herk); DLAF_MAKE_CALLABLE_OBJECT(trsm); +// TODO: Useful? Take only the predecessor sender, internally call transform. +// template +// decltype(auto) gemm(S&& s) { +// return transform(std::forward(s), gemm_o); +// } + +// TODO: Or? Automatically wrap and lift arguments in when_all. +// template +// decltype(auto) gemm(Ts&&... ts) { +// return transform(when_all_lift(std::forward(ts)...), gemm_o); +// } + +// TODO: Or eagerly submitted? Additionally call detach. +// template +// void gemm(Ts&&... ts) { +// ex::detach(transform(when_all_lift(std::forward(ts)...), gemm_o)); +// } } } diff --git a/include/dlaf/communication/sync/broadcast.h b/include/dlaf/communication/sync/broadcast.h index 15c4d79bc8..3271a61699 100644 --- a/include/dlaf/communication/sync/broadcast.h +++ b/include/dlaf/communication/sync/broadcast.h @@ -25,6 +25,8 @@ #include "dlaf/matrix/copy_tile.h" #include "dlaf/matrix/tile.h" +#include + namespace dlaf { namespace comm { namespace sync { @@ -39,6 +41,7 @@ void send(Communicator& communicator, DataIn&& message_to_send) { using DataT = std::remove_const_t::element_t>; auto message = comm::make_message(std::move(data)); + // std::cerr << "send with data = " << message.data() << " and size " << message.count() << "\n"; DLAF_MPI_CALL(MPI_Bcast(const_cast(message.data()), message.count(), message.mpi_type(), communicator.rank(), communicator)); } @@ -50,10 +53,105 @@ template void receive_from(const int broadcaster_rank, Communicator& communicator, DataOut&& data) { DLAF_ASSERT_HEAVY(broadcaster_rank != communicator.rank(), broadcaster_rank, communicator.rank()); auto message = comm::make_message(common::make_data(std::forward(data))); + // std::cerr << "receive with data = " << message.data() << " and size " << message.count() << "\n"; DLAF_MPI_CALL( MPI_Bcast(message.data(), message.count(), message.mpi_type(), broadcaster_rank, communicator)); } } } + +// TODO: These are here only temporarily. MPI handling has changed. + +/// Task for broadcasting (send endpoint) a Tile in a direction over a CommunicatorGrid +template +void sendTile(hpx::future> mpi_task_chain, Coord rc_comm, + hpx::shared_future> tile) { + using PromiseComm_t = common::PromiseGuard; + + PromiseComm_t pcomm = mpi_task_chain.get(); + comm::sync::broadcast::send(pcomm.ref().subCommunicator(rc_comm), tile.get()); +} + +template +void sendTile(common::PromiseGuard pcomm, Coord rc_comm, + std::reference_wrapper> tile) { + comm::sync::broadcast::send(pcomm.ref().subCommunicator(rc_comm), tile.get()); +} + +template +void sendTile(hpx::experimental::async_rw_mutex::readwrite_access_type pcomm, + Coord rc_comm, std::reference_wrapper> tile) { + comm::CommunicatorGrid& pcomm_ref = pcomm; + comm::sync::broadcast::send(pcomm_ref.subCommunicator(rc_comm), tile.get()); +} + +template +void sendTile(hpx::experimental::async_rw_mutex::readwrite_access_type pcomm, + Coord rc_comm, matrix::Tile const& tile) { + comm::CommunicatorGrid& pcomm_ref = pcomm; + comm::sync::broadcast::send(pcomm_ref.subCommunicator(rc_comm), tile); +} + +DLAF_MAKE_CALLABLE_OBJECT(sendTile); + +/// Task for broadcasting (receiving endpoint) a Tile in a direction over a CommunicatorGrid +template +void recvTile(hpx::future> mpi_task_chain, Coord rc_comm, + hpx::future> tile, comm::IndexT_MPI rank) { + using PromiseComm_t = common::PromiseGuard; + + PromiseComm_t pcomm = mpi_task_chain.get(); + comm::sync::broadcast::receive_from(rank, pcomm.ref().subCommunicator(rc_comm), tile.get()); +} + +DLAF_MAKE_CALLABLE_OBJECT(recvTile); + +/// Task for broadcasting (receiving endpoint) a Tile ("JIT" allocation) in a direction over a CommunicatorGrid +template +matrix::Tile recvAllocTile( + hpx::future> mpi_task_chain, Coord rc_comm, + TileElementSize tile_size, comm::IndexT_MPI rank) { + using ConstTile_t = matrix::Tile; + using PromiseComm_t = common::PromiseGuard; + using MemView_t = memory::MemoryView; + using Tile_t = matrix::Tile; + + PromiseComm_t pcomm = mpi_task_chain.get(); + MemView_t mem_view(tile_size.linear_size()); + Tile_t tile(tile_size, std::move(mem_view), tile_size.rows()); + comm::sync::broadcast::receive_from(rank, pcomm.ref().subCommunicator(rc_comm), tile); + return ConstTile_t(std::move(tile)); +} + +/// Task for broadcasting (receiving endpoint) a Tile ("JIT" allocation) in a direction over a +/// CommunicatorGrid +template +matrix::Tile recvAllocTileSender(common::PromiseGuard pcomm, + Coord rc_comm, TileElementSize tile_size, + comm::IndexT_MPI rank) { + using ConstTile_t = matrix::Tile; + using MemView_t = memory::MemoryView; + using Tile_t = matrix::Tile; + + MemView_t mem_view(tile_size.linear_size()); + Tile_t tile(tile_size, std::move(mem_view), tile_size.rows()); + comm::sync::broadcast::receive_from(rank, pcomm.ref().subCommunicator(rc_comm), tile); + return ConstTile_t(std::move(tile)); +} + +template +matrix::Tile recvAllocTileSenderMutex( + hpx::experimental::async_rw_mutex::readwrite_access_type pcomm, + Coord rc_comm, TileElementSize tile_size, comm::IndexT_MPI rank) { + using ConstTile_t = matrix::Tile; + using MemView_t = memory::MemoryView; + using Tile_t = matrix::Tile; + + MemView_t mem_view(tile_size.linear_size()); + Tile_t tile(tile_size, std::move(mem_view), tile_size.rows()); + comm::CommunicatorGrid& pcomm_ref = pcomm; + comm::sync::broadcast::receive_from(rank, pcomm_ref.subCommunicator(rc_comm), tile); + return ConstTile_t(std::move(tile)); +} } } diff --git a/include/dlaf/matrix/matrix.h b/include/dlaf/matrix/matrix.h index a609d09089..47b4341c8f 100644 --- a/include/dlaf/matrix/matrix.h +++ b/include/dlaf/matrix/matrix.h @@ -15,6 +15,8 @@ #include +#include + #include "dlaf/communication/communicator_grid.h" #include "dlaf/matrix/distribution.h" #include "dlaf/matrix/internal/tile_future_manager.h" @@ -128,12 +130,22 @@ class Matrix : public Matrix { return operator()(this->distribution().localTileIndex(index)); } + auto readwrite_sender(const LocalTileIndex& index) noexcept { + std::size_t i = to_sizet(tileLinearIndex(index)); + return tile_rw_mutexes_[i].readwrite(); + } + + auto readwrite_sender(const GlobalTileIndex& index) { + return readwrite_sender(this->distribution().localTileIndex(index)); + } + protected: using Matrix::tileLinearIndex; private: using Matrix::setUpTiles; using Matrix::tile_managers_; + using Matrix::tile_rw_mutexes_; }; template @@ -175,6 +187,15 @@ class Matrix : public internal::MatrixBase { return read(distribution().localTileIndex(index)); } + auto read_sender(const LocalTileIndex& index) noexcept { + std::size_t i = to_sizet(tileLinearIndex(index)); + return tile_rw_mutexes_[i].read(); + } + + auto read_sender(const GlobalTileIndex& index) { + return read_sender(distribution().localTileIndex(index)); + } + /// Synchronization barrier for all local tiles in the matrix /// /// This blocking call does not return until all operations, i.e. both RO and RW, @@ -187,6 +208,7 @@ class Matrix : public internal::MatrixBase { void setUpTiles(const memory::MemoryView& mem, const LayoutInfo& layout) noexcept; std::vector> tile_managers_; + std::vector, Tile>> tile_rw_mutexes_; }; // Note: the templates of the following helper functions are inverted w.r.t. the Matrix templates diff --git a/include/dlaf/matrix/matrix_const.tpp b/include/dlaf/matrix/matrix_const.tpp index 11ef1cb452..4a271ec7a5 100644 --- a/include/dlaf/matrix/matrix_const.tpp +++ b/include/dlaf/matrix/matrix_const.tpp @@ -70,6 +70,10 @@ void Matrix::setUpTiles(const memory::MemoryView::Tile(Tile&& rhs) noexcept template Tile::~Tile() { if (p_) { - if (std::uncaught_exception()) + if (std::uncaught_exceptions() > 0) p_->set_exception(std::make_exception_ptr(ContinuationException{})); else p_->set_value(Tile(size_, std::move(memory_view_), ld_)); diff --git a/include/dlaf/solver/triangular/impl.h b/include/dlaf/solver/triangular/impl.h index 046a0b0fc5..bffa2c2ee8 100644 --- a/include/dlaf/solver/triangular/impl.h +++ b/include/dlaf/solver/triangular/impl.h @@ -11,6 +11,13 @@ #include +// TODO: Clean up unneeded includes +#include +#include +#include +#include +#include + #include "dlaf/blas/tile.h" #include "dlaf/common/index2d.h" #include "dlaf/common/pipeline.h" @@ -24,6 +31,7 @@ #include "dlaf/matrix/distribution.h" #include "dlaf/matrix/matrix.h" #include "dlaf/solver/triangular/api.h" +#include "dlaf/transform.h" #include "dlaf/util_matrix.h" namespace dlaf { @@ -82,6 +90,8 @@ void Triangular::call_LLN(blas::Diag diag, T alpha, Matrix void Triangular::call_LLT(blas::Op op, blas::Diag diag, T alpha, Matrix& mat_a, Matrix& mat_b) { + namespace ex = hpx::execution::experimental; + constexpr auto Left = blas::Side::Left; constexpr auto Lower = blas::Uplo::Lower; constexpr auto NoTrans = blas::Op::NoTrans; @@ -105,9 +115,29 @@ void Triangular::call_LLT(blas::Op op, blas::Diag diag, T al auto beta = static_cast(-1.0) / alpha; // Update trailing matrix - hpx::dataflow(trailing_executor, matrix::unwrapExtendTiles(tile::gemm_o), op, NoTrans, beta, - mat_a.read(LocalTileIndex{k, i}), mat_b.read(kj), T(1.0), - mat_b(LocalTileIndex{i, j})); + + // Futures as senders version: + ex::detach( + transform(when_all_lift(op, blas::Op::NoTrans, beta, + // TODO: Add read_sender for use in sender algorithms? + ex::keep_future(mat_a.read(LocalTileIndex{k, i})), + ex::keep_future(mat_b.read(kj)), T(1.0), + // mat_a.read(LocalTileIndex{i, j}), + // mat_b.read(kj), T(1.0), + mat_b(LocalTileIndex{i, j})), + tile::gemm_o)); + // Alternative: + // tile::gemm in place of tile::transform + + // Pure sender version (with async_rw_mutex): + // - add versions of read and operator() to Matrix which return + // async_rw_mutex senders + // - remove the keep_future calls + + // Original dataflow/future version + // hpx::dataflow(trailing_executor, matrix::unwrapExtendTiles(tile::gemm_o), op, NoTrans, beta, + // mat_a.read(LocalTileIndex{k, i}), mat_b.read(kj), T(1.0), + // mat_b(LocalTileIndex{i, j})); } } } diff --git a/include/dlaf/transform.h b/include/dlaf/transform.h new file mode 100644 index 0000000000..b114687a2e --- /dev/null +++ b/include/dlaf/transform.h @@ -0,0 +1,161 @@ +// +// Distributed Linear Algebra with Future (DLAF) +// +// Copyright (c) 2020-2021, ETH Zurich +// All rights reserved. +// +// Please, refer to the LICENSE file in the root directory. +// SPDX-License-Identifier: BSD-3-Clause +// +#pragma once + +#include + +#include "dlaf/init.h" +#include "dlaf/types.h" + +#ifdef DLAF_WITH_CUDA +// TODO: Only for the pools, not the executors +#include "dlaf/cublas/executor.h" +#include "dlaf/cuda/executor.h" +#endif + +namespace dlaf { +// TODO: Upstream. In what form? execution::dataflow equivalent to +// when_all_lift | on | transform? +template ::value>> +decltype(auto) lift_non_senders(S&& s) { + return std::forward(s); +} + +template ::value>> +auto lift_non_senders(S&& s) { + return hpx::execution::experimental::just(std::forward(s)); +} + +template +auto when_all_lift(Ts&&... ts) { + return hpx::execution::experimental::when_all(lift_non_senders(std::forward(ts))...); +} +namespace internal { +// DLAF-specific transform, templated on a backend. This, together with +// when_all, takes the place of dataflow(executor, ...) + +// TODO: Priorities. Do backends need to be types (such that priorities can be +// attached to them)? Should tile algorithms take backend-specific execution +// policies? Should we go back to customizing hpx::execution::transform based on +// a via/on predecessor sender? +template +struct transform; + +// For Backend::MC we use the regular thread pool scheduler from HPX. +template <> +struct transform { + template + static auto call(S&& s, F&& f) { + namespace ex = hpx::execution::experimental; + return ex::transform(ex::on(std::forward(s), ex::executor{}), + hpx::util::unwrapping(std::forward(f))); + } +}; + +#ifdef DLAF_WITH_CUDA +// For Backend::GPU we use a custom sender. This currently handles CUDA stream +// and cuBLAS handle functions. +template <> +struct transform { + template + struct gpu_transform_sender { + std::decay_t s; + std::decay_t f; + + // TODO: Non-void functions + template