From 10d8d6b6e374996a2e92d87760680d3e7c897667 Mon Sep 17 00:00:00 2001 From: Yuang Liu Date: Wed, 24 Nov 2021 21:55:39 +0800 Subject: [PATCH] [fleet_executor] fix message bus bug (#37507) --- paddle/fluid/distributed/fleet_executor/message_bus.cc | 3 +-- paddle/fluid/distributed/fleet_executor/message_bus.h | 1 + paddle/fluid/distributed/fleet_executor/runtime_graph.cc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index 23e1b2a31d88b..4a41f69411836 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -111,8 +111,7 @@ void MessageBus::ListenPort() { #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) // function keep listen the port and handle the message - InterceptorMessageServiceImpl interceptor_message_service; - PADDLE_ENFORCE_EQ(server_.AddService(&interceptor_message_service, + PADDLE_ENFORCE_EQ(server_.AddService(&interceptor_message_service_, brpc::SERVER_DOESNT_OWN_SERVICE), 0, platform::errors::Unavailable( "Message bus: init brpc service error.")); diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.h b/paddle/fluid/distributed/fleet_executor/message_bus.h index 9212a93df425f..03bb7ed81a0c7 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.h +++ b/paddle/fluid/distributed/fleet_executor/message_bus.h @@ -89,6 +89,7 @@ class MessageBus final { #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) + InterceptorMessageServiceImpl interceptor_message_service_; // brpc server brpc::Server server_; #endif diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc index c9455ffef492d..3a76bd43f9d55 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc @@ -142,7 +142,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { int pipeline_stage = coord.pp_idx; int64_t num_pipeline_stages = exe_desc_.pp_degree(); // TODO(fleet_executor dev): start up steps should be a config `num_slots` - int64_t start_up_steps = num_pipeline_stages - pipeline_stage - 1; + int64_t start_up_steps = num_pipeline_stages - pipeline_stage; int64_t num_micro_batches = exe_desc_.num_micro_batches(); int64_t task_id = cur_rank * functionality_order.size(); for (std::size_t i = 0; i < functionality_order.size(); ++i) {