Skip to content

Commit

Permalink
Implement AsyncProgressQueueWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed Nov 2, 2019
1 parent 295e560 commit dc63ed8
Show file tree
Hide file tree
Showing 7 changed files with 438 additions and 61 deletions.
260 changes: 219 additions & 41 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3699,8 +3699,8 @@ inline AsyncWorker::AsyncWorker(const Object& receiver,
_env, resource_name, NAPI_AUTO_LENGTH, &resource_id);
NAPI_THROW_IF_FAILED_VOID(_env, status);

status = napi_create_async_work(_env, resource, resource_id, OnExecute,
OnWorkComplete, this, &_work);
status = napi_create_async_work(_env, resource, resource_id, OnAsyncWorkExecute,
OnAsyncWorkComplete, this, &_work);
NAPI_THROW_IF_FAILED_VOID(_env, status);
}

Expand All @@ -3725,8 +3725,8 @@ inline AsyncWorker::AsyncWorker(Napi::Env env,
_env, resource_name, NAPI_AUTO_LENGTH, &resource_id);
NAPI_THROW_IF_FAILED_VOID(_env, status);

status = napi_create_async_work(_env, resource, resource_id, OnExecute,
OnWorkComplete, this, &_work);
status = napi_create_async_work(_env, resource, resource_id, OnAsyncWorkExecute,
OnAsyncWorkComplete, this, &_work);
NAPI_THROW_IF_FAILED_VOID(_env, status);
}

Expand Down Expand Up @@ -3813,40 +3813,51 @@ inline void AsyncWorker::SetError(const std::string& error) {
inline std::vector<napi_value> AsyncWorker::GetResult(Napi::Env /*env*/) {
return {};
}
// The OnAsyncWorkExecute method receives an napi_env argument. However, do NOT
// use it within this method, as it does not run on the main thread and must
// not run any method that would cause JavaScript to run. In practice, this
// means that almost any use of napi_env will be incorrect.
inline void OnAsyncWorkExecute(napi_env env, void* asyncworker) {
AsyncWorker* self = static_cast<AsyncWorker*>(asyncworker);
self->OnExecute(env);
}
// The OnExecute method receives an napi_env argument. However, do NOT
// use it within this method, as it does not run on the main thread and must
// not run any method that would cause JavaScript to run. In practice, this
// means that almost any use of napi_env will be incorrect.
inline void AsyncWorker::OnExecute(napi_env /*DO_NOT_USE*/, void* this_pointer) {
AsyncWorker* self = static_cast<AsyncWorker*>(this_pointer);
inline void AsyncWorker::OnExecute(napi_env /*DO_NOT_USE*/) {
#ifdef NAPI_CPP_EXCEPTIONS
try {
self->Execute();
this->Execute();
} catch (const std::exception& e) {
self->SetError(e.what());
this->SetError(e.what());
}
#else // NAPI_CPP_EXCEPTIONS
self->Execute();
this->Execute();
#endif // NAPI_CPP_EXCEPTIONS
}

inline void AsyncWorker::OnWorkComplete(
napi_env /*env*/, napi_status status, void* this_pointer) {
AsyncWorker* self = static_cast<AsyncWorker*>(this_pointer);
inline void OnAsyncWorkComplete(napi_env env,
napi_status status,
void* asyncworker) {
AsyncWorker* self = static_cast<AsyncWorker*>(asyncworker);
self->OnWorkComplete(env, status);
}
inline void AsyncWorker::OnWorkComplete(napi_env /*env*/, napi_status status) {
if (status != napi_cancelled) {
HandleScope scope(self->_env);
HandleScope scope(this->_env);
details::WrapCallback([&] {
if (self->_error.size() == 0) {
self->OnOK();
if (this->_error.size() == 0) {
this->OnOK();
}
else {
self->OnError(Error::New(self->_env, self->_error));
this->OnError(Error::New(this->_env, this->_error));
}
return nullptr;
});
}
if (!self->_suppress_destruct) {
self->Destroy();
if (!this->_suppress_destruct) {
this->Destroy();
}
}

Expand Down Expand Up @@ -4172,9 +4183,38 @@ inline void ThreadSafeFunction::CallJS(napi_env env,
}

////////////////////////////////////////////////////////////////////////////////
// Async Progress Worker class
// Async Progress Worker Base class
////////////////////////////////////////////////////////////////////////////////
inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(const Object& receiver,
const Function& callback,
const char* resource_name,
const Object& resource)
: AsyncWorker(receiver, callback, resource_name, resource) {
_tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1);
}

#if NAPI_VERSION > 4
inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(Napi::Env env,
const char* resource_name,
const Object& resource)
: AsyncWorker(env, resource_name, resource) {
// TODO: Once the changes to make the callback optional for threadsafe
// functions are no longer optional we can remove the dummy Function here.
Function callback;
_tsfn = ThreadSafeFunction::New(env, callback, resource_name, 1, 1);
}
#endif

inline void OnAsyncWorkProgress(Napi::Env /* env */,
Napi::Function /* jsCallback */,
void* asyncworker) {
AsyncProgressWorkerBase* asyncprogressworker = static_cast<AsyncProgressWorkerBase*>(asyncworker);
asyncprogressworker->OnWorkProgress();
}

////////////////////////////////////////////////////////////////////////////////
// Async Progress Worker class
////////////////////////////////////////////////////////////////////////////////
template<class T>
inline AsyncProgressWorker<T>::AsyncProgressWorker(const Function& callback)
: AsyncProgressWorker(callback, "generic") {
Expand Down Expand Up @@ -4217,10 +4257,9 @@ inline AsyncProgressWorker<T>::AsyncProgressWorker(const Object& receiver,
const Function& callback,
const char* resource_name,
const Object& resource)
: AsyncWorker(receiver, callback, resource_name, resource),
: AsyncProgressWorkerBase(receiver, callback, resource_name, resource),
_asyncdata(nullptr),
_asyncsize(0) {
_tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1);
}

#if NAPI_VERSION > 4
Expand All @@ -4239,27 +4278,23 @@ template<class T>
inline AsyncProgressWorker<T>::AsyncProgressWorker(Napi::Env env,
const char* resource_name,
const Object& resource)
: AsyncWorker(env, resource_name, resource),
: AsyncProgressWorkerBase(env, resource_name, resource),
_asyncdata(nullptr),
_asyncsize(0) {
// TODO: Once the changes to make the callback optional for threadsafe
// functions are no longer optional we can remove the dummy Function here.
Function callback;
_tsfn = ThreadSafeFunction::New(env, callback, resource_name, 1, 1);
}
#endif

template<class T>
inline AsyncProgressWorker<T>::~AsyncProgressWorker() {
// Abort pending tsfn call.
// Don't send progress events after we've already completed.
_tsfn.Abort();
this->_tsfn.Abort();
{
std::lock_guard<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(this->_mutex);
_asyncdata = nullptr;
_asyncsize = 0;
}
_tsfn.Release();
this->_tsfn.Release();
}

template<class T>
Expand All @@ -4269,20 +4304,18 @@ inline void AsyncProgressWorker<T>::Execute() {
}

template<class T>
inline void AsyncProgressWorker<T>::WorkProgress_(Napi::Env /* env */, Napi::Function /* jsCallback */, void* _data) {
AsyncProgressWorker* self = static_cast<AsyncProgressWorker*>(_data);

inline void AsyncProgressWorker<T>::OnWorkProgress() {
T* data;
size_t size;
{
std::lock_guard<std::mutex> lock(self->_mutex);
data = self->_asyncdata;
size = self->_asyncsize;
self->_asyncdata = nullptr;
self->_asyncsize = 0;
std::lock_guard<std::mutex> lock(this->_mutex);
data = this->_asyncdata;
size = this->_asyncsize;
this->_asyncdata = nullptr;
this->_asyncsize = 0;
}

self->OnProgress(data, size);
this->OnProgress(data, size);
delete[] data;
}

Expand All @@ -4293,19 +4326,19 @@ inline void AsyncProgressWorker<T>::SendProgress_(const T* data, size_t count) {

T* old_data;
{
std::lock_guard<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(this->_mutex);
old_data = _asyncdata;
_asyncdata = new_data;
_asyncsize = count;
}
_tsfn.NonBlockingCall(this, WorkProgress_);
this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress);

delete[] old_data;
}

template<class T>
inline void AsyncProgressWorker<T>::Signal() const {
_tsfn.NonBlockingCall(this, WorkProgress_);
this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress);
}

template<class T>
Expand All @@ -4318,6 +4351,151 @@ inline void AsyncProgressWorker<T>::ExecutionProgress::Send(const T* data, size_
_worker->SendProgress_(data, count);
}

////////////////////////////////////////////////////////////////////////////////
// Async Progress Queue Worker class
////////////////////////////////////////////////////////////////////////////////
template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Function& callback)
: AsyncProgressQueueWorker(callback, "generic") {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Function& callback,
const char* resource_name)
: AsyncProgressQueueWorker(callback, resource_name, Object::New(callback.Env())) {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Function& callback,
const char* resource_name,
const Object& resource)
: AsyncProgressQueueWorker(Object::New(callback.Env()),
callback,
resource_name,
resource) {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Object& receiver,
const Function& callback)
: AsyncProgressQueueWorker(receiver, callback, "generic") {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Object& receiver,
const Function& callback,
const char* resource_name)
: AsyncProgressQueueWorker(receiver,
callback,
resource_name,
Object::New(callback.Env())) {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Object& receiver,
const Function& callback,
const char* resource_name,
const Object& resource)
: AsyncProgressWorkerBase(receiver, callback, resource_name, resource) {
}

#if NAPI_VERSION > 4
template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(Napi::Env env)
: AsyncProgressQueueWorker(env, "generic") {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(Napi::Env env,
const char* resource_name)
: AsyncProgressQueueWorker(env, resource_name, Object::New(env)) {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(Napi::Env env,
const char* resource_name,
const Object& resource)
: AsyncProgressWorkerBase(env, resource_name, resource) {
}
#endif

template<class T>
inline AsyncProgressQueueWorker<T>::~AsyncProgressQueueWorker() {
// Abort pending tsfn call.
// Don't send progress events after we've already completed.
this->_tsfn.Abort();
{
std::lock_guard<std::mutex> lock(this->_mutex);
while (!_asyncdata.empty()) {
std::pair<T*, size_t> &datapair = _asyncdata.front();
T *data = datapair.first;

_asyncdata.pop();

delete[] data;
}
}
this->_tsfn.Release();
}

template<class T>
inline void AsyncProgressQueueWorker<T>::Execute() {
ExecutionProgress progress(this);
Execute(progress);
}

template<class T>
inline void AsyncProgressQueueWorker<T>::OnWorkProgress() {
this->_mutex.lock();
while (!this->_asyncdata.empty()) {
std::pair<T*, size_t> &datapair = this->_asyncdata.front();

T *data = datapair.first;
size_t size = datapair.second;

this->_asyncdata.pop();
this->_mutex.unlock();

this->OnProgress(data, size);
delete[] data;

this->_mutex.lock();
}
this->_mutex.unlock();
}

template<class T>
inline void AsyncProgressQueueWorker<T>::SendProgress_(const T* data, size_t count) {
T* new_data = new T[count];
std::copy(data, data + count, new_data);

{
std::lock_guard<std::mutex> lock(this->_mutex);
_asyncdata.push(std::pair<T*, size_t>(new_data, count));
}
this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress);
}

template<class T>
inline void AsyncProgressQueueWorker<T>::Signal() const {
this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress);
}

template<class T>
inline void AsyncProgressQueueWorker<T>::OnWorkComplete(napi_env env, napi_status status) {
this->OnWorkProgress();
AsyncWorker::OnWorkComplete(env, status);
}

template<class T>
inline void AsyncProgressQueueWorker<T>::ExecutionProgress::Signal() const {
_worker->Signal();
}

template<class T>
inline void AsyncProgressQueueWorker<T>::ExecutionProgress::Send(const T* data, size_t count) const {
_worker->SendProgress_(data, count);
}
#endif

////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit dc63ed8

Please sign in to comment.