Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add design doc: concurrent data transfer and kernel execution #7276

Closed

Conversation

chengduoZH
Copy link
Contributor

@chengduoZH chengduoZH commented Jan 7, 2018

It would be more convenient to look here.


buffer_size = 2
for pass_id in range(5):
for i in range(buffer_size):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From this API, we can not see that stage_program will run in a separated thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CUDA can use stream mechanism to overlap kernel execution with data transfers.
https://devblogs.nvidia.com/parallelforall/how-overlap-data-transfers-cuda-cc/

The description here maybe not very clear. I will refine this description.

for pass_id in range(5):
for i in range(buffer_size):
exe.run(fluid.stage_program)
for data in train_reader():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only two for is needed:

for i in range(buffer_size):
    for data in train_reader():
        exe.run(fluid.stage_program, feed=feeder.feed(data))

for i in range(buffer_size):
    exe.run(fluid.default_main_program())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx! I have refine the code.

The process can be described follows:

  • The Staging area should be warmed up at first.
  • Then getting data from the stage area, but not the data set(on the cpu side), to do forward calculation, meanwhile the new data is copied from the data set to the stage area.
  • Finally, when the data set is empty, the program will continue getting data from the stage are to do forward calculation, until the stage area becomes empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that default_main_program() run twice, which means run forward and backward twice in each mini-batch. Then the final step should run stage_program instead of main.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated this doc, maybe it is clearer than before.

@chengduoZH chengduoZH changed the title [WIP] Add double_buffering design doc Add double_buffering design doc Jan 9, 2018
@chengduoZH chengduoZH changed the title Add double_buffering design doc Add design doc: concurrent data transfer and kernel execution Jan 10, 2018
@chengduoZH chengduoZH force-pushed the feature/double_buffer branch 3 times, most recently from 6688915 to 23f44e6 Compare January 11, 2018 00:49
To support the above description, we need to define a new class: `Channel`. Here, we borrow the concept of [`Channel`](https://www.golang-book.com/books/intro/10#section2) in the go language.

```
template <typename T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/operators/detail/simple_block_queue.h you can reuse this code or change the name. It now acts as a "channel" internally.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to set the capacity for this Queue?

places = get_places()
channel_list = create_channel_list(name="data_buffer")
with parallel.for(places) as for_loop:
cur_channel = create_channel(channel_list, for_loop.i, channel_size=2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallel.for is an operator that execute one block concurrently, so within the with block, there should be code to create operators?

using MetaType = LoDTensor;
using BufferElement = std::vector<MetaType>;

class Buffer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer could be just a Variable.


Buffer* GetBuffer(const platform::Place place, const size_t capacity,
const size_t bytes_limit) {
static std::map<platform::Place, Buffer*> buffering;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid global variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated this design doc, your comments have been fixed.

In general, the model training is divided into two steps: data preparation and model calculation. Because training a deep learning model needs a lot of computing resources. If using CPU, it will take a long time to complete an iteration. Training a good model usually takes tens of thousands of iterations. Obviously, this is unacceptable. Therefore, we usually choose the accelerator (e.g. GPU) for model training. But using accelerator for training model brings a new problem. Because our training data is in CPU, before the accelerator training model, it need to wait for the data to be copied from the CPU side to the accelerator. So the time to train the model on the accelerator is the sum of the time of loading the data, the time of data transfer, and the time of the accelerator calculation. Therefore, although the accelerator's computation speed is very fast, sometimes the data transfer time is very long, which may cause the accelerator training model to be slower than the direct CPU training model.

## Problem
The data transfer between host and device is synchronized, by default. If the accelerator is `GPU`, a time line for the execution of traing model on `GPU` is shown in the following diagram. This is just a schematic.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use async, to fully utilize GPUs

To support the above description, we need to define a new class: `Channel`. Here, we borrow the concept of [`Channel`](https://www.golang-book.com/books/intro/10#section2) in the go language.

```
template <typename T>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to set the capacity for this Queue?

@chengduoZH chengduoZH mentioned this pull request Jan 12, 2018
Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tonyyang-svail can you please take a look at this?

for_loop.input(label)], out=cur_channel)

main_program = Program()
with program(main_program):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a piece of pseudo-code that describes data transferring and model training to run concurrently. The code structure is only a logical structure of concurrent execution.

@helinwang helinwang self-requested a review January 15, 2018 18:58
Copy link
Contributor

@helinwang helinwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we discuss on this comment before this is merged?


```
...
concurrent_program = Program()
Copy link
Contributor

@helinwang helinwang Jan 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose the channel concept to the user?
We can do the following, and use channel as the implementation, without exposing it to the user:

main_program = Program()
with program(main_program):
    # automatically create a separate thread for data loading.
    # automatically infer the data destination is GPU place or CPU place.
    image = fluid.layers.data_layer(..., buf_size=100)
    label = fluid.layers.data_layer(..., buf_size=100)
    with parallel.for(places) as for_loop:
        y_predict = fluid.layers.fc(input=image, size=1, act=None)
        cost = fluid.layers.square_error_cost(input=y_predict, label=label)
        avg_cost = fluid.layers.mean(x=cost)
        ....

for pass_id in range(PASS):
    for data in train_reader():
        executor.run(main_program, fetch=[...])

Please compare the Python code in this PR and the above code, feels like exposing the channel concept to the user only adds verbosity to the program.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose the channel concept to the user?

I think we should have a deeper discussion about this.

automatically create a separate thread for data loading.

Currently, data loading is done on Python side, and we do not consider this in this PR.
This PR mainly analyzes the concurrent execution of data transferring from the CPU end to GPU transmission and computing model at the GPU.

feels like exposing the channel concept to the user only adds verbosity to the program

Exposing the channel concept maybe not appropriate, and some user may never use golang. But this can make the program more flexible.

template <typename T>
class Channel {
private:
using ChannelElement = std::vector<T>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ChannelElement should be is a point of T, since we could sen/recv an element to/from a Channel, instead of sen/recv a vector of elements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Replacing ChannelElement to T will make the code more general.
When I started writing this doc, I thought that we should put a batch data into Channel. For image classification task, the batch data includes data and label.

image = fluid.layers.data_layer(...)
label = fluid.layers.data_layer(...)
places = get_places()
channel_list = create_channel_list(name="data_buffer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm confusion about the Python code:

channel_list = create_channel_list(name="data_buffer")

Does it means create a list of channel?

For another, I think a Channel also need a type, and if we allow user to create a Channel in Python, how to define the type of a Channel, does it should be according with proto:: VarDesc::VarType ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it means create a list of channel?

Yes, on the condition of single machine multi-card, every GPU corresponds to a channel.

does it should be according with proto:: VarDesc::VarType ?

We may need to create new types.

executor.run(main_program, fetch=[...])

```
In Python code, we define two `program`, `concurrent_program` used to send data into `Channel` and `main_program` used to get data from the `Channel` and execute training. If you're familiar with [`Goroutine`](https://www.golang-book.com/books/intro/10#section1) in the go language, you'll find that `main_program` and `concurrent_program` just like two `Goroutine`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make main_program and concurrent_program like two Goroutine, I think we need to declare the code block will be run as a Goroutine, and pass the channel as a argument, so for my personal opinion, the Python code would look like:

chan = fluid.channel.make(type=var.LoDTensor)

with fluid.go(concurrent_program, chan):
    image = fluid.layers.data_layer(...)
    label = fluid.layers.data_layer(...)
    places = get_places() 
    with parallel.for(places) as for_loop:
        fluid.channle.send(chan, data=[for_loop.input(image), for_loop.input(label)])

with fluid.go(main_program, chan):
    places = get_places() 
    with parallel.for(places) as for_loop:
        image, label = fluid.channel.recv(chan)
        y_predict = fluid.layers.fc(input=image, size=1, act=None)
        cost = fluid.layers.square_error_cost(input=y_predict, label=label)
        avg_cost = fluid.layers.mean(x=cost) 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!
I think fluid.channel.make(type=var.LoDTensor) should be inside of fluid.go.
And I will update this PR.

with fluid.go(concurrent_program, chan_list_name):
chan_list_config = fluid.channel_list.config(name="chan_list", type=var.CHANNEL_LIST)

with fluid.go(concurrent_program, chan_list_config) as go:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we look at the go key word in Go, it can be used to create a goroutine running any function:

go foo()
go bar(a, b, c)

Here we use chan_list_config as an argument, coupling channel with goroutine, is that necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote this doc, I thought that one program corresponded to one goroutine, and chan_list_config may be used by more program, not just concurrent_program and main_program. So I regard chan_list_config as an argument of fluid.go. I am not very familiar with the go language. Maybe this design is not appropriate. I will continue to update this part, and if you have some advice, please tell me, many thanks.

with fluid.go(concurrent_program, chan_list_name):
chan_list_config = fluid.channel_list.config(name="chan_list", type=var.CHANNEL_LIST)

with fluid.go(concurrent_program, chan_list_config) as go:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this syntax valid? concurrent_program seems not defined anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is just pseudo code.

image = fluid.layers.data_layer(...)
label = fluid.layers.data_layer(...)
chan_list = fluid.channel_list.make(type=var.CHANNEL_LIST, name=chan_list_name)
places = get_places()
with parallel.for(places) as for_loop:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we don't need to read data concurrently, there is just one data source (pair(image, label)), concurrently reading them doesn't seems to help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this design doc, I want to overlap the time of data transfer(from CPU to GPU) and model training on GPU. As for data reading, there is no consideration here.

Copy link
Contributor

@helinwang helinwang Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. I think I understand what the code here is trying to do: load tensor on to different GPU memory for different channels. In the code below,

    image = fluid.layers.data_layer(...)
    label = fluid.layers.data_layer(...)
    places = get_places() 
    with parallel.for(places) as for_loop:
        chan = fluid.channel_list.get_channel(go.chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i))
        fluid.channle.send(chan, data=[for_loop.input(image), for_loop.input(label)])

Do you mean parallel.for(places) means each for block runs on the given GPU place? There is only send in the for block, but data is already read to CPU place, there is no code that copies the data from CPU place to the GPU place of each for block.

Btw, I think parallel.for should be unrelated to where each block is running, it's just a concurrency syntax, we need other syntax to specify where each block is running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallel.for(places) just like what ParallelDoOp does, data copying can be done in parallel.for(places).

SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places,
Inputs(kInputs));

The difference is that parallel.for(places) does not need merge output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chengduoZH Thanks for your help! Now I understand the situation better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@helinwang Thanks for your review!

places = get_places()
with parallel.for(places) as for_loop:
chan = fluid.channel_list.get_channel(chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i))
chan = fluid.channel_list.get_channel(go.chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe read data to one channel, and multiple consumer consuming from that channel (a single channel, not multiple channel) is enough?

Copy link
Contributor Author

@chengduoZH chengduoZH Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data reading has been done on Python side.
In the case of multi-GPU, since different GPU have the different address, each GPU should own one channel. So I use multiple channels here.

@abhinavarora abhinavarora self-requested a review January 19, 2018 19:31
@@ -0,0 +1,93 @@
# Design Doc: Concurrent data transfer and kernel execution.
Training deep learning model on GPU involves three stages: loading data from disk, copying the data from the CPU side to the GPU side, and executing training program on GPU. We need an efficient mechanism to take full use of hardware resources to make those stages executed concurrently. At present, Fluid uses producer-consumer mechanism at the python side to load data from disk. So this design doc mainly solves the time overlap of data transfer and kernel execution.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python ==> Python


```
template <typename T>
class Channel {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To implement CSP in Fluid, we need a channel type, which could work with our fluid.Select op, and we want fluid.Select depend on Linux's epoll system call for an efficient implementation. This part of work is still under design. So, I suggest to remove this section about channel design from this design doc, and make this doc more focused on how to overlap CPU-to-GPU data transfer and model update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will fix this for next commitment.


```
...
chan_list_config = fluid.channel_list.config(name="chan_list", type=var.CHANNEL_LIST)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this program is the most interesting part of this doc, but I don't see the creation of a channel-typed variable like

ch = fluid.channel(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of multi-GPU, since different GPU have the different address, each GPU should own one channel. So I create a channel_list and all the channels are created in this chan = fluid.channel_list.get_channel(go.chan_list...).

...
chan_list_config = fluid.channel_list.config(name="chan_list", type=var.CHANNEL_LIST)

with fluid.go(concurrent_program, chan_list_config) as go:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though about this for a while, now I believe that fluid.go should start a thread to run a block in the current ProgramDesc, instead of a new ProgramDesc, because a ProgramDesc is a program, like a .go file, and Go's go statement doesn't run a .go file, but a lambda represented by a block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting!
But go's block may be different from others' because, in my view, go's block does not need gradient op. I don't know whether my view is right or not.

@helinwang helinwang dismissed their stale review January 23, 2018 23:07

Thanks @chengduoZH 's explanation.

@luotao1
Copy link
Contributor

luotao1 commented Feb 1, 2019

感谢您给PaddlePaddle贡献文档。由于文档已迁移至FluidDoc repo,因此关闭您的PR,欢迎您向FluidDoc Repo贡献文档。
Thanks for contributing to PaddlePaddle! Since documents have been moved to FluidDoc repo, we close this PR. Welcome to contribute to FluidDoc repo.

@luotao1 luotao1 closed this Feb 1, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants