Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add executor.prepare #9022

Merged
merged 18 commits into from
Mar 20, 2018
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 11 additions & 17 deletions paddle/fluid/framework/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ limitations under the License. */

#include "paddle/fluid/framework/executor.h"

#include <set>

#include "gflags/gflags.h"
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/framework/feed_fetch_method.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
#include "paddle/fluid/framework/lod_rank_table.h"
#include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/framework/op_registry.h"
Expand All @@ -40,14 +36,13 @@ namespace {
int kProgramId = -1;
} // namespace

struct ExecutorPrepareContext {
ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id)
: prog_(prog), block_id_(block_id) {}
ExecutorPrepareContext::ExecutorPrepareContext(
const framework::ProgramDesc& prog, size_t block_id)
: prog_(prog), block_id_(block_id) {}

const framework::ProgramDesc& prog_;
size_t block_id_;
std::vector<std::unique_ptr<OperatorBase>> ops_;
};
ExecutorPrepareContext::~ExecutorPrepareContext() {
VLOG(5) << "destroy ExecutorPrepareContext";
}

Executor::Executor(const platform::Place& place) : place_(place) {}

Expand Down Expand Up @@ -101,9 +96,8 @@ static void CheckTensorNANOrInf(const std::string& name,
void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
bool create_local_scope, bool create_vars) {
platform::RecordBlock b(block_id);
auto* ctx = Prepare(pdesc, block_id);
RunPreparedContext(ctx, scope, create_local_scope, create_vars);
delete ctx;
auto ctx = Prepare(pdesc, block_id);
RunPreparedContext(ctx.get(), scope, create_local_scope, create_vars);
}

// Check whether the block already has feed operators and feed_holder.
Expand Down Expand Up @@ -266,15 +260,15 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
delete copy_program;
}

ExecutorPrepareContext* Executor::Prepare(const ProgramDesc& program,
int block_id) {
std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
const ProgramDesc& program, int block_id) {
auto* ctx = new ExecutorPrepareContext(program, block_id);
PADDLE_ENFORCE_LT(static_cast<size_t>(block_id), program.Size());
auto& block = program.Block(block_id);
for (auto& op_desc : block.AllOps()) {
ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc));
}
return ctx;
return std::unique_ptr<ExecutorPrepareContext>(ctx);
}

void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
Expand Down
15 changes: 12 additions & 3 deletions paddle/fluid/framework/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,16 @@ limitations under the License. */

namespace paddle {
namespace framework {
struct ExecutorPrepareContext;

struct ExecutorPrepareContext {
ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id);
~ExecutorPrepareContext();

const framework::ProgramDesc& prog_;
size_t block_id_;
std::vector<std::unique_ptr<OperatorBase>> ops_;
};

class Executor {
public:
// TODO(dzhwinter) : Do not rely on this function, it will be removed
Expand All @@ -47,8 +56,8 @@ class Executor {
const std::string& feed_holder_name = "feed",
const std::string& fetch_holder_name = "fetch");

static ExecutorPrepareContext* Prepare(const ProgramDesc& program,
int block_id);
static std::unique_ptr<ExecutorPrepareContext> Prepare(
const ProgramDesc& program, int block_id);

void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope = true,
Expand Down
4 changes: 0 additions & 4 deletions paddle/fluid/platform/profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,11 @@ struct RecordBlock {
private:
std::string name_;
uint64_t start_ns_;
int block_id_;
};

struct RecordThread {
explicit RecordThread(int thread_id);
~RecordThread();

private:
uint64_t start_ns_;
};

// Return the event list of all threads. Assumed the returned value calls
Expand Down
13 changes: 13 additions & 0 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,21 @@ All parameter, weight, gradient are variables in Paddle.
self.set_falsenet(net.Clone());
});

py::class_<ExecutorPrepareContext>(m, "ExecutorPrepareContext");

py::class_<framework::Executor>(m, "Executor")
.def(py::init<const platform::Place &>())
.def_static("prepare",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_prepare?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the visibility can be controlled on the Python side.

[](const ProgramDesc &pdesc,
int block_id) -> std::unique_ptr<ExecutorPrepareContext> {
return Executor::Prepare(pdesc, block_id);
})
.def("run_prepared_ctx",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_run_prepared_ctx

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

[](Executor &self, ExecutorPrepareContext *handle, Scope *scope,
bool create_local_scope, bool create_vars) {
self.RunPreparedContext(handle, scope, create_local_scope,
create_vars);
})
.def("run",
(void (Executor::*)(const ProgramDesc &, Scope *, int, bool, bool)) &
Executor::Run);
Expand Down
215 changes: 143 additions & 72 deletions python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ def to_name_str(var):
return str(feed_var_names + fetch_var_names)


class PreparedContext(object):
def __init__(self, handle, program, fetch_list, feed_var_name,
fetch_var_name):
self.handle = handle
self.program = program
self.fetch_list = fetch_list
self.feed_var_name = feed_var_name
self.fetch_var_name = fetch_var_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all of them public members?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this class if it's no longer used?



class Executor(object):
def __init__(self, places):
if not isinstance(places, list) and not isinstance(places, tuple):
Expand Down Expand Up @@ -235,6 +245,119 @@ def parselod(data):
tensor.set_lod(lod)
return tensor

def _get_program_cache(self, feed, fetch_list):
program_cache_key = get_program_cache_key(feed, fetch_list)
program_cache = self.program_caches.get(program_cache_key, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return at this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return program_cache

def _add_program_cache(self, feed, fetch_list, program):
program_cache_key = get_program_cache_key(feed, fetch_list)
self.program_caches[program_cache_key] = program

def _add_feed_fetch_ops(self, program, feed, fetch_list, feed_var_name,
fetch_var_name):
tmp_program = program.clone()

global_block = tmp_program.global_block()

if feed_var_name in global_block.vars:
feed_var = global_block.var(feed_var_name)
else:
feed_var = global_block.create_var(
name=feed_var_name,
type=core.VarDesc.VarType.FEED_MINIBATCH,
persistable=True)

if fetch_var_name in global_block.vars:
fetch_var = global_block.var(fetch_var_name)
else:
fetch_var = global_block.create_var(
name=fetch_var_name,
type=core.VarDesc.VarType.FETCH_LIST,
persistable=True)

# prepend feed operators
if not has_feed_operators(global_block, feed, feed_var_name):
for i, name in enumerate(feed):
out = global_block.var(name)
global_block.prepend_op(
type='feed',
inputs={'X': [feed_var]},
outputs={'Out': [out]},
attrs={'col': i})

# append fetch_operators
if not has_fetch_operators(global_block, fetch_list, fetch_var_name):
for i, var in enumerate(fetch_list):
assert isinstance(var, Variable) or isinstance(var, str), (
"Wrong type for fetch_list[%s]: %s" % (i, type(var)))
global_block.append_op(
type='fetch',
inputs={'X': [var]},
outputs={'Out': [fetch_var]},
attrs={'col': i})

return tmp_program

def feed_data(self, program, feed, feed_var_name, scope):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private member?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# feed var to framework
for op in program.global_block().ops:
if op.desc.type() == 'feed':
feed_target_name = op.desc.output('Out')[0]
cur_feed = feed[feed_target_name]
if not isinstance(cur_feed, core.LoDTensor):
cur_feed = self.aslodtensor(cur_feed)
idx = op.desc.attr('col')
core.set_feed_variable(scope, cur_feed, feed_var_name, idx)
else:
break

def fetch_data(self, fetch_list, fetch_var_name, scope):
outs = [
core.get_fetch_variable(scope, fetch_var_name, i)
for i in xrange(len(fetch_list))
]
return outs

def prepare(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private member?

program=None,
feed=None,
fetch_list=None,
feed_var_name='feed',
fetch_var_name='fetch'):
if feed is None:
feed = {}
if not isinstance(feed, dict):
raise TypeError("feed should be a map")
if fetch_list is None:
fetch_list = []
if program is None:
program = default_main_program()

if not isinstance(program, Program):
raise TypeError()

program = self._add_feed_fetch_ops(
program=program,
feed=feed,
fetch_list=fetch_list,
feed_var_name=feed_var_name,
fetch_var_name=fetch_var_name)
handle = self.executor.prepare(program.desc, 0)
return PreparedContext(handle, program, fetch_list, feed_var_name,
fetch_var_name)

def run_prepared_ctx(self, ctx, feed=None, scope=None, return_numpy=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if scope is None:
scope = global_scope()

self.feed_data(ctx.program, feed, ctx.feed_var_name, scope)
self.executor.run_prepared_ctx(ctx.handle, scope, True, True)
outs = self.fetch_data(ctx.fetch_list, ctx.fetch_var_name, scope)
if return_numpy:
outs = as_numpy(outs)
return outs

def run(self,
program=None,
feed=None,
Expand Down Expand Up @@ -268,7 +391,6 @@ def run(self,
raise TypeError("feed should be a map")
if fetch_list is None:
fetch_list = []

if program is None:
program = default_main_program()

Expand All @@ -278,79 +400,28 @@ def run(self,
if scope is None:
scope = global_scope()

program_cache = None
program_cache_key = get_program_cache_key(feed, fetch_list)

if use_program_cache:
# find program cache by cache_key
program_cache = self.program_caches.get(program_cache_key, None)
# TODO(qiao): Should check program_cache and program are exactly the same.
cached_program = self._get_program_cache(feed, fetch_list)
if cached_program is None:
cached_program = self._add_feed_fetch_ops(
program=program,
feed=feed,
fetch_list=fetch_list,
feed_var_name=feed_var_name,
fetch_var_name=fetch_var_name)
self._add_program_cache(feed, fetch_list, cached_program)
program = cached_program
else:
self.program_caches.pop(program_cache_key, None)

if program_cache is None:
program_cache = program.clone()

if use_program_cache:
self.program_caches[program_cache_key] = program_cache

global_block = program_cache.global_block()

if feed_var_name in global_block.vars:
feed_var = global_block.var(feed_var_name)
else:
feed_var = global_block.create_var(
name=feed_var_name,
type=core.VarDesc.VarType.FEED_MINIBATCH,
persistable=True)

if fetch_var_name in global_block.vars:
fetch_var = global_block.var(fetch_var_name)
else:
fetch_var = global_block.create_var(
name=fetch_var_name,
type=core.VarDesc.VarType.FETCH_LIST,
persistable=True)

# prepend feed operators
if not has_feed_operators(global_block, feed, feed_var_name):
for i, name in enumerate(feed):
out = global_block.var(name)
global_block.prepend_op(
type='feed',
inputs={'X': [feed_var]},
outputs={'Out': [out]},
attrs={'col': i})

# append fetch_operators
if not has_fetch_operators(global_block, fetch_list,
fetch_var_name):
for i, var in enumerate(fetch_list):
assert isinstance(var, Variable) or isinstance(var, str), (
"Wrong type for fetch_list[%s]: %s" % (i, type(var)))
global_block.append_op(
type='fetch',
inputs={'X': [var]},
outputs={'Out': [fetch_var]},
attrs={'col': i})

# feed var to framework
for op in program_cache.global_block().ops:
if op.desc.type() == 'feed':
feed_target_name = op.desc.output('Out')[0]
cur_feed = feed[feed_target_name]
if not isinstance(cur_feed, core.LoDTensor):
cur_feed = self.aslodtensor(cur_feed)
idx = op.desc.attr('col')
core.set_feed_variable(scope, cur_feed, feed_var_name, idx)
else:
break

self.executor.run(program_cache.desc, scope, 0, True, True)
outs = [
core.get_fetch_variable(scope, fetch_var_name, i)
for i in xrange(len(fetch_list))
]
program = self._add_feed_fetch_ops(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it no longer poping the cached program here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, add back.

program=program,
feed=feed,
fetch_list=fetch_list,
feed_var_name=feed_var_name,
fetch_var_name=fetch_var_name)

self.feed_data(program, feed, feed_var_name, scope)
self.executor.run(program.desc, scope, 0, True, True)
outs = self.fetch_data(fetch_list, fetch_var_name, scope)
if return_numpy:
outs = as_numpy(outs)
return outs
Loading