Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add executor.prepare #9022

Merged
merged 18 commits into from
Mar 20, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions paddle/fluid/framework/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,12 @@ limitations under the License. */

#include "paddle/fluid/framework/executor.h"

#include <set>

#include "gflags/gflags.h"
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/framework/feed_fetch_method.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
#include "paddle/fluid/framework/lod_rank_table.h"
#include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/platform/place.h"

DECLARE_bool(benchmark);
DEFINE_bool(check_nan_inf, false,
Expand All @@ -34,14 +29,9 @@ DEFINE_bool(check_nan_inf, false,
namespace paddle {
namespace framework {

struct ExecutorPrepareContext {
ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id)
: prog_(prog), block_id_(block_id) {}

framework::ProgramDesc prog_;
size_t block_id_;
std::vector<std::unique_ptr<OperatorBase>> ops_;
};
ExecutorPrepareContext::ExecutorPrepareContext(
const framework::ProgramDesc& prog, size_t block_id)
: prog_(prog), block_id_(block_id) {}

Executor::Executor(const platform::Place& place) : place_(place) {}

Expand Down Expand Up @@ -269,6 +259,10 @@ ExecutorPrepareContext* Executor::Prepare(const ProgramDesc& program,
return ctx;
}

void Executor::DeletePreparedContext(ExecutorPrepareContext* ctx) {
delete ctx;
}

void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope, bool create_vars) {
auto& block = ctx->prog_.Block(ctx->block_id_);
Expand Down
12 changes: 11 additions & 1 deletion paddle/fluid/framework/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ limitations under the License. */

namespace paddle {
namespace framework {
struct ExecutorPrepareContext;

struct ExecutorPrepareContext {
ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id);

framework::ProgramDesc prog_;
size_t block_id_;
std::vector<std::unique_ptr<OperatorBase>> ops_;
};

class Executor {
public:
// TODO(dzhwinter) : Do not rely on this function, it will be removed
Expand Down Expand Up @@ -50,6 +58,8 @@ class Executor {
static ExecutorPrepareContext* Prepare(const ProgramDesc& program,
int block_id);

static void DeletePreparedContext(ExecutorPrepareContext* ctx);

void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope = true,
bool create_vars = true);
Expand Down
18 changes: 18 additions & 0 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,24 @@ All parameter, weight, gradient are variables in Paddle.

py::class_<framework::Executor>(m, "Executor")
.def(py::init<const platform::Place &>())
.def_static(
"prepare",
[](const ProgramDesc &pdesc, int block_id) -> void * {
return static_cast<void *>(Executor::Prepare(pdesc, block_id));
},
py::return_value_policy::reference)
.def_static("delete_prepared_ctx",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not needed.

We can just return ExecutorPrepareContext in prepare method, and make Python delete this object.

Copy link
Member Author

@jacquesqiao jacquesqiao Mar 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reyoung Ok, I use copy to return the void * now:

    /** Create a new copy of the returned object, which will be owned by
        Python. This policy is comparably safe because the lifetimes of the two
        instances are decoupled. */
    copy,

I am not sure how will Python manage this returned pointer, can you give some information?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default policy is OK

[](void *handle) {
Executor::DeletePreparedContext(
static_cast<ExecutorPrepareContext *>(handle));
})
.def("run_prepared_ctx",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_run_prepared_ctx

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

[](Executor &self, void *handle, Scope *scope,
bool create_local_scope, bool create_vars) {
self.RunPreparedContext(
static_cast<ExecutorPrepareContext *>(handle), scope,
create_local_scope, create_vars);
})
.def("run",
(void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) &
Executor::Run);
Expand Down
218 changes: 146 additions & 72 deletions python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ def to_name_str(var):
return str(feed_var_names + fetch_var_names)


class PreparedContext(object):
def __init__(self, handle, program, fetch_list, feed_var_name,
fetch_var_name):
self.handle = handle
self.program = program
self.fetch_list = fetch_list
self.feed_var_name = feed_var_name
self.fetch_var_name = fetch_var_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all of them public members?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this class if it's no longer used?



class Executor(object):
def __init__(self, places):
if not isinstance(places, list) and not isinstance(places, tuple):
Expand Down Expand Up @@ -235,6 +245,122 @@ def parselod(data):
tensor.set_lod(lod)
return tensor

def _get_program_cache(self, feed, fetch_list):
program_cache_key = get_program_cache_key(feed, fetch_list)
program_cache = self.program_caches.get(program_cache_key, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return at this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return program_cache

def _add_program_cache(self, feed, fetch_list, program):
program_cache_key = get_program_cache_key(feed, fetch_list)
self.program_caches[program_cache_key] = program

def _add_feed_fetch_ops(self, program, feed, fetch_list, feed_var_name,
fetch_var_name):
tmp_program = program.clone()

global_block = tmp_program.global_block()

if feed_var_name in global_block.vars:
feed_var = global_block.var(feed_var_name)
else:
feed_var = global_block.create_var(
name=feed_var_name,
type=core.VarDesc.VarType.FEED_MINIBATCH,
persistable=True)

if fetch_var_name in global_block.vars:
fetch_var = global_block.var(fetch_var_name)
else:
fetch_var = global_block.create_var(
name=fetch_var_name,
type=core.VarDesc.VarType.FETCH_LIST,
persistable=True)

# prepend feed operators
if not has_feed_operators(global_block, feed, feed_var_name):
for i, name in enumerate(feed):
out = global_block.var(name)
global_block.prepend_op(
type='feed',
inputs={'X': [feed_var]},
outputs={'Out': [out]},
attrs={'col': i})

# append fetch_operators
if not has_fetch_operators(global_block, fetch_list, fetch_var_name):
for i, var in enumerate(fetch_list):
assert isinstance(var, Variable) or isinstance(var, str), (
"Wrong type for fetch_list[%s]: %s" % (i, type(var)))
global_block.append_op(
type='fetch',
inputs={'X': [var]},
outputs={'Out': [fetch_var]},
attrs={'col': i})

return tmp_program

def feed_data(self, program, feed, feed_var_name, scope):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private member?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# feed var to framework
for op in program.global_block().ops:
if op.desc.type() == 'feed':
feed_target_name = op.desc.output('Out')[0]
cur_feed = feed[feed_target_name]
if not isinstance(cur_feed, core.LoDTensor):
cur_feed = self.aslodtensor(cur_feed)
idx = op.desc.attr('col')
core.set_feed_variable(scope, cur_feed, feed_var_name, idx)
else:
break

def fetch_data(self, fetch_list, fetch_var_name, scope):
outs = [
core.get_fetch_variable(scope, fetch_var_name, i)
for i in xrange(len(fetch_list))
]
return outs

def prepare(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private member?

program=None,
feed=None,
fetch_list=None,
feed_var_name='feed',
fetch_var_name='fetch'):
if feed is None:
feed = {}
if not isinstance(feed, dict):
raise TypeError("feed should be a map")
if fetch_list is None:
fetch_list = []
if program is None:
program = default_main_program()

if not isinstance(program, Program):
raise TypeError()

program = self._add_feed_fetch_ops(
program=program,
feed=feed,
fetch_list=fetch_list,
feed_var_name=feed_var_name,
fetch_var_name=fetch_var_name)
handle = self.executor.prepare(program.desc, 0)
return PreparedContext(handle, program, fetch_list, feed_var_name,
fetch_var_name)

def run_prepared_ctx(self, ctx, feed=None, scope=None, return_numpy=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if scope is None:
scope = global_scope()

self.feed_data(ctx.program, feed, ctx.feed_var_name, scope)
self.executor.run_prepared_ctx(ctx.handle, scope, True, True)
outs = self.fetch_data(ctx.fetch_list, ctx.fetch_var_name, scope)
if return_numpy:
outs = as_numpy(outs)
return outs

def delete_prepared_ctx(self, ctx):
self.executor.delete_prepared_ctx(ctx.handle)

def run(self,
program=None,
feed=None,
Expand Down Expand Up @@ -268,7 +394,6 @@ def run(self,
raise TypeError("feed should be a map")
if fetch_list is None:
fetch_list = []

if program is None:
program = default_main_program()

Expand All @@ -278,79 +403,28 @@ def run(self,
if scope is None:
scope = global_scope()

program_cache = None
program_cache_key = get_program_cache_key(feed, fetch_list)

if use_program_cache:
# find program cache by cache_key
program_cache = self.program_caches.get(program_cache_key, None)
# TODO(qiao): Should check program_cache and program are exactly the same.
cached_program = self._get_program_cache(feed, fetch_list)
if cached_program is None:
cached_program = self._add_feed_fetch_ops(
program=program,
feed=feed,
fetch_list=fetch_list,
feed_var_name=feed_var_name,
fetch_var_name=fetch_var_name)
self._add_program_cache(feed, fetch_list, cached_program)
program = cached_program
else:
self.program_caches.pop(program_cache_key, None)

if program_cache is None:
program_cache = program.clone()

if use_program_cache:
self.program_caches[program_cache_key] = program_cache

global_block = program_cache.global_block()

if feed_var_name in global_block.vars:
feed_var = global_block.var(feed_var_name)
else:
feed_var = global_block.create_var(
name=feed_var_name,
type=core.VarDesc.VarType.FEED_MINIBATCH,
persistable=True)

if fetch_var_name in global_block.vars:
fetch_var = global_block.var(fetch_var_name)
else:
fetch_var = global_block.create_var(
name=fetch_var_name,
type=core.VarDesc.VarType.FETCH_LIST,
persistable=True)

# prepend feed operators
if not has_feed_operators(global_block, feed, feed_var_name):
for i, name in enumerate(feed):
out = global_block.var(name)
global_block.prepend_op(
type='feed',
inputs={'X': [feed_var]},
outputs={'Out': [out]},
attrs={'col': i})

# append fetch_operators
if not has_fetch_operators(global_block, fetch_list,
fetch_var_name):
for i, var in enumerate(fetch_list):
assert isinstance(var, Variable) or isinstance(var, str), (
"Wrong type for fetch_list[%s]: %s" % (i, type(var)))
global_block.append_op(
type='fetch',
inputs={'X': [var]},
outputs={'Out': [fetch_var]},
attrs={'col': i})

# feed var to framework
for op in program_cache.global_block().ops:
if op.desc.type() == 'feed':
feed_target_name = op.desc.output('Out')[0]
cur_feed = feed[feed_target_name]
if not isinstance(cur_feed, core.LoDTensor):
cur_feed = self.aslodtensor(cur_feed)
idx = op.desc.attr('col')
core.set_feed_variable(scope, cur_feed, feed_var_name, idx)
else:
break

self.executor.run(program_cache.desc, scope, 0, True, True)
outs = [
core.get_fetch_variable(scope, fetch_var_name, i)
for i in xrange(len(fetch_list))
]
program = self._add_feed_fetch_ops(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it no longer poping the cached program here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, add back.

program=program,
feed=feed,
fetch_list=fetch_list,
feed_var_name=feed_var_name,
fetch_var_name=fetch_var_name)

self.feed_data(program, feed, feed_var_name, scope)
self.executor.run(program.desc, scope, 0, True, True)
outs = self.fetch_data(fetch_list, fetch_var_name, scope)
if return_numpy:
outs = as_numpy(outs)
return outs
46 changes: 46 additions & 0 deletions python/paddle/fluid/tests/unittests/test_executor_and_mul.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,52 @@ def test_mul(self):
self.assertEqual((100, 100), out.shape)
self.assertTrue(numpy.allclose(out, numpy.dot(a_np, b_np)))

def test_prepare_then_run(self):
a = data(name='a', shape=[784], dtype='float32')
b = data(
name='b',
shape=[784, 100],
dtype='float32',
append_batch_size=False)
c = data(
name='c', shape=[100, 10], dtype='float32', append_batch_size=False)
out = mul(x=a, y=b)
place = core.CPUPlace()
a_np = numpy.random.random((100, 784)).astype('float32')
b_np = numpy.random.random((784, 100)).astype('float32')
c_np = numpy.random.random((100, 10)).astype('float32')
exe = Executor(place)
feed = {'a': a_np, 'b': b_np, 'c': c_np}

prepared_ctx = exe.prepare(feed=feed, fetch_list=[out])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you plan to expose prepare as a public member?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, prepare should be public, because a user should use it directly.

for _ in range(2):
outs = exe.run_prepared_ctx(ctx=prepared_ctx, feed=feed)
out_np = outs[0]
self.assertEqual((100, 100), out_np.shape)
self.assertTrue(numpy.allclose(out_np, numpy.dot(a_np, b_np)))

new_out = mul(x=out, y=c)
new_prepared_ctx = exe.prepare(feed=feed, fetch_list=[new_out])

handle_equal = (prepared_ctx.handle == new_prepared_ctx.handle)
self.assertFalse(handle_equal, "handle should not be equal")

for _ in range(2):
outs = exe.run_prepared_ctx(ctx=new_prepared_ctx, feed=feed)
out_np = outs[0]
self.assertEqual((100, 10), out_np.shape)
self.assertTrue(
numpy.allclose(out_np, numpy.dot(numpy.dot(a_np, b_np), c_np)))

for _ in range(2):
outs = exe.run_prepared_ctx(ctx=prepared_ctx, feed=feed)
out_np = outs[0]
self.assertEqual((100, 100), out_np.shape)
self.assertTrue(numpy.allclose(out_np, numpy.dot(a_np, b_np)))

exe.delete_prepared_ctx(prepared_ctx)
exe.delete_prepared_ctx(new_prepared_ctx)


if __name__ == '__main__':
unittest.main()