From 1b42bfc62217edb20b093aadfd46a712e1d4a026 Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Wed, 18 Sep 2019 23:27:23 +0200 Subject: [PATCH 01/10] tsfn: Implement copy constructor Removes the `unique_ptr` from `ThreadSafeFunction`, thereby allowing copies. Ref: https://github.com/nodejs/node-addon-api/issues/524 --- napi-inl.h | 35 ++-- napi.h | 10 +- test/binding.cc | 2 + test/binding.gyp | 1 + test/index.js | 1 + .../threadsafe_function_sum.cc | 152 ++++++++++++++++++ .../threadsafe_function_sum.js | 65 ++++++++ 7 files changed, 243 insertions(+), 23 deletions(-) create mode 100644 test/threadsafe_function/threadsafe_function_sum.cc create mode 100644 test/threadsafe_function/threadsafe_function_sum.js diff --git a/napi-inl.h b/napi-inl.h index 64315aea6..f993fe30e 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -3970,31 +3970,32 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, } inline ThreadSafeFunction::ThreadSafeFunction() - : _tsfn(new napi_threadsafe_function(nullptr), _d) { + : _tsfn() { } inline ThreadSafeFunction::ThreadSafeFunction( napi_threadsafe_function tsfn) - : _tsfn(new napi_threadsafe_function(tsfn), _d) { + : _tsfn(tsfn) { } inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other) : _tsfn(std::move(other._tsfn)) { - other._tsfn.reset(); +} + +inline ThreadSafeFunction::ThreadSafeFunction(const ThreadSafeFunction& other) + : _tsfn(other._tsfn) { } inline ThreadSafeFunction& ThreadSafeFunction::operator =( ThreadSafeFunction&& other) { - if (*_tsfn != nullptr) { - Error::Fatal("ThreadSafeFunction::operator =", - "You cannot assign a new TSFN because existing one is still alive."); - return *this; - } - _tsfn = std::move(other._tsfn); - other._tsfn.reset(); + _tsfn = other._tsfn; return *this; } +inline ThreadSafeFunction::operator napi_threadsafe_function() const { + return _tsfn; +} + inline napi_status ThreadSafeFunction::BlockingCall() const { return CallInternal(nullptr, napi_tsfn_blocking); } @@ -4034,21 +4035,21 @@ inline napi_status ThreadSafeFunction::NonBlockingCall( } inline napi_status ThreadSafeFunction::Acquire() const { - return napi_acquire_threadsafe_function(*_tsfn); + return napi_acquire_threadsafe_function(_tsfn); } inline napi_status ThreadSafeFunction::Release() { - return napi_release_threadsafe_function(*_tsfn, napi_tsfn_release); + return napi_release_threadsafe_function(_tsfn, napi_tsfn_release); } inline napi_status ThreadSafeFunction::Abort() { - return napi_release_threadsafe_function(*_tsfn, napi_tsfn_abort); + return napi_release_threadsafe_function(_tsfn, napi_tsfn_abort); } inline ThreadSafeFunction::ConvertibleContext ThreadSafeFunction::GetContext() const { void* context; - napi_get_threadsafe_function_context(*_tsfn, &context); + napi_get_threadsafe_function_context(_tsfn, &context); return ConvertibleContext({ context }); } @@ -4071,10 +4072,10 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, ThreadSafeFunction tsfn; auto* finalizeData = new details::ThreadSafeFinalize({ data, finalizeCallback, tsfn._tsfn.get() }); + FinalizerDataType>({ data, finalizeCallback, &tsfn._tsfn }); napi_status status = napi_create_threadsafe_function(env, callback, resource, Value::From(env, resourceName), maxQueueSize, initialThreadCount, - finalizeData, wrapper, context, CallJS, tsfn._tsfn.get()); + finalizeData, wrapper, context, CallJS, &tsfn._tsfn); if (status != napi_ok) { delete finalizeData; NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction()); @@ -4087,7 +4088,7 @@ inline napi_status ThreadSafeFunction::CallInternal( CallbackWrapper* callbackWrapper, napi_threadsafe_function_call_mode mode) const { napi_status status = napi_call_threadsafe_function( - *_tsfn, callbackWrapper, mode); + _tsfn, callbackWrapper, mode); if (status != napi_ok && callbackWrapper != nullptr) { delete callbackWrapper; } diff --git a/napi.h b/napi.h index 921aaa8bc..7da60ea1b 100644 --- a/napi.h +++ b/napi.h @@ -1987,8 +1987,11 @@ namespace Napi { ThreadSafeFunction(napi_threadsafe_function tsFunctionValue); ThreadSafeFunction(ThreadSafeFunction&& other); + ThreadSafeFunction(const ThreadSafeFunction& other); ThreadSafeFunction& operator=(ThreadSafeFunction&& other); + operator napi_threadsafe_function() const; + // This API may be called from any thread. napi_status BlockingCall() const; @@ -2053,13 +2056,8 @@ namespace Napi { napi_value jsCallback, void* context, void* data); - struct Deleter { - // napi_threadsafe_function is managed by Node.js, leave it alone. - void operator()(napi_threadsafe_function*) const {}; - }; - std::unique_ptr _tsfn; - Deleter _d; + napi_threadsafe_function _tsfn; }; #endif diff --git a/test/binding.cc b/test/binding.cc index dca00771a..51a1aa062 100644 --- a/test/binding.cc +++ b/test/binding.cc @@ -38,6 +38,7 @@ Object InitObjectDeprecated(Env env); Object InitPromise(Env env); #if (NAPI_VERSION > 3) Object InitThreadSafeFunctionPtr(Env env); +Object InitThreadSafeFunctionSum(Env env); Object InitThreadSafeFunction(Env env); #endif Object InitTypedArray(Env env); @@ -83,6 +84,7 @@ Object Init(Env env, Object exports) { exports.Set("promise", InitPromise(env)); #if (NAPI_VERSION > 3) exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env)); + exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env)); exports.Set("threadsafe_function", InitThreadSafeFunction(env)); #endif exports.Set("typedarray", InitTypedArray(env)); diff --git a/test/binding.gyp b/test/binding.gyp index f8c1cb0a8..c5768d0b8 100644 --- a/test/binding.gyp +++ b/test/binding.gyp @@ -34,6 +34,7 @@ 'object/set_property.cc', 'promise.cc', 'threadsafe_function/threadsafe_function_ptr.cc', + 'threadsafe_function/threadsafe_function_sum.cc', 'threadsafe_function/threadsafe_function.cc', 'typedarray.cc', 'objectwrap.cc', diff --git a/test/index.js b/test/index.js index d03c2e5b7..acd08c3d4 100644 --- a/test/index.js +++ b/test/index.js @@ -38,6 +38,7 @@ let testModules = [ 'object/set_property', 'promise', 'threadsafe_function/threadsafe_function_ptr', + 'threadsafe_function/threadsafe_function_sum', 'threadsafe_function/threadsafe_function', 'typedarray', 'typedarray-bigint', diff --git a/test/threadsafe_function/threadsafe_function_sum.cc b/test/threadsafe_function/threadsafe_function_sum.cc new file mode 100644 index 000000000..f49f44bb2 --- /dev/null +++ b/test/threadsafe_function/threadsafe_function_sum.cc @@ -0,0 +1,152 @@ +#include "napi.h" +#include +#include + +#if (NAPI_VERSION > 3) + +using namespace Napi; +using namespace std; + +namespace { + +struct TestData { + // Native Promise returned to JavaScript + Promise::Deferred deferred; + + // List of threads created for test. This list only ever accessed via main + // thread. + vector threads = {}; + + ThreadSafeFunction tsfn = ThreadSafeFunction(); +}; + +// The no-context with finalizer + finalizer data overload is +// ambiguous. Workaround: use a lambda... + +#define FinalizerCallback [](Napi::Env env, TestData* finalizeData){ \ + for (size_t i = 0; i < finalizeData->threads.size(); ++i) { \ + finalizeData->threads[i].join(); \ + } \ + finalizeData->deferred.Resolve(Boolean::New(env,true)); \ + delete finalizeData; \ +} + +/** + * See threadsafe_function_sum.js for descriptions of the tests in this file + */ + +void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) { + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + tsfn.BlockingCall( [=](Napi::Env env, Function callback) { + callback.Call( { Number::New(env, static_cast(threadId))}); + }); + tsfn.Release(); +} + +static Value TestWithTSFN(const CallbackInfo& info) { + int threadCount = info[0].As().Int32Value(); + Function cb = info[1].As(); + + // We pass the test data to the Finalizer for cleanup. The finalizer is + // responsible for deleting this data as well. + TestData *testData = new TestData({ + Promise::Deferred::New(info.Env()) + }); + + ThreadSafeFunction tsfn = ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount, FinalizerCallback, testData); + + for (int i = 0; i < threadCount; ++i) { + testData->threads.push_back( thread(entryWithTSFN, tsfn, i) ); + } + + return testData->deferred.Promise(); +} + + +void entryDelayedTSFN(std::future tsfnFuture, int threadId) { + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + ThreadSafeFunction tsfn = tsfnFuture.get(); + tsfn.BlockingCall( [=](Napi::Env env, Function callback) { + callback.Call( { Number::New(env, static_cast(threadId))}); + }); + tsfn.Release(); +} + +static Value TestDelayedTSFN(const CallbackInfo& info) { + int threadCount = info[0].As().Int32Value(); + Function cb = info[1].As(); + + TestData *testData = new TestData({ + Promise::Deferred::New(info.Env()) + }); + + vector< std::promise > tsfnPromises; + + for (int i = 0; i < threadCount; ++i) { + tsfnPromises.emplace_back(); + testData->threads.push_back( thread(entryDelayedTSFN, tsfnPromises[i].get_future(), i) ); + } + + testData->tsfn = ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount, FinalizerCallback, testData); + + for (int i = 0; i < threadCount; ++i) { + tsfnPromises[i].set_value(testData->tsfn); + } + + return testData->deferred.Promise(); +} + +void entryAcquire(ThreadSafeFunction tsfn, int threadId) { + tsfn.Acquire(); + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + tsfn.BlockingCall( [=](Napi::Env env, Function callback) { + callback.Call( { Number::New(env, static_cast(threadId))}); + }); + tsfn.Release(); +} + +static Value CreateThread(const CallbackInfo& info) { + TestData* testData = static_cast(info.Data()); + ThreadSafeFunction tsfn = testData->tsfn; + int threadId = testData->threads.size(); + testData->threads.push_back( thread(entryAcquire, tsfn, threadId) ); + return Number::New(info.Env(), threadId); +} + +static Value StopThreads(const CallbackInfo& info) { + TestData* testData = static_cast(info.Data()); + ThreadSafeFunction tsfn = testData->tsfn; + tsfn.Release(); + return info.Env().Undefined(); +} + +static Value TestAcquire(const CallbackInfo& info) { + Function cb = info[0].As(); + Napi::Env env = info.Env(); + + // We pass the test data to the Finalizer for cleanup. The finalizer is + // responsible for deleting this data as well. + TestData *testData = new TestData({ + Promise::Deferred::New(env) + }); + + testData->tsfn = ThreadSafeFunction::New(env, cb, "Test", 0, 1, FinalizerCallback, testData); + + Object result = Object::New(env); + result["createThread"] = Function::New( env, CreateThread, "createThread", testData); + result["stopThreads"] = Function::New( env, StopThreads, "stopThreads", testData); + result["promise"] = testData->deferred.Promise(); + + return result; +} +} + +Object InitThreadSafeFunctionSum(Env env) { + Object exports = Object::New(env); + exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN); + exports["testWithTSFN"] = Function::New(env, TestWithTSFN); + exports["testAcquire"] = Function::New(env, TestAcquire); + return exports; +} + +#endif diff --git a/test/threadsafe_function/threadsafe_function_sum.js b/test/threadsafe_function/threadsafe_function_sum.js new file mode 100644 index 000000000..4323dabeb --- /dev/null +++ b/test/threadsafe_function/threadsafe_function_sum.js @@ -0,0 +1,65 @@ +'use strict'; +const assert = require('assert'); +const buildType = process.config.target_defaults.default_configuration; + +/** + * + * ThreadSafeFunction Tests: Thread Id Sums + * + * Every native C++ function that utilizes the TSFN will call the registered + * callback with the thread id. Passing Array.prototype.push with a bound array + * will push the thread id to the array. Therefore, starting `N` threads, we + * will expect the sum of all elements in the array to be `(N-1) * (N) / 2` (as + * thread IDs are 0-based) + * + * We check different methods of passing a ThreadSafeFunction around multiple + * threads: + * - `testWithTSFN`: The main thread creates the TSFN. Then, it creates + * threads, passing the TSFN at thread construction. The number of threads is + * static (known at TSFN creation). + * - `testDelayedTSFN`: The main thread creates threads, passing a promise to a + * TSFN at construction. Then, it creates the TSFN, and resolves each + * threads' promise. The number of threads is static. + * - `testAcquire`: The native binding returns a function to start a new. A + * call to this function will return `false` once `N` calls have been made. + * Each thread will acquire its own use of the TSFN, call it, and then + * release. + */ + +const THREAD_COUNT = 5; +const EXPECTED_SUM = (THREAD_COUNT - 1) * (THREAD_COUNT) / 2; + +module.exports = Promise.all([ + test(require(`../build/${buildType}/binding.node`)), + test(require(`../build/${buildType}/binding_noexcept.node`)) +]); + +/** @param {number[]} N */ +const sum = (N) => N.reduce((sum, n) => sum + n, 0); + +function test(binding) { + async function check(bindingFunction) { + const calls = []; + const result = await bindingFunction(THREAD_COUNT, Array.prototype.push.bind(calls)); + assert.ok(result); + assert.equal(sum(calls), EXPECTED_SUM); + } + + async function checkAcquire() { + const calls = []; + const { promise, createThread, stopThreads } = binding.threadsafe_function_sum.testAcquire(Array.prototype.push.bind(calls)); + for (let i = 0; i < THREAD_COUNT; i++) { + createThread(); + } + stopThreads(); + const result = await promise; + assert.ok(result); + assert.equal(sum(calls), EXPECTED_SUM); + } + + return Promise.all([ + check(binding.threadsafe_function_sum.testDelayedTSFN), + check(binding.threadsafe_function_sum.testWithTSFN), + checkAcquire() + ]); +} From e10e683a078a53be6984d2602e6d3be5da0d5d69 Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Wed, 2 Oct 2019 21:04:19 +0200 Subject: [PATCH 02/10] change FinalizerCallback to static method --- .../threadsafe_function_sum.cc | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/test/threadsafe_function/threadsafe_function_sum.cc b/test/threadsafe_function/threadsafe_function_sum.cc index f49f44bb2..371cfe73c 100644 --- a/test/threadsafe_function/threadsafe_function_sum.cc +++ b/test/threadsafe_function/threadsafe_function_sum.cc @@ -20,40 +20,39 @@ struct TestData { ThreadSafeFunction tsfn = ThreadSafeFunction(); }; -// The no-context with finalizer + finalizer data overload is -// ambiguous. Workaround: use a lambda... - -#define FinalizerCallback [](Napi::Env env, TestData* finalizeData){ \ - for (size_t i = 0; i < finalizeData->threads.size(); ++i) { \ - finalizeData->threads[i].join(); \ - } \ - finalizeData->deferred.Resolve(Boolean::New(env,true)); \ - delete finalizeData; \ +void FinalizerCallback(Napi::Env env, TestData* finalizeData){ + for (size_t i = 0; i < finalizeData->threads.size(); ++i) { + finalizeData->threads[i].join(); + } + finalizeData->deferred.Resolve(Boolean::New(env,true)); + delete finalizeData; } -/** +/** * See threadsafe_function_sum.js for descriptions of the tests in this file */ void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) { - std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); tsfn.BlockingCall( [=](Napi::Env env, Function callback) { callback.Call( { Number::New(env, static_cast(threadId))}); }); tsfn.Release(); } -static Value TestWithTSFN(const CallbackInfo& info) { +static Value TestWithTSFN(const CallbackInfo& info) { int threadCount = info[0].As().Int32Value(); Function cb = info[1].As(); - // We pass the test data to the Finalizer for cleanup. The finalizer is + // We pass the test data to the Finalizer for cleanup. The finalizer is // responsible for deleting this data as well. TestData *testData = new TestData({ Promise::Deferred::New(info.Env()) }); - ThreadSafeFunction tsfn = ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount, FinalizerCallback, testData); + ThreadSafeFunction tsfn = ThreadSafeFunction::New( + info.Env(), cb, "Test", 0, threadCount, + std::function(FinalizerCallback), testData); for (int i = 0; i < threadCount; ++i) { testData->threads.push_back( thread(entryWithTSFN, tsfn, i) ); @@ -64,7 +63,7 @@ static Value TestWithTSFN(const CallbackInfo& info) { void entryDelayedTSFN(std::future tsfnFuture, int threadId) { - std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); ThreadSafeFunction tsfn = tsfnFuture.get(); tsfn.BlockingCall( [=](Napi::Env env, Function callback) { callback.Call( { Number::New(env, static_cast(threadId))}); @@ -72,7 +71,7 @@ void entryDelayedTSFN(std::future tsfnFuture, int threadId) tsfn.Release(); } -static Value TestDelayedTSFN(const CallbackInfo& info) { +static Value TestDelayedTSFN(const CallbackInfo& info) { int threadCount = info[0].As().Int32Value(); Function cb = info[1].As(); @@ -87,7 +86,9 @@ static Value TestDelayedTSFN(const CallbackInfo& info) { testData->threads.push_back( thread(entryDelayedTSFN, tsfnPromises[i].get_future(), i) ); } - testData->tsfn = ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount, FinalizerCallback, testData); + testData->tsfn = ThreadSafeFunction::New( + info.Env(), cb, "Test", 0, threadCount, + std::function(FinalizerCallback), testData); for (int i = 0; i < threadCount; ++i) { tsfnPromises[i].set_value(testData->tsfn); @@ -98,7 +99,7 @@ static Value TestDelayedTSFN(const CallbackInfo& info) { void entryAcquire(ThreadSafeFunction tsfn, int threadId) { tsfn.Acquire(); - std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); tsfn.BlockingCall( [=](Napi::Env env, Function callback) { callback.Call( { Number::New(env, static_cast(threadId))}); }); @@ -120,17 +121,19 @@ static Value StopThreads(const CallbackInfo& info) { return info.Env().Undefined(); } -static Value TestAcquire(const CallbackInfo& info) { +static Value TestAcquire(const CallbackInfo& info) { Function cb = info[0].As(); Napi::Env env = info.Env(); - // We pass the test data to the Finalizer for cleanup. The finalizer is + // We pass the test data to the Finalizer for cleanup. The finalizer is // responsible for deleting this data as well. TestData *testData = new TestData({ Promise::Deferred::New(env) }); - testData->tsfn = ThreadSafeFunction::New(env, cb, "Test", 0, 1, FinalizerCallback, testData); + testData->tsfn = ThreadSafeFunction::New( + env, cb, "Test", 0, 1, + std::function(FinalizerCallback), testData); Object result = Object::New(env); result["createThread"] = Function::New( env, CreateThread, "createThread", testData); From dfa9e5f52d15806d3141ef3ffdede58fd8dd90e6 Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Wed, 9 Oct 2019 17:50:43 +0200 Subject: [PATCH 03/10] tsfn: add copy ctor comments to test --- test/threadsafe_function/threadsafe_function_sum.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/threadsafe_function/threadsafe_function_sum.cc b/test/threadsafe_function/threadsafe_function_sum.cc index 371cfe73c..81de851f3 100644 --- a/test/threadsafe_function/threadsafe_function_sum.cc +++ b/test/threadsafe_function/threadsafe_function_sum.cc @@ -55,6 +55,7 @@ static Value TestWithTSFN(const CallbackInfo& info) { std::function(FinalizerCallback), testData); for (int i = 0; i < threadCount; ++i) { + // A copy of the ThreadSafeFunction will go to the thread entry point testData->threads.push_back( thread(entryWithTSFN, tsfn, i) ); } @@ -110,6 +111,7 @@ static Value CreateThread(const CallbackInfo& info) { TestData* testData = static_cast(info.Data()); ThreadSafeFunction tsfn = testData->tsfn; int threadId = testData->threads.size(); + // A copy of the ThreadSafeFunction will go to the thread entry point testData->threads.push_back( thread(entryAcquire, tsfn, threadId) ); return Number::New(info.Env(), threadId); } From 815c1fea31162bbb3324c1558ae1793d6a492ae8 Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Thu, 24 Oct 2019 20:20:48 +0200 Subject: [PATCH 04/10] tsfn: remove unnecessary constructors, operators --- napi-inl.h | 14 -------------- napi.h | 4 ---- 2 files changed, 18 deletions(-) diff --git a/napi-inl.h b/napi-inl.h index 5ef809323..0e417c370 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -4026,20 +4026,6 @@ inline ThreadSafeFunction::ThreadSafeFunction( : _tsfn(tsfn) { } -inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other) - : _tsfn(std::move(other._tsfn)) { -} - -inline ThreadSafeFunction::ThreadSafeFunction(const ThreadSafeFunction& other) - : _tsfn(other._tsfn) { -} - -inline ThreadSafeFunction& ThreadSafeFunction::operator =( - ThreadSafeFunction&& other) { - _tsfn = other._tsfn; - return *this; -} - inline ThreadSafeFunction::operator napi_threadsafe_function() const { return _tsfn; } diff --git a/napi.h b/napi.h index e216bf797..2ce60eecc 100644 --- a/napi.h +++ b/napi.h @@ -2008,10 +2008,6 @@ namespace Napi { ThreadSafeFunction(); ThreadSafeFunction(napi_threadsafe_function tsFunctionValue); - ThreadSafeFunction(ThreadSafeFunction&& other); - ThreadSafeFunction(const ThreadSafeFunction& other); - ThreadSafeFunction& operator=(ThreadSafeFunction&& other); - operator napi_threadsafe_function() const; // This API may be called from any thread. From 3219515ace4b37c3cb1a313e93f397ccac81f70b Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Thu, 24 Oct 2019 20:23:10 +0200 Subject: [PATCH 05/10] test: change tsfn test data construction --- .../threadsafe_function_sum.cc | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/test/threadsafe_function/threadsafe_function_sum.cc b/test/threadsafe_function/threadsafe_function_sum.cc index 81de851f3..94dbf9a98 100644 --- a/test/threadsafe_function/threadsafe_function_sum.cc +++ b/test/threadsafe_function/threadsafe_function_sum.cc @@ -10,6 +10,9 @@ using namespace std; namespace { struct TestData { + + TestData(Promise::Deferred&& deferred) : deferred(std::move(deferred)) {}; + // Native Promise returned to JavaScript Promise::Deferred deferred; @@ -46,9 +49,7 @@ static Value TestWithTSFN(const CallbackInfo& info) { // We pass the test data to the Finalizer for cleanup. The finalizer is // responsible for deleting this data as well. - TestData *testData = new TestData({ - Promise::Deferred::New(info.Env()) - }); + TestData *testData = new TestData(Promise::Deferred::New(info.Env())); ThreadSafeFunction tsfn = ThreadSafeFunction::New( info.Env(), cb, "Test", 0, threadCount, @@ -76,9 +77,7 @@ static Value TestDelayedTSFN(const CallbackInfo& info) { int threadCount = info[0].As().Int32Value(); Function cb = info[1].As(); - TestData *testData = new TestData({ - Promise::Deferred::New(info.Env()) - }); + TestData *testData = new TestData(Promise::Deferred::New(info.Env())); vector< std::promise > tsfnPromises; @@ -129,9 +128,7 @@ static Value TestAcquire(const CallbackInfo& info) { // We pass the test data to the Finalizer for cleanup. The finalizer is // responsible for deleting this data as well. - TestData *testData = new TestData({ - Promise::Deferred::New(env) - }); + TestData *testData = new TestData(Promise::Deferred::New(info.Env())); testData->tsfn = ThreadSafeFunction::New( env, cb, "Test", 0, 1, From 0e9d4aeb6e35835466b00de5a89008e5f2df2101 Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Thu, 24 Oct 2019 21:04:18 +0200 Subject: [PATCH 06/10] test: fix tsfn tests by napi version --- test/index.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/index.js b/test/index.js index 4c546f75b..0b1e9e11d 100644 --- a/test/index.js +++ b/test/index.js @@ -70,6 +70,8 @@ if ((process.env.npm_config_NAPI_VERSION !== undefined) && if ((process.env.npm_config_NAPI_VERSION !== undefined) && (process.env.npm_config_NAPI_VERSION < 4)) { testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ptr'), 1); + testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_sum'), 1); + testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_unref'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function'), 1); } From cba39bfd0e449c2adf48214c05b8cac65b58bfe8 Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Fri, 25 Oct 2019 10:44:19 +0200 Subject: [PATCH 07/10] test: use explicit std qualifiers in tsfn test --- test/threadsafe_function/threadsafe_function_sum.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/threadsafe_function/threadsafe_function_sum.cc b/test/threadsafe_function/threadsafe_function_sum.cc index 94dbf9a98..30c8a12e3 100644 --- a/test/threadsafe_function/threadsafe_function_sum.cc +++ b/test/threadsafe_function/threadsafe_function_sum.cc @@ -1,11 +1,11 @@ #include "napi.h" #include +#include #include #if (NAPI_VERSION > 3) using namespace Napi; -using namespace std; namespace { @@ -18,7 +18,7 @@ struct TestData { // List of threads created for test. This list only ever accessed via main // thread. - vector threads = {}; + std::vector threads = {}; ThreadSafeFunction tsfn = ThreadSafeFunction(); }; @@ -57,7 +57,7 @@ static Value TestWithTSFN(const CallbackInfo& info) { for (int i = 0; i < threadCount; ++i) { // A copy of the ThreadSafeFunction will go to the thread entry point - testData->threads.push_back( thread(entryWithTSFN, tsfn, i) ); + testData->threads.push_back( std::thread(entryWithTSFN, tsfn, i) ); } return testData->deferred.Promise(); @@ -79,11 +79,11 @@ static Value TestDelayedTSFN(const CallbackInfo& info) { TestData *testData = new TestData(Promise::Deferred::New(info.Env())); - vector< std::promise > tsfnPromises; + std::vector< std::promise > tsfnPromises; for (int i = 0; i < threadCount; ++i) { tsfnPromises.emplace_back(); - testData->threads.push_back( thread(entryDelayedTSFN, tsfnPromises[i].get_future(), i) ); + testData->threads.push_back( std::thread(entryDelayedTSFN, tsfnPromises[i].get_future(), i) ); } testData->tsfn = ThreadSafeFunction::New( @@ -111,7 +111,7 @@ static Value CreateThread(const CallbackInfo& info) { ThreadSafeFunction tsfn = testData->tsfn; int threadId = testData->threads.size(); // A copy of the ThreadSafeFunction will go to the thread entry point - testData->threads.push_back( thread(entryAcquire, tsfn, threadId) ); + testData->threads.push_back( std::thread(entryAcquire, tsfn, threadId) ); return Number::New(info.Env(), threadId); } From 5b83cf2bf128ca744e0fbbba972d4d750075aee9 Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Tue, 29 Oct 2019 15:13:42 +0100 Subject: [PATCH 08/10] tsfn: use mutex, cv for tests vs promise --- .../threadsafe_function_sum.cc | 80 ++++++++++++++----- 1 file changed, 62 insertions(+), 18 deletions(-) diff --git a/test/threadsafe_function/threadsafe_function_sum.cc b/test/threadsafe_function/threadsafe_function_sum.cc index 30c8a12e3..d42267a49 100644 --- a/test/threadsafe_function/threadsafe_function_sum.cc +++ b/test/threadsafe_function/threadsafe_function_sum.cc @@ -63,35 +63,79 @@ static Value TestWithTSFN(const CallbackInfo& info) { return testData->deferred.Promise(); } +// Task instance created for each new std::thread +class DelayedTSFNTask { +public: + // Each instance has its own tsfn + ThreadSafeFunction tsfn; + + // Thread-safety + std::mutex mtx; + std::condition_variable cv; + + // Entry point for std::thread + void entryDelayedTSFN(int threadId) { + std::unique_lock lk(mtx); + cv.wait(lk); + tsfn.BlockingCall([=](Napi::Env env, Function callback) { + callback.Call({Number::New(env, static_cast(threadId))}); + }); + tsfn.Release(); + }; +}; -void entryDelayedTSFN(std::future tsfnFuture, int threadId) { - std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); - ThreadSafeFunction tsfn = tsfnFuture.get(); - tsfn.BlockingCall( [=](Napi::Env env, Function callback) { - callback.Call( { Number::New(env, static_cast(threadId))}); - }); - tsfn.Release(); +struct TestDataDelayed { + + TestDataDelayed(Promise::Deferred &&deferred) + : deferred(std::move(deferred)){}; + ~TestDataDelayed() { taskInsts.clear(); }; + // Native Promise returned to JavaScript + Promise::Deferred deferred; + + // List of threads created for test. This list only ever accessed via main + // thread. + std::vector threads = {}; + + // List of DelayedTSFNThread instances + std::vector> taskInsts = {}; + + ThreadSafeFunction tsfn = ThreadSafeFunction(); +}; + +void FinalizerCallbackDelayed(Napi::Env env, TestDataDelayed *finalizeData) { + for (size_t i = 0; i < finalizeData->threads.size(); ++i) { + finalizeData->threads[i].join(); + } + finalizeData->deferred.Resolve(Boolean::New(env, true)); + delete finalizeData; } -static Value TestDelayedTSFN(const CallbackInfo& info) { +static Value TestDelayedTSFN(const CallbackInfo &info) { int threadCount = info[0].As().Int32Value(); Function cb = info[1].As(); - TestData *testData = new TestData(Promise::Deferred::New(info.Env())); + TestDataDelayed *testData = + new TestDataDelayed(Promise::Deferred::New(info.Env())); - std::vector< std::promise > tsfnPromises; + testData->tsfn = + ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount, + std::function( + FinalizerCallbackDelayed), + testData); for (int i = 0; i < threadCount; ++i) { - tsfnPromises.emplace_back(); - testData->threads.push_back( std::thread(entryDelayedTSFN, tsfnPromises[i].get_future(), i) ); + testData->taskInsts.push_back( + std::unique_ptr(new DelayedTSFNTask())); + testData->threads.push_back(std::thread(&DelayedTSFNTask::entryDelayedTSFN, + testData->taskInsts.back().get(), + i)); } + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); - testData->tsfn = ThreadSafeFunction::New( - info.Env(), cb, "Test", 0, threadCount, - std::function(FinalizerCallback), testData); - - for (int i = 0; i < threadCount; ++i) { - tsfnPromises[i].set_value(testData->tsfn); + for (auto &task : testData->taskInsts) { + std::lock_guard lk(task->mtx); + task->tsfn = testData->tsfn; + task->cv.notify_all(); } return testData->deferred.Promise(); From 4b3d13435671b55d3c4f981446cf12bd72efa37a Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Wed, 30 Oct 2019 15:32:43 +0100 Subject: [PATCH 09/10] test: remove unnecessary include --- test/threadsafe_function/threadsafe_function_sum.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/test/threadsafe_function/threadsafe_function_sum.cc b/test/threadsafe_function/threadsafe_function_sum.cc index d42267a49..f46a28bd7 100644 --- a/test/threadsafe_function/threadsafe_function_sum.cc +++ b/test/threadsafe_function/threadsafe_function_sum.cc @@ -1,7 +1,6 @@ #include "napi.h" #include #include -#include #if (NAPI_VERSION > 3) From bcdc46c143b81a5c612e3aa6e65bac4ada039d34 Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Wed, 30 Oct 2019 15:36:08 +0100 Subject: [PATCH 10/10] test: add std includes --- test/threadsafe_function/threadsafe_function_sum.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/threadsafe_function/threadsafe_function_sum.cc b/test/threadsafe_function/threadsafe_function_sum.cc index f46a28bd7..bba57dd47 100644 --- a/test/threadsafe_function/threadsafe_function_sum.cc +++ b/test/threadsafe_function/threadsafe_function_sum.cc @@ -1,6 +1,8 @@ #include "napi.h" #include #include +#include +#include #if (NAPI_VERSION > 3)