Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-thread inference example which shares the inference_program and parameters #9302

Merged
merged 12 commits into from
Apr 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions paddle/fluid/framework/program_desc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ ProgramDesc::ProgramDesc(const std::string &binary_str) {
}

const std::vector<std::string> ProgramDesc::GetFeedTargetNames() {
BlockDesc *global_block = blocks_[0].get();
auto &global_block = Block(0);
std::vector<std::string> feed_target_names;
for (auto *op : global_block->AllOps()) {
for (auto *op : global_block.AllOps()) {
if (op->Type() == kFeedOpType) {
feed_target_names.insert(feed_target_names.begin(), op->Output("Out")[0]);
}
Expand All @@ -96,15 +96,53 @@ const std::vector<std::string> ProgramDesc::GetFeedTargetNames() {
}

const std::vector<std::string> ProgramDesc::GetFetchTargetNames() {
BlockDesc *global_block = blocks_[0].get();
auto &global_block = Block(0);
std::vector<std::string> fetch_target_names;
for (auto *op : global_block->AllOps()) {
for (auto *op : global_block.AllOps()) {
if (op->Type() == kFetchOpType) {
fetch_target_names.push_back(op->Input("X")[0]);
}
}
return fetch_target_names;
}

void ProgramDesc::SetFeedHolderName(const std::string &feed_holder_name) {
auto *global_block = MutableBlock(0);
int index = 0;
for (auto *op : global_block->AllOps()) {
if (op->Type() == kFeedOpType) {
// Unify the input's name of all feed_ops to feed_holder_name
global_block->RemoveVar(op->Input("X")[0]);
op->SetInput("X", {feed_holder_name});
op->SetAttr("col", {index});
op->CheckAttrs();
index++;
}
}

auto *feed_holder = global_block->Var(feed_holder_name);
feed_holder->SetType(proto::VarType::FEED_MINIBATCH);
feed_holder->SetPersistable(true);
}

void ProgramDesc::SetFetchHolderName(const std::string &fetch_holder_name) {
auto *global_block = MutableBlock(0);
int index = 0;
for (auto *op : global_block->AllOps()) {
if (op->Type() == kFetchOpType) {
// Unify the output's name of all fetch_ops to fetch_holder_name
global_block->RemoveVar(op->Output("Out")[0]);
op->SetOutput("Out", {fetch_holder_name});
op->SetAttr("col", {index});
op->CheckAttrs();
index++;
}
}

auto *fetch_holder = global_block->Var(fetch_holder_name);
fetch_holder->SetType(proto::VarType::FETCH_LIST);
fetch_holder->SetPersistable(true);
}

} // namespace framework
} // namespace paddle
18 changes: 18 additions & 0 deletions paddle/fluid/framework/program_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#pragma once

#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/framework.pb.h"
Expand Down Expand Up @@ -52,9 +53,26 @@ class ProgramDesc {

proto::ProgramDesc *Proto();

// The output variable of feed_op is referenced as feed_target.
// This function is used to collect the output variable's name of all
// feed_ops.
const std::vector<std::string> GetFeedTargetNames();

// The input variable of fetch_op is referenced as fetch_target.
// This function is used to collect the input variable's name of all
// fetch_ops.
const std::vector<std::string> GetFetchTargetNames();

// The input variable of feed_op that holds input Tensor provided by users is
// referenced as feed_holder.
// This function is used to change or unify the feed_holder variables' name.
void SetFeedHolderName(const std::string &feed_holder_name);

// The output variable of fetch_op that holds output Tensor needed by users is
// referenced as fetch_holder.
// This function is used to change or unify the fetch_holder variables' name.
void SetFetchHolderName(const std::string &fetch_holder_name);

private:
proto::ProgramDesc desc_;

Expand Down
74 changes: 53 additions & 21 deletions paddle/fluid/inference/tests/book/test_inference_fit_a_line.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ limitations under the License. */
#include "gflags/gflags.h"
#include "gtest/gtest.h"
#include "paddle/fluid/inference/tests/test_helper.h"
#include "paddle/fluid/inference/tests/test_multi_thread_helper.h"

DEFINE_string(dirname, "", "Directory of the inference model.");

Expand All @@ -26,32 +27,63 @@ TEST(inference, fit_a_line) {
// 0. Call `paddle::framework::InitDevices()` initialize all the devices
// In unittests, this is done in paddle/testing/paddle_gtest_main.cc

paddle::framework::LoDTensor input;
// The second dim of the input tensor should be 13
// The input data should be >= 0
int64_t batch_size = 10;
SetupTensor<float>(&input, {batch_size, 13}, static_cast<float>(0),
static_cast<float>(10));
std::vector<paddle::framework::LoDTensor*> cpu_feeds;
cpu_feeds.push_back(&input);
for (int num_threads : {1, 2}) {
std::vector<std::vector<paddle::framework::LoDTensor*>> cpu_feeds;
cpu_feeds.resize(num_threads);
for (int i = 0; i < num_threads; ++i) {
auto* input = new paddle::framework::LoDTensor();
// The second dim of the input tensor should be 13
// The input data should be >= 0
int64_t batch_size = 10;
SetupTensor<float>(input, {batch_size, 13}, static_cast<float>(0),
static_cast<float>(10));
cpu_feeds[i].push_back(input);
}

paddle::framework::LoDTensor output1;
std::vector<paddle::framework::LoDTensor*> cpu_fetchs1;
cpu_fetchs1.push_back(&output1);
std::vector<std::vector<paddle::framework::LoDTensor*>> cpu_fetchs1;
cpu_fetchs1.resize(num_threads);
for (int i = 0; i < num_threads; ++i) {
auto* output = new paddle::framework::LoDTensor();
cpu_fetchs1[i].push_back(output);
}

// Run inference on CPU
TestInference<paddle::platform::CPUPlace>(dirname, cpu_feeds, cpu_fetchs1);
LOG(INFO) << output1.dims();
// Run inference on CPU
LOG(INFO) << "--- CPU Runs (num_threads: " << num_threads << "): ---";
if (num_threads == 1) {
TestInference<paddle::platform::CPUPlace>(dirname, cpu_feeds[0],
cpu_fetchs1[0]);
} else {
TestMultiThreadInference<paddle::platform::CPUPlace>(
dirname, cpu_feeds, cpu_fetchs1, num_threads);
}

#ifdef PADDLE_WITH_CUDA
paddle::framework::LoDTensor output2;
std::vector<paddle::framework::LoDTensor*> cpu_fetchs2;
cpu_fetchs2.push_back(&output2);
std::vector<std::vector<paddle::framework::LoDTensor*>> cpu_fetchs2;
cpu_fetchs2.resize(num_threads);
for (int i = 0; i < num_threads; ++i) {
auto* output = new paddle::framework::LoDTensor();
cpu_fetchs2[i].push_back(output);
}

// Run inference on CUDA GPU
TestInference<paddle::platform::CUDAPlace>(dirname, cpu_feeds, cpu_fetchs2);
LOG(INFO) << output2.dims();
// Run inference on CUDA GPU
LOG(INFO) << "--- GPU Runs (num_threads: " << num_threads << "): ---";
if (num_threads == 1) {
TestInference<paddle::platform::CUDAPlace>(dirname, cpu_feeds[0],
cpu_fetchs2[0]);
} else {
TestMultiThreadInference<paddle::platform::CUDAPlace>(
dirname, cpu_feeds, cpu_fetchs2, num_threads);
}

CheckError<float>(output1, output2);
for (int i = 0; i < num_threads; ++i) {
CheckError<float>(*cpu_fetchs1[i][0], *cpu_fetchs2[i][0]);
delete cpu_fetchs2[i][0];
}
#endif

for (int i = 0; i < num_threads; ++i) {
delete cpu_feeds[i][0];
delete cpu_fetchs1[i][0];
}
} // num_threads-loop
}
3 changes: 2 additions & 1 deletion paddle/fluid/inference/tests/test_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ limitations under the License. */
template <typename T>
void SetupTensor(paddle::framework::LoDTensor* input,
paddle::framework::DDim dims, T lower, T upper) {
std::mt19937 rng(100); // An arbitrarily chosen but fixed seed.
static unsigned int seed = 100;
std::mt19937 rng(seed++);
std::uniform_real_distribution<double> uniform_dist(0, 1);

T* input_ptr = input->mutable_data<T>(dims, paddle::platform::CPUPlace());
Expand Down
90 changes: 90 additions & 0 deletions paddle/fluid/inference/tests/test_multi_thread_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <map>
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/inference/io.h"

void ThreadedRunInference(
const std::unique_ptr<paddle::framework::ProgramDesc>& inference_program,
paddle::framework::Executor* executor, paddle::framework::Scope* scope,
const int thread_id,
const std::vector<paddle::framework::LoDTensor*>& cpu_feeds,
const std::vector<paddle::framework::LoDTensor*>& cpu_fetchs) {
auto copy_program = std::unique_ptr<paddle::framework::ProgramDesc>(
new paddle::framework::ProgramDesc(*inference_program));

std::string feed_holder_name = "feed_" + paddle::string::to_string(thread_id);
std::string fetch_holder_name =
"fetch_" + paddle::string::to_string(thread_id);
copy_program->SetFeedHolderName(feed_holder_name);
copy_program->SetFetchHolderName(fetch_holder_name);

// 3. Get the feed_target_names and fetch_target_names
const std::vector<std::string>& feed_target_names =
copy_program->GetFeedTargetNames();
const std::vector<std::string>& fetch_target_names =
copy_program->GetFetchTargetNames();

// 4. Prepare inputs: set up maps for feed targets
std::map<std::string, const paddle::framework::LoDTensor*> feed_targets;
for (size_t i = 0; i < feed_target_names.size(); ++i) {
// Please make sure that cpu_feeds[i] is right for feed_target_names[i]
feed_targets[feed_target_names[i]] = cpu_feeds[i];
}

// 5. Define Tensor to get the outputs: set up maps for fetch targets
std::map<std::string, paddle::framework::LoDTensor*> fetch_targets;
for (size_t i = 0; i < fetch_target_names.size(); ++i) {
fetch_targets[fetch_target_names[i]] = cpu_fetchs[i];
}

// 6. Run the inference program
executor->Run(*copy_program, scope, feed_targets, fetch_targets, true,
feed_holder_name, fetch_holder_name);
}

template <typename Place>
void TestMultiThreadInference(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请问TestMultiThreadInference 为什么没和 TestInference一样,放在paddle/fluid/inference/tests/test_helper.h里呢?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

多线程作为一种特殊的应用场景,放在一个单独的文件里面,方便索引和引用吧。

const std::string& dirname,
const std::vector<std::vector<paddle::framework::LoDTensor*>>& cpu_feeds,
const std::vector<std::vector<paddle::framework::LoDTensor*>>& cpu_fetchs,
const int num_threads) {
// 1. Define place, executor, scope
auto place = Place();
auto executor = paddle::framework::Executor(place);
auto* scope = new paddle::framework::Scope();

// 2. Initialize the inference_program and load parameters
std::unique_ptr<paddle::framework::ProgramDesc> inference_program =
paddle::inference::Load(executor, *scope, dirname);

std::vector<std::thread*> threads;
for (int i = 0; i < num_threads; ++i) {
threads.push_back(new std::thread(
ThreadedRunInference, std::ref(inference_program), &executor, scope, i,
std::ref(cpu_feeds[i]), std::ref(cpu_fetchs[i])));
}
for (int i = 0; i < num_threads; ++i) {
threads[i]->join();
delete threads[i];
}

delete scope;
}