Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support numpy dense #207

Merged
merged 1 commit into from
Oct 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmake/flags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ set(COMMON_FLAGS
-Wdelete-non-virtual-dtor
-Wno-unused-parameter
-Wno-error=literal-suffix
-Wno-error=unused-local-typedefs)
-Wno-error=unused-local-typedefs
-Wno-error=unused-function # Warnings in Numpy Header.
)

foreach(flag ${COMMON_FLAGS})
safe_set_cflag(CMAKE_C_FLAGS ${flag})
Expand Down
2 changes: 1 addition & 1 deletion demo/mnist/data/get_mnist_data.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env sh
# This scripts downloads the mnist data and unzips it.

set -e
DIR="$( cd "$(dirname "$0")" ; pwd -P )"
rm -rf "$DIR/raw_data"
mkdir "$DIR/raw_data"
Expand Down
8 changes: 6 additions & 2 deletions paddle/gserver/dataproviders/DataProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ void BufferBatch::clone(DataBatch* srcBatch, bool useGpu) {
}
}

DoubleBuffer::DoubleBuffer(DataProvider* dataPool, bool useGpu,
DoubleBuffer::DoubleBuffer(DataProvider *dataPool,
bool useGpu,
int64_t batchSize) {
batchSize_ = batchSize;
dataPool_ = dataPool;
Expand Down Expand Up @@ -110,6 +111,9 @@ void DoubleBuffer::removeOneBatch(DataBatch* dataBatch) {
}

void DoubleBuffer::insertOneBatch(DataBatch* batch) {
while (!bufferQueue_->waitNotEmptyFor(2 /* seconds */)) { // time out
if (stopping_) return;
}
BufferBatch* bufBatch = bufferQueue_->dequeue();
// clone and copy the data from an Threadlocal Variable
bufBatch->clone(batch, useGpu_);
Expand Down Expand Up @@ -138,7 +142,7 @@ void DoubleBuffer::asyncLoadBatch() {
actualSize = dataPool_->getNextBatchInternal(batchSize_, &newBatch);
}
insertOneBatch(&newBatch);
} while (actualSize > 0);
} while (actualSize > 0 && !stopping_);
}
}

Expand Down
5 changes: 3 additions & 2 deletions paddle/gserver/dataproviders/DataProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ typedef Queue<BufferBatch*> BufferBatchQueue;

class DoubleBuffer {
public:
DoubleBuffer(DataProvider* dataPool, bool useGpu, int64_t batchSize = 0);
DoubleBuffer(DataProvider* dataPool,
bool useGpu,
int64_t batchSize = 0);
virtual ~DoubleBuffer();
void removeOneBatch(DataBatch* dataBatch);

Expand Down Expand Up @@ -349,7 +351,6 @@ class DataProvider {
*/
virtual void reset() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because PyDataProvider2 is always using double-buffer, and PyDataProvider2 is the only interface to providing data in opensource. So remove the noising line here.

if (doubleBuffer_ != nullptr) {
LOG(INFO) << "the double-buffer is starting ...";
doubleBuffer_->startAsyncLoad();
}
}
Expand Down
36 changes: 30 additions & 6 deletions paddle/gserver/dataproviders/PyDataProvider2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@ limitations under the License. */
#include <stdlib.h>
#include <unordered_set>
#include <list>
#include <Python.h>
#include <numpy/numpyconfig.h>
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#include <numpy/ndarrayobject.h>

#include "DataProvider.h"

#include "paddle/utils/PythonUtil.h"
#include "paddle/utils/Locks.h"
#include "paddle/utils/Stat.h"

namespace paddle {

Expand Down Expand Up @@ -202,7 +209,10 @@ class PyDataProvider2 : public DataProvider {
PyDataProvider2(const DataConfig& config,
const ModelConfig& modelConfig,
bool useGpu)
:DataProvider(config, useGpu), callingContextCreated_(2) {
:DataProvider(config, useGpu),
callingContextCreated_(2) {
if (PyArray_API == NULL)
import_array();
auto& args = config.load_data_args();
PyObjectPtr kwargs = PyObjectPtr(PyDict_New());
if (!args.empty()) {
Expand Down Expand Up @@ -454,6 +464,7 @@ class PyDataProvider2 : public DataProvider {
std::condition_variable pushCV_;
std::condition_variable pullCV_;
std::mutex mtx_;

ThreadBarrier callingContextCreated_;
std::unique_ptr<IPyDataProviderCache> cache_;

Expand Down Expand Up @@ -496,8 +507,8 @@ class PyDataProvider2 : public DataProvider {
* Resetting the PyDataProvider. May start reading thread here.
*/
virtual void reset() {
DataProvider::reset();
resetImpl(true);
DataProvider::reset();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataProvider::reset() should be invoked at the end of reset(). Because the DataProvider::reset() will invoke getNextBatchInternal in other thread right now.

}

/**
Expand All @@ -518,6 +529,7 @@ class PyDataProvider2 : public DataProvider {
* Loading a batch of data.
*/
int64_t getNextBatchInternal(int64_t size_, DataBatch *batch) {
REGISTER_TIMER("PyDP2.getNextBatchInternal")
CHECK_GE(size_, 0);
size_t size = (size_t) size_;
if (loadThread_) { // loading from thread should wait for data pool ready.
Expand Down Expand Up @@ -698,10 +710,22 @@ class DenseScanner: public IFieldScanner {
*/
virtual void fill(Argument &argument, PyObject *obj) {
real* dat = argument.value->getData() + height_ * headerPtr_->dim;
py::SequenceHelper s(obj);
// TODO(yuyang18): Here we can use AVX or SSE to accelerate memory copy.
for (size_t i=0; i < headerPtr_->dim; ++i) {
dat[i] = (real) s.getDouble(i);
if (PyArray_Check(obj)) {
auto dtype = PyArray_DTYPE((PyArrayObject*)obj);
if (dtype->type == 'f' && dtype->elsize == sizeof(real)) {
real * data = (real*)PyArray_DATA((PyArrayObject*)obj);
auto sz = PyArray_SIZE((PyArrayObject*)obj);
std::copy(data, data + sz, dat);
} else {
LOG(FATAL) << "You should yield float" << sizeof(real) * 8
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only support the same data size returned by numpy.

Maybe i need implement other format by casting, but it will be slow.

<< " array";
}
} else {
py::SequenceHelper s(obj);
// TODO(yuyang18): Here we can use AVX or SSE to accelerate memory copy.
for (size_t i=0; i < headerPtr_->dim; ++i) {
dat[i] = (real) s.getDouble(i);
}
}
++height_;
}
Expand Down
15 changes: 15 additions & 0 deletions paddle/utils/Queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ class Queue {
queueCV_.wait(lock, [this]() { return numElements_ == 0; });
}

/**
* @brief wait queue is not empty at most for some seconds.
* @param seconds wait time limit.
* @return true if queue is not empty. false if timeout.
*/
bool waitNotEmptyFor(int seconds) {
std::unique_lock<std::mutex> lock(queueLock_);
return queueCV_.wait_for(
lock,
std::chrono::seconds(seconds),
[this] {
return numElements_ != 0;
});
}

private:
std::deque<T> elements_;
int numElements_;
Expand Down
1 change: 1 addition & 0 deletions python/paddle/trainer_config_helpers/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def py_data2(files, load_data_module, load_data_object, load_data_args,
data.load_data_module = load_data_module
data.load_data_object = load_data_object
data.load_data_args = load_data_args
data.async_load_data = True
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always enable double buffer.

The async load is very confusing. It just enable double buffer in DataProvider.

Maybe we should change it later.

return data
data_cls = py_data2

Expand Down