Skip to content

Commit

Permalink
support storage fallback with mutable inputs (apache#147)
Browse files Browse the repository at this point in the history
* include mutatable inputs in storage fallback. refactor executor

add fallback test for rms prop and adam

fix lint

fix lint

fix test in optimizer

*  update according to comments

* fix unit tests

* fix gpu compilation err
  • Loading branch information
eric-haibin-lin authored Aug 6, 2017
1 parent 6956431 commit 66b7b8a
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 125 deletions.
4 changes: 2 additions & 2 deletions python/mxnet/ndarray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
# pylint: disable=wildcard-import
from .ndarray import *
from .ndarray_utils import load, save, zeros, empty, array
from .sparse_ndarray import _ndarray_cls
from .sparse_ndarray import csr, row_sparse, BaseSparseNDArray, todense, RowSparseNDArray, CSRNDArray
from .sparse_ndarray import _ndarray_cls, todense
from .sparse_ndarray import csr, row_sparse, BaseSparseNDArray, RowSparseNDArray, CSRNDArray
8 changes: 4 additions & 4 deletions python/mxnet/ndarray/sparse_ndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,11 @@ def copyto(self, other):
value and ``other`` will point to the same ``NDArray`` or ``CSRNDArray``.
"""
if isinstance(other, Context):
super(CSRNDArray, self).copyto(other)
return super(CSRNDArray, self).copyto(other)
elif isinstance(other, NDArray):
stype = other.stype
if stype == 'default' or stype == 'csr':
super(CSRNDArray, self).copyto(other)
return super(CSRNDArray, self).copyto(other)
else:
raise TypeError('copyto does not support destination NDArray stype ' + str(stype))
else:
Expand Down Expand Up @@ -597,11 +597,11 @@ def copyto(self, other):
return value and ``other`` will point to the same ``NDArray`` or ``RowSparseNDArray``.
"""
if isinstance(other, Context):
super(RowSparseNDArray, self).copyto(other)
return super(RowSparseNDArray, self).copyto(other)
elif isinstance(other, NDArray):
stype = other.stype
if stype == 'default' or stype == 'row_sparse':
super(RowSparseNDArray, self).copyto(other)
return super(RowSparseNDArray, self).copyto(other)
else:
raise TypeError('copyto does not support destination NDArray stype ' + str(stype))
else:
Expand Down
14 changes: 8 additions & 6 deletions python/mxnet/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,10 @@ def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8,
self.epsilon = epsilon

def create_state(self, index, weight):
return (zeros(weight.shape, weight.context, dtype=weight.dtype), # mean
zeros(weight.shape, weight.context, dtype=weight.dtype)) # variance
return (zeros(weight.shape, weight.context, dtype=weight.dtype,
stype=weight.stype), # mean
zeros(weight.shape, weight.context, dtype=weight.dtype,
stype=weight.stype)) # variance

def update(self, index, weight, grad, state):
assert(isinstance(weight, NDArray))
Expand Down Expand Up @@ -649,11 +651,11 @@ def __init__(self, learning_rate=0.001, gamma1=0.9, gamma2=0.9,
def create_state(self, index, weight):
if self.centered:
return (
zeros(weight.shape, weight.context), # n
zeros(weight.shape, weight.context), # g
zeros(weight.shape, weight.context)) # delta
zeros(weight.shape, weight.context, stype=weight.stype), # n
zeros(weight.shape, weight.context, stype=weight.stype), # g
zeros(weight.shape, weight.context, stype=weight.stype)) # delta
else:
return (zeros(weight.shape, weight.context),) # n
return (zeros(weight.shape, weight.context, stype=weight.stype),) # n

def update(self, index, weight, grad, state):
assert(isinstance(weight, NDArray))
Expand Down
79 changes: 52 additions & 27 deletions src/c_api/c_api_ndarray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void SetShapeType(const nnvm::Op* op,
void SetDependency(std::vector<engine::VarHandle> *p_read_vars,
std::vector<engine::VarHandle> *p_write_vars,
std::vector<Resource> *p_requested,
std::vector<uint32_t> *p_auxidx,
std::vector<uint32_t> *p_mutate_idx,
const nnvm::Op* op,
const nnvm::NodeAttrs& attrs,
const Context& ctx,
Expand All @@ -235,7 +235,7 @@ void SetDependency(std::vector<engine::VarHandle> *p_read_vars,
std::vector<engine::VarHandle>& read_vars = *p_read_vars;
std::vector<engine::VarHandle>& write_vars = *p_write_vars;
std::vector<Resource>& requested = *p_requested;
std::vector<uint32_t>& auxidx = *p_auxidx;
std::vector<uint32_t>& mutate_idx = *p_mutate_idx;

if (tmp_resource.count(op)) {
int ntmp = 0;
Expand All @@ -261,9 +261,9 @@ void SetDependency(std::vector<engine::VarHandle> *p_read_vars,
write_vars.push_back(i.var());
}
if (mutate.count(op)) {
auxidx = mutate[op](attrs);
std::sort(auxidx.begin(), auxidx.end());
for (auto & i : auxidx) {
mutate_idx = mutate[op](attrs);
std::sort(mutate_idx.begin(), mutate_idx.end());
for (auto & i : mutate_idx) {
write_vars.push_back(ndinputs[i].var());
}
}
Expand Down Expand Up @@ -293,36 +293,49 @@ void PushFCompute(const FCompute& fn,
const std::vector<engine::VarHandle>& write_vars,
const std::vector<Resource>& requested,
const std::vector<NDArray>& ndinputs,
const std::vector<NDArray>& ndoutputs) {
const std::vector<NDArray>& ndoutputs,
const std::vector<uint32_t>& mutate_idx) {
using namespace common;
bool is_train = AutogradRuntime::Get()->IsTraining();
Engine::Get()->PushAsync(
[ctx, attrs, fn, ndinputs, ndoutputs, requested, is_train](
[ctx, attrs, fn, ndinputs, ndoutputs, requested, is_train, mutate_idx](
RunContext rctx,
engine::CallbackOnComplete on_complete) {
std::vector<TBlob> input_blobs, output_blobs;
std::vector<NDArray> temp_in_src, temp_in_dst, temp_out_src, temp_out_dst;
// pre-fcompute and post-fcompute storage fallback src NDArrays and dst NDArrays
std::vector<NDArray> pre_temp_src, pre_temp_dst, post_temp_dst, post_temp_src;
// mapping from index in input_blobs to index in pre_temp_dst
std::unordered_map<uint32_t, uint32_t> in_temp_idx_map;
// populate input blobs and output blobs
SetupDefaultBlobs(ndinputs, &input_blobs, &pre_temp_src, &pre_temp_dst, &in_temp_idx_map);
SetupDefaultBlobs(ndoutputs, &output_blobs, &post_temp_dst, &post_temp_src);
// add mutable inputs to post temp list
for (const auto idx : mutate_idx) {
auto map_iter = in_temp_idx_map.find(idx);
if (map_iter != in_temp_idx_map.end()) {
post_temp_src.push_back(pre_temp_dst[map_iter->second]);
post_temp_dst.push_back(ndinputs[idx]);
}
}
OpContext opctx{is_train, rctx,
engine::CallbackOnComplete(),
requested};
GetDefaultBlobs(ndinputs, &input_blobs, &temp_in_src, &temp_in_dst);
GetDefaultBlobs(ndoutputs, &output_blobs, &temp_out_src, &temp_out_dst);
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
if (ctx.dev_mask() == gpu::kDevMask) {
#if MXNET_USE_CUDA
CastNonDefaultStorage<gpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<gpu>(pre_temp_src, pre_temp_dst, opctx);
fn(attrs, opctx, input_blobs, req, output_blobs);
// cast to original storage type, if necessary
CastNonDefaultStorage<gpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<gpu>(post_temp_src, post_temp_dst, opctx);
rctx.get_stream<gpu>()->Wait();
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
CastNonDefaultStorage<cpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<cpu>(pre_temp_src, pre_temp_dst, opctx);
fn(attrs, opctx, input_blobs, req, output_blobs);
// cast to original storage type, if necessary
CastNonDefaultStorage<cpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<cpu>(post_temp_src, post_temp_dst, opctx);
}
on_complete();
}, ctx, read_vars, write_vars, FnProperty::kNormal,
Expand Down Expand Up @@ -365,7 +378,8 @@ void PushOperator(const OpStatePtr& state,
const std::vector<engine::VarHandle>& write_vars,
const std::vector<Resource>& requested,
const std::vector<NDArray>& ndinputs,
const std::vector<NDArray>& ndoutputs) {
const std::vector<NDArray>& ndoutputs,
const std::vector<uint32_t>& mutate_idx) {
using namespace common;
static auto& fexec_type = nnvm::Op::GetAttr<FExecType>("FExecType");

Expand All @@ -379,28 +393,39 @@ void PushOperator(const OpStatePtr& state,
if (fcompute != nullptr) {
CHECK(exec_type == ExecType::kSync || exec_type == ExecType::kAsync);
Engine::Get()->PushAsync(
[state, fcompute, ndinputs, ndoutputs, requested, is_train, exec_type](
[state, fcompute, ndinputs, ndoutputs, requested, is_train, exec_type, mutate_idx](
RunContext rctx,
engine::CallbackOnComplete on_complete) {
OpContext opctx{is_train, rctx, on_complete, requested};

std::vector<TBlob> input_blobs, output_blobs;
std::vector<NDArray> temp_in_src, temp_in_dst, temp_out_src, temp_out_dst;
GetDefaultBlobs(ndinputs, &input_blobs, &temp_in_src, &temp_in_dst);
GetDefaultBlobs(ndoutputs, &output_blobs, &temp_out_src, &temp_out_dst);
// pre-fcompute and post-fcompute storage fallback src NDArrays and dst NDArrays
std::vector<NDArray> pre_temp_src, pre_temp_dst, post_temp_dst, post_temp_src;
// mapping from index in input_blobs to index in pre_temp_dst
std::unordered_map<uint32_t, uint32_t> in_temp_idx_map;
// populate input blobs and output blobs
SetupDefaultBlobs(ndinputs, &input_blobs, &pre_temp_src, &pre_temp_dst, &in_temp_idx_map);
SetupDefaultBlobs(ndoutputs, &output_blobs, &post_temp_dst, &post_temp_src);
// add mutable inputs to post temp list
for (const auto idx : mutate_idx) {
if (in_temp_idx_map.find(idx) != in_temp_idx_map.end()) {
post_temp_src.push_back(pre_temp_dst[in_temp_idx_map[idx]]);
post_temp_dst.push_back(ndinputs[idx]);
}
}
std::vector<OpReqType> req(output_blobs.size(), kWriteTo);
if (rctx.get_ctx().dev_mask() == gpu::kDevMask) {
#if MXNET_USE_CUDA
CastNonDefaultStorage<gpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<gpu>(pre_temp_src, pre_temp_dst, opctx);
fcompute(state, opctx, input_blobs, req, output_blobs);
CastNonDefaultStorage<gpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<gpu>(post_temp_src, post_temp_dst, opctx);
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif
} else {
CastNonDefaultStorage<cpu>(temp_in_src, temp_in_dst, opctx);
CastNonDefaultStorage<cpu>(pre_temp_src, pre_temp_dst, opctx);
fcompute(state, opctx, input_blobs, req, output_blobs);
CastNonDefaultStorage<cpu>(temp_out_dst, temp_out_src, opctx);
CastNonDefaultStorage<cpu>(post_temp_src, post_temp_dst, opctx);
}
if (exec_type == ExecType::kSync) {
if (rctx.get_ctx().dev_mask() == gpu::kDevMask) {
Expand Down Expand Up @@ -463,8 +488,8 @@ void ImperativeInvokeImpl(const Context& default_ctx,

std::vector<engine::VarHandle> read_vars, write_vars;
std::vector<Resource> requested;
std::vector<uint32_t> auxidx;
SetDependency(&read_vars, &write_vars, &requested, &auxidx,
std::vector<uint32_t> mutate_idx;
SetDependency(&read_vars, &write_vars, &requested, &mutate_idx,
op, attrs, ctx, ndinputs, ndoutputs);

FCompute fn = common::GetFCompute<FCompute>(op, "FCompute", ctx);
Expand All @@ -482,7 +507,7 @@ void ImperativeInvokeImpl(const Context& default_ctx,
attrs, &ndinputs, &ndoutputs);
}
PushFCompute(fn, op, attrs, ctx, read_vars, write_vars,
requested, ndinputs, ndoutputs);
requested, ndinputs, ndoutputs, mutate_idx);
} else if (createop.count(op)) {
auto state =
createop[op](attrs, ctx, ret->arg_shapes, ret->arg_types);
Expand All @@ -492,7 +517,7 @@ void ImperativeInvokeImpl(const Context& default_ctx,
}
write_vars.push_back(state.get_var());
PushOperator(state, op, attrs, ctx, read_vars, write_vars,
requested, ndinputs, ndoutputs);
requested, ndinputs, ndoutputs, mutate_idx);
} else {
LOG(FATAL)
<< "Operator " << op->name << " is not implemented for "
Expand Down
36 changes: 27 additions & 9 deletions src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,30 @@ template<typename xpu>
void CastStorageDispatch(const OpContext& ctx, const NDArray& input, const NDArray& output);

/*
* \brief get the corresponding tensor blobs from default storage NDArrays.
* If any NDArray is of non-default storage, it will be added to `temp_src`
* \return true if any input storage needs to be casted
* \brief setup default-storage tblobs from source NDArrays. If any source NDArray has non-default
* storage, it creates a temp NDArray with default storage and uses the temp tblob. The
* function also records the indices of non-default source NDArrays and the indices of
* their corresponding temporary NDArrays in the temp array.
* \param src list of source NDArray
* \param blobs list of tblobs to return
* \param temp_src list of source NDArrays which requires temporary default storage representation
* \param temp_dst list of temporary destination NDArrays for default storage representation
* \param idx_map mapping from indices in source NDArrays to indices in temp_dst. When not set,
indices are not recorded
* \return true if any source NDArray need to cast storage
*/
inline bool GetDefaultBlobs(const std::vector<NDArray>& src,
std::vector<TBlob> *blobs,
std::vector<NDArray> *temp_src,
std::vector<NDArray> *temp_dst) {
inline bool SetupDefaultBlobs(const std::vector<NDArray>& src,
std::vector<TBlob> *blobs,
std::vector<NDArray> *temp_src,
std::vector<NDArray> *temp_dst,
std::unordered_map<uint32_t, uint32_t> *idx_map = nullptr) {
bool require_cast = false;
for (size_t i = 0; i < src.size(); i++) {
auto& nd = src[i];
if (nd.storage_type() != kDefaultStorage) {
if (idx_map != nullptr) {
(*idx_map)[i] = temp_dst->size();
}
NDArray temp(nd.shape(), nd.ctx(), false);
temp_src->emplace_back(nd);
temp_dst->emplace_back(temp);
Expand All @@ -56,10 +68,15 @@ inline bool GetDefaultBlobs(const std::vector<NDArray>& src,
}

/*
* \brief cast the NDArrays in `src` to NDArrays in `dst`. This is only used
* for storage fallback mechanism in executor.
* \brief cast the NDArrays in `src` and store the result in NDArrays in `dst`.
* This is only used for storage fallback in executor.
* When storage_fallback is false, and `MXNET_EXEC_STORAGE_FALLBACK` == 0,
* storage fallback is disallowed.
* \param src list of source NDArray to cast
* \param dst list of destionation NDArray which hold the result of cast_storage operation
* \param ctx operator context for cast_storage operation
* \param storage_fallback whether storage_fallback is allowed. When set to false,
* its value depends on `MXNET_EXEC_STORAGE_FALLBACK`.
*/
template <typename xpu>
inline void CastNonDefaultStorage(const std::vector<NDArray>& src,
Expand Down Expand Up @@ -89,6 +106,7 @@ inline bool ContainsNonDefaultStorage(const StorageTypeVector& vstorage) {
return false;
}

// Check if any NDArray in the list has default storage
inline bool ContainsDefaultStorage(const std::vector<NDArray>& ndarrays) {
for (const auto &nd : ndarrays) {
if (nd.storage_type() == kDefaultStorage) {
Expand Down
Loading

0 comments on commit 66b7b8a

Please sign in to comment.