From 2386db8717f79b962240a2286402844d4f216f5f Mon Sep 17 00:00:00 2001 From: zhangbo9674 <82555433+zhangbo9674@users.noreply.github.com> Date: Mon, 14 Aug 2023 12:56:13 +0800 Subject: [PATCH] [IR] Support multi-thread run && delete unused code of new_ir interpreter (#56148) * add code * fix bug * fix bug * delete unused code * refine code * fix bug * fix bug * fix bug * fix bug * fix bug --- .../interpreter/dependency_builder.cc | 6 + .../interpreter/dependency_builder.h | 2 + .../interpreter/stream_analyzer.cc | 2 +- .../interpreter/stream_analyzer.h | 2 +- .../new_executor/interpreter_base_impl.h | 12 +- .../framework/new_executor/interpretercore.cc | 5 - .../framework/new_executor/interpretercore.h | 3 - .../new_executor/new_ir_interpreter.cc | 1564 +++-------------- .../new_executor/new_ir_interpreter.h | 78 +- .../new_executor/program_interpreter.cc | 17 +- .../new_executor/program_interpreter.h | 10 +- paddle/phi/core/flags.cc | 18 +- .../standalone_executor_new_ir_test.cc | 4 +- test/cpp/prim/test_vjp.cc | 6 +- 14 files changed, 348 insertions(+), 1381 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpreter/dependency_builder.cc b/paddle/fluid/framework/new_executor/interpreter/dependency_builder.cc index 639885b80e534..de77780abc3e5 100644 --- a/paddle/fluid/framework/new_executor/interpreter/dependency_builder.cc +++ b/paddle/fluid/framework/new_executor/interpreter/dependency_builder.cc @@ -664,6 +664,12 @@ void NewIrDependencyBuilder::BuildDownstreamMap() { } } +void NewIrDependencyBuilder::ShareDependencyFrom( + const NewIrDependencyBuilder& src) { + std::tie(op_downstream_map_, op_happens_before_) = src.GetDependency(); + is_build_ = true; +} + } // namespace interpreter } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpreter/dependency_builder.h b/paddle/fluid/framework/new_executor/interpreter/dependency_builder.h index 2593b11a2e48a..4e3fb8d1b2450 100644 --- a/paddle/fluid/framework/new_executor/interpreter/dependency_builder.h +++ b/paddle/fluid/framework/new_executor/interpreter/dependency_builder.h @@ -114,6 +114,8 @@ class NewIrDependencyBuilder : public DependencyBuilder { void BuildDownstreamMap(); + void ShareDependencyFrom(const NewIrDependencyBuilder& src); + private: std::vector instructions_; // not_owned }; diff --git a/paddle/fluid/framework/new_executor/interpreter/stream_analyzer.cc b/paddle/fluid/framework/new_executor/interpreter/stream_analyzer.cc index a83f02a861731..1ee621624dbff 100644 --- a/paddle/fluid/framework/new_executor/interpreter/stream_analyzer.cc +++ b/paddle/fluid/framework/new_executor/interpreter/stream_analyzer.cc @@ -684,7 +684,7 @@ platform::DeviceType NewIrStreamAnalyzer::GetWaiterType( } } -void NewIrStreamAnalyzer::ShareEventInfoFrom(const StreamAnalyzer& src) { +void NewIrStreamAnalyzer::ShareEventInfoFrom(const NewIrStreamAnalyzer& src) { event_info_ = src.GetEventInfo(); is_event_info_build_ = true; } diff --git a/paddle/fluid/framework/new_executor/interpreter/stream_analyzer.h b/paddle/fluid/framework/new_executor/interpreter/stream_analyzer.h index 8c7d2d5b6ddbc..f8ba8103620a2 100644 --- a/paddle/fluid/framework/new_executor/interpreter/stream_analyzer.h +++ b/paddle/fluid/framework/new_executor/interpreter/stream_analyzer.h @@ -138,7 +138,7 @@ class NewIrStreamAnalyzer { platform::DeviceType GetWaiterType( const paddle::framework::InstructionBase* instr) const; - void ShareEventInfoFrom(const StreamAnalyzer& src); + void ShareEventInfoFrom(const NewIrStreamAnalyzer& src); std::shared_ptr< std::map>>> diff --git a/paddle/fluid/framework/new_executor/interpreter_base_impl.h b/paddle/fluid/framework/new_executor/interpreter_base_impl.h index 586aef975a643..43d70883a4882 100644 --- a/paddle/fluid/framework/new_executor/interpreter_base_impl.h +++ b/paddle/fluid/framework/new_executor/interpreter_base_impl.h @@ -72,12 +72,6 @@ class InterpreterBaseImpl { virtual paddle::framework::FetchList Run( const std::vector& feed_names, bool need_fetch = true) = 0; - // NOTE(zhangbo): This interface is only used for temporary testing and only - // for testing during the iteration process of the new IR access actuator - // version. It will be deleted in the future. - virtual paddle::framework::FetchList BetaRun( - const std::vector& feed_names, bool need_fetch = true) = 0; - virtual void ShareWorkQueueFrom(InterpreterBaseImpl* src) = 0; virtual void ShareBuildResultsFrom(const InterpreterBaseImpl& src) = 0; @@ -107,6 +101,12 @@ class InterpreterBaseImpl { virtual const interpreter::StreamAnalyzer& GetStreamAnalyzer() const = 0; + virtual const interpreter::NewIrDependencyBuilder& GetNewIrDependencyBuilder() + const = 0; + + virtual const interpreter::NewIrStreamAnalyzer& GetNewIrStreamAnalyzer() + const = 0; + virtual bool IsSharedResultsBuild() const = 0; }; diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 9ee34fcc39c11..384c668ed2e56 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -74,11 +74,6 @@ FetchList InterpreterCore::Run(const std::vector& feed_names, return impl_->Run(feed_names, need_fetch); } -FetchList InterpreterCore::BetaRun(const std::vector& feed_names, - bool need_fetch) { - return impl_->BetaRun(feed_names, need_fetch); -} - void InterpreterCore::ShareWorkQueueFrom(std::shared_ptr src) { impl_->ShareWorkQueueFrom(const_cast(src->Impl())); } diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index 66f998bb557f6..191f0b92eb8b2 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -52,9 +52,6 @@ class InterpreterCore { paddle::framework::FetchList Run(const std::vector& feed_names, bool need_fetch = true); - paddle::framework::FetchList BetaRun( - const std::vector& feed_names, bool need_fetch = true); - void ShareWorkQueueFrom(std::shared_ptr src); void ShareBuildResultsFrom(std::shared_ptr src); diff --git a/paddle/fluid/framework/new_executor/new_ir_interpreter.cc b/paddle/fluid/framework/new_executor/new_ir_interpreter.cc index 836cc490221e2..ead957f7e87ec 100644 --- a/paddle/fluid/framework/new_executor/new_ir_interpreter.cc +++ b/paddle/fluid/framework/new_executor/new_ir_interpreter.cc @@ -43,9 +43,7 @@ PHI_DECLARE_bool(enable_new_ir_in_executor); -PHI_DECLARE_bool(enable_new_ir_in_executor_beta_run); - -PHI_DECLARE_bool(enable_new_ir_in_executor_loop_run); +PHI_DECLARE_bool(enable_new_ir_in_executor_trace_run); namespace paddle { namespace framework { @@ -57,7 +55,6 @@ NewIRInterpreter::NewIRInterpreter( framework::Scope* scope, const ExecutionConfig& execution_config) : place_(place), - stream_analyzer_(place), execution_config_(execution_config), var_scope_(scope), scope_(scope), @@ -74,6 +71,8 @@ NewIRInterpreter::NewIRInterpreter( exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught); completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion); + dependecy_count_ = std::make_shared>(); + if (!FLAGS_new_executor_use_local_scope) { execution_config_.create_local_scope = false; } @@ -90,17 +89,6 @@ NewIRInterpreter::NewIRInterpreter( ir_program_->block()->size()); execution_config_.Log(/*log_level=*/8); - instruction_scheduling_priority_less = [this](size_t lhs, size_t rhs) { - SchedulingPriority lhs_scheduling_priority = - vec_instruction_[lhs].GetSchedulingPriority(); - SchedulingPriority rhs_scheduling_priority = - vec_instruction_[rhs].GetSchedulingPriority(); - if (lhs_scheduling_priority == rhs_scheduling_priority) { - return lhs < rhs; - } - return lhs_scheduling_priority > rhs_scheduling_priority; - }; - ir_instruction_scheduling_priority_less = [this](size_t lhs, size_t rhs) { SchedulingPriority lhs_scheduling_priority = vec_instruction_base_[lhs]->GetSchedulingPriority(); @@ -128,163 +116,6 @@ NewIRInterpreter::~NewIRInterpreter() { #endif } -void NewIRInterpreter::RunImpl() { - // lazy initialization of gc, do not create gc is the program only run once - if (!gc_) { - gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_); - } - - interpreter::ResetAtomicGuard guard(&deps_, &refs_); - - // if ((execution_config_.used_for_jit || execution_config_.used_for_cinn) - // && - // (sync_op_num_ == 0)) { - VLOG(4) << "Tracing Instruction List"; - - TraceInstructionList(vec_instruction_); - - // } else { - // VLOG(4) << "Non-tracing"; - // // For the program that only run once, it is no need to - // // create work_queue, so the async_work_queue_ is created - // // until the second step run. - // async_work_queue_ = GetWorkQueue(); - // ExecuteInstructionList(vec_instruction_); - // } - // #ifdef PADDLE_WITH_CUSTOM_DEVICE - // if (platform::is_custom_place(place_)) { - // platform::DeviceContextPool::Instance().Get(place_)->Wait(); - // } - // #endif -} - -FetchList NewIRInterpreter::Run( - const std::vector& feed_names, - const std::vector& feed_tensors) { - SetDeviceId(place_); - CheckCUDAGraphBeforeRun(feed_names); - -#ifdef PADDLE_WITH_DNNL - platform::AttachPointerHashToMKLDNNKey(this, place_); -#endif - - bool is_build = is_build_; - Prepare(feed_names, feed_tensors, is_build); - - if (is_build) { - RunImpl(); - } - - if (HasLocalScope()) { - ClearLoDTensorArrayInLocalScope(); - } - - // return Fetch Tensors - auto* fetch_var = local_scope_->FindVar(interpreter::kFetchVarName); - if (fetch_var) { - auto fetch_list = std::move(*fetch_var->GetMutable()); -#ifdef PADDLE_WITH_CUDA - if (platform::IsCUDAGraphCapturing()) { - PADDLE_ENFORCE_EQ(fetch_list.empty(), - true, - platform::errors::InvalidArgument( - "Cannot fetch data when using CUDA Graph.")); - } -#endif - return fetch_list; - } else { - return {}; - } -} - -FetchList NewIRInterpreter::Run(const std::vector& feed_names, - bool need_fetch) { - if (FLAGS_enable_new_ir_in_executor_beta_run) { - LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in BetaRun mode."; - return BetaRun(feed_names, need_fetch); - } - - SetDeviceId(place_); - CheckCUDAGraphBeforeRun(feed_names); - -#ifdef PADDLE_WITH_DNNL - platform::AttachPointerHashToMKLDNNKey(this, place_); -#endif - - if (!is_build_) { - LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in OldRun mode."; - std::stringstream ss; - ss << this; - ::ir::BuildScope(*ir_program_->block(), - InnerScope(), - ss.str(), - &value_2_var_name_, - &variable_2_var_name_, - &var_name_2_id_, - &variable_list_); - VLOG(4) << DebugValueInfo(); - - SolvePersisableVarNames(); - - std::vector op_func_nodes; - interpreter::BuildOpFuncList(place_, - ir_program_->block(), - &op_func_nodes, - scope_, - local_scope_, - value_2_var_name_, - execution_config_); - // SetFeedVarsInplaceSkip(feed_names); - // convert vec func_list to graph - Convert(&op_func_nodes); - UpdateSyncOpNum(); - if (static_build_) { - VLOG(4) << "RUN impl"; - RunImpl(); - } - is_build_ = true; - } else { - RunImpl(); - } - - if (HasLocalScope()) { - ClearLoDTensorArrayInLocalScope(); - } - - // return Fetch Tensors - Scope* inner_scope = InnerScope(); - if (FLAGS_enable_new_ir_in_executor) { - framework::FetchList fetch_res; - - if (need_fetch) { - for (auto& var_name : fetch_var_names_) { - auto* var = inner_scope->FindVar(var_name); - VLOG(0) << "fetch " << var_name << "[" << var << "]"; - fetch_res.push_back(var->Get()); - } - } - VLOG(4) << "get fetch list size: " << fetch_res.size(); - return fetch_res; - } else { - auto* fetch_var = inner_scope->FindVar(interpreter::kFetchVarName); - if (fetch_var && need_fetch) { - auto fetch_list = - std::move(*fetch_var->GetMutable()); -#ifdef PADDLE_WITH_CUDA - if (platform::IsCUDAGraphCapturing()) { - PADDLE_ENFORCE_EQ(fetch_list.empty(), - true, - platform::errors::InvalidArgument( - "Cannot fetch data when using CUDA Graph.")); - } -#endif - return fetch_list; - } else { - return {}; - } - } -} - int NewIRInterpreter::GetIdByName(const std::string& name) const { auto it = var_name_2_id_.find(name); if (it != var_name_2_id_.end()) { @@ -294,7 +125,8 @@ int NewIRInterpreter::GetIdByName(const std::string& name) const { } void NewIRInterpreter::SetCopyProgram(std::shared_ptr prog) { - copy_program_ = prog; + PADDLE_THROW(platform::errors::Unimplemented( + "SetCopyProgram is not implemented in NewIRInterpreter.")); } void NewIRInterpreter::SetSkipGcVars( @@ -331,21 +163,17 @@ const VariableScope* NewIRInterpreter::GetVariableScope() const { void NewIRInterpreter::reset_scope(Scope* new_scope) { var_scope_.SetScope(new_scope); - auto& var_list = var_scope_.MutableVarList(); - for (size_t i = 0; i < var_list.size(); i++) { - const auto& var_name = var_scope_.GetNameById(i); - var_list[i] = new_scope->FindVar(var_name); + scope_ = new_scope; + for (size_t i = 0; i < variable_list_.size(); i++) { + const auto& var_name = GetNameById(i); + variable_list_[i] = new_scope->FindVar(var_name); } // The index should be assured valid, cause the InterpreterCore may not be // fully built, but was still cached and used. For example, see unit test // `test_assert.py`, it may exit before `NewIRInterpreter::Convert`, // but still was cached and used by later tests. - for (size_t i = 0; i < std::min(refs_.size(), var_list.size()); i++) { - refs_[i]->ResetVariable(var_list[i]); - } - - for (auto& ins : vec_instruction_) { - BuildAndCacheInstructionCtx(&ins); + for (size_t i = 0; i < std::min(refs_.size(), variable_list_.size()); i++) { + refs_[i]->ResetVariable(variable_list_[i]); } } @@ -373,8 +201,17 @@ void NewIRInterpreter::ShareWorkQueueFrom(InterpreterBaseImpl* src) { } void NewIRInterpreter::ShareBuildResultsFrom(const InterpreterBaseImpl& src) { - PADDLE_THROW(platform::errors::Unimplemented( - "ShareBuildResultsFrom is not implemented in NewIRInterpreter.")); + if (is_shared_results_build_ || !src.IsSharedResultsBuild()) { + return; + } + // share op dependency + ir_dependency_builder_.ShareDependencyFrom(src.GetNewIrDependencyBuilder()); + dependecy_count_ = src.GetDependencyCount(); + // share event analysis + ir_stream_analyzer_.ShareEventInfoFrom(src.GetNewIrStreamAnalyzer()); + is_shared_results_build_ = true; + VLOG(8) << "Share Build Results from InterpreterCore(" << &src + << ") to InterpreterCore(" << this << ")"; } // op dependences @@ -384,10 +221,14 @@ const interpreter::DependencyBuilder& NewIRInterpreter::GetDependencyBuilder() "GetDependencyBuilder is not implemented in NewIRInterpreter.")); } +const interpreter::NewIrDependencyBuilder& +NewIRInterpreter::GetNewIrDependencyBuilder() const { + return ir_dependency_builder_; +} + std::shared_ptr> NewIRInterpreter::GetDependencyCount() const { - PADDLE_THROW(platform::errors::Unimplemented( - "GetDependencyCount is not implemented in NewIRInterpreter.")); + return dependecy_count_; } const interpreter::StreamAnalyzer& NewIRInterpreter::GetStreamAnalyzer() const { @@ -395,1069 +236,114 @@ const interpreter::StreamAnalyzer& NewIRInterpreter::GetStreamAnalyzer() const { "GetStreamAnalyzer is not implemented in NewIRInterpreter.")); } -bool NewIRInterpreter::IsSharedResultsBuild() const { - PADDLE_THROW(platform::errors::Unimplemented( - "IsSharedResultsBuild is not implemented in NewIRInterpreter.")); -} - -bool NewIRInterpreter::BuildInplaceCheckVarIsOnlyInput( - const std::vector>& input_var2op, size_t var_index) { - if (!var_scope_.VarDesc(var_index)) { - return input_var2op.at(var_index).size() == 1; - } else { - int is_input_cnt = 0; - for (auto inst_id : input_var2op.at(var_index)) { - OpInOutInfo info; - info.Build(vec_instruction_.at(inst_id).OpBase()); - if (info.IsInArgBufferNeeded(var_scope_.VarDesc(var_index)->Name())) { - is_input_cnt++; - } - } - return is_input_cnt == 1; - } -} - -std::shared_ptr NewIRInterpreter::GetWorkQueue() { - if (async_work_queue_ == nullptr) { - async_work_queue_ = std::make_shared( - execution_config_.host_num_threads, - execution_config_.device_num_threads, - nullptr); - } - return async_work_queue_; -} - -void NewIRInterpreter::BuildAndCacheInstructionCtx(Instruction* instr_node) { - Scope* inner_scope = InnerScope(); - VariableValueMap ins_map; - for (auto& var_name_item : instr_node->Inputs()) { - std::vector input_vars; - - input_vars.reserve(var_name_item.second.size()); - for (auto& id : var_name_item.second) { - input_vars.emplace_back(inner_scope->FindVar(var_scope_.GetNameById(id))); - } - ins_map.emplace(var_name_item.first, std::move(input_vars)); - } - - VariableValueMap outs_map; - for (auto& var_name_item : instr_node->Outputs()) { - std::vector out_vars; - - out_vars.reserve(var_name_item.second.size()); - for (auto& id : var_name_item.second) { - out_vars.emplace_back(inner_scope->FindVar(var_scope_.GetNameById(id))); - } - outs_map.emplace(var_name_item.first, std::move(out_vars)); - } - - // set runtime_ctx and infershape_ctx_ - if (instr_node->OpBase()->Type() == "cinn_launch" || - instr_node->OpBase()->Type() == "cinn_instruction_run") { // OP use scope - // in kernel - Scope* inner_scope = InnerScope(); - instr_node->ResetContextWithScope(ins_map, outs_map, *inner_scope); - } else { - instr_node->ResetContext(ins_map, outs_map); - } -} - -void NewIRInterpreter::BuildInplace() { - // NOTE(Ruibiao): coalesce_tensor_op outputs a FusedOutput phi::DenseTensor - // and a list of Output Tensors which are sliced from the FusedOutput. These - // outputs sholud not be the outvar of the in-place var-pair since memory - // reuse between FusedOutput and Output Tensors is assumed. For the following - // example: - // fused_var, var1, var2, var3 = coalesce_tensor(var1, var2, var3) - // var1 = sum(var4, var5) - // ... - // - // After running coalesce_tensor_op, var1 is assumed to share the buffer - // slices from fused_var. However, if sum_op is in-place, then var1 would - // re-share the buffer with var4 instead of fused_var. - std::set skip_inplace_outvars; - for (Instruction& instr : vec_instruction_) { - OperatorBase* op = instr.OpBase(); - if (op->Type() == kCoalesceTensor) { - const std::vector& outputs = - op->OutputVars(/*has_intermediate=*/false); - skip_inplace_outvars.insert(outputs.begin(), outputs.end()); - } - } - - Scope* local_scope = InnerScope(); - std::vector> input_var2op(var_scope_.VarSize()); - for (Instruction& instr : vec_instruction_) { - for (auto& item : instr.Inputs()) { - for (int var_id : item.second) { - if (var_id != kEmptyVarIndex) { - input_var2op.at(var_id).push_back(instr.Id()); - } - } - } - } - - for (auto& instr : vec_instruction_) { - auto* op_base = instr.OpBase(); - if (!op_base->Info().infer_inplace_) { - continue; - } - - auto in_to_outs = op_base->Info().infer_inplace_( - platform::is_gpu_place(instr.DeviceContext().GetPlace())); - - auto& inputs = instr.Inputs(); - auto& outputs = instr.Outputs(); - for (auto& pair : in_to_outs) { - auto iter = inputs.find(pair.first); - if (iter != inputs.end() && !iter->second.empty()) { - auto in_var_desc = var_scope_.VarDesc(iter->second[0]); - if (in_var_desc && in_var_desc->Persistable()) { - continue; - } - if (var_scope_.GetVarSikpInplace(iter->second[0])) { - continue; - } - if (BuildInplaceCheckVarIsOnlyInput(input_var2op, iter->second[0])) { - auto iterout = outputs.find(pair.second); - if (iterout != outputs.end() && !iterout->second.empty()) { - const std::string& invar_name = - var_scope_.GetNameById(iter->second[0]); - const std::string& outvar_name = - var_scope_.GetNameById(iterout->second[0]); - auto invar = local_scope->FindVar(invar_name); - auto outvar = local_scope->FindVar(outvar_name); - - if (invar && outvar && invar->IsType() && - outvar->IsType() && - skip_inplace_outvars.find(outvar_name) == - skip_inplace_outvars.end()) { - instr.AddInplace(invar, outvar); - VLOG(3) << "inplace " << op_base->Type() << " " << invar_name - << " -> " << outvar_name; - } - } - } - } - } - } -} - -void NewIRInterpreter::PrepareForCUDAGraphCapture() { - if (!FLAGS_new_executor_use_cuda_graph) return; -#ifdef PADDLE_WITH_CUDA - PADDLE_ENFORCE_EQ( - platform::IsCUDAGraphCapturing(), - false, - platform::errors::PermissionDenied("CUDA Graph is not allowed to capture " - "before prepare.")); - PADDLE_ENFORCE_EQ(platform::is_gpu_place(place_), - true, - platform::errors::InvalidArgument( - "CUDA Graph is only supported on NVIDIA GPU device.")); - // If set true, will call `cudaStreamSynchronize(nccl_stream)`after allreduce. - // which may cause error in cuda graph. This behavior is consistent with PE. - PADDLE_ENFORCE_EQ(FLAGS_sync_nccl_allreduce, - false, - platform::errors::InvalidArgument( - "FLAGS_sync_nccl_allreduce must be False to support " - "CUDA Graph capturing.")); - - // All output vars of coalesce_tensor op should be persistable. - // If fused output var of coalesce_tensor is gc, it will cause accuracy - // problem. The specific reasons need to be analyzed. -// for (auto& op_desc : block_.AllOps()) { -// if (op_desc->Type() == kCoalesceTensor) { -// for (auto& out_var_name : op_desc->OutputArgumentNames()) { -// // The fused var needs to be set to persistable, not just added to -// // skip_gc_vars. -// // In the case where the feed fetch var is changed, -// StandaloneExecutor -// // will be newly constructed. If the fused var is not persistable, -// // these vars will be recreated and initialized, resulting in -// // precision problems. -// auto* out_var = op_desc->Block()->FindVarRecursive(out_var_name); -// if (out_var) { -// out_var->SetPersistable(true); -// VLOG(4) << "Mark Var(" << out_var_name << ") as Persistable."; -// } -// } -// } -// } -#else - PADDLE_THROW(platform::errors::Unimplemented( - "CUDA Graph is only supported on NVIDIA GPU device.")); -#endif -} - -void NewIRInterpreter::CheckCUDAGraphBeforeRun( - const std::vector& feed_names) { -#ifdef PADDLE_WITH_CUDA - if (platform::IsCUDAGraphCapturing()) { - PADDLE_ENFORCE_EQ( - feed_names.empty(), - true, - platform::errors::InvalidArgument( - "Feeding data is not permitted when capturing CUDA Graph.")); - PADDLE_ENFORCE_EQ( - FLAGS_new_executor_use_cuda_graph, - true, - platform::errors::InvalidArgument( - "You must turn on FLAGS_new_executor_use_cuda_graph to True " - "to enable CUDA Graph capturing.")); - PADDLE_ENFORCE_EQ( - place_, - platform::CUDAGraphCapturingPlace(), - platform::errors::InvalidArgument("The place to capture CUDAGraph is " - "not the same as the place to run.")); - } -#endif -} - -void NewIRInterpreter::BuildOperatorDependences() { - // analysis the dependences between ops, add next_instr_list to each instr, - // and set the dependecy_count_ - size_t instr_num = vec_instruction_.size(); - dependecy_count_ = std::vector(instr_num, 0); - auto downstream_map = dependency_builder_.Build(vec_instruction_); - - for (size_t instr_id = 0; instr_id < instr_num; ++instr_id) { - Instruction& cur_instr = vec_instruction_[instr_id]; - const std::set& next_instr_ids = downstream_map[instr_id]; - - if (FLAGS_new_executor_serial_run) { - for (size_t next_instr_id : next_instr_ids) { - cur_instr.AddNextInstrInSameThread(next_instr_id); - } - } else { - if (cur_instr.KernelType() == OpFuncType::kGpuAsync) { - for (size_t next_instr_id : next_instr_ids) { - if (vec_instruction_[next_instr_id].KernelType() == - OpFuncType::kGpuAsync) { - cur_instr.AddNextInstrInSameThread(next_instr_id); - } else { - cur_instr.AddNextInstrInDifferentThread(next_instr_id); - } - } - } else { - bool has_instr_in_same_thread = false; - for (size_t next_instr_id : next_instr_ids) { - if (!has_instr_in_same_thread && - vec_instruction_[next_instr_id].KernelType() != - OpFuncType::kGpuAsync) { - cur_instr.AddNextInstrInSameThread(next_instr_id); - has_instr_in_same_thread = true; - } else { - cur_instr.AddNextInstrInDifferentThread(next_instr_id); - } - } - } - } - - for (size_t next_instr_id : next_instr_ids) { - ++dependecy_count_[next_instr_id]; - } - } -} - -// At the end of each step, the holder of phi::DenseTensor in LoDTensorArray is -// null. Clear these Tensors and leave LoDTensorArray empty, otherwise an -// exception will occur in the next step -void NewIRInterpreter::ClearLoDTensorArrayInLocalScope() { - auto vars = local_scope_->LocalVars(); - for (auto var : vars) { - if (var->IsType()) { - auto* lod_tensor_arr = var->GetMutable(); - lod_tensor_arr->clear(); - } - } -} - -void NewIRInterpreter::Convert( - std::vector* op_func_nodes) { - auto& vec_meta_info = var_scope_.MutableVecMetaInfo(); - auto nodes = *op_func_nodes; - auto op_nums = nodes.size(); - vec_instruction_.clear(); - vec_instruction_.reserve(op_nums); - for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) { - auto& op_func_node = nodes[op_idx]; - auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node); - vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_); -#ifdef PADDLE_WITH_CUDA - if (FLAGS_new_executor_use_cuda_graph) { - auto& op = op_func_node.operator_base_; - auto& op_type = op->Type(); - if (op_type == interpreter::kMemcpyD2H || - op_type == interpreter::kMemcpyH2D) { - PADDLE_THROW(paddle::platform::errors::Fatal( - "Cuda memory copy d2h/h2d is not allowed while using cuda graph.")); - } - PADDLE_ENFORCE_EQ(typeid(*dev_ctx_) == typeid(phi::GPUContext), - true, - platform::errors::InvalidArgument( - "Device context of op %s must be [%s] while using " - "cuda graph, but got [%s].", - op_type, - typeid(phi::GPUContext).name(), - typeid(*dev_ctx_).name())); - // cuda graph needs to record all stream - phi::backends::gpu::CUDAGraphContextManager::Instance() - .RecordCapturingDeviceContext(dev_ctx_); - } -#endif - } - - BuildOperatorDependences(); - - // NOTE(Ruibiao): For cross-step stream synchronization, an event may be - // recorded in the first step and waited in the second step. So, in the first - // step, the WaitEvent may be called without RecordEvent. Considering that - // before the first call to RecordEvent, an Event represents an empty set of - // work and WaitEvent always return succeed immediately, we omit the - // prelude-record for the first step here. - stream_analyzer_.ConstructEvents(&vec_instruction_); - - // add event for the input var of jit program, since there are async copied - // from gpu_pinned place to gpu place on compute stream. - for (size_t i = 0; i < dependecy_count_.size(); ++i) { - if (dependecy_count_[i] == 0) { - auto& inst = vec_instruction_[i]; - if (inst.OpBaseValid() && - inst.OpBase()->Type() == interpreter::kMemcpyD2H && - platform::is_gpu_place(place_)) { - for (auto& item : inst.Inputs()) { - for (auto var_id : item.second) { - auto name = var_scope_.GetNameById(var_id); - if (JitInputVars().count(name)) { - auto device_event = std::make_shared( - place_, platform::GenerateDeviceEventFlag()); - VLOG(4) << "Add input event for input: " << name << " of " - << inst.OpBase()->Type(); - inst.AddEventToWait( - i, device_event, stream_analyzer_.GetWaiterType(inst)); - } - } - } - } - } - } - - // clear the last_live_ops list for all vars in skip_gc_vars - for (const std::string& skip_gc_var : execution_config_.skip_gc_vars) { - int var_id = var_scope_.GetIdByName(skip_gc_var); - if (var_id != -1) { - last_live_ops_[var_id].clear(); - VLOG(8) << "Skip gc for var: " << skip_gc_var; - } - } - - // shrink, find the downstream op that has no other op in the - // downstream list happens before it - // For example, - // b = op1(a) - // c = op2(a, b) - // in this case, a is the input of op1 and op2, we only need to check - // a after op2, because op2 always uses a after op1. - for (size_t i = 0; i < last_live_ops_.size(); ++i) { - std::set minumum_last_live_ops; - for (size_t item : last_live_ops_[i]) { - bool not_before_any = true; - // find the op that is not executed before any - for (size_t other_item : last_live_ops_[i]) { - if (dependency_builder_.OpHappensBefore(item, other_item)) { - VLOG(8) << "happens_before: " << item << "->" << other_item - << ", so skip " << item; - not_before_any = false; - break; - } - } - if (not_before_any) { - VLOG(8) << "last live op of var " << i << " " - << var_scope_.GetNameById(i) << " : " << item << " " - << vec_instruction_[item].OpBase()->Type(); - minumum_last_live_ops.insert(item); - vec_instruction_[item].AddGCCheckVar(i); - } - } - last_live_ops_[i] = minumum_last_live_ops; - vec_meta_info[i].var_ref_count_ = last_live_ops_[i].size(); - } - - for (auto& dep : dependecy_count_) { - deps_.emplace_back(std::make_shared(dep)); - } - for (size_t i = 0; i < vec_meta_info.size(); ++i) { - refs_.emplace_back(std::make_shared( - vec_meta_info[i].var_ref_count_, var_scope_.VarRef(i))); - } - - AnalyseExecuteOrderForTrace(dependency_builder_.OpDownstreamMap(), - instruction_scheduling_priority_less); -} - -void NewIRInterpreter::BuildSkipShareLoDInfo() { - for (size_t i = 0; i < vec_instruction_.size(); ++i) { - bool can_skip_lod = true; - for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) { - for (auto& var : input.second) { - if (var->IsType()) { - if (!var->Get().lod().empty()) { - can_skip_lod = false; - break; - } - } else { - can_skip_lod = false; - break; - } - } - } - if (can_skip_lod) { - VLOG(8) << "skip share lod for: " << vec_instruction_[i].OpBase()->Type() - << " (" << i << ")"; - } - vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod); - } -} - -void NewIRInterpreter::RunOperator(const Instruction& instr_node) { - auto* op = instr_node.OpBase(); - auto place = instr_node.DeviceContext().GetPlace(); - Scope* local_scope = InnerScope(); - VLOG(4) << "Start run " << place << " " << op->DebugStringEx(local_scope); - - auto op_with_kernel = dynamic_cast(op); - { - // If it is OperatorBase, InferShape do nothing. - if (op_with_kernel != nullptr) { - platform::RecordEvent infershape_event( - "infer_shape", - platform::TracerEventType::OperatorInner, - 1, - platform::EventRole::kInnerOp); - - // see OperatorWithKernel::RunImpl in operator.cc for why - if (!(op_with_kernel->HasAttr(kAllKernelsMustComputeRuntimeShape) && - op_with_kernel->Attr(kAllKernelsMustComputeRuntimeShape))) { - op_with_kernel->Info().infer_shape_( - instr_node.InnerInferShapeContext().get()); - } - infershape_event.End(); - platform::RecordOpInfoSupplement(op->Type(), - op->Attrs(), - *(instr_node.InnerInferShapeContext()), - *(instr_node.InnerRuntimeContext()), - op->Id()); - } - } - if (op_with_kernel != nullptr && FLAGS_new_executor_use_inplace) { - // TODO(xiongkun03) Does operator base support inplace ? - for (auto& pair : instr_node.InplaceInfo()) { - const auto& in = paddle::framework::details::GetTensorFromVar(pair.first); - auto* out = - paddle::framework::details::GetMutableTensorFromVar(pair.second); - if (in.dims() == out->dims()) { - out->ShareBufferWith(in); - } - } - } - - { - platform::RecordEvent compute_event( - "compute", - platform::TracerEventType::OperatorInner, - 1, - platform::EventRole::kInnerOp); - if (op_with_kernel == nullptr) { // operator base - instr_node.OpBase()->Run(*local_scope, place_); - } else { - phi::Kernel* kernel = instr_node.PhiKernel(); - if (kernel && kernel->IsValid()) { // phi kernel - if (kernel->GetKernelRegisteredType() == - phi::KernelRegisteredType::FUNCTION) { - VLOG(4) << "Run function kernel: " << op->Type(); - VLOG(4) << instr_node.InnerRuntimeContext().get() << " " - << &instr_node.DeviceContext(); - phi::KernelContext phi_kernel_context; - op_with_kernel->BuildPhiKernelContext( - *instr_node.InnerRuntimeContext().get(), - const_cast(&instr_node.DeviceContext()), - &phi_kernel_context); - - (*kernel)(&phi_kernel_context); - } else { - VLOG(4) << "Run structure kernel: " << op->Type(); - (*kernel)(instr_node.InnerExecutionContext().get()); - } - } else { // fluid kernel - instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get()); - } - } - } - - VLOG(4) << "End run " << place << " " << op->DebugStringEx(local_scope); - - if (!instr_node.InplaceBackMap().empty()) { - platform::RecordEvent inplaceback_event( - "InplaceVarsBack", platform::TracerEventType::UserDefined, 10); - auto& m = instr_node.InplaceBackMap(); - // NOTE(zhiqiu): same logic as TransferInplaceVarsBack() in operator.cc - for (auto& p : m) { - auto* transformed_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar( - var_scope_.VarRef(p.first)); - auto* original_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar( - var_scope_.VarRef(p.second)); - original_tensor->ShareDataWith(*transformed_tensor); - VLOG(4) << "Transfer inplace variable back form " - << var_scope_.GetNameById(p.first) << " to " - << var_scope_.GetNameById(p.second); - } - } - - /*For profiling/benchmark only*/ - if (FLAGS_benchmark) { - instr_node.DeviceContext().Wait(); -#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - PADDLE_ENFORCE_GPU_SUCCESS(platform::GpuGetLastError()); - VLOG(4) << "Operator(" << op->Type() - << "): context wait and get last error"; -#endif - } - - for (auto& hook : hookfuncs_) { - hook(op, local_scope); - } - - // for debug nan/inf - if (op_with_kernel != nullptr && FLAGS_check_nan_inf) { - VLOG(4) << "Check nan/inf"; - try { - framework::details::CheckOpHasNanOrInf( - *op, - *local_scope, - place); // TODO(xiongkun03) change it to inner scope. - } catch (...) { - const std::vector* callstack = nullptr; - auto attrs = op->Attrs(); - auto iter = - attrs.find(OpProtoAndCheckerMaker::OpCreationCallstackAttrName()); - if (iter != attrs.end()) { - callstack = &PADDLE_GET_CONST(std::vector, iter->second); - if (callstack->empty()) callstack = nullptr; - } - std::ostringstream sout; - if (callstack) { - if (FLAGS_call_stack_level > 1) { - sout << "\n\n Compile Traceback (most recent call last):"; - } else { - sout << "In user code:\n"; - } - for (auto& line : *callstack) { - sout << "\n " << line; - } - } - std::cout << sout.str() << std::endl; - std::rethrow_exception(std::current_exception()); - } - } -} - -void NewIRInterpreter::RunInstruction(const Instruction& instr_node) { - OperatorBase* op = nullptr; - if (instr_node.OpBaseValid()) { - op = instr_node.OpBase(); - platform::RecordEvent instruction_event( - op->Type(), platform::TracerEventType::Operator, 1); - } - - SetDeviceId(instr_node.DeviceContext().GetPlace()); - - try { - instr_node.WaitEvent(place_); - - if (instr_node.PreDefineContext()) { - VLOG(5) << "run new ir selected kernel"; - auto op_func_node = const_cast((instr_node.OpFunc())); - VLOG(5) << "begin to run op " << op_func_node->phi_op_name_; - if (op_func_node->infer_meta_interface_) { - op_func_node->infer_meta_interface_->infer_meta_( - &(op_func_node->infer_meta_context_)); - } - VLOG(5) << "after run infer meta"; - if (op_func_node->fluid_op) { - // run fluid op - ExecutionContext exe_ctx(*(op_func_node->operator_base_.get()), - *scope_, - *(op_func_node->dev_ctx_), - *(op_func_node->runtime_ctx_.get())); - (*(op_func_node->phi_kernel_))(&exe_ctx); - - } else { - (*(op_func_node->phi_kernel_))(&(op_func_node->kernel_context_)); - } - VLOG(5) << "after run kernel"; - } else if (!instr_node.IsArtificial()) { - RunOperator(instr_node); - CheckGC(instr_node); - interpreter::LogDeviceMemoryStats(place_); - } - - instr_node.RecordEvent(place_); - } catch (platform::EnforceNotMet& ex) { - LOG(WARNING) << instr_node.OpFunc()->phi_op_name_ - << " raises an EnforceNotMet exception " - << platform::demangle(typeid(ex).name()) << ", " << ex.what(); - exception_holder_.Catch(std::make_exception_ptr(std::move(ex))); - } catch (platform::EOFException&) { - exception_holder_.Catch(std::current_exception()); - } catch (std::exception& ex) { - LOG(WARNING) << instr_node.OpFunc()->phi_op_name_ << " raises an exception " - << platform::demangle(typeid(ex).name()) << ", " << ex.what(); - exception_holder_.Catch(std::current_exception()); - } catch (...) { - LOG(WARNING) << instr_node.OpFunc()->phi_op_name_ - << " raises an unknown exception"; - exception_holder_.Catch(std::current_exception()); - } -} - -std::string NewIRInterpreter::GetDepsString() const { - std::stringstream ss; - auto downstream_map = dependency_builder_.OpDownstreamMap(); - ss << "Note: when static_dep is 1, it is ok that the dynamic_dep will not " - "be decreased to 0." - << std::endl; - ss << "unfinished_op_number_:" << unfinished_op_number_ << std::endl; - for (size_t i = 0; i < deps_.size(); ++i) { - ss << "op:" << i << ", type: " << vec_instruction_[i].OpBase()->Type() - << ", static_dep:" << deps_[i]->StaticDep() - << ", dynamic_dep:" << deps_[i]->DynamicDep() << ", downstream op: "; - for (auto id : downstream_map[i]) { - ss << id << ", "; - } - ss << std::endl; - } - return ss.str(); -} - -void NewIRInterpreter::ExecuteInstructionList( - const std::vector& vec_instr) { - unfinished_op_number_ = vec_instr.size(); - if (unfinished_op_number_ == 0) { - VLOG(4) << "No op to run, return"; - return; - } - - exception_holder_.Clear(); - - for (size_t i = 0; i < dependecy_count_.size(); ++i) { - if (dependecy_count_[i] == 0) { - // NOTE(zhiqiu): hot fix for jit input var - RecordMemcpyD2H(vec_instr.at(i)); - if (FLAGS_new_executor_serial_run) { - RunInstructionAsync(i); - } else { - async_work_queue_->AddTask(vec_instr.at(i).KernelType(), - [this, i] { RunInstructionAsync(i); }); - } - } - } - - // For debug hang in main_thread_blocker_.WaitEvent(), - // launch async task to log deps every - // FLAGS_executor_log_deps_every_microseconds, then cancel the std::async when - // main_thread_blocker_.WaitEvent() executed. Why not use std::async instead - // of workqueue? To make sure that the logging thread itself will not affect - // the workqueue - // used in interpretercore. - - std::future logged_times; - std::atomic_bool cancel_log = ATOMIC_VAR_INIT(false); - if (FLAGS_executor_log_deps_every_microseconds) { - logged_times = std::async( - std::launch::async, - [this](const std::atomic_bool& cancel) { - int times = 0; - while (!cancel) { - std::this_thread::sleep_for(std::chrono::microseconds( - FLAGS_executor_log_deps_every_microseconds)); - // check again, since cancel may be changed during sleep - if (cancel) { - break; - } - VLOG(6) << "deps:\n" << GetDepsString(); - times++; - } - return times; - }, - std::ref(cancel_log)); - } - - auto event_name = main_thread_blocker_.WaitEvent(); - VLOG(1) << "main_thread_blocker_(" << &main_thread_blocker_ - << ") got event_name: " << event_name; - - cancel_log = true; - if (logged_times.valid()) { - VLOG(1) << "Logged deps for " << logged_times.get() << " times"; - } - - if (UNLIKELY(exception_holder_.IsCaught())) { - VLOG(1) << "Exception caught " << exception_holder_.Type(); - // Graceful exit when the executor encountered a fatal error. - // EOF is not a fatal error. - if (exception_holder_.Type() != "EOF") { - async_work_queue_->Cancel(); - async_work_queue_.reset(); - } - VLOG(4) << "Cancel ok"; - PADDLE_ENFORCE_EQ( - main_thread_blocker_.Clear(), - 0, - platform::errors::PreconditionNotMet( - "main_thread_blocker_.Clear() return -1, clear failed")); - VLOG(4) << "clear ok"; - exception_holder_.ReThrow(); - } -} - -void NewIRInterpreter::RunNextInstructions(const Instruction& instr, - SchedulingQueue* reserved_next_ops) { - platform::RecordEvent record( - "RunNextInstructions", platform::TracerEventType::UserDefined, 10); - - auto IsReady = [this](size_t next_id) { - VLOG(4) << "op_id: " << next_id - << ", remain deps: " << deps_[next_id]->DynamicDep(); - return deps_[next_id]->CheckAndDecrease(); - }; - - for (size_t next_instr_id : instr.NextInstrsInDifferenceThread()) { - if (IsReady(next_instr_id)) { - async_work_queue_->AddTask( - vec_instruction_[next_instr_id].KernelType(), - [this, next_instr_id]() { RunInstructionAsync(next_instr_id); }); - } - } - - for (size_t next_instr_id : instr.NextInstrsInSameThread()) { - if (IsReady(next_instr_id)) { - reserved_next_ops->push(next_instr_id); - } - } -} - -void NewIRInterpreter::RunInstructionAsync(size_t instr_id) { - // NOTE(Ruibiao): Due to the uncertain order in multi-threading asynchronous - // scheduling, the priority order involved cross-thread scheduling is not - // guaranteed. Only Ops scheduled by the same AddTask call have the guarantee - // of priority order. - SchedulingQueue ready_ops(instruction_scheduling_priority_less); - ready_ops.push(instr_id); - while (!ready_ops.empty()) { - instr_id = ready_ops.top(); - ready_ops.pop(); - auto& instr_node = vec_instruction_.at(instr_id); - - RunInstruction(instr_node); - - if (UNLIKELY(exception_holder_.IsCaught())) { - VLOG(4) << "Exception caught"; - if (exception_notifier_ != nullptr) { - exception_notifier_->NotifyEvent(); - } - return; - } - - VLOG(4) << "unfinished_op_number_: " << unfinished_op_number_; - if (UNLIKELY(unfinished_op_number_.fetch_sub( - 1, std::memory_order_relaxed) == 1)) { - if (completion_notifier_ != nullptr) { - completion_notifier_->NotifyEvent(); - } - } - - RunNextInstructions(instr_node, &ready_ops); - } -} - -void NewIRInterpreter::RecordStreamForGC(const Instruction& instr) { -#if !defined(PADDLE_WITH_CUDA) && !defined(PADDLE_WITH_HIP) - PADDLE_THROW(platform::errors::Unimplemented( - "RecordStreamForGC is only implemented when compiled with GPU.")); -#else - if (!IsInterpretercoreFastGCEnabled() || - instr.KernelType() != OpFuncType::kGpuAsync) { - return; - } - if (instr.DeviceContext().GetPlace().GetType() == - phi::AllocationType::CUSTOM) { - return; - } - platform::RecordEvent record( - "RecordStreamForGC", platform::TracerEventType::UserDefined, 10); - - gpuStream_t stream = - reinterpret_cast(instr.DeviceContext()).stream(); - auto TensorRecordStream = [&stream](phi::DenseTensor& tensor) { - auto allocation = tensor.Holder(); - if (allocation == nullptr) { - return; - } - - const platform::Place& place = allocation->place(); - if (platform::is_gpu_place(place)) { - memory::RecordStream(allocation, stream); - } else if (platform::is_cuda_pinned_place(place)) { - // TODO(Ruibiao): Here should do something to make sure that the tensor - // is not freed until the H2D copies done. However, simplely launch a - // CUDA runtime callback to the H2D stream may lead a high performance - // overhead. As all the cases we meet in H2D are copies from CPUPlace at - // present, we just log a WARNING here. A better design is required. - LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous " - "manner may lead a data inconsistent"; - } else { - // memory copies involve CPUPlace are always synchronous, so just do - // nothing here - } - }; - - /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when - * all the following conditions are satisfied: - * 1. The tensor will be GC after running the instruction, i.e., in - * instr.GCCheckVars. - * 2. The stream which initializes this tensor is different from the stream - * which the instruction run in. - * 3. The tensor is the instruction's input, cause we assume that - * instruction will initialize all output tensors with its running stream. - * 4. In the OP function of this instruction, the tensor is an input of a - * async CUDA kernel. - * - * Here we only process the first condition, because: - * 1. Since the RecordStream function will directly return when the recored - * stream is equal to the owning stream, recording a stream same as which - * initialized this tensor has less time overhead. Conversely, it may take - * more time if we try to extract those cross-stream input vars from - * instr.GCCheckVars. - * 2. Now the instruction has no idea of which vars involving async running - * in OP function, and thus we can not recognize condition 4. It should be - * supported later. - */ - for (int var_id : instr.GCCheckVars()) { - VLOG(4) << "GC sync " << var_scope_.GetNameById(var_id) << " " - << var_scope_.VarDesc(var_id); - - // persistable var will be ignore while GC - if (var_scope_.VarDesc(var_id) && - var_scope_.VarDesc(var_id)->Persistable()) { - continue; - } - - paddle::framework::Variable* var = var_scope_.VarRef(var_id); - if (var == nullptr) { - continue; - } - - if (var->IsType()) { - TensorRecordStream(*(var->GetMutable())); - } else if (var->IsType< - operators::reader:: - OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) { - // do nothing - } else if (var->IsType()) { - TensorRecordStream( - *(var->GetMutable()->mutable_value())); - } else if (var->IsType()) { - auto* tensor_arr = var->GetMutable(); - for (auto& tensor : *tensor_arr) { - TensorRecordStream(tensor); - } - } else if (var->IsType>()) { - // do nothing - } else { - PADDLE_THROW(platform::errors::Unimplemented( - "The variable(%s) is not supported in eager deletion.", - framework::ToTypeName(var->Type()))); - } - } -#endif -} - -void NewIRInterpreter::CheckGC(const Instruction& instr) { - platform::RecordEvent record( - "CheckGC", platform::TracerEventType::UserDefined, 10); -#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - RecordStreamForGC(instr); -#endif - auto& var_scope = var_scope_; - - for (auto var_id : instr.GCCheckVars()) { - VLOG(4) << "GC:" << var_scope_.GetNameById(var_id) << ", id:" << var_id - << ", ref:" << refs_[var_id]->DynamicRef(); - bool is_ready = refs_[var_id]->CheckAndDecrease(); - // ignore all persistable var while GC - if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) { - continue; - } - if (is_ready) { - VLOG(6) << "Async delete variable with name : " - << var_scope.GetNameById(var_id); - gc_->Add(refs_[var_id]->Var(), instr); - } - } -} - -void NewIRInterpreter::Prepare( - const std::vector& feed_names, - const std::vector& feed_tensors, - bool prepare_feed) { - PADDLE_ENFORCE_EQ(feed_names.size(), - feed_tensors.size(), - platform::errors::PreconditionNotMet( - "Required feed_names.size() == feed_tensors.size(), " - "but received %d != %d", - feed_names.size(), - feed_tensors.size())); - auto FeedInput = [&] { - VLOG(4) << "Feed inputs"; - for (size_t i = 0; i < feed_names.size(); ++i) { - auto* feed_var = local_scope_->FindVar(feed_names[i]); - PADDLE_ENFORCE_NOT_NULL( - feed_var, - platform::errors::NotFound("Variable %s should not be nullptr.", - feed_names[i])); - - auto feed_tensor = feed_var->GetMutable(); - feed_tensor->ShareDataWith(feed_tensors[i]); - feed_tensor->set_lod(feed_tensors[i].lod()); - } - }; - // TODO(dev): Support this - // if (!is_build_) { - // paddle::framework::interpreter::BuildVariableScope( - // block_, execution_config_, &var_scope_); - // FeedInput(); - // std::vector op_func_nodes; - // paddle::framework::interpreter::BuildOpFuncList( - // place_, - // block_, - // execution_config_.skip_gc_vars, - // &op_func_nodes, - // &var_scope_, - // execution_config_, - // HasLocalScope(), - // static_build_); - // SetFeedVarsInplaceSkip(feed_names); - // // convert vec func_list to graph - // Convert(&op_func_nodes); - // UpdateSyncOpNum(); - // if (static_build_) { - // VLOG(4) << "RUN impl"; - // RunImpl(); - // } - // BuildSkipShareLoDInfo(); - // is_build_ = true; - // } - // NOTE: Because feed_tensor will be GC after - // paddle::framework::BuildOpFuncList, so we should - // call FeedInput again. - if (prepare_feed) { - FeedInput(); - } -} - -void NewIRInterpreter::SetFeedVarsInplaceSkip( - const std::vector& feed_names) { - for (auto& feed_name : feed_names) { - var_scope_.SetVarSikpInplace(feed_name, true); - } +const interpreter::NewIrStreamAnalyzer& +NewIRInterpreter::GetNewIrStreamAnalyzer() const { + return ir_stream_analyzer_; } -bool NewIRInterpreter::HasLocalScope() const { return local_scope_ != nullptr; } - -Scope* NewIRInterpreter::InnerScope() { - return local_scope_ != nullptr ? local_scope_ : scope_; +bool NewIRInterpreter::IsSharedResultsBuild() const { + return is_shared_results_build_; } -// Note(zhangbo): -// (1) What is "Trace"? -// The OP execute scheduling rule adopted by Interpretercore by default is a -// multi-threaded scheduling mode(see ExecuteInstructionList). By maintaining a -// high-performance thread pool, the OP's execute scheduling is distributed to -// the sub threads maintained by the thread pool, but the main thread does not -// have any tasks. In Trace mode, the executor will execute directly in the main -// thread according to the pre provided OP sequence(trace_execute_order_), -// instead of being distributed to the thread pool. -// (2) When we use "Trace"? -// In dygraph to static, This scheduling causes that the execution of the -// forward and backward OPs and the execution of the dygraph optimizer cannot be -// executed in the same thread. Executing thread switch may cause cpu cache -// miss. When a model is all KQueueAsync type OPs, all OPs will be distributed -// to the DeviceThread for execution, and the multithreading scheduling will not -// have any benefits. Therefore, in the dynamic to static, when the number of -// KQueueAsync Ops is 0, we choose Trace mode. -void NewIRInterpreter::TraceInstructionList( - const std::vector& vec_instr) { - unfinished_op_number_ = vec_instr.size(); - if (unfinished_op_number_ == 0) { - VLOG(4) << "No op to run, return"; - return; +std::shared_ptr NewIRInterpreter::GetWorkQueue() { + if (async_work_queue_ == nullptr) { + async_work_queue_ = std::make_shared( + execution_config_.host_num_threads, + execution_config_.device_num_threads, + nullptr); } + return async_work_queue_; +} - exception_holder_.Clear(); +void NewIRInterpreter::PrepareForCUDAGraphCapture() { + if (!FLAGS_new_executor_use_cuda_graph) return; +#ifdef PADDLE_WITH_CUDA + PADDLE_ENFORCE_EQ( + platform::IsCUDAGraphCapturing(), + false, + platform::errors::PermissionDenied("CUDA Graph is not allowed to capture " + "before prepare.")); + PADDLE_ENFORCE_EQ(platform::is_gpu_place(place_), + true, + platform::errors::InvalidArgument( + "CUDA Graph is only supported on NVIDIA GPU device.")); + // If set true, will call `cudaStreamSynchronize(nccl_stream)`after allreduce. + // which may cause error in cuda graph. This behavior is consistent with PE. + PADDLE_ENFORCE_EQ(FLAGS_sync_nccl_allreduce, + false, + platform::errors::InvalidArgument( + "FLAGS_sync_nccl_allreduce must be False to support " + "CUDA Graph capturing.")); +#else + PADDLE_THROW(platform::errors::Unimplemented( + "CUDA Graph is only supported on NVIDIA GPU device.")); +#endif +} - for (size_t i = 0; i < dependecy_count_.size(); ++i) { - if (dependecy_count_[i] == 0) { - // NOTE(zhiqiu): hot fix for jit input var - RecordMemcpyD2H(vec_instr.at(i)); - } +void NewIRInterpreter::CheckCUDAGraphBeforeRun( + const std::vector& feed_names) { +#ifdef PADDLE_WITH_CUDA + if (platform::IsCUDAGraphCapturing()) { + PADDLE_ENFORCE_EQ( + feed_names.empty(), + true, + platform::errors::InvalidArgument( + "Feeding data is not permitted when capturing CUDA Graph.")); + PADDLE_ENFORCE_EQ( + FLAGS_new_executor_use_cuda_graph, + true, + platform::errors::InvalidArgument( + "You must turn on FLAGS_new_executor_use_cuda_graph to True " + "to enable CUDA Graph capturing.")); + PADDLE_ENFORCE_EQ( + place_, + platform::CUDAGraphCapturingPlace(), + platform::errors::InvalidArgument("The place to capture CUDAGraph is " + "not the same as the place to run.")); } +#endif +} - // TODO(phlrain) use orignal order for now, use better dependecy - for (auto& instr_node : vec_instruction_) { - /// auto instr_id = trace_execute_order_[idx]; - - RunInstruction(instr_node); - - if (UNLIKELY(exception_holder_.IsCaught())) { - VLOG(4) << "Exception caught"; - break; +void NewIRInterpreter::ClearLoDTensorArrayInLocalScope() { + auto vars = local_scope_->LocalVars(); + for (auto var : vars) { + if (var->IsType()) { + auto* lod_tensor_arr = var->GetMutable(); + lod_tensor_arr->clear(); } } - - if (UNLIKELY(exception_holder_.IsCaught())) { - VLOG(1) << "Exception caught " << exception_holder_.Type(); - PADDLE_ENFORCE_EQ( - main_thread_blocker_.Clear(), - 0, - platform::errors::PreconditionNotMet( - "main_thread_blocker_.Clear() return -1, clear failed")); - VLOG(4) << "clear ok"; - exception_holder_.ReThrow(); - } } -void NewIRInterpreter::RecordMemcpyD2H(const Instruction& instr_node) { - // NOTE(zhiqiu): hot fix for jit input var - if (instr_node.OpBaseValid() && - instr_node.OpBase()->Type() == interpreter::kMemcpyD2H) { - platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto* default_dev_ctx = pool.Get(place_); - for (auto& event : instr_node.EventsToWait()) { - platform::RecordEvent record( - "RecordStreamEvent", platform::TracerEventType::UserDefined, 10); - VLOG(3) << "Record event on default stream in jit_input_var at op: " - << instr_node.OpBase()->Type(); - event.event_->Record(default_dev_ctx); +std::string NewIRInterpreter::GetDepsString() const { + std::stringstream ss; + auto downstream_map = ir_dependency_builder_.OpDownstreamMap(); + ss << "Note: when static_dep is 1, it is ok that the dynamic_dep will not " + "be decreased to 0." + << std::endl; + ss << "unfinished_op_number_:" << unfinished_op_number_ << std::endl; + for (size_t i = 0; i < deps_.size(); ++i) { + ss << "op:" << i << ", type: " << vec_instruction_base_[i]->Name() + << ", static_dep:" << deps_[i]->StaticDep() + << ", dynamic_dep:" << deps_[i]->DynamicDep() << ", downstream op: "; + for (auto id : downstream_map[i]) { + ss << id << ", "; } + ss << std::endl; } + return ss.str(); +} + +bool NewIRInterpreter::HasLocalScope() const { return local_scope_ != nullptr; } + +Scope* NewIRInterpreter::InnerScope() { + return local_scope_ != nullptr ? local_scope_ : scope_; } void NewIRInterpreter::UpdateSyncOpNum() { int64_t sync_op_num = 0; - for (auto& ins : vec_instruction_) { - if (ins.KernelType() == OpFuncType::kCpuSync || - ins.KernelType() == OpFuncType::kGpuSync) { + for (auto& ins : vec_instruction_base_) { + if (ins->KernelType() == OpFuncType::kCpuSync || + ins->KernelType() == OpFuncType::kGpuSync) { sync_op_num = sync_op_num + 1; } } @@ -1486,8 +372,8 @@ void NewIRInterpreter::AnalyseExecuteOrderForTrace( std::vector trace_order; SchedulingQueue ready_ops(compare); - for (size_t instr_id = 0; instr_id < dependecy_count_.size(); ++instr_id) { - if (dependecy_count_[instr_id] == 0) { + for (size_t instr_id = 0; instr_id < dependecy_count_->size(); ++instr_id) { + if ((*dependecy_count_)[instr_id] == 0) { ready_ops.push(instr_id); } } @@ -1508,7 +394,7 @@ void NewIRInterpreter::AnalyseExecuteOrderForTrace( PADDLE_ENFORCE_EQ( trace_order.size(), - dependecy_count_.size(), + dependecy_count_->size(), platform::errors::PreconditionNotMet( "trace_order size should be equal to dependecy_count_.")); @@ -1608,7 +494,10 @@ void NewIRInterpreter::BuildInstructionDependences() { // analysis the dependences between instructions, add next_instr_list to each // instr, and set the dependecy_count_ size_t instr_num = vec_instruction_base_.size(); - dependecy_count_ = std::vector(instr_num, 0); + dependecy_count_ = GetDependencyCount(); + if (!is_shared_results_build_) { + dependecy_count_->assign(instr_num, 0); + } std::vector instructions_ptr; for (auto& instr : vec_instruction_base_) { @@ -1649,8 +538,10 @@ void NewIRInterpreter::BuildInstructionDependences() { } } - for (size_t next_instr_id : next_instr_ids) { - ++dependecy_count_[next_instr_id]; + if (!is_shared_results_build_) { + for (size_t next_instr_id : next_instr_ids) { + ++(*dependecy_count_)[next_instr_id]; + } } } } @@ -1886,7 +777,7 @@ void NewIRInterpreter::CalculateLastLiveOps() { var_ref_count_[i] = last_live_ops_[i].size(); } - for (auto& dep : dependecy_count_) { + for (auto& dep : *dependecy_count_) { deps_.emplace_back(std::make_shared(dep)); } for (size_t i = 0; i < variable_list_.size(); ++i) { @@ -1896,8 +787,8 @@ void NewIRInterpreter::CalculateLastLiveOps() { } void NewIRInterpreter::ConstructEventForJitInput() { - for (size_t i = 0; i < dependecy_count_.size(); ++i) { - if (dependecy_count_[i] == 0) { + for (size_t i = 0; i < dependecy_count_->size(); ++i) { + if ((*dependecy_count_)[i] == 0) { InstructionBase* inst = vec_instruction_base_[i].get(); if (inst->Name() == "pd.memcpy_d2h" && platform::is_gpu_place(place_)) { for (auto& item : inst->Inputs()) { @@ -1918,8 +809,15 @@ void NewIRInterpreter::ConstructEventForJitInput() { } } -FetchList NewIRInterpreter::BetaRun(const std::vector& feed_names, - bool need_fetch) { +paddle::framework::FetchList NewIRInterpreter::Run( + const std::vector& feed_names, + const std::vector& feed_tensors) { + PADDLE_THROW(platform::errors::Unimplemented( + "Run with feed_tensors is not implemented in NewIRInterpreter.")); +} + +FetchList NewIRInterpreter::Run(const std::vector& feed_names, + bool need_fetch) { SetDeviceId(place_); CheckCUDAGraphBeforeRun(feed_names); @@ -1956,22 +854,27 @@ FetchList NewIRInterpreter::BetaRun(const std::vector& feed_names, VLOG(4) << "Done PreAnalysis"; // Run - if (FLAGS_enable_new_ir_in_executor_loop_run) { - LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in BetaRun mode " - "with for_loop version."; - LoopRunImpl(); - } else { + if (FLAGS_enable_new_ir_in_executor_trace_run || + ((execution_config_.used_for_jit || execution_config_.used_for_cinn) && + (sync_op_num_ == 0))) { LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in BetaRun mode " "with trace version."; TraceRunImpl(); + } else { + LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in BetaRun mode " + "with multi thread version."; + MultiThreadRunImpl(); } is_build_ = true; + is_shared_results_build_ = true; } else { - if (FLAGS_enable_new_ir_in_executor_loop_run) { - LoopRunImpl(); - } else { + if (FLAGS_enable_new_ir_in_executor_trace_run || + ((execution_config_.used_for_jit || execution_config_.used_for_cinn) && + (sync_op_num_ == 0))) { TraceRunImpl(); + } else { + MultiThreadRunImpl(); } } @@ -2013,33 +916,34 @@ FetchList NewIRInterpreter::BetaRun(const std::vector& feed_names, } } -void NewIRInterpreter::LoopRunImpl() { +void NewIRInterpreter::TraceRunImpl() { // lazy initialization of gc, do not create gc is the program only run once if (!gc_) { gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_base_); } interpreter::ResetAtomicGuard guard(&deps_, &refs_); - VLOG(4) << "Loop Instruction List"; + VLOG(4) << "Tracing Instruction List"; - LoopRunInstructionList(vec_instruction_base_); - VLOG(4) << "Done LoopRunImpl"; + TraceRunInstructionList(vec_instruction_base_); + VLOG(4) << "Done TraceRunInstructionList"; } -void NewIRInterpreter::TraceRunImpl() { +void NewIRInterpreter::MultiThreadRunImpl() { // lazy initialization of gc, do not create gc is the program only run once if (!gc_) { gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_base_); } interpreter::ResetAtomicGuard guard(&deps_, &refs_); - VLOG(4) << "Tracing Instruction List"; + VLOG(4) << "Multi Thread Run Instruction List"; - TraceRunInstructionList(vec_instruction_base_); - VLOG(4) << "Done TraceRunImpl"; + async_work_queue_ = GetWorkQueue(); + MultiThreadRunInstructionList(vec_instruction_base_); + VLOG(4) << "Done MultiThreadRunInstructionList"; } -void NewIRInterpreter::LoopRunInstructionList( +void NewIRInterpreter::TraceRunInstructionList( const std::vector>& vec_instr) { unfinished_op_number_ = vec_instr.size(); if (unfinished_op_number_ == 0) { @@ -2049,17 +953,19 @@ void NewIRInterpreter::LoopRunInstructionList( exception_holder_.Clear(); - for (size_t i = 0; i < dependecy_count_.size(); ++i) { - if (dependecy_count_[i] == 0) { + for (size_t i = 0; i < dependecy_count_->size(); ++i) { + if ((*dependecy_count_)[i] == 0) { // NOTE(zhiqiu): hot fix for jit input var RecordMemcpyD2H(vec_instr.at(i).get()); } } - for (size_t idx = 0; idx < vec_instr.size(); idx++) { - InstructionBase* instr_node = vec_instr[idx].get(); + for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) { + auto instr_id = trace_execute_order_[idx]; + InstructionBase* instr_node = vec_instruction_base_.at(instr_id).get(); - VLOG(6) << "Run InstructionBase " << idx; + VLOG(6) << "Run InstructionBase " << instr_node->Name() << "[" << instr_id + << "]"; RunInstructionBase(instr_node); if (UNLIKELY(exception_holder_.IsCaught())) { @@ -2078,10 +984,10 @@ void NewIRInterpreter::LoopRunInstructionList( VLOG(4) << "clear ok"; exception_holder_.ReThrow(); } - VLOG(4) << "Done LoopRunInstructionList"; + VLOG(4) << "Done TraceRunInstructionList"; } -void NewIRInterpreter::TraceRunInstructionList( +void NewIRInterpreter::MultiThreadRunInstructionList( const std::vector>& vec_instr) { unfinished_op_number_ = vec_instr.size(); if (unfinished_op_number_ == 0) { @@ -2091,29 +997,67 @@ void NewIRInterpreter::TraceRunInstructionList( exception_holder_.Clear(); - for (size_t i = 0; i < dependecy_count_.size(); ++i) { - if (dependecy_count_[i] == 0) { + for (size_t i = 0; i < dependecy_count_->size(); ++i) { + if ((*dependecy_count_)[i] == 0) { // NOTE(zhiqiu): hot fix for jit input var RecordMemcpyD2H(vec_instr.at(i).get()); + if (FLAGS_new_executor_serial_run) { + RunInstructionBaseAsync(i); + } else { + async_work_queue_->AddTask(vec_instr.at(i)->KernelType(), + [this, i] { RunInstructionBaseAsync(i); }); + } } } - for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) { - auto instr_id = trace_execute_order_[idx]; - InstructionBase* instr_node = vec_instruction_base_.at(instr_id).get(); + // For debug hang in main_thread_blocker_.WaitEvent(), + // launch async task to log deps every + // FLAGS_executor_log_deps_every_microseconds, then cancel the std::async when + // main_thread_blocker_.WaitEvent() executed. Why not use std::async instead + // of workqueue? To make sure that the logging thread itself will not affect + // the workqueue + // used in interpretercore. - VLOG(6) << "Run InstructionBase " << instr_node->Name() << "[" << instr_id - << "]"; - RunInstructionBase(instr_node); + std::future logged_times; + std::atomic_bool cancel_log = ATOMIC_VAR_INIT(false); + if (FLAGS_executor_log_deps_every_microseconds) { + logged_times = std::async( + std::launch::async, + [this](const std::atomic_bool& cancel) { + int times = 0; + while (!cancel) { + std::this_thread::sleep_for(std::chrono::microseconds( + FLAGS_executor_log_deps_every_microseconds)); + // check again, since cancel may be changed during sleep + if (cancel) { + break; + } + VLOG(0) << "deps:\n" << GetDepsString(); + times++; + } + return times; + }, + std::ref(cancel_log)); + } - if (UNLIKELY(exception_holder_.IsCaught())) { - VLOG(4) << "Exception caught"; - break; - } + auto event_name = main_thread_blocker_.WaitEvent(); + VLOG(1) << "main_thread_blocker_(" << &main_thread_blocker_ + << ") got event_name: " << event_name; + + cancel_log = true; + if (logged_times.valid()) { + VLOG(1) << "Logged deps for " << logged_times.get() << " times"; } if (UNLIKELY(exception_holder_.IsCaught())) { VLOG(1) << "Exception caught " << exception_holder_.Type(); + // Graceful exit when the executor encountered a fatal error. + // EOF is not a fatal error. + if (exception_holder_.Type() != "EOF") { + async_work_queue_->Cancel(); + async_work_queue_.reset(); + } + VLOG(4) << "Cancel ok"; PADDLE_ENFORCE_EQ( main_thread_blocker_.Clear(), 0, @@ -2122,7 +1066,66 @@ void NewIRInterpreter::TraceRunInstructionList( VLOG(4) << "clear ok"; exception_holder_.ReThrow(); } - VLOG(4) << "Done TraceRunInstructionList"; +} + +void NewIRInterpreter::RunInstructionBaseAsync(size_t instr_id) { + // NOTE(Ruibiao): Due to the uncertain order in multi-threading asynchronous + // scheduling, the priority order involved cross-thread scheduling is not + // guaranteed. Only Ops scheduled by the same AddTask call have the guarantee + // of priority order. + SchedulingQueue ready_ops(ir_instruction_scheduling_priority_less); + ready_ops.push(instr_id); + while (!ready_ops.empty()) { + instr_id = ready_ops.top(); + ready_ops.pop(); + auto* instr_node = vec_instruction_base_.at(instr_id).get(); + + RunInstructionBase(instr_node); + + if (UNLIKELY(exception_holder_.IsCaught())) { + VLOG(4) << "Exception caught"; + if (exception_notifier_ != nullptr) { + exception_notifier_->NotifyEvent(); + } + return; + } + + VLOG(4) << "unfinished_op_number_: " << unfinished_op_number_; + if (UNLIKELY(unfinished_op_number_.fetch_sub( + 1, std::memory_order_relaxed) == 1)) { + if (completion_notifier_ != nullptr) { + completion_notifier_->NotifyEvent(); + } + } + + RunNextInstructions(instr_node, &ready_ops); + } +} + +void NewIRInterpreter::RunNextInstructions(InstructionBase* instr, + SchedulingQueue* reserved_next_ops) { + platform::RecordEvent record( + "RunNextInstructions", platform::TracerEventType::UserDefined, 10); + + auto IsReady = [this](size_t next_id) { + VLOG(4) << "op_id: " << next_id + << ", remain deps: " << deps_[next_id]->DynamicDep(); + return deps_[next_id]->CheckAndDecrease(); + }; + + for (size_t next_instr_id : instr->NextInstrsInDifferenceThread()) { + if (IsReady(next_instr_id)) { + async_work_queue_->AddTask( + vec_instruction_base_[next_instr_id]->KernelType(), + [this, next_instr_id]() { RunInstructionBaseAsync(next_instr_id); }); + } + } + + for (size_t next_instr_id : instr->NextInstrsInSameThread()) { + if (IsReady(next_instr_id)) { + reserved_next_ops->push(next_instr_id); + } + } } void NewIRInterpreter::RunInstructionBase(InstructionBase* instr_node) { @@ -2178,6 +1181,9 @@ void NewIRInterpreter::PreAnalysis() { AnalyseExecuteOrderForTrace(ir_dependency_builder_.OpDownstreamMap(), ir_instruction_scheduling_priority_less); VLOG(4) << "Done AnalyseExecuteOrderForTrace"; + + UpdateSyncOpNum(); + VLOG(4) << "Done UpdateSyncOpNum"; } ::ir::Value NewIRInterpreter::GetValueByName(const std::string& var_name) { diff --git a/paddle/fluid/framework/new_executor/new_ir_interpreter.h b/paddle/fluid/framework/new_executor/new_ir_interpreter.h index 6c9f975a0ef41..a6b24a8200a8b 100644 --- a/paddle/fluid/framework/new_executor/new_ir_interpreter.h +++ b/paddle/fluid/framework/new_executor/new_ir_interpreter.h @@ -49,10 +49,6 @@ class NewIRInterpreter : public InterpreterBaseImpl { paddle::framework::FetchList Run(const std::vector& feed_names, bool need_fetch = true) override; - paddle::framework::FetchList BetaRun( - const std::vector& feed_names, - bool need_fetch = true) override; - void ShareWorkQueueFrom(InterpreterBaseImpl* src) override; void ShareBuildResultsFrom(const InterpreterBaseImpl& src) override; @@ -92,10 +88,6 @@ class NewIRInterpreter : public InterpreterBaseImpl { private: // build graph - void Convert(std::vector* op_func_nodes); - void BuildOperatorDependences(); - void BuildAndCacheInstructionCtx(Instruction* instr_node); - void BuildSkipShareLoDInfo(); void UpdateSyncOpNum(); void AnalyseExecuteOrderForTrace( std::map> op_downstream_map, @@ -103,39 +95,13 @@ class NewIRInterpreter : public InterpreterBaseImpl { void ConstructEventForJitInput(); void CalculateLastLiveOps(); - // inplace - void BuildInplace(); - bool BuildInplaceCheckVarIsOnlyInput( - const std::vector>& input_var2op, size_t var_index); - void SetFeedVarsInplaceSkip(const std::vector& feed_names); + // gc + void ClearLoDTensorArrayInLocalScope(); // cuda graph void CheckCUDAGraphBeforeRun(const std::vector& feed_names); void PrepareForCUDAGraphCapture(); - // execution - void RunImpl(); - void ExecuteInstructionList(const std::vector& vec_instr); - void RunInstructionAsync(size_t instr_id); - void RunInstruction(const Instruction& instr_node); - void RunNextInstructions(const Instruction& instr_id, - SchedulingQueue* reserved_next_ops); - void RunOperator(const Instruction& instr_node); - // Trace - void TraceInstructionList(const std::vector& vec_instr); - - // only used when program contains no feed op - void Prepare(const std::vector& feed_names, - const std::vector& feed_tensors, - bool prepare_feed); - - void RecordMemcpyD2H(const Instruction& instr_node); - - // gc - void RecordStreamForGC(const Instruction& instr); - void CheckGC(const Instruction& instr); - void ClearLoDTensorArrayInLocalScope(); - // workqueue std::shared_ptr GetWorkQueue(); @@ -150,23 +116,12 @@ class NewIRInterpreter : public InterpreterBaseImpl { bool is_build_{false}; bool static_build_{false}; - const platform::Place place_; + // Note(sonder): share the op dependency and event analysis procedure. + bool is_shared_results_build_{false}; - interpreter::DependencyBuilder dependency_builder_; - interpreter::StreamAnalyzer stream_analyzer_; - - // NOTE(zhiqiu): when add fetch ops in GetInterpreterCore, we will - // copy a new program and block, the copy_program_ here is used to - // hold the program, otherwise block_ maybe not valid after the - // new program is deleted. - std::shared_ptr copy_program_{nullptr}; + const platform::Place place_; // from variable scope - std::vector var_list_; - std::map name2id_; - std::vector vec_meta_info_; - - std::vector vec_instruction_; // deconstruct before OpFuncNode std::atomic unfinished_op_number_{0}; @@ -189,9 +144,9 @@ class NewIRInterpreter : public InterpreterBaseImpl { // var std::map> last_live_ops_; - // dependecy_count_[i] contains the number of dependencies that the i-th op + // (*dependecy_count_)[i] contains the number of dependencies that the i-th op // need to wait - std::vector dependecy_count_; + std::shared_ptr> dependecy_count_; std::vector> deps_; std::vector> refs_; @@ -200,8 +155,6 @@ class NewIRInterpreter : public InterpreterBaseImpl { int64_t sync_op_num_{-1}; std::vector trace_execute_order_; - InstructionSchedulingPriorityLess instruction_scheduling_priority_less; - std::vector hookfuncs_; /// ======================== /// @@ -215,16 +168,21 @@ class NewIRInterpreter : public InterpreterBaseImpl { void BuildInstructionDependences(); - void LoopRunImpl(); - void TraceRunImpl(); void TraceRunInstructionList( const std::vector>& vec_instr); - void LoopRunInstructionList( + void MultiThreadRunImpl(); + + void MultiThreadRunInstructionList( const std::vector>& vec_instr); + void RunInstructionBaseAsync(size_t instr_id); + + void RunNextInstructions(InstructionBase* instr, + SchedulingQueue* reserved_next_ops); + void RunInstructionBase(InstructionBase* instr_node); void RecordMemcpyD2H(InstructionBase* instr_node); @@ -237,6 +195,12 @@ class NewIRInterpreter : public InterpreterBaseImpl { void SolvePersisableVarNames(); + const interpreter::NewIrDependencyBuilder& GetNewIrDependencyBuilder() + const override; + + const interpreter::NewIrStreamAnalyzer& GetNewIrStreamAnalyzer() + const override; + InstructionSchedulingPriorityLess ir_instruction_scheduling_priority_less; std::unique_ptr<::ir::Program> ir_program_{nullptr}; diff --git a/paddle/fluid/framework/new_executor/program_interpreter.cc b/paddle/fluid/framework/new_executor/program_interpreter.cc index 7f12e6b9100b8..ca5f7f9eb490e 100644 --- a/paddle/fluid/framework/new_executor/program_interpreter.cc +++ b/paddle/fluid/framework/new_executor/program_interpreter.cc @@ -222,11 +222,6 @@ FetchList ProgramInterpreter::Run(const std::vector& feed_names, } } -FetchList ProgramInterpreter::BetaRun( - const std::vector& feed_names, bool need_fetch) { - return {}; -} - void ProgramInterpreter::SetCopyProgram(std::shared_ptr prog) { copy_program_ = prog; } @@ -348,6 +343,18 @@ const interpreter::StreamAnalyzer& ProgramInterpreter::GetStreamAnalyzer() return stream_analyzer_; } +const interpreter::NewIrDependencyBuilder& +ProgramInterpreter::GetNewIrDependencyBuilder() const { + PADDLE_THROW(platform::errors::Unimplemented( + "GetDependencyBuilder is not implemented in ProgramInterpreter.")); +} + +const interpreter::NewIrStreamAnalyzer& +ProgramInterpreter::GetNewIrStreamAnalyzer() const { + PADDLE_THROW(platform::errors::Unimplemented( + "GetDependencyBuilder is not implemented in ProgramInterpreter.")); +} + bool ProgramInterpreter::IsSharedResultsBuild() const { return is_shared_results_build_; } diff --git a/paddle/fluid/framework/new_executor/program_interpreter.h b/paddle/fluid/framework/new_executor/program_interpreter.h index a942425609c18..e21fe6e0a49bb 100644 --- a/paddle/fluid/framework/new_executor/program_interpreter.h +++ b/paddle/fluid/framework/new_executor/program_interpreter.h @@ -48,10 +48,6 @@ class ProgramInterpreter : public InterpreterBaseImpl { paddle::framework::FetchList Run(const std::vector& feed_names, bool need_fetch = true) override; - paddle::framework::FetchList BetaRun( - const std::vector& feed_names, - bool need_fetch = true) override; - void ShareWorkQueueFrom(InterpreterBaseImpl* src) override; void ShareBuildResultsFrom(const InterpreterBaseImpl& src) override; @@ -63,6 +59,12 @@ class ProgramInterpreter : public InterpreterBaseImpl { const interpreter::StreamAnalyzer& GetStreamAnalyzer() const override; + const interpreter::NewIrDependencyBuilder& GetNewIrDependencyBuilder() + const override; + + const interpreter::NewIrStreamAnalyzer& GetNewIrStreamAnalyzer() + const override; + bool IsSharedResultsBuild() const override; void SetCopyProgram(std::shared_ptr prog) override; diff --git a/paddle/phi/core/flags.cc b/paddle/phi/core/flags.cc index 56dc16a1b3772..3cb37db9af344 100644 --- a/paddle/phi/core/flags.cc +++ b/paddle/phi/core/flags.cc @@ -1283,25 +1283,13 @@ PHI_DEFINE_EXPORTED_bool(enable_new_ir_api, /** * Using new IR in executor FLAG - * Name: enable_new_ir_in_executor_beta_run - * Since Version: 2.6.0 - * Value Range: bool, default=true - * Example: - * Note: If Ture, executor will use new IR and run in beta version. - */ -PHI_DEFINE_EXPORTED_bool(enable_new_ir_in_executor_beta_run, - true, - "Enable new IR in executor"); - -/** - * Using new IR in executor FLAG - * Name: enable_new_ir_in_executor_loop_run + * Name: enable_new_ir_in_executor_trace_run * Since Version: 2.6.0 * Value Range: bool, default=false * Example: - * Note: If Ture, executor will use new IR and run in beta version by for loop + * Note: If Ture, executor will use new IR and run in beta version by for trace * version. */ -PHI_DEFINE_EXPORTED_bool(enable_new_ir_in_executor_loop_run, +PHI_DEFINE_EXPORTED_bool(enable_new_ir_in_executor_trace_run, false, "Enable new IR in executor"); diff --git a/test/cpp/new_executor/standalone_executor_new_ir_test.cc b/test/cpp/new_executor/standalone_executor_new_ir_test.cc index b1eda24e8a11d..c59ddd84aec87 100644 --- a/test/cpp/new_executor/standalone_executor_new_ir_test.cc +++ b/test/cpp/new_executor/standalone_executor_new_ir_test.cc @@ -77,7 +77,7 @@ TEST(StandaloneExecutor, run) { std::string out_name = os.str() + "_inner_var_2"; test_core.SetSkipGcVars({out_name}); - test_core.BetaRun({}); + test_core.Run({}); auto out_tensor = test_core.local_scope() == nullptr @@ -118,7 +118,7 @@ TEST(StandaloneExecutor, run_inplace_sqrt) { std::string out_name = os.str() + "_inner_var_0"; test_core.SetSkipGcVars({out_name}); - test_core.BetaRun({}); + test_core.Run({}); auto out_tensor = test_core.local_scope() == nullptr diff --git a/test/cpp/prim/test_vjp.cc b/test/cpp/prim/test_vjp.cc index 9f7633c008176..9eb865a579765 100644 --- a/test/cpp/prim/test_vjp.cc +++ b/test/cpp/prim/test_vjp.cc @@ -76,7 +76,7 @@ TEST(VJP, TanhBackwardTest) { std::string prefix_str = os.str(); test_core.SetSkipGcVars( {prefix_str + "_inner_var_1", prefix_str + "_inner_var_3"}); - test_core.BetaRun({}); + test_core.Run({}); auto out_tensor = test_core.local_scope() == nullptr ? scope.FindVar(prefix_str + "_inner_var_1")->Get() @@ -130,7 +130,7 @@ TEST(VJP, Tanh_BackwardTest) { std::string prefix_str = os.str(); test_core.SetSkipGcVars( {prefix_str + "_inner_var_0", prefix_str + "_inner_var_2"}); - test_core.BetaRun({}); + test_core.Run({}); auto out_tensor = test_core.local_scope() == nullptr ? scope.FindVar(prefix_str + "_inner_var_0")->Get() @@ -184,7 +184,7 @@ TEST(VJP, MeanBackwardTest) { std::string prefix_str = os.str(); test_core.SetSkipGcVars( {prefix_str + "_inner_var_1", prefix_str + "_inner_var_3"}); - test_core.BetaRun({}); + test_core.Run({}); auto out_tensor = test_core.local_scope() == nullptr ? scope.FindVar(prefix_str + "_inner_var_1")->Get()