Skip to content

Commit

Permalink
Try #371:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] authored Jun 3, 2021
2 parents e9e585d + 34ffb5d commit dbda159
Show file tree
Hide file tree
Showing 19 changed files with 845 additions and 414 deletions.
4 changes: 2 additions & 2 deletions ci/docker/codecov/build.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ RUN wget -q https://github.com/gperftools/gperftools/releases/download/gperftool
rm -rf /root/gperftools.tar.gz /root/gperftools-${GPERFTOOLS_VERSION}

# Install HPX
ARG HPX_FORK=STEllAR-GROUP
ARG HPX_VERSION=1.6.0
ARG HPX_FORK=msimberg
ARG HPX_VERSION=cuda-event-callback
ARG HPX_WITH_CUDA=OFF
ARG HPX_PATH=/usr/local/hpx
RUN wget -q https://github.com/${HPX_FORK}/hpx/archive/${HPX_VERSION}.tar.gz -O hpx.tar.gz && \
Expand Down
4 changes: 2 additions & 2 deletions ci/docker/release/build.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ RUN wget -q https://github.com/gperftools/gperftools/releases/download/gperftool
rm -rf /root/gperftools.tar.gz /root/gperftools-${GPERFTOOLS_VERSION}

# Install HPX
ARG HPX_FORK=STEllAR-GROUP
ARG HPX_VERSION=1.6.0
ARG HPX_FORK=msimberg
ARG HPX_VERSION=cuda-event-callback
ARG HPX_WITH_CUDA=OFF
ARG HPX_PATH=/usr/local/hpx
RUN wget -q https://github.com/${HPX_FORK}/hpx/archive/${HPX_VERSION}.tar.gz -O hpx.tar.gz && \
Expand Down
71 changes: 1 addition & 70 deletions include/dlaf/cublas/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,51 +31,13 @@

#include "dlaf/common/assert.h"
#include "dlaf/cublas/error.h"
#include "dlaf/cublas/handle_pool.h"
#include "dlaf/cuda/error.h"
#include "dlaf/cuda/executor.h"

namespace dlaf {
namespace cublas {
namespace internal {
class HandlePoolImpl {
int device_;
std::size_t num_worker_threads_ = hpx::get_num_worker_threads();
std::vector<cublasHandle_t> handles_;
cublasPointerMode_t ptr_mode_;

public:
HandlePoolImpl(int device, cublasPointerMode_t ptr_mode)
: device_(device), handles_(num_worker_threads_), ptr_mode_(ptr_mode) {
DLAF_CUDA_CALL(cudaSetDevice(device_));

for (auto& h : handles_) {
DLAF_CUBLAS_CALL(cublasCreate(&h));
}
}

HandlePoolImpl& operator=(HandlePoolImpl&&) = default;
HandlePoolImpl(HandlePoolImpl&&) = default;
HandlePoolImpl(const HandlePoolImpl&) = delete;
HandlePoolImpl& operator=(const HandlePoolImpl&) = delete;

~HandlePoolImpl() {
for (auto& h : handles_) {
DLAF_CUBLAS_CALL(cublasDestroy(h));
}
}

cublasHandle_t getNextHandle(cudaStream_t stream) {
cublasHandle_t handle = handles_[hpx::get_worker_thread_num()];
DLAF_CUDA_CALL(cudaSetDevice(device_));
DLAF_CUBLAS_CALL(cublasSetStream(handle, stream));
DLAF_CUBLAS_CALL(cublasSetPointerMode(handle, ptr_mode_));
return handle;
}

int getDevice() {
return device_;
}
};

template <bool IsCallable, typename F, typename... Ts>
struct isAsyncCublasCallableImpl : std::false_type {
Expand All @@ -99,37 +61,6 @@ struct isDataflowCublasCallable
std::declval<Futures>()))> {};
}

/// A pool of cuBLAS handles with reference semantics (copying points to the
/// same underlying cuBLAS handles, last reference destroys the references).
/// Allows access to cuBLAS handles associated with a particular stream. The
/// user must ensure that the handle pool and the stream use the same device.
/// Each HPX worker thread is assigned thread local cuBLAS handle.
class HandlePool {
std::shared_ptr<internal::HandlePoolImpl> handles_ptr_;

public:
HandlePool(int device = 0, cublasPointerMode_t ptr_mode = CUBLAS_POINTER_MODE_HOST)
: handles_ptr_(std::make_shared<internal::HandlePoolImpl>(device, ptr_mode)) {}

cublasHandle_t getNextHandle(cudaStream_t stream) {
DLAF_ASSERT(bool(handles_ptr_), "");
return handles_ptr_->getNextHandle(stream);
}

int getDevice() {
DLAF_ASSERT(bool(handles_ptr_), "");
return handles_ptr_->getDevice();
}

bool operator==(HandlePool const& rhs) const noexcept {
return handles_ptr_ == rhs.handles_ptr_;
}

bool operator!=(HandlePool const& rhs) const noexcept {
return !(*this == rhs);
}
};

/// An executor for cuBLAS functions. Uses handles and streams from the given
/// HandlePool and StreamPool. A cuBLAS function is defined as any function that
/// takes a cuBLAS handle as the first argument. The executor inserts a cuBLAS
Expand Down
109 changes: 109 additions & 0 deletions include/dlaf/cublas/handle_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//
// Distributed Linear Algebra with Future (DLAF)
//
// Copyright (c) 2018-2021, ETH Zurich
// All rights reserved.
//
// Please, refer to the LICENSE file in the root directory.
// SPDX-License-Identifier: BSD-3-Clause
//

#pragma once

/// @file

#ifdef DLAF_WITH_CUDA

#include <cstddef>
#include <memory>
#include <utility>
#include <vector>

#include <cublas_v2.h>
#include <cuda_runtime.h>

#include <hpx/local/runtime.hpp>

#include "dlaf/common/assert.h"
#include "dlaf/cublas/error.h"
#include "dlaf/cuda/error.h"
#include "dlaf/cuda/executor.h"

namespace dlaf {
namespace cublas {
namespace internal {
class HandlePoolImpl {
int device_;
std::size_t num_worker_threads_ = hpx::get_num_worker_threads();
std::vector<cublasHandle_t> handles_;
cublasPointerMode_t ptr_mode_;

public:
HandlePoolImpl(int device, cublasPointerMode_t ptr_mode)
: device_(device), handles_(num_worker_threads_), ptr_mode_(ptr_mode) {
DLAF_CUDA_CALL(cudaSetDevice(device_));

for (auto& h : handles_) {
DLAF_CUBLAS_CALL(cublasCreate(&h));
}
}

HandlePoolImpl& operator=(HandlePoolImpl&&) = default;
HandlePoolImpl(HandlePoolImpl&&) = default;
HandlePoolImpl(const HandlePoolImpl&) = delete;
HandlePoolImpl& operator=(const HandlePoolImpl&) = delete;

~HandlePoolImpl() {
for (auto& h : handles_) {
DLAF_CUBLAS_CALL(cublasDestroy(h));
}
}

cublasHandle_t getNextHandle(cudaStream_t stream) {
cublasHandle_t handle = handles_[hpx::get_worker_thread_num()];
DLAF_CUDA_CALL(cudaSetDevice(device_));
DLAF_CUBLAS_CALL(cublasSetStream(handle, stream));
DLAF_CUBLAS_CALL(cublasSetPointerMode(handle, ptr_mode_));
return handle;
}

int getDevice() {
return device_;
}
};
}

/// A pool of cuBLAS handles with reference semantics (copying points to the
/// same underlying cuBLAS handles, last reference destroys the references).
/// Allows access to cuBLAS handles associated with a particular stream. The
/// user must ensure that the handle pool and the stream use the same device.
/// Each HPX worker thread is assigned thread local cuBLAS handle.
class HandlePool {
std::shared_ptr<internal::HandlePoolImpl> handles_ptr_;

public:
HandlePool(int device = 0, cublasPointerMode_t ptr_mode = CUBLAS_POINTER_MODE_HOST)
: handles_ptr_(std::make_shared<internal::HandlePoolImpl>(device, ptr_mode)) {}

cublasHandle_t getNextHandle(cudaStream_t stream) {
DLAF_ASSERT(bool(handles_ptr_), "");
return handles_ptr_->getNextHandle(stream);
}

int getDevice() {
DLAF_ASSERT(bool(handles_ptr_), "");
return handles_ptr_->getDevice();
}

bool operator==(HandlePool const& rhs) const noexcept {
return handles_ptr_ == rhs.handles_ptr_;
}

bool operator!=(HandlePool const& rhs) const noexcept {
return !(*this == rhs);
}
};
}
}

#endif
100 changes: 2 additions & 98 deletions include/dlaf/cuda/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,111 +24,15 @@
#include <hpx/functional.hpp>
#include <hpx/future.hpp>
#include <hpx/include/util.hpp>
#include <hpx/modules/async_cuda.hpp>
#include <hpx/tuple.hpp>

#include "dlaf/common/assert.h"
#include "dlaf/cuda/error.h"
#include "dlaf/cuda/stream_pool.h"

namespace dlaf {
namespace cuda {
namespace internal {

struct StreamPoolImpl {
int device_;
std::size_t num_worker_threads_ = hpx::get_num_worker_threads();
std::size_t num_streams_per_worker_thread_;
std::vector<cudaStream_t> streams_;
std::vector<hpx::util::cache_aligned_data<std::size_t>> current_stream_idxs_;

StreamPoolImpl(int device, std::size_t num_streams_per_worker_thread,
hpx::threads::thread_priority hpx_thread_priority)
: device_(device), num_streams_per_worker_thread_(num_streams_per_worker_thread),
streams_(num_worker_threads_ * num_streams_per_worker_thread),
current_stream_idxs_(num_worker_threads_, {std::size_t(0)}) {
DLAF_CUDA_CALL(cudaSetDevice(device));

// We map hpx::threads::thread_priority::high to the highest CUDA stream
// priority, and the rest to the lowest. Typically CUDA streams will only
// have two priorities.
int least_priority, greatest_priority;
DLAF_CUDA_CALL(cudaDeviceGetStreamPriorityRange(&least_priority, &greatest_priority));
int stream_priority = least_priority;
if (hpx_thread_priority == hpx::threads::thread_priority::high) {
stream_priority = greatest_priority;
}

for (auto& s : streams_) {
DLAF_CUDA_CALL(cudaStreamCreateWithPriority(&s, cudaStreamNonBlocking, stream_priority));
}
}

StreamPoolImpl& operator=(StreamPoolImpl&&) = default;
StreamPoolImpl(StreamPoolImpl&&) = default;
StreamPoolImpl(const StreamPoolImpl&) = delete;
StreamPoolImpl& operator=(const StreamPoolImpl&) = delete;

~StreamPoolImpl() {
for (auto& s : streams_) {
DLAF_CUDA_CALL(cudaStreamDestroy(s));
}
}

cudaStream_t getNextStream() {
// Set the device corresponding to the CUBLAS handle.
//
// The CUBLAS library context is tied to the current CUDA device [1]. A previous task scheduled on
// the same thread may have set a different device, this makes sure the correct device is used. The
// function is considered very low overhead call [2].
//
// [1]: https://docs.nvidia.com/cuda/cublas/index.html#cublascreate
// [2]: CUDA Runtime API, section 5.1 Device Management
DLAF_CUDA_CALL(cudaSetDevice(device_));
const std::size_t worker_thread_num = hpx::get_worker_thread_num();
DLAF_ASSERT(worker_thread_num != std::size_t(-1), worker_thread_num);
std::size_t stream_idx =
worker_thread_num * num_streams_per_worker_thread_ +
(++current_stream_idxs_[worker_thread_num].data_ % num_streams_per_worker_thread_);

return streams_[stream_idx];
}

int getDevice() {
return device_;
}
};
}

/// A pool of CUDA streams with reference semantics (copying points to the same
/// underlying CUDA streams, last reference destroys the references). Allows
/// access to CUDA streams in a round-robin fashion. Each HPX worker thread is
/// assigned a set of thread local CUDA streams.
class StreamPool {
std::shared_ptr<internal::StreamPoolImpl> streams_ptr_;

public:
StreamPool(int device = 0, std::size_t num_streams_per_worker_thread = 3,
hpx::threads::thread_priority hpx_thread_priority = hpx::threads::thread_priority::default_)
: streams_ptr_(std::make_shared<internal::StreamPoolImpl>(device, num_streams_per_worker_thread,
hpx_thread_priority)) {}

cudaStream_t getNextStream() {
DLAF_ASSERT(bool(streams_ptr_), "");
return streams_ptr_->getNextStream();
}

int getDevice() {
DLAF_ASSERT(bool(streams_ptr_), "");
return streams_ptr_->getDevice();
}

bool operator==(StreamPool const& rhs) const noexcept {
return streams_ptr_ == rhs.streams_ptr_;
}

bool operator!=(StreamPool const& rhs) const noexcept {
return !(*this == rhs);
}
};

/// An executor for CUDA functions. Uses streams from the given StreamPool. A
/// CUDA function is defined as any function that takes a CUDA stream as the
Expand Down
Loading

0 comments on commit dbda159

Please sign in to comment.