Skip to content

Commit

Permalink
[Cherry-pick] Support diff dataset tensor place in single process da…
Browse files Browse the repository at this point in the history
…taloader (#33470) (#33487)

Support diff dataset tensor place in single process dataloader

cherry-pick of #33470
  • Loading branch information
chenwhql authored Jun 11, 2021
1 parent f57ae4d commit 1444090
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 9 deletions.
18 changes: 10 additions & 8 deletions paddle/fluid/operators/reader/buffered_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ BufferedReader::BufferedReader(
stream_ = platform::NpuStreamResourcePool::Instance().New(dev_idx);
}
#endif
is_same_place_ = false;
cpu_buffer_.resize(buffer_size);
cuda_buffer_.resize(buffer_size);
npu_buffer_.resize(buffer_size);
Expand Down Expand Up @@ -116,7 +115,7 @@ void BufferedReader::ReadAsync(size_t i) {
std::vector<void *> cuda_pinned_ptrs;
cuda_pinned_ptrs.reserve(cpu.size());
platform::RecordEvent record_event("BufferedReader:MemoryCopy");
// NODE(chenwehiang): When we use CUDAPinned Memory, we need call
// NODE(chenweihang): When we use CUDAPinned Memory, we need call
// cudaHostAlloc, that is a CUDA API, calling CUDA API need load
// cuda lib into device, it will cost hundreds of MB of GPU memory.
// If we don't set Device here, which will use CUDAPlace(0) default.
Expand All @@ -126,18 +125,21 @@ void BufferedReader::ReadAsync(size_t i) {
if (platform::is_cpu_place(cpu[i].place())) {
cuda[i].Resize(cpu[i].dims());
cuda[i].set_layout(cpu[i].layout());
cuda_pinned_ptrs.emplace_back(
cuda[i].mutable_data(cuda_pinned_place, cpu[i].type()));
cuda_pinned_ptrs[i] =
cuda[i].mutable_data(cuda_pinned_place, cpu[i].type());
auto size =
cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type());

memory::Copy(cuda_pinned_place, cuda_pinned_ptrs[i],
BOOST_GET_CONST(platform::CPUPlace, cpu[i].place()),
cpu[i].data<void>(), size);

cuda[i].set_lod(cpu[i].lod());
} else {
// we set same place flag & use cpu[i] directly
is_same_place_ = true;
// Here the cpu[i]'s place may be CUDAPlace, CUDAPinnedPlace, or
// others, we don't copy the memory of it to CUDAPinnedPlace, but
// we should share tensor data to cuda[i]
cuda[i].ShareDataWith(cpu[i]);
}
}
} else {
Expand Down Expand Up @@ -296,9 +298,9 @@ void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
return;
}

if (platform::is_gpu_place(place_) && !is_same_place_) {
if (platform::is_gpu_place(place_)) {
*out = std::move(cuda_buffer_[i]);
} else if (platform::is_npu_place(place_) && !is_same_place_) {
} else if (platform::is_npu_place(place_)) {
*out = std::move(npu_buffer_[i]);
} else {
*out = std::move(cpu_buffer_[i]);
Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/operators/reader/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class BufferedReader : public framework::DecoratedReader {
// buffer, just read async and create futures as buffer size. However, to
// malloc tensors every time is extremely slow. Here we store all data in
// buffers and prevent alloc every time.
bool is_same_place_;
std::vector<TensorVec> cpu_buffer_;
std::vector<TensorVec> cuda_buffer_;
std::vector<TensorVec> npu_buffer_;
Expand Down
46 changes: 46 additions & 0 deletions python/paddle/fluid/tests/unittests/test_dataloader_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

from __future__ import division

import sys
import unittest
import numpy as np

import paddle
import paddle.vision.transforms as transforms
import paddle.fluid as fluid
from paddle.io import *

Expand All @@ -37,5 +40,48 @@ def test_main(self):
pass


class TestDatasetWithDiffOutputPlace(unittest.TestCase):
def get_dataloader(self, num_workers):
dataset = paddle.vision.datasets.MNIST(
mode='test', transform=transforms.ToTensor())
loader = paddle.io.DataLoader(
dataset, batch_size=32, num_workers=num_workers, shuffle=True)
return loader

def run_check_on_cpu(self):
paddle.set_device('cpu')
loader = self.get_dataloader(0)
for image, label in loader:
self.assertTrue(image.place.is_cpu_place())
self.assertTrue(label.place.is_cpu_place())
break

def test_single_process(self):
self.run_check_on_cpu()
if paddle.is_compiled_with_cuda():
# Get (image, label) tuple from MNIST dataset
# - the image is on CUDAPlace, label is on CPUPlace
paddle.set_device('gpu')
loader = self.get_dataloader(0)
for image, label in loader:
self.assertTrue(image.place.is_gpu_place())
self.assertTrue(label.place.is_cuda_pinned_place())
break

def test_multi_process(self):
# DataLoader with multi-process mode is not supported on MacOs and Windows currently
if sys.platform != 'darwin' and sys.platform != 'win32':
self.run_check_on_cpu()
if paddle.is_compiled_with_cuda():
# Get (image, label) tuple from MNIST dataset
# - the image and label are on CPUPlace
paddle.set_device('gpu')
loader = self.get_dataloader(1)
for image, label in loader:
self.assertTrue(image.place.is_cuda_pinned_place())
self.assertTrue(label.place.is_cuda_pinned_place())
break


if __name__ == '__main__':
unittest.main()

0 comments on commit 1444090

Please sign in to comment.