Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-thread inference example which shares the inference_program and parameters #9302

Merged
merged 12 commits into from
Apr 9, 2018
Merged
46 changes: 42 additions & 4 deletions paddle/fluid/framework/program_desc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ ProgramDesc::ProgramDesc(const std::string &binary_str) {
}

const std::vector<std::string> ProgramDesc::GetFeedTargetNames() {
BlockDesc *global_block = blocks_[0].get();
auto &global_block = Block(0);
std::vector<std::string> feed_target_names;
for (auto *op : global_block->AllOps()) {
for (auto *op : global_block.AllOps()) {
if (op->Type() == kFeedOpType) {
feed_target_names.insert(feed_target_names.begin(), op->Output("Out")[0]);
}
Expand All @@ -96,15 +96,53 @@ const std::vector<std::string> ProgramDesc::GetFeedTargetNames() {
}

const std::vector<std::string> ProgramDesc::GetFetchTargetNames() {
BlockDesc *global_block = blocks_[0].get();
auto &global_block = Block(0);
std::vector<std::string> fetch_target_names;
for (auto *op : global_block->AllOps()) {
for (auto *op : global_block.AllOps()) {
if (op->Type() == kFetchOpType) {
fetch_target_names.push_back(op->Input("X")[0]);
}
}
return fetch_target_names;
}

void ProgramDesc::SetFeedHolderName(const std::string &feed_holder_name) {
auto *global_block = MutableBlock(0);
int index = 0;
for (auto *op : global_block->AllOps()) {
if (op->Type() == kFeedOpType) {
// Unify the input's name of all feed_ops to feed_holder_name
global_block->RemoveVar(op->Input("X")[0]);
op->SetInput("X", {feed_holder_name});
op->SetAttr("col", {index});
op->CheckAttrs();
index++;
}
}

auto *feed_holder = global_block->Var(feed_holder_name);
feed_holder->SetType(proto::VarType::FEED_MINIBATCH);
feed_holder->SetPersistable(true);
}

void ProgramDesc::SetFetchHolderName(const std::string &fetch_holder_name) {
auto *global_block = MutableBlock(0);
int index = 0;
for (auto *op : global_block->AllOps()) {
if (op->Type() == kFetchOpType) {
// Unify the output's name of all fetch_ops to fetch_holder_name
global_block->RemoveVar(op->Output("Out")[0]);
op->SetOutput("Out", {fetch_holder_name});
op->SetAttr("col", {index});
op->CheckAttrs();
index++;
}
}

auto *fetch_holder = global_block->Var(fetch_holder_name);
fetch_holder->SetType(proto::VarType::FETCH_LIST);
fetch_holder->SetPersistable(true);
}

} // namespace framework
} // namespace paddle
4 changes: 4 additions & 0 deletions paddle/fluid/framework/program_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#pragma once

#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/framework.pb.h"
Expand Down Expand Up @@ -55,6 +56,9 @@ class ProgramDesc {
const std::vector<std::string> GetFeedTargetNames();
const std::vector<std::string> GetFetchTargetNames();

void SetFeedHolderName(const std::string &feed_holder_name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is feed/fetch holder a new concept? add some comments will be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

void SetFetchHolderName(const std::string &fetch_holder_name);

private:
proto::ProgramDesc desc_;

Expand Down
65 changes: 65 additions & 0 deletions paddle/fluid/inference/tests/book/test_inference_fit_a_line.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ limitations under the License. */
#include "gflags/gflags.h"
#include "gtest/gtest.h"
#include "paddle/fluid/inference/tests/test_helper.h"
#include "paddle/fluid/inference/tests/test_multi_thread_helper.h"

DEFINE_string(dirname, "", "Directory of the inference model.");

Expand Down Expand Up @@ -40,6 +41,7 @@ TEST(inference, fit_a_line) {
cpu_fetchs1.push_back(&output1);

// Run inference on CPU
LOG(INFO) << "--- CPU Runs: ---";
TestInference<paddle::platform::CPUPlace>(dirname, cpu_feeds, cpu_fetchs1);
LOG(INFO) << output1.dims();

Expand All @@ -49,9 +51,72 @@ TEST(inference, fit_a_line) {
cpu_fetchs2.push_back(&output2);

// Run inference on CUDA GPU
LOG(INFO) << "--- GPU Runs: ---";
TestInference<paddle::platform::CUDAPlace>(dirname, cpu_feeds, cpu_fetchs2);
LOG(INFO) << output2.dims();

CheckError<float>(output1, output2);
#endif
}

TEST(multi_thread_inference, fit_a_line) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TEST(inference, fit_a_line)TEST(multi_thread_inference, fit_a_line) 的代码非常类似,前者能否作为后者的一个特例num_threads=1呢?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (FLAGS_dirname.empty()) {
LOG(FATAL) << "Usage: ./example --dirname=path/to/your/model";
}

LOG(INFO) << "FLAGS_dirname: " << FLAGS_dirname << std::endl;
std::string dirname = FLAGS_dirname;

// 0. Call `paddle::framework::InitDevices()` initialize all the devices
// In unittests, this is done in paddle/testing/paddle_gtest_main.cc

int num_threads = 2;

std::vector<std::vector<paddle::framework::LoDTensor*>> cpu_feeds;
cpu_feeds.resize(num_threads);
for (int i = 0; i < num_threads; ++i) {
auto* input = new paddle::framework::LoDTensor();
// The second dim of the input tensor should be 13
// The input data should be >= 0
int64_t batch_size = 10;
SetupTensor<float>(input, {batch_size, 13}, static_cast<float>(0),
static_cast<float>(10));
cpu_feeds[i].push_back(input);
}

std::vector<std::vector<paddle::framework::LoDTensor*>> cpu_fetchs1;
cpu_fetchs1.resize(num_threads);
for (int i = 0; i < num_threads; ++i) {
auto* output = new paddle::framework::LoDTensor();
cpu_fetchs1[i].push_back(output);
}

// Run inference on CPU
LOG(INFO) << "--- CPU Runs (Multi Thread): ---";
TestMultiThreadInference<paddle::platform::CPUPlace>(
dirname, cpu_feeds, cpu_fetchs1, num_threads);

#ifdef PADDLE_WITH_CUDA
std::vector<std::vector<paddle::framework::LoDTensor*>> cpu_fetchs2;
cpu_fetchs2.resize(num_threads);
for (int i = 0; i < num_threads; ++i) {
auto* output = new paddle::framework::LoDTensor();
cpu_fetchs2[i].push_back(output);
}

// Run inference on CUDA GPU
LOG(INFO) << "--- GPU Runs (Multi Thread): ---";
TestMultiThreadInference<paddle::platform::CUDAPlace>(
dirname, cpu_feeds, cpu_fetchs2, num_threads);

for (int i = 0; i < num_threads; ++i) {
CheckError<float>(*cpu_fetchs1[i][0], *cpu_fetchs2[i][0]);
delete cpu_fetchs2[i][0];
}
#endif

for (int i = 0; i < num_threads; ++i) {
delete cpu_feeds[i][0];
delete cpu_fetchs1[i][0];
}
}
90 changes: 90 additions & 0 deletions paddle/fluid/inference/tests/test_multi_thread_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include <map>
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/inference/io.h"

void ThreadedRunInference(
const std::unique_ptr<paddle::framework::ProgramDesc>& inference_program,
paddle::framework::Executor* executor, paddle::framework::Scope* scope,
const int thread_id,
const std::vector<paddle::framework::LoDTensor*>& cpu_feeds,
const std::vector<paddle::framework::LoDTensor*>& cpu_fetchs) {
auto copy_program = std::unique_ptr<paddle::framework::ProgramDesc>(
new paddle::framework::ProgramDesc(*inference_program));

std::string feed_holder_name = "feed_" + paddle::string::to_string(thread_id);
std::string fetch_holder_name =
"fetch_" + paddle::string::to_string(thread_id);
copy_program->SetFeedHolderName(feed_holder_name);
copy_program->SetFetchHolderName(fetch_holder_name);

// 3. Get the feed_target_names and fetch_target_names
const std::vector<std::string>& feed_target_names =
copy_program->GetFeedTargetNames();
const std::vector<std::string>& fetch_target_names =
copy_program->GetFetchTargetNames();

// 4. Prepare inputs: set up maps for feed targets
std::map<std::string, const paddle::framework::LoDTensor*> feed_targets;
for (size_t i = 0; i < feed_target_names.size(); ++i) {
// Please make sure that cpu_feeds[i] is right for feed_target_names[i]
feed_targets[feed_target_names[i]] = cpu_feeds[i];
}

// 5. Define Tensor to get the outputs: set up maps for fetch targets
std::map<std::string, paddle::framework::LoDTensor*> fetch_targets;
for (size_t i = 0; i < fetch_target_names.size(); ++i) {
fetch_targets[fetch_target_names[i]] = cpu_fetchs[i];
}

// 6. Run the inference program
executor->Run(*copy_program, scope, feed_targets, fetch_targets,
feed_holder_name, fetch_holder_name);
}

template <typename Place>
void TestMultiThreadInference(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请问TestMultiThreadInference 为什么没和 TestInference一样,放在paddle/fluid/inference/tests/test_helper.h里呢?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

多线程作为一种特殊的应用场景,放在一个单独的文件里面,方便索引和引用吧。

const std::string& dirname,
const std::vector<std::vector<paddle::framework::LoDTensor*>>& cpu_feeds,
const std::vector<std::vector<paddle::framework::LoDTensor*>>& cpu_fetchs,
const int num_threads) {
// 1. Define place, executor, scope
auto place = Place();
auto executor = paddle::framework::Executor(place);
auto* scope = new paddle::framework::Scope();

// 2. Initialize the inference_program and load parameters
std::unique_ptr<paddle::framework::ProgramDesc> inference_program =
paddle::inference::Load(executor, *scope, dirname);

std::vector<std::thread*> threads;
for (int i = 0; i < num_threads; ++i) {
threads.push_back(new std::thread(
ThreadedRunInference, std::ref(inference_program), &executor, scope, i,
std::ref(cpu_feeds[i]), std::ref(cpu_fetchs[i])));
}
for (int i = 0; i < num_threads; ++i) {
threads[i]->join();
delete threads[i];
}

delete scope;
}