Skip to content

Commit

Permalink
Fix misc
Browse files Browse the repository at this point in the history
  • Loading branch information
reminisce committed Aug 14, 2017
1 parent 68c91cb commit 5cebdad
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 18 deletions.
19 changes: 11 additions & 8 deletions benchmark/python/sparse_end2end.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,18 @@ def row_sparse_pull(kv, key, data, slices, weight_array, priority):
# the weights to each context
# column indices (NDArray type) of the csr data
# used as the row_idx of the weight row-sparse matrix
# TODO(junwu):
# the following two lines block, may need to precompute
# them and cache them outside the for loop
row_indices = data.indices
indptr = data.indptr.asnumpy()
row_idx_array = []
for s in slices:
row_idx_array.append(row_indices[indptr[s.start]:indptr[s.stop]])
kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_idx_array)
if len(slices) == 1:
kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_indices)
else: # more than one slices, multi-GPU training. Need to retain weight rows according to data slices
# TODO(junwu):
# the following line blocks, may need to pre-compute
# and cache it outside the for loop
indptr = data.indptr.asnumpy()
row_idx_array = []
for s in slices:
row_idx_array.append(row_indices[indptr[s.start]:indptr[s.stop]])
kv.row_sparse_pull(key, weight_array, priority=priority, row_ids=row_idx_array)


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ class CommCPU : public Comm {
<< "BroadcastRowSparse with row_indices on gpu context not supported";
// retain according to unique indices
const bool use_sparse_retain = (src.shape()[0] != src.storage_shape()[0])
|| (row_id.dtype() != out->aux_type(rowsparse::kIdx));
|| (row_id.dtype() != out->aux_type(rowsparse::kIdx))
|| (out->ctx().dev_mask() != Context::kGPU);
if (use_sparse_retain) { // use sparse_retain op
const bool is_to_gpu = out->ctx().dev_mask() == Context::kGPU;
NDArray out_cpu = is_to_gpu? NDArray(kRowSparseStorage, src.shape(),
Expand Down
40 changes: 33 additions & 7 deletions src/operator/tensor/sparse_retain-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,25 @@ struct SparseRetainCopyIndices {
}
};

/*!
* Copy input retained rows to output rows.
* Only used when input rsp is dense.
* This kernel is only used when ctx is on GPU.
* So it's parallelized by out_rows' elements,
* instead of rows.
* For CPU ctx, we simply call mshadow::Copy.
*/
struct SparseRetainCopyRetainedRowsFromDns {
template<typename DType, typename RType, typename IType>
MSHADOW_XINLINE static void Map(int i, DType* out_rows, const DType* in_rows,
const RType* in_row_idx, const IType* idx,
const size_t row_length) {
const size_t irow = i / row_length;
const size_t icol = i % row_length;
out_rows[i] = in_rows[static_cast<size_t>(idx[irow]) * row_length + icol];
}
};

template<typename xpu>
void SparseRetainOpForwardRspImpl(mshadow::Stream<xpu> *s,
const NDArray& input_nd,
Expand Down Expand Up @@ -255,14 +274,21 @@ void SparseRetainOpForwardRspImpl(mshadow::Stream<xpu> *s,
output_idx.dptr<RType>(), idx_data.dptr<IType>());
}
// copy data
const Tensor<xpu, 2, DType> input_tensor =
input_data.get_with_shape<xpu, 2, DType>(Shape2(input_data.shape_[0], row_length), s);
Tensor<xpu, 2, DType> output_tensor =
output_data.get_with_shape<xpu, 2, DType>(Shape2(output_data.shape_[0], row_length), s);
for (size_t i = 0; i < num_rows_retained; ++i) {
Copy(output_tensor[i], input_tensor[output_idx_tensor[i]], s);
if (std::is_same<xpu, cpu>::value) { // For cpu, we can access output_idx_tensor[i]
const Tensor<xpu, 2, DType> input_tensor =
input_data.get_with_shape<xpu, 2, DType>(Shape2(input_data.shape_[0], row_length), s);
Tensor<xpu, 2, DType> output_tensor =
output_data.get_with_shape<xpu, 2, DType>(Shape2(output_data.shape_[0], row_length),
s);
for (size_t i = 0; i < num_rows_retained; ++i) {
Copy(output_tensor[i], input_tensor[output_idx_tensor[i]], s);
}
} else { // For gpu, have to kernel launch
Kernel<SparseRetainCopyRetainedRowsFromDns, xpu>::Launch(s, output_data.Size(),
output_data.dptr<DType>(), input_data.dptr<DType>(), input_idx.dptr<RType>(),
idx_data.dptr<IType>(), row_length);
}
} else {
} else { // input rsp is not dense
Kernel<SparseRetainRspThreadKernel, xpu>::Launch(s, idx_data.Size(),
output_data.dptr<DType>(), output_idx.dptr<RType>(), input_data.dptr<DType>(),
input_idx.dptr<RType>(), idx_data.dptr<IType>(), input_data.shape_[0], row_length);
Expand Down
4 changes: 2 additions & 2 deletions tests/python/gpu/test_kvstore_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ def init_kv_with_str(stype='default'):

def test_row_sparse_pull():
kv = init_kv_with_str('row_sparse')
kv.init('e', mx.nd.ones(shape)._to_rsp())
kv.init('e', mx.nd.ones(shape).tostype('row_sparse'))

def check_row_sparse_pull(kv, count, ctx=default_context()):
num_rows = shape[0]
vals = []
row_ids = []
all_row_ids = np.arange(num_rows)
for i in range(count):
vals.append(mx.nd.zeros(shape, ctx=ctx)._to_rsp())
vals.append(mx.nd.zeros(shape, ctx=ctx).tostype('row_sparse'))
row_id = np.random.randint(num_rows, size=num_rows)
row_ids.append(mx.nd.array(row_id, dtype='int64'))
row_ids_to_pull = row_ids[0] if len(row_ids) == 1 else row_ids
Expand Down

0 comments on commit 5cebdad

Please sign in to comment.