Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Add a new Policy in ThreadedEngine to do update per device #72

Merged
merged 4 commits into from
Sep 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
- TASK=python CXX=g++
- TASK=python3 CXX=g++
- TASK=python_naive CXX=g++
- TASK=python_perdev CXX=g++
- TASK=cpp_unittest CXX=g++

# dependent apt packages
Expand Down
2 changes: 1 addition & 1 deletion dmlc-core
6 changes: 4 additions & 2 deletions include/mxnet/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ typedef Opr* OprHandle;
enum class FnProperty {
/*! \brief Normal operation */
kNormal,
/*! \brief Copy operation between CPU and GPU */
kCopy,
/*! \brief Copy operation from GPU to other devices */
kCopyFromGPU,
/*! \brief Copy operation from CPU to other devices */
kCopyToGPU,
/*! \brief Asynchronous function call */
kAsync
}; // enum class FnProperty
Expand Down
12 changes: 10 additions & 2 deletions scripts/travis_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,23 @@ if [ ${TASK} == "python3" ]; then
make all || exit -1
export MXNET_ENGINE_TYPE=ThreadedEngine
nosetests tests/python/unittest || exit -1
nosetests tests/python/train || exit -1
nosetests tests/python/train || exit -1
fi

if [ ${TASK} == "python_naive" ]; then
echo "USE_CUDA=0" >> config.mk
make all || exit -1
export MXNET_ENGINE_TYPE=NaiveEngine
nosetests tests/python/unittest || exit -1
nosetests tests/python/train || exit -1
nosetests tests/python/train || exit -1
fi

if [ ${TASK} == "python_perdev" ]; then
echo "USE_CUDA=0" >> config.mk
make all || exit -1
export MXNET_ENGINE_TYPE=ThreadedEnginePerDevice
nosetests tests/python/unittest || exit -1
nosetests tests/python/train || exit -1
fi

if [ ${TASK} == "cpp_unittest" ]; then
Expand Down
12 changes: 8 additions & 4 deletions src/engine/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ inline Engine* CreateEngine() {
const bool default_engine = (type == nullptr);
if (type == nullptr) type = "ThreadedEngine";
std::string stype = type;

Engine *ret = nullptr;
if (stype == "ThreadedEngine") {
ret = CreateThreadedEngine();
} else if (stype == "NaiveEngine") {
if (stype == "NaiveEngine") {
ret = CreateNaiveEngine();
} else if (stype == "ThreadedEngine") {
ret = CreateThreadedEnginePooled();
} else if (stype == "ThreadedEnginePerDevice") {
ret = CreateThreadedEnginePerDevice();
}

CHECK_NE(ret, nullptr)
<< "Cannot find Eine " << type << " in registry";
<< "Cannot find Engine " << type;
if (!default_engine) {
LOG(INFO) << "MXNet start using engine: " << type;
}
Expand Down
9 changes: 7 additions & 2 deletions src/engine/engine_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,16 @@ inline T* Opr::Cast() {
#endif
}

/*! \brief Maximum number of GPUs */
static constexpr std::size_t kMaxNumGPUs = 16;

// predeclare factory function for each type of engine
/*! \return NaiveEngine instance */
Engine *CreateNaiveEngine();
/*! \return ThreadedEngine instance */
Engine *CreateThreadedEngine();
/*! \return ThreadedEnginePooled instance */
Engine *CreateThreadedEnginePooled();
/*! \return ThreadedEnginePerDevie instance */
Engine *CreateThreadedEnginePerDevice();
} // namespace engine
} // namespace mxnet
#endif // MXNET_ENGINE_ENGINE_IMPL_H_
27 changes: 14 additions & 13 deletions src/engine/stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "../common/cuda_utils.h"

namespace mxnet {

namespace engine {

/*!
Expand Down Expand Up @@ -44,9 +43,9 @@ class StreamManager {
template <std::size_t kNumGpus, std::size_t kStreams>
RunContext StreamManager<kNumGpus, kStreams>::GetRunContext(
Context const& ctx) {
RunContext ret;
switch (ctx.dev_mask) {
case cpu::kDevMask:
return {nullptr};
case cpu::kDevMask: ret.stream = nullptr; break;
case gpu::kDevMask: {
#if MXNET_USE_CUDA
std::size_t use_counter;
Expand All @@ -63,21 +62,22 @@ RunContext StreamManager<kNumGpus, kStreams>::GetRunContext(
use_counter = counter;
counter = (counter + 1) % kStreams;
}
return {gpu_streams_.at(ctx.dev_id).at(use_counter)};
#else // MXNET_USE_CUDA
LOG(FATAL) << "Please compile with CUDA enabled";
ret.stream = gpu_streams_.at(ctx.dev_id).at(use_counter);
break;
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif // MXNET_USE_CUDA
}
}
return {nullptr};
return ret;
}

template <std::size_t kNumGpus, std::size_t kStreams>
RunContext StreamManager<kNumGpus, kStreams>::GetIORunContext(
Context const& ctx) {
RunContext ret;
switch (ctx.dev_mask) {
case cpu::kDevMask:
return {nullptr};
case cpu::kDevMask: ret.stream = nullptr; break;
case gpu::kDevMask: {
#if MXNET_USE_CUDA
CUDA_CALL(cudaSetDevice(ctx.dev_id));
Expand All @@ -87,13 +87,14 @@ RunContext StreamManager<kNumGpus, kStreams>::GetIORunContext(
gpu_io_streams_.at(ctx.dev_id) = mshadow::NewStream<gpu>(false, false);
}
}
return {gpu_io_streams_.at(ctx.dev_id)};
#else // MXNET_USE_CUDA
LOG(FATAL) << "Please compile with CUDA enabled";
ret.stream = gpu_io_streams_.at(ctx.dev_id);
break;
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif // MXNET_USE_CUDA
}
}
return {nullptr};
return ret;
}

template <std::size_t kNumGpus, std::size_t kStreams>
Expand Down
43 changes: 16 additions & 27 deletions src/engine/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <dmlc/base.h>
#include <cstddef>
#include <array>
#include <vector>
#include <thread>
#include <utility>
#include "mxnet/base.h"
Expand All @@ -17,24 +17,30 @@ namespace engine {
/*!
* \brief Thread pool.
*/
template <std::size_t kSize>
class ThreadPool {
public:
/*!
* \brief Constructor takes function to run and its arguments.
* \brief Constructor takes function to run.
* \param size size of the thread pool.
* \param func the function to run on the thread pool.
*/
template <typename Function, typename... Args>
explicit ThreadPool(Function&& func, Args&&... args);
/*!
* \brief Destructor.
*/
~ThreadPool() noexcept(false);
explicit ThreadPool(size_t size, std::function<void()> func)
: worker_threads_(size) {
for (auto& i : worker_threads_) {
i = std::thread(func);
}
}
~ThreadPool() noexcept(false) {
for (auto&& i : worker_threads_) {
i.join();
}
}

private:
/*!
* \brief Worker threads.
*/
std::array<std::thread, kSize> worker_threads_;
std::vector<std::thread> worker_threads_;
/*!
* \brief Disallow default construction.
*/
Expand All @@ -44,23 +50,6 @@ class ThreadPool {
*/
DISALLOW_COPY_AND_ASSIGN(ThreadPool);
};

template <std::size_t kSize>
template <typename Function, typename... Args>
ThreadPool<kSize>::ThreadPool(Function&& func, Args&&... args) {
for (auto&& i : worker_threads_) {
i = std::thread{std::forward<Function>(func), std::forward<Args>(args)...};
}
}

template <std::size_t kSize>
ThreadPool<kSize>::~ThreadPool() noexcept(false) {
for (auto&& i : worker_threads_) {
i.join();
}
}

} // namespace engine
} // namespace mxnet

#endif // MXNET_ENGINE_THREAD_POOL_H_
Loading