Skip to content

Commit

Permalink
executor framework
Browse files Browse the repository at this point in the history
  • Loading branch information
LiYuRio committed Nov 2, 2021
1 parent 249081b commit d64028c
Show file tree
Hide file tree
Showing 14 changed files with 295 additions and 6 deletions.
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
if(NOT WITH_PSCORE)
add_subdirectory(fleet_executor)
return()
endif()

Expand All @@ -15,6 +16,7 @@ add_subdirectory(service)
add_subdirectory(table)
add_subdirectory(test)
add_subdirectory(index_dataset)
add_subdirectory(fleet_executor)

get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS)

Expand Down
6 changes: 6 additions & 0 deletions paddle/fluid/distributed/fleet_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
proto_library(fleet_executor_desc_proto SRCS fleet_executor_desc.proto)
cc_library(fleet_executor SRCS fleet_executor.cc DEPS fleet_executor_desc_proto)

if(WITH_PYTHON)
py_proto_compile(fleet_executor_desc_py_proto SRCS fleet_executor_desc.proto)
endif()
43 changes: 43 additions & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/runtime_graph.h"
#include "paddle/fluid/framework/program_desc.h"

namespace paddle {
namespace distributed {

FleetExecutor::FleetExecutor(const std::string& exe_desc_str) {
// Initialize Executor
}

FleetExecutor::~FleetExecutor() {
// Destroy Executor
}

void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) {
// Compile and Initialize
}

void FleetExecutor::Run() {
// Run
}

void FleetExecutor::Release() {
// Release
}

} // namespace distributed
} // namespace paddle
44 changes: 44 additions & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <memory>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace framework {
class ProgramDesc;
}

namespace distributed {
class RuntimeGraph;

class FleetExecutor final {
public:
FleetExecutor() = delete;
FleetExecutor(const std::string& exe_desc_str);
~FleetExecutor();
void Init(const paddle::framework::ProgramDesc& program_desc);
void Run();
void Release();

private:
DISABLE_COPY_AND_ASSIGN(FleetExecutor);
FleetExecutorDesc exe_desc_;
std::unique_ptr<RuntimeGraph> runtime_graph_;
};

} // namespace distributed
} // namespace paddle
21 changes: 21 additions & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto2";
package paddle.distributed;

message FleetExecutorDesc {
optional string grain = 1 [ default = "coarse" ];
repeated string addrs = 2; // "ip:port" of all ranks
}
34 changes: 34 additions & 0 deletions paddle/fluid/distributed/fleet_executor/runtime_graph.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

namespace paddle {
namespace framework {
class ProgramDesc;
}

namespace distributed {

class RuntimeGraph final {
public:
RuntimeGraph() = default;
explicit RuntimeGraph(const paddle::framework::ProgramDesc &program) {}
~RuntimeGraph() = default;

DISABLE_COPY_AND_ASSIGN(RuntimeGraph);
};

} // namespace distributed
} // namespace paddle
13 changes: 8 additions & 5 deletions paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ if(WITH_PYTHON)
COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto
COMMENT "Copy generated python proto into directory paddle/fluid/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
add_custom_target(fleet_executor_proto_init ALL DEPENDS fleet_executor_desc_py_proto
COMMAND cp ${PADDLE_BINARY_DIR}/paddle/fluid/distributed/fleet_executor/fleet_executor_*.py ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto
COMMENT "Copy generated python proto into directory paddle/distributed/fleet/proto.")
else(NOT WIN32)
string(REPLACE "/" "\\" proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/")
string(REPLACE "/" "\\" fleet_proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto/")
Expand Down Expand Up @@ -284,7 +287,7 @@ if(WITH_DISTRIBUTE)
fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer
lod_rank_table feed_fetch_method collective_helper ${GLOB_DISTRIBUTE_DEPS}
graph_to_program_pass variable_helper data_feed_proto timer monitor
heter_service_proto ${BRPC_DEP})
heter_service_proto fleet_executor ${BRPC_DEP})
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(DISTRIBUTE_COMPILE_FLAGS
Expand All @@ -303,7 +306,7 @@ if(WITH_DISTRIBUTE)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor heter_service_proto fleet)
graph_to_program_pass variable_helper timer monitor heter_service_proto fleet fleet_executor)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(multi_trainer.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
Expand All @@ -317,7 +320,7 @@ if(WITH_DISTRIBUTE)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)
graph_to_program_pass variable_helper timer monitor fleet_executor)
endif()
elseif(WITH_PSLIB)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
Expand All @@ -337,7 +340,7 @@ elseif(WITH_PSLIB)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor ${BRPC_DEP})
graph_to_program_pass variable_helper timer monitor fleet_executor ${BRPC_DEP})
else()
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
Expand All @@ -347,7 +350,7 @@ else()
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)
graph_to_program_pass variable_helper timer monitor fleet_executor)
endif()

target_link_libraries(executor while_op_helper executor_gc_helper recurrent_op_helper conditional_block_op_helper)
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/pybind/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set(PYBIND_DEPS pybind python proto_desc memory executor fleet_wrapper box_wrapp
feed_fetch_method pass generate_pass pass_builder parallel_executor profiler layer tracer engine scope_pool
analysis_predictor imperative_profiler imperative_flag save_load_util dlpack_tensor device_context
gloo_wrapper infer_io_utils heter_wrapper generator op_version_registry ps_gpu_wrapper custom_operator
cost_model cuda_graph_with_memory_pool)
cost_model cuda_graph_with_memory_pool fleet_executor)

if (WITH_PSCORE)
set(PYBIND_DEPS ${PYBIND_DEPS} ps_service)
Expand Down Expand Up @@ -61,6 +61,7 @@ set(PYBIND_SRCS
imperative.cc
ir.cc
bind_cost_model.cc
bind_fleet_executor.cc
inference_api.cc
compatible.cc
io.cc
Expand Down
34 changes: 34 additions & 0 deletions paddle/fluid/pybind/bind_fleet_executor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/pybind/bind_fleet_executor.h"
#include <pybind11/stl.h>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/framework/program_desc.h"

namespace py = pybind11;

namespace paddle {
namespace pybind {

using paddle::distributed::FleetExecutor;

void BindFleetExecutor(py::module* m) {
py::class_<FleetExecutor>(*m, "FleetExecutor")
.def(py::init<const std::string&>())
.def("init", &FleetExecutor::Init)
.def("run", &FleetExecutor::Run);
}
} // namespace pybind
} // namespace paddle
25 changes: 25 additions & 0 deletions paddle/fluid/pybind/bind_fleet_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <pybind11/pybind11.h>

namespace paddle {
namespace pybind {

void BindFleetExecutor(pybind11::module* m);

} // namespace pybind
} // namespace paddle
2 changes: 2 additions & 0 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ limitations under the License. */
#include "paddle/fluid/pybind/ascend_wrapper_py.h"
#endif
#include "paddle/fluid/pybind/bind_cost_model.h"
#include "paddle/fluid/pybind/bind_fleet_executor.h"
#include "paddle/fluid/pybind/box_helper_py.h"
#include "paddle/fluid/pybind/compatible.h"
#include "paddle/fluid/pybind/const_value.h"
Expand Down Expand Up @@ -2197,6 +2198,7 @@ All parameter, weight, gradient are variables in Paddle.
BindConstValue(&m);
BindGlobalValueGetterSetter(&m);
BindProcessMeshDesc(&m);
BindFleetExecutor(&m);

py::class_<framework::LoDRankTable>(m, "LodRankTable")
.def("items", [](framework::LoDRankTable &table) {
Expand Down
30 changes: 30 additions & 0 deletions python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,11 @@ def _run_impl(self, program, feed, fetch_list, feed_var_name,
fetch_list = self._check_fetch_list(fetch_list)

if isinstance(program, Program) and program._pipeline_opt:
if "fleet_opt" in program._pipeline_opt:
return self._run_using_fleet_executor(
program,
fetch_list=fetch_list,
use_program_cache=use_program_cache)
if "startup_program" in program._pipeline_opt:
program = program._pipeline_opt["startup_program"]
else:
Expand Down Expand Up @@ -1820,6 +1825,31 @@ def _get_real_program_fetch_list():

return ctx

def _run_using_fleet_executor(self,
program=None,
dataset=None,
scope=None,
thread=0,
is_infer=False,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None,
use_program_cache=False):
scope, real_fetch_list, trainer_instance = \
self._prepare_pipeline_ctx(program, dataset, scope, thread,
is_infer, debug, fetch_list, fetch_info,
print_period, fetch_handler,
use_program_cache)
from ..distributed.fleet.proto import fleet_executor_desc_pb2
from google.protobuf import text_format
fleet_exe_desc = fleet_executor_desc_pb2.FleetExecutorDesc()
fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString())
fleet_exe.init(program._pipeline_opt["section_program"].desc)
fleet_exe.run()
return None

def _run_pipeline(self,
program=None,
dataset=None,
Expand Down
1 change: 1 addition & 0 deletions python/paddle/fluid/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_raw_program_optimizer)
LIST(REMOVE_ITEM TEST_OPS test_fleet_gradient_scale)
LIST(REMOVE_ITEM TEST_OPS test_disable_signal_handler)
LIST(REMOVE_ITEM TEST_OPS test_fleet_executor)
endif()

# Temporally disable test_deprecated_decorator
Expand Down
Loading

1 comment on commit d64028c

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.