Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-pick] Support diff dataset tensor place in single process dataloader #33487

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions paddle/fluid/operators/reader/buffered_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ BufferedReader::BufferedReader(
stream_ = platform::NpuStreamResourcePool::Instance().New(dev_idx);
}
#endif
is_same_place_ = false;
cpu_buffer_.resize(buffer_size);
cuda_buffer_.resize(buffer_size);
npu_buffer_.resize(buffer_size);
Expand Down Expand Up @@ -116,7 +115,7 @@ void BufferedReader::ReadAsync(size_t i) {
std::vector<void *> cuda_pinned_ptrs;
cuda_pinned_ptrs.reserve(cpu.size());
platform::RecordEvent record_event("BufferedReader:MemoryCopy");
// NODE(chenwehiang): When we use CUDAPinned Memory, we need call
// NODE(chenweihang): When we use CUDAPinned Memory, we need call
// cudaHostAlloc, that is a CUDA API, calling CUDA API need load
// cuda lib into device, it will cost hundreds of MB of GPU memory.
// If we don't set Device here, which will use CUDAPlace(0) default.
Expand All @@ -126,18 +125,21 @@ void BufferedReader::ReadAsync(size_t i) {
if (platform::is_cpu_place(cpu[i].place())) {
cuda[i].Resize(cpu[i].dims());
cuda[i].set_layout(cpu[i].layout());
cuda_pinned_ptrs.emplace_back(
cuda[i].mutable_data(cuda_pinned_place, cpu[i].type()));
cuda_pinned_ptrs[i] =
cuda[i].mutable_data(cuda_pinned_place, cpu[i].type());
auto size =
cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type());

memory::Copy(cuda_pinned_place, cuda_pinned_ptrs[i],
BOOST_GET_CONST(platform::CPUPlace, cpu[i].place()),
cpu[i].data<void>(), size);

cuda[i].set_lod(cpu[i].lod());
} else {
// we set same place flag & use cpu[i] directly
is_same_place_ = true;
// Here the cpu[i]'s place may be CUDAPlace, CUDAPinnedPlace, or
// others, we don't copy the memory of it to CUDAPinnedPlace, but
// we should share tensor data to cuda[i]
cuda[i].ShareDataWith(cpu[i]);
}
}
} else {
Expand Down Expand Up @@ -296,9 +298,9 @@ void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
return;
}

if (platform::is_gpu_place(place_) && !is_same_place_) {
if (platform::is_gpu_place(place_)) {
*out = std::move(cuda_buffer_[i]);
} else if (platform::is_npu_place(place_) && !is_same_place_) {
} else if (platform::is_npu_place(place_)) {
*out = std::move(npu_buffer_[i]);
} else {
*out = std::move(cpu_buffer_[i]);
Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/operators/reader/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class BufferedReader : public framework::DecoratedReader {
// buffer, just read async and create futures as buffer size. However, to
// malloc tensors every time is extremely slow. Here we store all data in
// buffers and prevent alloc every time.
bool is_same_place_;
std::vector<TensorVec> cpu_buffer_;
std::vector<TensorVec> cuda_buffer_;
std::vector<TensorVec> npu_buffer_;
Expand Down
46 changes: 46 additions & 0 deletions python/paddle/fluid/tests/unittests/test_dataloader_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

from __future__ import division

import sys
import unittest
import numpy as np

import paddle
import paddle.vision.transforms as transforms
import paddle.fluid as fluid
from paddle.io import *

Expand All @@ -37,5 +40,48 @@ def test_main(self):
pass


class TestDatasetWithDiffOutputPlace(unittest.TestCase):
def get_dataloader(self, num_workers):
dataset = paddle.vision.datasets.MNIST(
mode='test', transform=transforms.ToTensor())
loader = paddle.io.DataLoader(
dataset, batch_size=32, num_workers=num_workers, shuffle=True)
return loader

def run_check_on_cpu(self):
paddle.set_device('cpu')
loader = self.get_dataloader(0)
for image, label in loader:
self.assertTrue(image.place.is_cpu_place())
self.assertTrue(label.place.is_cpu_place())
break

def test_single_process(self):
self.run_check_on_cpu()
if paddle.is_compiled_with_cuda():
# Get (image, label) tuple from MNIST dataset
# - the image is on CUDAPlace, label is on CPUPlace
paddle.set_device('gpu')
loader = self.get_dataloader(0)
for image, label in loader:
self.assertTrue(image.place.is_gpu_place())
self.assertTrue(label.place.is_cuda_pinned_place())
break

def test_multi_process(self):
# DataLoader with multi-process mode is not supported on MacOs and Windows currently
if sys.platform != 'darwin' and sys.platform != 'win32':
self.run_check_on_cpu()
if paddle.is_compiled_with_cuda():
# Get (image, label) tuple from MNIST dataset
# - the image and label are on CPUPlace
paddle.set_device('gpu')
loader = self.get_dataloader(1)
for image, label in loader:
self.assertTrue(image.place.is_cuda_pinned_place())
self.assertTrue(label.place.is_cuda_pinned_place())
break


if __name__ == '__main__':
unittest.main()