Skip to content

Commit

Permalink
tsfn: Implement copy constructor
Browse files Browse the repository at this point in the history
Removes the `unique_ptr` from `ThreadSafeFunction`, thereby allowing
copies.

Ref: nodejs#524
  • Loading branch information
KevinEady committed Sep 18, 2019
1 parent 6192e70 commit 1b42bfc
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 23 deletions.
35 changes: 18 additions & 17 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3970,31 +3970,32 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
}

inline ThreadSafeFunction::ThreadSafeFunction()
: _tsfn(new napi_threadsafe_function(nullptr), _d) {
: _tsfn() {
}

inline ThreadSafeFunction::ThreadSafeFunction(
napi_threadsafe_function tsfn)
: _tsfn(new napi_threadsafe_function(tsfn), _d) {
: _tsfn(tsfn) {
}

inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other)
: _tsfn(std::move(other._tsfn)) {
other._tsfn.reset();
}

inline ThreadSafeFunction::ThreadSafeFunction(const ThreadSafeFunction& other)
: _tsfn(other._tsfn) {
}

inline ThreadSafeFunction& ThreadSafeFunction::operator =(
ThreadSafeFunction&& other) {
if (*_tsfn != nullptr) {
Error::Fatal("ThreadSafeFunction::operator =",
"You cannot assign a new TSFN because existing one is still alive.");
return *this;
}
_tsfn = std::move(other._tsfn);
other._tsfn.reset();
_tsfn = other._tsfn;
return *this;
}

inline ThreadSafeFunction::operator napi_threadsafe_function() const {
return _tsfn;
}

inline napi_status ThreadSafeFunction::BlockingCall() const {
return CallInternal(nullptr, napi_tsfn_blocking);
}
Expand Down Expand Up @@ -4034,21 +4035,21 @@ inline napi_status ThreadSafeFunction::NonBlockingCall(
}

inline napi_status ThreadSafeFunction::Acquire() const {
return napi_acquire_threadsafe_function(*_tsfn);
return napi_acquire_threadsafe_function(_tsfn);
}

inline napi_status ThreadSafeFunction::Release() {
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_release);
return napi_release_threadsafe_function(_tsfn, napi_tsfn_release);
}

inline napi_status ThreadSafeFunction::Abort() {
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_abort);
return napi_release_threadsafe_function(_tsfn, napi_tsfn_abort);
}

inline ThreadSafeFunction::ConvertibleContext
ThreadSafeFunction::GetContext() const {
void* context;
napi_get_threadsafe_function_context(*_tsfn, &context);
napi_get_threadsafe_function_context(_tsfn, &context);
return ConvertibleContext({ context });
}

Expand All @@ -4071,10 +4072,10 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,

ThreadSafeFunction tsfn;
auto* finalizeData = new details::ThreadSafeFinalize<ContextType, Finalizer,
FinalizerDataType>({ data, finalizeCallback, tsfn._tsfn.get() });
FinalizerDataType>({ data, finalizeCallback, &tsfn._tsfn });
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount,
finalizeData, wrapper, context, CallJS, tsfn._tsfn.get());
finalizeData, wrapper, context, CallJS, &tsfn._tsfn);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
Expand All @@ -4087,7 +4088,7 @@ inline napi_status ThreadSafeFunction::CallInternal(
CallbackWrapper* callbackWrapper,
napi_threadsafe_function_call_mode mode) const {
napi_status status = napi_call_threadsafe_function(
*_tsfn, callbackWrapper, mode);
_tsfn, callbackWrapper, mode);
if (status != napi_ok && callbackWrapper != nullptr) {
delete callbackWrapper;
}
Expand Down
10 changes: 4 additions & 6 deletions napi.h
Original file line number Diff line number Diff line change
Expand Up @@ -1987,8 +1987,11 @@ namespace Napi {
ThreadSafeFunction(napi_threadsafe_function tsFunctionValue);

ThreadSafeFunction(ThreadSafeFunction&& other);
ThreadSafeFunction(const ThreadSafeFunction& other);
ThreadSafeFunction& operator=(ThreadSafeFunction&& other);

operator napi_threadsafe_function() const;

// This API may be called from any thread.
napi_status BlockingCall() const;

Expand Down Expand Up @@ -2053,13 +2056,8 @@ namespace Napi {
napi_value jsCallback,
void* context,
void* data);
struct Deleter {
// napi_threadsafe_function is managed by Node.js, leave it alone.
void operator()(napi_threadsafe_function*) const {};
};

std::unique_ptr<napi_threadsafe_function, Deleter> _tsfn;
Deleter _d;
napi_threadsafe_function _tsfn;
};
#endif

Expand Down
2 changes: 2 additions & 0 deletions test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Object InitObjectDeprecated(Env env);
Object InitPromise(Env env);
#if (NAPI_VERSION > 3)
Object InitThreadSafeFunctionPtr(Env env);
Object InitThreadSafeFunctionSum(Env env);
Object InitThreadSafeFunction(Env env);
#endif
Object InitTypedArray(Env env);
Expand Down Expand Up @@ -83,6 +84,7 @@ Object Init(Env env, Object exports) {
exports.Set("promise", InitPromise(env));
#if (NAPI_VERSION > 3)
exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env));
exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env));
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
#endif
exports.Set("typedarray", InitTypedArray(env));
Expand Down
1 change: 1 addition & 0 deletions test/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'object/set_property.cc',
'promise.cc',
'threadsafe_function/threadsafe_function_ptr.cc',
'threadsafe_function/threadsafe_function_sum.cc',
'threadsafe_function/threadsafe_function.cc',
'typedarray.cc',
'objectwrap.cc',
Expand Down
1 change: 1 addition & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ let testModules = [
'object/set_property',
'promise',
'threadsafe_function/threadsafe_function_ptr',
'threadsafe_function/threadsafe_function_sum',
'threadsafe_function/threadsafe_function',
'typedarray',
'typedarray-bigint',
Expand Down
152 changes: 152 additions & 0 deletions test/threadsafe_function/threadsafe_function_sum.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#include "napi.h"
#include <thread>
#include <future>

#if (NAPI_VERSION > 3)

using namespace Napi;
using namespace std;

namespace {

struct TestData {
// Native Promise returned to JavaScript
Promise::Deferred deferred;

// List of threads created for test. This list only ever accessed via main
// thread.
vector<thread> threads = {};

ThreadSafeFunction tsfn = ThreadSafeFunction();
};

// The no-context with finalizer + finalizer data overload is
// ambiguous. Workaround: use a lambda...

#define FinalizerCallback [](Napi::Env env, TestData* finalizeData){ \
for (size_t i = 0; i < finalizeData->threads.size(); ++i) { \
finalizeData->threads[i].join(); \
} \
finalizeData->deferred.Resolve(Boolean::New(env,true)); \
delete finalizeData; \
}

/**
* See threadsafe_function_sum.js for descriptions of the tests in this file
*/

void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value TestWithTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();

// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData *testData = new TestData({
Promise::Deferred::New(info.Env())
});

ThreadSafeFunction tsfn = ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount, FinalizerCallback, testData);

for (int i = 0; i < threadCount; ++i) {
testData->threads.push_back( thread(entryWithTSFN, tsfn, i) );
}

return testData->deferred.Promise();
}


void entryDelayedTSFN(std::future<ThreadSafeFunction> tsfnFuture, int threadId) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
ThreadSafeFunction tsfn = tsfnFuture.get();
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value TestDelayedTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();

TestData *testData = new TestData({
Promise::Deferred::New(info.Env())
});

vector< std::promise<ThreadSafeFunction> > tsfnPromises;

for (int i = 0; i < threadCount; ++i) {
tsfnPromises.emplace_back();
testData->threads.push_back( thread(entryDelayedTSFN, tsfnPromises[i].get_future(), i) );
}

testData->tsfn = ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount, FinalizerCallback, testData);

for (int i = 0; i < threadCount; ++i) {
tsfnPromises[i].set_value(testData->tsfn);
}

return testData->deferred.Promise();
}

void entryAcquire(ThreadSafeFunction tsfn, int threadId) {
tsfn.Acquire();
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
callback.Call( { Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}

static Value CreateThread(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
ThreadSafeFunction tsfn = testData->tsfn;
int threadId = testData->threads.size();
testData->threads.push_back( thread(entryAcquire, tsfn, threadId) );
return Number::New(info.Env(), threadId);
}

static Value StopThreads(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
ThreadSafeFunction tsfn = testData->tsfn;
tsfn.Release();
return info.Env().Undefined();
}

static Value TestAcquire(const CallbackInfo& info) {
Function cb = info[0].As<Function>();
Napi::Env env = info.Env();

// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData *testData = new TestData({
Promise::Deferred::New(env)
});

testData->tsfn = ThreadSafeFunction::New(env, cb, "Test", 0, 1, FinalizerCallback, testData);

Object result = Object::New(env);
result["createThread"] = Function::New( env, CreateThread, "createThread", testData);
result["stopThreads"] = Function::New( env, StopThreads, "stopThreads", testData);
result["promise"] = testData->deferred.Promise();

return result;
}
}

Object InitThreadSafeFunctionSum(Env env) {
Object exports = Object::New(env);
exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN);
exports["testWithTSFN"] = Function::New(env, TestWithTSFN);
exports["testAcquire"] = Function::New(env, TestAcquire);
return exports;
}

#endif
65 changes: 65 additions & 0 deletions test/threadsafe_function/threadsafe_function_sum.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
'use strict';
const assert = require('assert');
const buildType = process.config.target_defaults.default_configuration;

/**
*
* ThreadSafeFunction Tests: Thread Id Sums
*
* Every native C++ function that utilizes the TSFN will call the registered
* callback with the thread id. Passing Array.prototype.push with a bound array
* will push the thread id to the array. Therefore, starting `N` threads, we
* will expect the sum of all elements in the array to be `(N-1) * (N) / 2` (as
* thread IDs are 0-based)
*
* We check different methods of passing a ThreadSafeFunction around multiple
* threads:
* - `testWithTSFN`: The main thread creates the TSFN. Then, it creates
* threads, passing the TSFN at thread construction. The number of threads is
* static (known at TSFN creation).
* - `testDelayedTSFN`: The main thread creates threads, passing a promise to a
* TSFN at construction. Then, it creates the TSFN, and resolves each
* threads' promise. The number of threads is static.
* - `testAcquire`: The native binding returns a function to start a new. A
* call to this function will return `false` once `N` calls have been made.
* Each thread will acquire its own use of the TSFN, call it, and then
* release.
*/

const THREAD_COUNT = 5;
const EXPECTED_SUM = (THREAD_COUNT - 1) * (THREAD_COUNT) / 2;

module.exports = Promise.all([
test(require(`../build/${buildType}/binding.node`)),
test(require(`../build/${buildType}/binding_noexcept.node`))
]);

/** @param {number[]} N */
const sum = (N) => N.reduce((sum, n) => sum + n, 0);

function test(binding) {
async function check(bindingFunction) {
const calls = [];
const result = await bindingFunction(THREAD_COUNT, Array.prototype.push.bind(calls));
assert.ok(result);
assert.equal(sum(calls), EXPECTED_SUM);
}

async function checkAcquire() {
const calls = [];
const { promise, createThread, stopThreads } = binding.threadsafe_function_sum.testAcquire(Array.prototype.push.bind(calls));
for (let i = 0; i < THREAD_COUNT; i++) {
createThread();
}
stopThreads();
const result = await promise;
assert.ok(result);
assert.equal(sum(calls), EXPECTED_SUM);
}

return Promise.all([
check(binding.threadsafe_function_sum.testDelayedTSFN),
check(binding.threadsafe_function_sum.testWithTSFN),
checkAcquire()
]);
}

0 comments on commit 1b42bfc

Please sign in to comment.