Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/develop' into thread_pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Aurelius84 committed Sep 26, 2021
2 parents 3c4c555 + b23b17c commit 1b34770
Show file tree
Hide file tree
Showing 104 changed files with 6,559 additions and 753 deletions.
6 changes: 3 additions & 3 deletions cmake/external/lite.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ if (NOT LITE_SOURCE_DIR OR NOT LITE_BINARY_DIR)
set(LITE_INSTALL_DIR ${THIRD_PARTY_PATH}/install/lite)

if(NOT LITE_GIT_TAG)
set(LITE_GIT_TAG d3a3a6931b6d22d504d21ba32b3ae972770e9204)
set(LITE_GIT_TAG 4ab64daecc11fbf74fffdc6a4733f388472e7d5d)
endif()

if(NOT CUDA_ARCH_NAME)
Expand Down Expand Up @@ -197,9 +197,9 @@ set(LITE_SHARED_LIB ${LITE_BINARY_DIR}/${LITE_OUTPUT_BIN_DIR}/cxx/lib/libpaddle_
if (LITE_WITH_NNADAPTER)
set(LITE_NNADAPTER_LIB ${LITE_BINARY_DIR}/${LITE_OUTPUT_BIN_DIR}/cxx/lib/libnnadapter.so)
if (NNADAPTER_WITH_HUAWEI_ASCEND_NPU)
external_lite_libs(lite_nnadapter ${LITE_BINARY_DIR}/${LITE_OUTPUT_BIN_DIR}/cxx/lib/libnnadapter.so ${LITE_BINARY_DIR}/${LITE_OUTPUT_BIN_DIR}/cxx/lib/libnnadapter_driver_huawei_ascend_npu.so)
external_lite_libs(lite_nnadapter ${LITE_BINARY_DIR}/${LITE_OUTPUT_BIN_DIR}/cxx/lib/libnnadapter.so ${LITE_BINARY_DIR}/${LITE_OUTPUT_BIN_DIR}/cxx/lib/libhuawei_ascend_npu.so)
set(LITE_DEPS lite_full_static lite_nnadapter)
set(LITE_NNADAPTER_NPU_LIB ${LITE_BINARY_DIR}/${LITE_OUTPUT_BIN_DIR}/cxx/lib/libnnadapter_driver_huawei_ascend_npu.so)
set(LITE_NNADAPTER_NPU_LIB ${LITE_BINARY_DIR}/${LITE_OUTPUT_BIN_DIR}/cxx/lib/libhuawei_ascend_npu.so)
endif()
else()
set(LITE_DEPS lite_full_static)
Expand Down
2 changes: 1 addition & 1 deletion cmake/external/xpu.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ELSE ()
ENDIF()

SET(XPU_BASE_URL_WITHOUT_DATE "https://baidu-kunlun-product.cdn.bcebos.com/KL-SDK/klsdk-dev")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20210909")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20210921")
SET(XPU_XRE_URL "${XPU_BASE_URL}/${XPU_XRE_DIR_NAME}.tar.gz" CACHE STRING "" FORCE)
SET(XPU_XDNN_URL "${XPU_BASE_URL}/${XPU_XDNN_DIR_NAME}.tar.gz" CACHE STRING "" FORCE)
SET(XPU_XCCL_URL "${XPU_BASE_URL_WITHOUT_DATE}/20210623/${XPU_XCCL_DIR_NAME}.tar.gz" CACHE STRING "" FORCE)
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/framework/ir/multihead_matmul_fuse_pass.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static int BuildFusion(Graph* graph, const std::string& name_scope) {
// BOOST_GET_CONST(bool, scale->Op()->GetAttr("bias_after_scale"));

// create multihead
OpDesc multihead_op_desc;
OpDesc multihead_op_desc(mul0->Op()->Block());

// create tmp tensor
VarDesc k_var_desc(*mul1_out->Var());
Expand Down Expand Up @@ -847,7 +847,7 @@ int MultiHeadMatmulV2FusePass::BuildFusionV2(Graph* graph,
int head_number =
BOOST_GET_CONST(std::vector<int>, reshape_desc->GetAttr("shape")).at(2);

OpDesc multihead_op_desc;
OpDesc multihead_op_desc(mul0->Op()->Block());
multihead_op_desc.SetType("multihead_matmul");

multihead_op_desc.SetInput("Input", {input0->Name()});
Expand Down Expand Up @@ -1287,7 +1287,7 @@ int MultiHeadMatmulV3FusePass::BuildFusionV3(Graph* graph,
int head_number =
BOOST_GET_CONST(std::vector<int>, reshape_desc->GetAttr("shape")).at(2);

OpDesc multihead_op_desc;
OpDesc multihead_op_desc(mul0->Op()->Block());
multihead_op_desc.SetType("multihead_matmul");

multihead_op_desc.SetInput("Input", {input0->Name()});
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/new_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ set(INTERPRETERCORE_DEPS op_registry device_context scope framework_proto data_f
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)

cc_library(workqueue SRCS workqueue.cc DEPS enforce)
cc_library(workqueue SRCS workqueue.cc workqueue_utils.cc DEPS enforce)
cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS})
cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS} workqueue)
cc_library(event_manager SRCS event_manager.cc DEPS ${DEVICE_EVENT_LIBS} glog)
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/framework/new_executor/event_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include <cstdlib>
#include <mutex>
#include <vector>
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"

namespace paddle {
namespace framework {
Expand All @@ -60,7 +61,7 @@ class EventCount {

explicit EventCount(size_t waiter_num) : state_(kStackMask) {
assert(waiter_num < (1 << kWaiterBits) - 1);
void* buffer = malloc(sizeof(Waiter) * waiter_num);
void* buffer = AlignedMalloc(sizeof(Waiter) * waiter_num, alignof(Waiter));
if (buffer == nullptr) {
return;
}
Expand All @@ -78,7 +79,7 @@ class EventCount {
~EventCount() {
// Ensure there are no waiters.
assert(state_.load() == kStackMask);
free(waiters_);
AlignedFree(waiters_);
}

Waiter* GetWaiter(size_t waiter_index) {
Expand Down
30 changes: 18 additions & 12 deletions paddle/fluid/framework/new_executor/nonblocking_threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ class TaskTracker {
}

private:
std::atomic<uint64_t> num_tasks_{0};
EventCount wait_empty_cv_;
std::atomic<bool> wait_empty_{false};
alignas(64) std::atomic<uint64_t> num_tasks_{0};
alignas(64) EventCount wait_empty_cv_;
alignas(64) std::atomic<bool> wait_empty_{false};
};

template <typename Environment>
Expand All @@ -70,15 +70,16 @@ class ThreadPoolTempl {
ThreadPoolTempl(int num_threads, bool allow_spinning,
Environment env = Environment())
: env_(env),
num_threads_(num_threads),
allow_spinning_(allow_spinning),
thread_data_(num_threads),
global_steal_partition_(EncodePartition(0, num_threads_)),
blocked_(0),
num_tasks_(0),
spinning_(0),
done_(false),
cancelled_(false),
ec_(num_threads_) {
ec_(num_threads),
num_threads_(num_threads),
thread_data_(num_threads) {
// Calculate coprimes of all numbers [1, num_threads].
// Coprimes are used for random walks over all threads in Steal
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take
Expand Down Expand Up @@ -143,6 +144,7 @@ class ThreadPoolTempl {
void AddTaskWithHint(std::function<void()> fn, int start, int limit) {
Task t = env_.CreateTask(std::move(fn));
PerThread* pt = GetPerThread();
uint64_t num_tasks = num_tasks_.fetch_add(1, std::memory_order_relaxed) + 1;
if (pt->pool == this) {
// Worker thread of this pool, push onto the thread's queue.
Queue& q = thread_data_[pt->thread_id].queue;
Expand All @@ -166,8 +168,11 @@ class ThreadPoolTempl {
// this. We expect that such scenario is prevented by program, that is,
// this is kept alive while any threads can potentially be in Schedule.
if (!t.f) {
ec_.Notify(false);
if (num_tasks > num_threads_ - blocked_.load(std::memory_order_relaxed)) {
ec_.Notify(false);
}
} else {
num_tasks_.fetch_sub(1, std::memory_order_relaxed);
env_.ExecuteTask(t); // Push failed, execute directly.
}
}
Expand Down Expand Up @@ -259,16 +264,17 @@ class ThreadPoolTempl {
};

Environment env_;
const int num_threads_;
const bool allow_spinning_;
std::vector<ThreadData> thread_data_;
std::vector<std::vector<unsigned>> all_coprimes_;
unsigned global_steal_partition_;
std::atomic<unsigned> blocked_;
std::atomic<uint64_t> num_tasks_;
std::atomic<bool> spinning_;
std::atomic<bool> done_;
std::atomic<bool> cancelled_;
EventCount ec_;
const int num_threads_;
std::vector<ThreadData> thread_data_;

// Main worker thread loop.
void WorkerLoop(int thread_id) {
Expand Down Expand Up @@ -305,6 +311,7 @@ class ThreadPoolTempl {
}
if (t.f) {
env_.ExecuteTask(t);
num_tasks_.fetch_sub(1, std::memory_order_relaxed);
}
}
} else {
Expand All @@ -315,16 +322,14 @@ class ThreadPoolTempl {
if (!t.f) {
t = GlobalSteal();
if (!t.f) {
// Leave one thread spinning. This reduces latency.
if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) {
if (allow_spinning_) {
for (int i = 0; i < spin_count && !t.f; i++) {
if (!cancelled_.load(std::memory_order_relaxed)) {
t = GlobalSteal();
} else {
return;
}
}
spinning_ = false;
}
if (!t.f) {
if (!WaitForWork(waiter, &t)) {
Expand All @@ -336,6 +341,7 @@ class ThreadPoolTempl {
}
if (t.f) {
env_.ExecuteTask(t);
num_tasks_.fetch_sub(1, std::memory_order_relaxed);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/new_executor/run_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ class RunQueue {
kReady,
};

std::mutex mutex_;
// Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
// front/back, respectively. The remaining bits contain modification counters
// that are incremented on Push operations. This allows us to (1) distinguish
Expand All @@ -214,6 +213,7 @@ class RunQueue {
// modification counters.
alignas(64) std::atomic<unsigned> front_;
alignas(64) std::atomic<unsigned> back_;
std::mutex mutex_;
Elem array_[kSize];

// SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
Expand Down
16 changes: 12 additions & 4 deletions paddle/fluid/framework/new_executor/workqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ class WorkQueueImpl : public WorkQueue {
explicit WorkQueueImpl(const WorkQueueOptions& options)
: WorkQueue(options), queue_(nullptr), tracker_(nullptr) {
if (options_.track_task) {
tracker_ = new TaskTracker;
void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker));
tracker_ = new (storage) TaskTracker;
}
queue_ = new NonblockingThreadPool(options_.num_threads,
options_.allow_spinning);
}

virtual ~WorkQueueImpl() {
delete tracker_;
if (tracker_ != nullptr) {
tracker_->~TaskTracker();
AlignedFree(tracker_);
}
delete queue_;
}

Expand Down Expand Up @@ -89,7 +93,8 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
for (size_t idx = 0; idx < num_queues; ++idx) {
const auto& options = queues_options_[idx];
if (options.track_task && tracker_ == nullptr) {
tracker_ = new TaskTracker;
void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker));
tracker_ = new (storage) TaskTracker;
}
queues_[idx] = new (&queues_storage_[idx])
NonblockingThreadPool(options.num_threads, options.allow_spinning);
Expand All @@ -100,7 +105,10 @@ WorkQueueGroupImpl::~WorkQueueGroupImpl() {
for (auto queue : queues_) {
queue->~NonblockingThreadPool();
}
delete tracker_;
if (tracker_ != nullptr) {
tracker_->~TaskTracker();
AlignedFree(tracker_);
}
free(queues_storage_);
}

Expand Down
59 changes: 59 additions & 0 deletions paddle/fluid/framework/new_executor/workqueue_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/new_executor/workqueue_utils.h"
#include <cstdint>
#include <cstdlib>

namespace paddle {
namespace framework {

void* AlignedMalloc(size_t size, size_t alignment) {
assert(alignment >= sizeof(void*) && (alignment & (alignment - 1)) == 0);
size = (size + alignment - 1) / alignment * alignment;
#if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L
void* aligned_mem = nullptr;
if (posix_memalign(&aligned_mem, alignment, size) != 0) {
aligned_mem = nullptr;
}
return aligned_mem;
#elif defined(_WIN32)
return _aligned_malloc(size, alignment);
#else
void* mem = malloc(size + alignment);
if (mem == nullptr) {
return nullptr;
}
size_t adjust = alignment - reinterpret_cast<uint64_t>(mem) % alignment;
void* aligned_mem = reinterpret_cast<char*>(mem) + adjust;
*(reinterpret_cast<void**>(aligned_mem) - 1) = mem;
assert(reinterpret_cast<uint64_t>(aligned_mem) % alignment == 0);
return aligned_mem;
#endif
}

void AlignedFree(void* mem_ptr) {
#if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L
free(mem_ptr);
#elif defined(_WIN32)
_aligned_free(mem_ptr);
#else
if (mem_ptr) {
free(*(reinterpret_cast<void**>(mem_ptr) - 1));
}
#endif
}

} // namespace framework
} // namespace paddle
4 changes: 4 additions & 0 deletions paddle/fluid/framework/new_executor/workqueue_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,9 @@ class CounterGuard {
Holder* counter_holder_{nullptr};
};

void* AlignedMalloc(size_t size, size_t alignment);

void AlignedFree(void* memory_ptr);

} // namespace framework
} // namespace paddle
4 changes: 3 additions & 1 deletion paddle/fluid/imperative/partial_grad_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,12 @@ static void GetGraphInfoBetweenTargets(
}

for (auto &pending_node : node->GradPendingNodes()) {
for (auto &pending_op : *pending_node) {
preceding_ops[&pending_op].insert(op);
}
if (visited.count(pending_node.get()) == 0) {
visited.insert(pending_node.get());
for (auto &pending_op : *pending_node) {
preceding_ops[&pending_op].insert(op);
q.emplace(&pending_op, pending_node.get());
}
}
Expand Down
21 changes: 18 additions & 3 deletions paddle/fluid/inference/api/analysis_predictor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,24 @@ void AnalysisPredictor::OptimizeInferenceProgram() {
// Note, please do NOT use any member variables, because member variables may
// have been destructed in multiple threads.
#if PADDLE_WITH_TENSORRT
paddle::inference::Singleton<
inference::tensorrt::TRTEngineManager>::Global()
.DeleteAll();
auto &block = prog->Block(0);
for (auto &op_desc : block.AllOps()) {
if (op_desc->Type() == "tensorrt_engine") {
std::string engine_key =
BOOST_GET_CONST(std::string, op_desc->GetAttr("engine_key"));
int engine_predictor_id =
BOOST_GET_CONST(int, op_desc->GetAttr("predictor_id"));
std::string engine_name =
engine_key + std::to_string(engine_predictor_id);
if (paddle::inference::Singleton<
inference::tensorrt::TRTEngineManager>::Global()
.Has(engine_name)) {
paddle::inference::Singleton<
inference::tensorrt::TRTEngineManager>::Global()
.DeleteKey(engine_name);
}
}
}
#endif
delete prog;
});
Expand Down
Loading

1 comment on commit 1b34770

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.