diff --git a/src/common/utils.h b/src/common/utils.h index 5b80c4dcaa29..ce01af118735 100644 --- a/src/common/utils.h +++ b/src/common/utils.h @@ -14,6 +14,7 @@ #include #include #include +#include #endif // DMLC_USE_CXX11 #include @@ -168,6 +169,57 @@ inline int GetExecNumMatchColor() { return std::min(num_match_color, GetNumThreadPerGPU()); } +/*! + * \brief + * Helper function for ParallelSort. + * DO NOT call this function directly. + * Use the interface ParallelSort instead. + * Ref: https://github.com/dmlc/difacto/blob/master/src/common/parallel_sort.h + */ +template +void ParallelSortHelper(RandomIt first, size_t len, + size_t grainsize, const Compare& comp) { + if (len < grainsize) { + std::sort(first, first+len, comp); + } else { + std::thread thr(ParallelSortHelper, first, len/2, grainsize, comp); + ParallelSortHelper(first+len/2, len - len/2, grainsize, comp); + thr.join(); + std::inplace_merge(first, first+len/2, first+len, comp); + } +} + +/*! + * \brief + * Sort the elements in the range [first, last) into the ascending order defined by + * the comparator comp. + * If the length of the range [first, last) is greater than a certain threshold, + * the range will be recursively divided into two and assign two threads + * to sort each half range. + * Ref: https://github.com/dmlc/difacto/blob/master/src/common/parallel_sort.h + */ +template +void ParallelSort(RandomIt first, RandomIt last, size_t num_threads, Compare comp) { + const auto num = std::distance(first, last); + size_t grainsize = std::max(num / num_threads + 5, static_cast(1024*16)); + ParallelSortHelper(first, num, grainsize, comp); +} + +/*! + * \brief + * Sort the elements in the range [first, last) into ascending order. + * The elements are compared using the default < operator. + * If the length of the range [first, last) is greater than a certain threshold, + * the range will be recursively divided into two and assign two threads + * to sort each half range. + * Ref: https://github.com/dmlc/difacto/blob/master/src/common/parallel_sort.h + */ +template +void ParallelSort(RandomIt first, RandomIt last, size_t num_threads) { + ParallelSort(first, last, num_threads, + std::less::value_type>()); +} + /*! * \brief Random Engine */ diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index e1ab5c9557e0..f72ccca9aa17 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -3,13 +3,16 @@ */ #ifndef MXNET_KVSTORE_COMM_H_ #define MXNET_KVSTORE_COMM_H_ +#include #include #include #include #include #include #include +#include #include "mxnet/ndarray.h" +#include "../common/utils.h" namespace mxnet { namespace kvstore { /** @@ -65,6 +68,8 @@ class CommCPU : public Comm { CommCPU() { nthread_reduction_ = dmlc::GetEnv("MXNET_KVSTORE_REDUCTION_NTHREADS", 4); bigarray_bound_ = dmlc::GetEnv("MXNET_KVSTORE_BIGARRAY_BOUND", 1000 * 1000); + // TODO(junwu) delete the following data member, now for benchmark only + is_serial_push_ = dmlc::GetEnv("MXNET_KVSTORE_SERIAL_PUSH", 0); } virtual ~CommCPU() { } @@ -130,7 +135,8 @@ class CommCPU : public Comm { auto result = buf.merged; Engine::Get()->PushSync([reduce, result, this](RunContext rctx) { NDArray out = result; - ReduceSumCPUEx(reduce, &out); + is_serial_push_? + ReduceSumCPUExSerial(reduce, &out) : ReduceSumCPUExParallel(reduce, &out); }, Context::CPU(), const_vars, {result.var()}, FnProperty::kCPUPrioritized, priority, PROFILER_MESSAGE("KVStoreReduce")); } @@ -168,7 +174,7 @@ class CommCPU : public Comm { // serial implementation of reduce sum for row sparse NDArray. // TODO(haibin) use openmp kernel to parallelize the summation - inline void ReduceSumCPUEx(const std::vector &in, NDArray *out) { + inline void ReduceSumCPUExSerial(const std::vector &in, NDArray *out) { using namespace rowsparse; using namespace mshadow; auto stype = out->storage_type(); @@ -239,6 +245,115 @@ class CommCPU : public Comm { }); } + template + void ReduceSumCPUExImpl(const std::vector& nds, + const std::vector& uniq_row_idx, + NDArray* out) { +#pragma omp parallel num_threads(nthread_reduction_) + { + const size_t nnr = uniq_row_idx.size(); + const int num_threads = omp_get_num_threads(); + size_t row_block_len = (nnr + num_threads - 1) / num_threads; + const size_t row_block_start = omp_get_thread_num() * row_block_len; + if (row_block_start < nnr) { + const size_t row_block_end = std::min(row_block_start+row_block_len, nnr); + + auto out_values = out->data().FlatTo2D(); + auto out_indices = out->aux_data(rowsparse::kIdx).FlatTo1D(); + for (size_t i = row_block_start; i < row_block_end; ++i) { + out_indices[i] = uniq_row_idx[i]; + } + for (const auto& nd : nds) { + if (nd.storage_initialized()) { + const auto nd_indices = nd.aux_data(rowsparse::kIdx).FlatTo1D(); + const auto nd_values = nd.data().FlatTo2D(); + const auto nd_num_rows = nd.aux_shape(rowsparse::kIdx).Size(); + const IType* nd_indices_start = &nd_indices[0]; + const IType* nd_indices_end = nd_indices_start + nd_num_rows; + const IType* row_idx_ptr = std::lower_bound(nd_indices_start, nd_indices_end, + out_indices[row_block_start]); + // skip this nd if all of its row indices are smaller than out_indices[row_block_start] + // or current row block is not covered by [*row_idx_ptr, nd_indices_end). + if (nd_indices_end == row_idx_ptr || *row_idx_ptr > out_indices[row_block_end-1]) { + continue; + } + for (size_t irow = row_block_start; + irow < row_block_end && row_idx_ptr != nd_indices_end;) { + if (out_indices[irow] == *row_idx_ptr) { + auto out_value_cur_row = out_values[irow]; + const auto offset = row_idx_ptr - nd_indices_start; + auto nd_value_cur_row = nd_values[offset]; + for (size_t j = 0; j < nd_value_cur_row.shape_[0]; ++j) { + out_value_cur_row[j] += nd_value_cur_row[j]; + } + ++irow; + ++row_idx_ptr; + } else if (out_indices[irow] < *row_idx_ptr) { + ++irow; + } else { + ++row_idx_ptr; + } + } + } + } + } + } + } + + /*! + * \brief Given a vector of ndarrays, generate a index vector containing + * all the unique row indices of the ndarrays. + */ + template + void GetUniqueRspRowIdx(const std::vector& nds, + std::vector* uniq_row_idx) { + using namespace rowsparse; + size_t total_num_rows = 0; + for (const auto& nd : nds) { + CHECK_EQ(nd.storage_type(), kRowSparseStorage); + if (nd.storage_initialized()) { + total_num_rows += nd.aux_shape(kIdx).Size(); + } + } + + uniq_row_idx->resize(total_num_rows); + int nthreads = omp_get_max_threads(); + size_t offset = 0; + for (const auto& nd : nds) { + if (nd.storage_initialized()) { + const IType* nd_row_idx = nd.aux_data(kIdx).dptr(); + const size_t num_rows = nd.aux_shape(kIdx).Size(); +#pragma omp parallel for num_threads(nthreads) + for (size_t i = 0; i < num_rows; ++i) { + (*uniq_row_idx)[offset+i] = nd_row_idx[i]; + } + offset += num_rows; + } + } + + common::ParallelSort(uniq_row_idx->begin(), uniq_row_idx->end(), nthreads); + auto it = std::unique(uniq_row_idx->begin(), uniq_row_idx->end()); + uniq_row_idx->resize(it - uniq_row_idx->begin()); + } + + void ReduceSumCPUExParallel(const std::vector& nds, NDArray* out) { + if (nds.empty()) return; + using namespace rowsparse; + CHECK_EQ(out->storage_type(), kRowSparseStorage) + << "Expected row sparse storage type (" + << out->storage_type() << " given)"; + + MSHADOW_TYPE_SWITCH(out->dtype(), DType, { + MSHADOW_INT_TYPE_SWITCH(out->aux_type(kIdx), IType, { + std::vector uniq_row_idx; + GetUniqueRspRowIdx(nds, &uniq_row_idx); + out->CheckAndAlloc({mshadow::Shape1(uniq_row_idx.size())}); + out->data().FlatTo2D() = static_cast(0); + ReduceSumCPUExImpl(nds, uniq_row_idx, out); + }); + }); + } + template inline static void ReduceSumCPU( const std::vector &dptr, size_t offset, index_t size) { @@ -304,6 +419,7 @@ class CommCPU : public Comm { std::unordered_map merge_buf_; size_t bigarray_bound_; int nthread_reduction_; + bool is_serial_push_; }; /** diff --git a/src/ndarray/ndarray.cc b/src/ndarray/ndarray.cc index e45d0c2bfd68..c4474fbbaf7e 100644 --- a/src/ndarray/ndarray.cc +++ b/src/ndarray/ndarray.cc @@ -925,7 +925,16 @@ void NDArray::Load(dmlc::Stream* fi, } NDArray NDArray::Copy(Context ctx) const { - NDArray ret(shape(), ctx, true, dtype_); + NDArray ret; + if (kDefaultStorage == storage_type()) { + ret = NDArray(shape(), ctx, true, dtype_); + } else if (kUndefinedStorage != storage_type()) { + ret = NDArray(storage_type(), shape(), ctx, true, dtype_, + ptr_->aux_types, ptr_->aux_shapes, storage_shape()); + } else { + LOG(FATAL) << "NDArray::Copy cannot copy undefined storage-type ndarray to ctx.dev_type=" + << ctx.dev_type << ", ctx.dev_id=" << ctx.dev_id; + } CopyFromTo(*this, &ret); return ret; }