diff --git a/README.md b/README.md index 8e7ee391d..c7e40bd92 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,9 @@ The following is the documentation for node-addon-api. - [AsyncWorker](doc/async_worker.md) - [AsyncContext](doc/async_context.md) - [AsyncWorker Variants](doc/async_worker_variants.md) - - [Thread-safe Functions](doc/threadsafe_function.md) + - [Thread-safe Functions](doc/threadsafe.md) + - [ThreadSafeFunction](doc/threadsafe_function.md) + - [TypedThreadSafeFunction](doc/typed_threadsafe_function.md) - [Promises](doc/promises.md) - [Version management](doc/version_management.md) diff --git a/doc/threadsafe.md b/doc/threadsafe.md new file mode 100644 index 000000000..22bb399fc --- /dev/null +++ b/doc/threadsafe.md @@ -0,0 +1,124 @@ +# Thread-safe Functions + +JavaScript functions can normally only be called from a native addon's main +thread. If an addon creates additional threads, then node-addon-api functions +that require a `Napi::Env`, `Napi::Value`, or `Napi::Reference` must not be +called from those threads. + +When an addon has additional threads and JavaScript functions need to be invoked +based on the processing completed by those threads, those threads must +communicate with the addon's main thread so that the main thread can invoke the +JavaScript function on their behalf. The thread-safe function APIs provide an +easy way to do this. These APIs provide two types -- +[`Napi::ThreadSafeFunction`](threadsafe_function.md) and +[`Napi::TypedThreadSafeFunction`](typed_threadsafe_function.md) -- as well as +APIs to create, destroy, and call objects of this type. The differences between +the two are subtle and are [highlighted below](#implementation-differences). +Regardless of which type you choose, the APIs between the two are similar. + +`Napi::[Typed]ThreadSafeFunction::New()` creates a persistent reference that +holds a JavaScript function which can be called from multiple threads. The calls +happen asynchronously. This means that values with which the JavaScript callback +is to be called will be placed in a queue, and, for each value in the queue, a +call will eventually be made to the JavaScript function. + +`Napi::[Typed]ThreadSafeFunction` objects are destroyed when every thread which +uses the object has called `Release()` or has received a return status of +`napi_closing` in response to a call to `BlockingCall()` or `NonBlockingCall()`. +The queue is emptied before the `Napi::[Typed]ThreadSafeFunction` is destroyed. +It is important that `Release()` be the last API call made in conjunction with a +given `Napi::[Typed]ThreadSafeFunction`, because after the call completes, there +is no guarantee that the `Napi::[Typed]ThreadSafeFunction` is still allocated. +For the same reason it is also important that no more use be made of a +thread-safe function after receiving a return value of `napi_closing` in +response to a call to `BlockingCall()` or `NonBlockingCall()`. Data associated +with the `Napi::[Typed]ThreadSafeFunction` can be freed in its `Finalizer` +callback which was passed to `[Typed]ThreadSafeFunction::New()`. + +Once the number of threads making use of a `Napi::[Typed]ThreadSafeFunction` +reaches zero, no further threads can start making use of it by calling +`Acquire()`. In fact, all subsequent API calls associated with it, except +`Release()`, will return an error value of `napi_closing`. + +## Implementation Differences + +The choice between `Napi::ThreadSafeFunction` and +`Napi::TypedThreadSafeFunction` depends largely on how you plan to execute your +native C++ code (the "callback") on the Node.js thread. + +### [`Napi::ThreadSafeFunction`](threadsafe_function.md) + +This API is designed without N-API 5 native support for [the optional JavaScript + function callback feature](https://github.com/nodejs/node/commit/53297e66cb). + `::New` methods that do not have a `Function` parameter will construct a + _new_, no-op `Function` on the environment to pass to the underlying N-API + call. + +This API has some dynamic functionality, in that: +- The `[Non]BlockingCall()` methods provide a `Napi::Function` parameter as the + callback to run when processing the data item on the main thread -- the + `CallJs` callback. Since the callback is a parameter, it can be changed for + every call. +- Different C++ data types may be passed with each call of `[Non]BlockingCall()` + to match the specific data type as specified in the `CallJs` callback. + +Note that this functionality comes with some **additional overhead** and +situational **memory leaks**: +- The API acts as a "broker" between the underlying `napi_threadsafe_function`, + and dynamically constructs a wrapper for your callback on the heap for every + call to `[Non]BlockingCall()`. +- In acting in this "broker" fashion, the API will call the underlying "make + call" N-API method on this packaged item. If the API has determined the + thread-safe function is no longer accessible (eg. all threads have released + yet there are still items on the queue), **the callback passed to + [Non]BlockingCall will not execute**. This means it is impossible to perform + clean-up for calls that never execute their `CallJs` callback. **This may lead + to memory leaks** if you are dynamically allocating memory. +- The `CallJs` does not receive the thread-safe function's context as a + parameter. In order for the callback to access the context, it must have a + reference to either (1) the context directly, or (2) the thread-safe function + to call `GetContext()`. Furthermore, the `GetContext()` method is not + _type-safe_, as the method returns an object that can be "any-casted", instead + of having a static type. + +### [`Napi::TypedThreadSafeFunction`](typed_threadsafe_function.md) + +The `TypedThreadSafeFunction` class is a new implementation to address the +drawbacks listed above. The API is designed with N-API 5's support of an +optional function callback. The API will correctly allow developers to pass +`std::nullptr` instead of a `const Function&` for the callback function +specified in `::New`. It also provides helper APIs to _target_ N-API 4 and +construct a no-op `Function` **or** to target N-API 5 and "construct" a +`std::nullptr` callback. This allows a single codebase to use the same APIs, +with just a switch of the `NAPI_VERSION` compile-time constant. + +The removal of the dynamic call functionality has the following implications: +- The API does _not_ act as a "broker" compared to the + `Napi::ThreadSafeFunction`. Once Node.js finalizes the thread-safe function, + the `CallJs` callback will execute with an empty `Napi::Env` for any remaining + items on the queue. This provides the ability to handle any necessary cleanup + of the item's data. +- The callback _does_ receive the context as a parameter, so a call to + `GetContext()` is _not_ necessary. This context type is specified as the + **first template argument** specified to `::New`, ensuring type safety. +- The `New()` constructor accepts the `CallJs` callback as the **second type + argument**. The callback must be statically defined for the API to access it. + This affords the ability to statically pass the context as the correct type + across all methods. +- Only one C++ data type may be specified to every call to `[Non]BlockingCall()` + -- the **third template argument** specified to `::New`. Any "dynamic call + data" must be implemented by the user. + + +### Usage Suggestions + +In summary, it may be best to use `Napi::TypedThreadSafeFunction` if: + +- static, compile-time support for targeting N-API 4 or 5+ with an optional + JavaScript callback feature is desired; +- the callback can have `static` storage class and will not change across calls + to `[Non]BlockingCall()`; +- cleanup of items' data is required (eg. deleting dynamically-allocated data + that is created at the caller level). + +Otherwise, `Napi::ThreadSafeFunction` may be a better choice. diff --git a/doc/threadsafe_function.md b/doc/threadsafe_function.md index da4c2af52..cb4e7a374 100644 --- a/doc/threadsafe_function.md +++ b/doc/threadsafe_function.md @@ -1,41 +1,10 @@ # ThreadSafeFunction -JavaScript functions can normally only be called from a native addon's main -thread. If an addon creates additional threads, then node-addon-api functions -that require a `Napi::Env`, `Napi::Value`, or `Napi::Reference` must not be -called from those threads. - -When an addon has additional threads and JavaScript functions need to be invoked -based on the processing completed by those threads, those threads must -communicate with the addon's main thread so that the main thread can invoke the -JavaScript function on their behalf. The thread-safe function APIs provide an -easy way to do this. - -These APIs provide the type `Napi::ThreadSafeFunction` as well as APIs to -create, destroy, and call objects of this type. -`Napi::ThreadSafeFunction::New()` creates a persistent reference that holds a -JavaScript function which can be called from multiple threads. The calls happen -asynchronously. This means that values with which the JavaScript callback is to -be called will be placed in a queue, and, for each value in the queue, a call -will eventually be made to the JavaScript function. - -`Napi::ThreadSafeFunction` objects are destroyed when every thread which uses -the object has called `Release()` or has received a return status of -`napi_closing` in response to a call to `BlockingCall()` or `NonBlockingCall()`. -The queue is emptied before the `Napi::ThreadSafeFunction` is destroyed. It is -important that `Release()` be the last API call made in conjunction with a given -`Napi::ThreadSafeFunction`, because after the call completes, there is no -guarantee that the `Napi::ThreadSafeFunction` is still allocated. For the same -reason it is also important that no more use be made of a thread-safe function -after receiving a return value of `napi_closing` in response to a call to -`BlockingCall()` or `NonBlockingCall()`. Data associated with the -`Napi::ThreadSafeFunction` can be freed in its `Finalizer` callback which was -passed to `ThreadSafeFunction::New()`. - -Once the number of threads making use of a `Napi::ThreadSafeFunction` reaches -zero, no further threads can start making use of it by calling `Acquire()`. In -fact, all subsequent API calls associated with it, except `Release()`, will -return an error value of `napi_closing`. +The `Napi::ThreadSafeFunction` type provides APIs for threads to communicate +with the addon's main thread to invoke JavaScript functions on their behalf. +Documentation can be found for an [overview of the API](threadsafe.md), as well +as [differences between the two thread-safe function +APIs](threadsafe.md#implementation-differences). ## Methods @@ -92,7 +61,8 @@ New(napi_env env, - `maxQueueSize`: Maximum size of the queue. `0` for no limit. - `initialThreadCount`: The initial number of threads, including the main thread, which will be making use of this function. -- `[optional] context`: Data to attach to the resulting `ThreadSafeFunction`. +- `[optional] context`: Data to attach to the resulting `ThreadSafeFunction`. It + can be retreived by calling `GetContext()`. - `[optional] finalizeCallback`: Function to call when the `ThreadSafeFunction` is being destroyed. This callback will be invoked on the main thread when the thread-safe function is about to be destroyed. It receives the context and the @@ -101,8 +71,8 @@ New(napi_env env, `uv_thread_join()`. It is important that, aside from the main loop thread, there be no threads left using the thread-safe function after the finalize callback completes. Must implement `void operator()(Env env, DataType* data, - Context* hint)`, skipping `data` or `hint` if they are not provided. - Can be retrieved via `GetContext()`. + ContextType* hint)`, skipping `data` or `hint` if they are not provided. Can + be retrieved via `GetContext()`. - `[optional] data`: Data to be passed to `finalizeCallback`. Returns a non-empty `Napi::ThreadSafeFunction` instance. @@ -110,7 +80,7 @@ Returns a non-empty `Napi::ThreadSafeFunction` instance. ### Acquire Add a thread to this thread-safe function object, indicating that a new thread -will start making use of the thread-safe function. +will start making use of the thread-safe function. ```cpp napi_status Napi::ThreadSafeFunction::Acquire() @@ -118,7 +88,7 @@ napi_status Napi::ThreadSafeFunction::Acquire() Returns one of: - `napi_ok`: The thread has successfully acquired the thread-safe function -for its use. +for its use. - `napi_closing`: The thread-safe function has been marked as closing via a previous call to `Abort()`. @@ -136,7 +106,7 @@ napi_status Napi::ThreadSafeFunction::Release() Returns one of: - `napi_ok`: The thread-safe function has been successfully released. - `napi_invalid_arg`: The thread-safe function's thread-count is zero. -- `napi_generic_failure`: A generic error occurred when attemping to release +- `napi_generic_failure`: A generic error occurred when attempting to release the thread-safe function. ### Abort @@ -258,10 +228,10 @@ Value Start( const CallbackInfo& info ) // Create a native thread nativeThread = std::thread( [count] { auto callback = []( Napi::Env env, Function jsCallback, int* value ) { - // Transform native data into JS data, passing it to the provided + // Transform native data into JS data, passing it to the provided // `jsCallback` -- the TSFN's JavaScript function. jsCallback.Call( {Number::New( env, *value )} ); - + // We're finished with the data. delete value; }; diff --git a/doc/typed_threadsafe_function.md b/doc/typed_threadsafe_function.md new file mode 100644 index 000000000..e0d29807f --- /dev/null +++ b/doc/typed_threadsafe_function.md @@ -0,0 +1,307 @@ +# TypedThreadSafeFunction + +The `Napi::TypedThreadSafeFunction` type provides APIs for threads to +communicate with the addon's main thread to invoke JavaScript functions on their +behalf. The type is a three-argument templated class, each argument representing +the type of: +- `ContextType = std::nullptr_t`: The thread-safe function's context. By + default, a TSFN has no context. +- `DataType = void*`: The data to use in the native callback. By default, a TSFN + can accept any data type. +- `Callback = void(*)(Napi::Env, Napi::Function jsCallback, ContextType*, + DataType*)`: The callback to run for each item added to the queue. If no + `Callback` is given, the API will call the function `jsCallback` with no + arguments. + +Documentation can be found for an [overview of the API](threadsafe.md), as well +as [differences between the two thread-safe function +APIs](threadsafe.md#implementation-differences). + +## Methods + +### Constructor + +Creates a new empty instance of `Napi::TypedThreadSafeFunction`. + +```cpp +Napi::Function::TypedThreadSafeFunction::TypedThreadSafeFunction(); +``` + +### Constructor + +Creates a new instance of the `Napi::TypedThreadSafeFunction` object. + +```cpp +Napi::TypedThreadSafeFunction::TypedThreadSafeFunction(napi_threadsafe_function tsfn); +``` + +- `tsfn`: The `napi_threadsafe_function` which is a handle for an existing + thread-safe function. + +Returns a non-empty `Napi::TypedThreadSafeFunction` instance. To ensure the API +statically handles the correct return type for `GetContext()` and +`[Non]BlockingCall()`, pass the proper template arguments to +`Napi::TypedThreadSafeFunction`. + +### New + +Creates a new instance of the `Napi::TypedThreadSafeFunction` object. The `New` +function has several overloads for the various optional parameters: skip the +optional parameter for that specific overload. + +```cpp +New(napi_env env, + CallbackType callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data = nullptr); +``` + +- `env`: The `napi_env` environment in which to construct the + `Napi::ThreadSafeFunction` object. +- `[optional] callback`: The `Function` to call from another thread. +- `[optional] resource`: An object associated with the async work that will be + passed to possible async_hooks init hooks. +- `resourceName`: A JavaScript string to provide an identifier for the kind of + resource that is being provided for diagnostic information exposed by the + async_hooks API. +- `maxQueueSize`: Maximum size of the queue. `0` for no limit. +- `initialThreadCount`: The initial number of threads, including the main + thread, which will be making use of this function. +- `[optional] context`: Data to attach to the resulting `ThreadSafeFunction`. It + can be retreived via `GetContext()`. +- `[optional] finalizeCallback`: Function to call when the + `TypedThreadSafeFunction` is being destroyed. This callback will be invoked + on the main thread when the thread-safe function is about to be destroyed. It + receives the context and the finalize data given during construction (if + given), and provides an opportunity for cleaning up after the threads e.g. by + calling `uv_thread_join()`. It is important that, aside from the main loop + thread, there be no threads left using the thread-safe function after the + finalize callback completes. Must implement `void operator()(Env env, + FinalizerDataType* data, ContextType* hint)`. +- `[optional] data`: Data to be passed to `finalizeCallback`. + +Returns a non-empty `Napi::TypedThreadSafeFunction` instance. + +Depending on the targetted `NAPI_VERSION`, the API has different implementations +for `CallbackType callback`. + +When targetting version 4, `callback` may be: +- of type `const Function&` +- not provided as a parameter, in which case the API creates a new no-op + `Function` + +When targetting version 5+, `callback` may be: +- of type `const Function&` +- of type `std::nullptr_t` +- not provided as a parameter, in which case the API passes `std::nullptr` + +### Acquire + +Adds a thread to this thread-safe function object, indicating that a new thread +will start making use of the thread-safe function. + +```cpp +napi_status Napi::TypedThreadSafeFunction::Acquire() +``` + +Returns one of: +- `napi_ok`: The thread has successfully acquired the thread-safe function for + its use. +- `napi_closing`: The thread-safe function has been marked as closing via a + previous call to `Abort()`. + +### Release + +Indicates that an existing thread will stop making use of the thread-safe +function. A thread should call this API when it stops making use of this +thread-safe function. Using any thread-safe APIs after having called this API +has undefined results in the current thread, as the thread-safe function may +have been destroyed. + +```cpp +napi_status Napi::TypedThreadSafeFunction::Release() +``` + +Returns one of: +- `napi_ok`: The thread-safe function has been successfully released. +- `napi_invalid_arg`: The thread-safe function's thread-count is zero. +- `napi_generic_failure`: A generic error occurred when attemping to release the + thread-safe function. + +### Abort + +"Aborts" the thread-safe function. This will cause all subsequent APIs +associated with the thread-safe function except `Release()` to return +`napi_closing` even before its reference count reaches zero. In particular, +`BlockingCall` and `NonBlockingCall()` will return `napi_closing`, thus +informing the threads that it is no longer possible to make asynchronous calls +to the thread-safe function. This can be used as a criterion for terminating the +thread. Upon receiving a return value of `napi_closing` from a thread-safe +function call a thread must make no further use of the thread-safe function +because it is no longer guaranteed to be allocated. + +```cpp +napi_status Napi::TypedThreadSafeFunction::Abort() +``` + +Returns one of: +- `napi_ok`: The thread-safe function has been successfully aborted. +- `napi_invalid_arg`: The thread-safe function's thread-count is zero. +- `napi_generic_failure`: A generic error occurred when attemping to abort the + thread-safe function. + +### BlockingCall / NonBlockingCall + +Calls the Javascript function in either a blocking or non-blocking fashion. +- `BlockingCall()`: the API blocks until space becomes available in the queue. + Will never block if the thread-safe function was created with a maximum queue + size of `0`. +- `NonBlockingCall()`: will return `napi_queue_full` if the queue was full, + preventing data from being successfully added to the queue. + +```cpp +napi_status Napi::TypedThreadSafeFunction::BlockingCall(DataType* data = nullptr) const + +napi_status Napi::TypedThreadSafeFunction::NonBlockingCall(DataType* data = nullptr) const +``` + +- `[optional] data`: Data to pass to the callback which was passed to + `TypedThreadSafeFunction::New()`. + +Returns one of: +- `napi_ok`: `data` was successfully added to the queue. +- `napi_queue_full`: The queue was full when trying to call in a non-blocking + method. +- `napi_closing`: The thread-safe function is aborted and no further calls can + be made. +- `napi_invalid_arg`: The thread-safe function is closed. +- `napi_generic_failure`: A generic error occurred when attemping to add to the + queue. + + +## Example + +```cpp +#include +#include +#include + +using namespace Napi; + +using Context = Reference; +using DataType = int; +void CallJs(Napi::Env env, Function callback, Context *context, DataType *data); +using TSFN = TypedThreadSafeFunction; +using FinalizerDataType = void; + +std::thread nativeThread; +TSFN tsfn; + +Value Start(const CallbackInfo &info) { + Napi::Env env = info.Env(); + + if (info.Length() < 2) { + throw TypeError::New(env, "Expected two arguments"); + } else if (!info[0].IsFunction()) { + throw TypeError::New(env, "Expected first arg to be function"); + } else if (!info[1].IsNumber()) { + throw TypeError::New(env, "Expected second arg to be number"); + } + + int count = info[1].As().Int32Value(); + + // Create a new context set to the the receiver (ie, `this`) of the function + // call + Context *context = new Reference(Persistent(info.This())); + + // Create a ThreadSafeFunction + tsfn = TSFN::New( + env, + info[0].As(), // JavaScript function called asynchronously + "Resource Name", // Name + 0, // Unlimited queue + 1, // Only one thread will use this initially + context, + [](Napi::Env, FinalizerDataType *, + Context *ctx) { // Finalizer used to clean threads up + nativeThread.join(); + delete ctx; + }); + + // Create a native thread + nativeThread = std::thread([count] { + for (int i = 0; i < count; i++) { + // Create new data + int *value = new int(clock()); + + // Perform a blocking call + napi_status status = tsfn.BlockingCall(value); + if (status != napi_ok) { + // Handle error + break; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + // Release the thread-safe function + tsfn.Release(); + }); + + return Boolean::New(env, true); +} + +// Transform native data into JS data, passing it to the provided +// `callback` -- the TSFN's JavaScript function. +void CallJs(Napi::Env env, Function callback, Context *context, + DataType *data) { + // Is the JavaScript environment still available to call into, eg. the TSFN is + // not aborted + if (env != nullptr) { + // On N-API 5+, the `callback` parameter is optional; however, this example + // does ensure a callback is provided. + if (callback != nullptr) { + callback.Call(context->Value(), {Number::New(env, *data)}); + } + } + if (data != nullptr) { + // We're finished with the data. + delete data; + } +} + +Napi::Object Init(Napi::Env env, Object exports) { + exports.Set("start", Function::New(env, Start)); + return exports; +} + +NODE_API_MODULE(clock, Init) +``` + +The above code can be used from JavaScript as follows: + +```js +const { start } = require('bindings')('clock'); + +start.call(new Date(), function (clock) { + const context = this; + console.log(context, clock); +}, 5); +``` + +When executed, the output will show the value of `clock()` five times at one +second intervals, prefixed with the TSFN's context -- `start`'s receiver (ie, +`new Date()`): + +``` +2020-08-18T21:04:25.116Z 49824 +2020-08-18T21:04:25.116Z 62493 +2020-08-18T21:04:25.116Z 62919 +2020-08-18T21:04:25.116Z 63228 +2020-08-18T21:04:25.116Z 63531 +``` diff --git a/napi-inl.h b/napi-inl.h index ab7ece1c8..5a345df53 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -260,6 +260,45 @@ struct ThreadSafeFinalize { FinalizerDataType* data; Finalizer callback; }; + +template +typename std::enable_if::type static inline CallJsWrapper( + napi_env env, napi_value jsCallback, void* context, void* data) { + call(env, + Function(env, jsCallback), + static_cast(context), + static_cast(data)); +} + +template +typename std::enable_if::type static inline CallJsWrapper( + napi_env env, napi_value jsCallback, void* /*context*/, void* /*data*/) { + if (jsCallback != nullptr) { + Function(env, jsCallback).Call(0, nullptr); + } +} + +#if NAPI_VERSION > 4 + +template +napi_value DefaultCallbackWrapper(napi_env /*env*/, std::nullptr_t /*cb*/) { + return nullptr; +} + +template +napi_value DefaultCallbackWrapper(napi_env /*env*/, Napi::Function cb) { + return cb; +} + +#else +template +napi_value DefaultCallbackWrapper(napi_env env, Napi::Function cb) { + if (cb.IsEmpty()) { + return TSFN::EmptyFunctionFactory(env); + } + return cb; +} +#endif // NAPI_VERSION > 4 #endif // NAPI_VERSION > 3 && !defined(__wasm32__) template @@ -4353,6 +4392,490 @@ inline void AsyncWorker::OnWorkComplete(Napi::Env /*env*/, napi_status status) { } #if (NAPI_VERSION > 3 && !defined(__wasm32__)) +//////////////////////////////////////////////////////////////////////////////// +// TypedThreadSafeFunction class +//////////////////////////////////////////////////////////////////////////////// + +// Starting with NAPI 5, the JavaScript function `func` parameter of +// `napi_create_threadsafe_function` is optional. +#if NAPI_VERSION > 4 +// static, with Callback [missing] Resource [missing] Finalizer [missing] +template +template +inline TypedThreadSafeFunction +TypedThreadSafeFunction::New( + napi_env env, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context) { + TypedThreadSafeFunction tsfn; + + napi_status status = + napi_create_threadsafe_function(env, + nullptr, + nullptr, + String::From(env, resourceName), + maxQueueSize, + initialThreadCount, + nullptr, + nullptr, + context, + CallJsInternal, + &tsfn._tsfn); + if (status != napi_ok) { + NAPI_THROW_IF_FAILED( + env, status, TypedThreadSafeFunction()); + } + + return tsfn; +} + +// static, with Callback [missing] Resource [passed] Finalizer [missing] +template +template +inline TypedThreadSafeFunction +TypedThreadSafeFunction::New( + napi_env env, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context) { + TypedThreadSafeFunction tsfn; + + napi_status status = + napi_create_threadsafe_function(env, + nullptr, + resource, + String::From(env, resourceName), + maxQueueSize, + initialThreadCount, + nullptr, + nullptr, + context, + CallJsInternal, + &tsfn._tsfn); + if (status != napi_ok) { + NAPI_THROW_IF_FAILED( + env, status, TypedThreadSafeFunction()); + } + + return tsfn; +} + +// static, with Callback [missing] Resource [missing] Finalizer [passed] +template +template +inline TypedThreadSafeFunction +TypedThreadSafeFunction::New( + napi_env env, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data) { + TypedThreadSafeFunction tsfn; + + auto* finalizeData = new details:: + ThreadSafeFinalize( + {data, finalizeCallback}); + napi_status status = napi_create_threadsafe_function( + env, + nullptr, + nullptr, + String::From(env, resourceName), + maxQueueSize, + initialThreadCount, + finalizeData, + details::ThreadSafeFinalize:: + FinalizeFinalizeWrapperWithDataAndContext, + context, + CallJsInternal, + &tsfn._tsfn); + if (status != napi_ok) { + delete finalizeData; + NAPI_THROW_IF_FAILED( + env, status, TypedThreadSafeFunction()); + } + + return tsfn; +} + +// static, with Callback [missing] Resource [passed] Finalizer [passed] +template +template +inline TypedThreadSafeFunction +TypedThreadSafeFunction::New( + napi_env env, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data) { + TypedThreadSafeFunction tsfn; + + auto* finalizeData = new details:: + ThreadSafeFinalize( + {data, finalizeCallback}); + napi_status status = napi_create_threadsafe_function( + env, + nullptr, + resource, + String::From(env, resourceName), + maxQueueSize, + initialThreadCount, + finalizeData, + details::ThreadSafeFinalize:: + FinalizeFinalizeWrapperWithDataAndContext, + context, + CallJsInternal, + &tsfn._tsfn); + if (status != napi_ok) { + delete finalizeData; + NAPI_THROW_IF_FAILED( + env, status, TypedThreadSafeFunction()); + } + + return tsfn; +} +#endif + +// static, with Callback [passed] Resource [missing] Finalizer [missing] +template +template +inline TypedThreadSafeFunction +TypedThreadSafeFunction::New( + napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context) { + TypedThreadSafeFunction tsfn; + + napi_status status = + napi_create_threadsafe_function(env, + callback, + nullptr, + String::From(env, resourceName), + maxQueueSize, + initialThreadCount, + nullptr, + nullptr, + context, + CallJsInternal, + &tsfn._tsfn); + if (status != napi_ok) { + NAPI_THROW_IF_FAILED( + env, status, TypedThreadSafeFunction()); + } + + return tsfn; +} + +// static, with Callback [passed] Resource [passed] Finalizer [missing] +template +template +inline TypedThreadSafeFunction +TypedThreadSafeFunction::New( + napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context) { + TypedThreadSafeFunction tsfn; + + napi_status status = + napi_create_threadsafe_function(env, + callback, + resource, + String::From(env, resourceName), + maxQueueSize, + initialThreadCount, + nullptr, + nullptr, + context, + CallJsInternal, + &tsfn._tsfn); + if (status != napi_ok) { + NAPI_THROW_IF_FAILED( + env, status, TypedThreadSafeFunction()); + } + + return tsfn; +} + +// static, with Callback [passed] Resource [missing] Finalizer [passed] +template +template +inline TypedThreadSafeFunction +TypedThreadSafeFunction::New( + napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data) { + TypedThreadSafeFunction tsfn; + + auto* finalizeData = new details:: + ThreadSafeFinalize( + {data, finalizeCallback}); + napi_status status = napi_create_threadsafe_function( + env, + callback, + nullptr, + String::From(env, resourceName), + maxQueueSize, + initialThreadCount, + finalizeData, + details::ThreadSafeFinalize:: + FinalizeFinalizeWrapperWithDataAndContext, + context, + CallJsInternal, + &tsfn._tsfn); + if (status != napi_ok) { + delete finalizeData; + NAPI_THROW_IF_FAILED( + env, status, TypedThreadSafeFunction()); + } + + return tsfn; +} + +// static, with: Callback [passed] Resource [passed] Finalizer [passed] +template +template +inline TypedThreadSafeFunction +TypedThreadSafeFunction::New( + napi_env env, + CallbackType callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data) { + TypedThreadSafeFunction tsfn; + + auto* finalizeData = new details:: + ThreadSafeFinalize( + {data, finalizeCallback}); + napi_status status = napi_create_threadsafe_function( + env, + details::DefaultCallbackWrapper< + CallbackType, + TypedThreadSafeFunction>(env, + callback), + resource, + String::From(env, resourceName), + maxQueueSize, + initialThreadCount, + finalizeData, + details::ThreadSafeFinalize:: + FinalizeFinalizeWrapperWithDataAndContext, + context, + CallJsInternal, + &tsfn._tsfn); + if (status != napi_ok) { + delete finalizeData; + NAPI_THROW_IF_FAILED( + env, status, TypedThreadSafeFunction()); + } + + return tsfn; +} + +template +inline TypedThreadSafeFunction:: + TypedThreadSafeFunction() + : _tsfn() {} + +template +inline TypedThreadSafeFunction:: + TypedThreadSafeFunction(napi_threadsafe_function tsfn) + : _tsfn(tsfn) {} + +template +inline TypedThreadSafeFunction:: +operator napi_threadsafe_function() const { + return _tsfn; +} + +template +inline napi_status +TypedThreadSafeFunction::BlockingCall( + DataType* data) const { + return napi_call_threadsafe_function(_tsfn, data, napi_tsfn_blocking); +} + +template +inline napi_status +TypedThreadSafeFunction::NonBlockingCall( + DataType* data) const { + return napi_call_threadsafe_function(_tsfn, data, napi_tsfn_nonblocking); +} + +template +inline void TypedThreadSafeFunction::Ref( + napi_env env) const { + if (_tsfn != nullptr) { + napi_status status = napi_ref_threadsafe_function(env, _tsfn); + NAPI_THROW_IF_FAILED_VOID(env, status); + } +} + +template +inline void TypedThreadSafeFunction::Unref( + napi_env env) const { + if (_tsfn != nullptr) { + napi_status status = napi_unref_threadsafe_function(env, _tsfn); + NAPI_THROW_IF_FAILED_VOID(env, status); + } +} + +template +inline napi_status +TypedThreadSafeFunction::Acquire() const { + return napi_acquire_threadsafe_function(_tsfn); +} + +template +inline napi_status +TypedThreadSafeFunction::Release() { + return napi_release_threadsafe_function(_tsfn, napi_tsfn_release); +} + +template +inline napi_status +TypedThreadSafeFunction::Abort() { + return napi_release_threadsafe_function(_tsfn, napi_tsfn_abort); +} + +template +inline ContextType* +TypedThreadSafeFunction::GetContext() const { + void* context; + napi_status status = napi_get_threadsafe_function_context(_tsfn, &context); + NAPI_FATAL_IF_FAILED(status, + "TypedThreadSafeFunction::GetContext", + "napi_get_threadsafe_function_context"); + return static_cast(context); +} + +// static +template +void TypedThreadSafeFunction::CallJsInternal( + napi_env env, napi_value jsCallback, void* context, void* data) { + details::CallJsWrapper( + env, jsCallback, context, data); +} + +#if NAPI_VERSION == 4 +// static +template +Napi::Function +TypedThreadSafeFunction::EmptyFunctionFactory( + Napi::Env env) { + return Napi::Function::New(env, [](const CallbackInfo& cb) {}); +} + +// static +template +Napi::Function +TypedThreadSafeFunction::FunctionOrEmpty( + Napi::Env env, Napi::Function& callback) { + if (callback.IsEmpty()) { + return EmptyFunctionFactory(env); + } + return callback; +} + +#else +// static +template +std::nullptr_t +TypedThreadSafeFunction::EmptyFunctionFactory( + Napi::Env /*env*/) { + return nullptr; +} + +// static +template +Napi::Function +TypedThreadSafeFunction::FunctionOrEmpty( + Napi::Env /*env*/, Napi::Function& callback) { + return callback; +} + +#endif + //////////////////////////////////////////////////////////////////////////////// // ThreadSafeFunction class //////////////////////////////////////////////////////////////////////////////// diff --git a/napi.h b/napi.h index 945aac55e..cf4410bd9 100644 --- a/napi.h +++ b/napi.h @@ -2249,6 +2249,196 @@ namespace Napi { napi_threadsafe_function _tsfn; }; + // A TypedThreadSafeFunction by default has no context (nullptr) and can + // accept any type (void) to its CallJs. + template + class TypedThreadSafeFunction { + public: + // This API may only be called from the main thread. + // Helper function that returns nullptr if running N-API 5+, otherwise a + // non-empty, no-op Function. This provides the ability to specify at + // compile-time a callback parameter to `New` that safely does no action + // when targeting _any_ N-API version. +#if NAPI_VERSION > 4 + static std::nullptr_t EmptyFunctionFactory(Napi::Env env); +#else + static Napi::Function EmptyFunctionFactory(Napi::Env env); +#endif + static Napi::Function FunctionOrEmpty(Napi::Env env, + Napi::Function& callback); + +#if NAPI_VERSION > 4 + // This API may only be called from the main thread. + // Creates a new threadsafe function with: + // Callback [missing] Resource [missing] Finalizer [missing] + template + static TypedThreadSafeFunction New( + napi_env env, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context = nullptr); + + // This API may only be called from the main thread. + // Creates a new threadsafe function with: + // Callback [missing] Resource [passed] Finalizer [missing] + template + static TypedThreadSafeFunction New( + napi_env env, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context = nullptr); + + // This API may only be called from the main thread. + // Creates a new threadsafe function with: + // Callback [missing] Resource [missing] Finalizer [passed] + template + static TypedThreadSafeFunction New( + napi_env env, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data = nullptr); + + // This API may only be called from the main thread. + // Creates a new threadsafe function with: + // Callback [missing] Resource [passed] Finalizer [passed] + template + static TypedThreadSafeFunction New( + napi_env env, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data = nullptr); +#endif + + // This API may only be called from the main thread. + // Creates a new threadsafe function with: + // Callback [passed] Resource [missing] Finalizer [missing] + template + static TypedThreadSafeFunction New( + napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context = nullptr); + + // This API may only be called from the main thread. + // Creates a new threadsafe function with: + // Callback [passed] Resource [passed] Finalizer [missing] + template + static TypedThreadSafeFunction New( + napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context = nullptr); + + // This API may only be called from the main thread. + // Creates a new threadsafe function with: + // Callback [passed] Resource [missing] Finalizer [passed] + template + static TypedThreadSafeFunction New( + napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data = nullptr); + + // This API may only be called from the main thread. + // Creates a new threadsafe function with: + // Callback [passed] Resource [passed] Finalizer [passed] + template + static TypedThreadSafeFunction New( + napi_env env, + CallbackType callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data = nullptr); + + TypedThreadSafeFunction(); + TypedThreadSafeFunction( + napi_threadsafe_function tsFunctionValue); + + operator napi_threadsafe_function() const; + + // This API may be called from any thread. + napi_status BlockingCall(DataType* data = nullptr) const; + + // This API may be called from any thread. + napi_status NonBlockingCall(DataType* data = nullptr) const; + + // This API may only be called from the main thread. + void Ref(napi_env env) const; + + // This API may only be called from the main thread. + void Unref(napi_env env) const; + + // This API may be called from any thread. + napi_status Acquire() const; + + // This API may be called from any thread. + napi_status Release(); + + // This API may be called from any thread. + napi_status Abort(); + + // This API may be called from any thread. + ContextType* GetContext() const; + + private: + template + static TypedThreadSafeFunction New( + napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data, + napi_finalize wrapper); + + static void CallJsInternal(napi_env env, + napi_value jsCallback, + void* context, + void* data); + + protected: + napi_threadsafe_function _tsfn; + }; template class AsyncProgressWorkerBase : public AsyncWorker { public: diff --git a/test/binding.cc b/test/binding.cc index ebfe5e5db..03661da35 100644 --- a/test/binding.cc +++ b/test/binding.cc @@ -49,6 +49,12 @@ Object InitThreadSafeFunctionPtr(Env env); Object InitThreadSafeFunctionSum(Env env); Object InitThreadSafeFunctionUnref(Env env); Object InitThreadSafeFunction(Env env); +Object InitTypedThreadSafeFunctionCtx(Env env); +Object InitTypedThreadSafeFunctionExistingTsfn(Env env); +Object InitTypedThreadSafeFunctionPtr(Env env); +Object InitTypedThreadSafeFunctionSum(Env env); +Object InitTypedThreadSafeFunctionUnref(Env env); +Object InitTypedThreadSafeFunction(Env env); #endif Object InitTypedArray(Env env); Object InitObjectWrap(Env env); @@ -108,7 +114,18 @@ Object Init(Env env, Object exports) { exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env)); exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env)); exports.Set("threadsafe_function_unref", InitThreadSafeFunctionUnref(env)); - exports.Set("threadsafe_function", InitThreadSafeFunction(env)); + exports.Set("threadsafe_function", InitTypedThreadSafeFunction(env)); + exports.Set("typed_threadsafe_function_ctx", + InitTypedThreadSafeFunctionCtx(env)); + exports.Set("typed_threadsafe_function_existing_tsfn", + InitTypedThreadSafeFunctionExistingTsfn(env)); + exports.Set("typed_threadsafe_function_ptr", + InitTypedThreadSafeFunctionPtr(env)); + exports.Set("typed_threadsafe_function_sum", + InitTypedThreadSafeFunctionSum(env)); + exports.Set("typed_threadsafe_function_unref", + InitTypedThreadSafeFunctionUnref(env)); + exports.Set("typed_threadsafe_function", InitTypedThreadSafeFunction(env)); #endif exports.Set("typedarray", InitTypedArray(env)); exports.Set("objectwrap", InitObjectWrap(env)); diff --git a/test/binding.gyp b/test/binding.gyp index 288051633..90dee4565 100644 --- a/test/binding.gyp +++ b/test/binding.gyp @@ -42,6 +42,12 @@ 'threadsafe_function/threadsafe_function_sum.cc', 'threadsafe_function/threadsafe_function_unref.cc', 'threadsafe_function/threadsafe_function.cc', + 'typed_threadsafe_function/typed_threadsafe_function_ctx.cc', + 'typed_threadsafe_function/typed_threadsafe_function_existing_tsfn.cc', + 'typed_threadsafe_function/typed_threadsafe_function_ptr.cc', + 'typed_threadsafe_function/typed_threadsafe_function_sum.cc', + 'typed_threadsafe_function/typed_threadsafe_function_unref.cc', + 'typed_threadsafe_function/typed_threadsafe_function.cc', 'typedarray.cc', 'objectwrap.cc', 'objectwrap_constructor_exception.cc', diff --git a/test/index.js b/test/index.js index ac95d88aa..3f8549999 100644 --- a/test/index.js +++ b/test/index.js @@ -50,6 +50,12 @@ let testModules = [ 'threadsafe_function/threadsafe_function_sum', 'threadsafe_function/threadsafe_function_unref', 'threadsafe_function/threadsafe_function', + 'typed_threadsafe_function/typed_threadsafe_function_ctx', + 'typed_threadsafe_function/typed_threadsafe_function_existing_tsfn', + 'typed_threadsafe_function/typed_threadsafe_function_ptr', + 'typed_threadsafe_function/typed_threadsafe_function_sum', + 'typed_threadsafe_function/typed_threadsafe_function_unref', + 'typed_threadsafe_function/typed_threadsafe_function', 'typedarray', 'typedarray-bigint', 'objectwrap', diff --git a/test/typed_threadsafe_function/typed_threadsafe_function.cc b/test/typed_threadsafe_function/typed_threadsafe_function.cc new file mode 100644 index 000000000..f9896db86 --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function.cc @@ -0,0 +1,201 @@ +#include +#include +#include "napi.h" + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +constexpr size_t ARRAY_LENGTH = 10; +constexpr size_t MAX_QUEUE_SIZE = 2; + +static std::thread threads[2]; + +static struct ThreadSafeFunctionInfo { + enum CallType { DEFAULT, BLOCKING, NON_BLOCKING } type; + bool abort; + bool startSecondary; + FunctionReference jsFinalizeCallback; + uint32_t maxQueueSize; +} tsfnInfo; + +static void TSFNCallJS(Env env, + Function jsCallback, + ThreadSafeFunctionInfo* /* context */, + int* data) { + // A null environment signifies the threadsafe function has been finalized. + if (!(env == nullptr || jsCallback == nullptr)) { + // If called with no data + if (data == nullptr) { + jsCallback.Call({}); + } else { + jsCallback.Call({Number::New(env, *data)}); + } + } +} + +using TSFN = TypedThreadSafeFunction; +static TSFN tsfn; + +// Thread data to transmit to JS +static int ints[ARRAY_LENGTH]; + +static void SecondaryThread() { + if (tsfn.Release() != napi_ok) { + Error::Fatal("SecondaryThread", "ThreadSafeFunction.Release() failed"); + } +} + +// Source thread producing the data +static void DataSourceThread() { + ThreadSafeFunctionInfo* info = tsfn.GetContext(); + + if (info->startSecondary) { + if (tsfn.Acquire() != napi_ok) { + Error::Fatal("DataSourceThread", "ThreadSafeFunction.Acquire() failed"); + } + + threads[1] = std::thread(SecondaryThread); + } + + bool queueWasFull = false; + bool queueWasClosing = false; + for (int index = ARRAY_LENGTH - 1; index > -1 && !queueWasClosing; index--) { + napi_status status = napi_generic_failure; + + switch (info->type) { + case ThreadSafeFunctionInfo::DEFAULT: + status = tsfn.BlockingCall(); + break; + case ThreadSafeFunctionInfo::BLOCKING: + status = tsfn.BlockingCall(&ints[index]); + break; + case ThreadSafeFunctionInfo::NON_BLOCKING: + status = tsfn.NonBlockingCall(&ints[index]); + break; + } + + if (info->maxQueueSize == 0) { + // Let's make this thread really busy for 200 ms to give the main thread a + // chance to abort. + auto start = std::chrono::high_resolution_clock::now(); + constexpr auto MS_200 = std::chrono::milliseconds(200); + for (; std::chrono::high_resolution_clock::now() - start < MS_200;) + ; + } + + switch (status) { + case napi_queue_full: + queueWasFull = true; + index++; + // fall through + + case napi_ok: + continue; + + case napi_closing: + queueWasClosing = true; + break; + + default: + Error::Fatal("DataSourceThread", "ThreadSafeFunction.*Call() failed"); + } + } + + if (info->type == ThreadSafeFunctionInfo::NON_BLOCKING && !queueWasFull) { + Error::Fatal("DataSourceThread", "Queue was never full"); + } + + if (info->abort && !queueWasClosing) { + Error::Fatal("DataSourceThread", "Queue was never closing"); + } + + if (!queueWasClosing && tsfn.Release() != napi_ok) { + Error::Fatal("DataSourceThread", "ThreadSafeFunction.Release() failed"); + } +} + +static Value StopThread(const CallbackInfo& info) { + tsfnInfo.jsFinalizeCallback = Napi::Persistent(info[0].As()); + bool abort = info[1].As(); + if (abort) { + tsfn.Abort(); + } else { + tsfn.Release(); + } + return Value(); +} + +// Join the thread and inform JS that we're done. +static void JoinTheThreads(Env /* env */, + std::thread* theThreads, + ThreadSafeFunctionInfo* info) { + theThreads[0].join(); + if (info->startSecondary) { + theThreads[1].join(); + } + + info->jsFinalizeCallback.Call({}); + info->jsFinalizeCallback.Reset(); +} + +static Value StartThreadInternal(const CallbackInfo& info, + ThreadSafeFunctionInfo::CallType type) { + tsfnInfo.type = type; + tsfnInfo.abort = info[1].As(); + tsfnInfo.startSecondary = info[2].As(); + tsfnInfo.maxQueueSize = info[3].As().Uint32Value(); + + tsfn = TSFN::New(info.Env(), + info[0].As(), + Object::New(info.Env()), + "Test", + tsfnInfo.maxQueueSize, + 2, + &tsfnInfo, + JoinTheThreads, + threads); + + threads[0] = std::thread(DataSourceThread); + + return Value(); +} + +static Value Release(const CallbackInfo& /* info */) { + if (tsfn.Release() != napi_ok) { + Error::Fatal("Release", "ThreadSafeFunction.Release() failed"); + } + return Value(); +} + +static Value StartThread(const CallbackInfo& info) { + return StartThreadInternal(info, ThreadSafeFunctionInfo::BLOCKING); +} + +static Value StartThreadNonblocking(const CallbackInfo& info) { + return StartThreadInternal(info, ThreadSafeFunctionInfo::NON_BLOCKING); +} + +static Value StartThreadNoNative(const CallbackInfo& info) { + return StartThreadInternal(info, ThreadSafeFunctionInfo::DEFAULT); +} + +Object InitTypedThreadSafeFunction(Env env) { + for (size_t index = 0; index < ARRAY_LENGTH; index++) { + ints[index] = index; + } + + Object exports = Object::New(env); + exports["ARRAY_LENGTH"] = Number::New(env, ARRAY_LENGTH); + exports["MAX_QUEUE_SIZE"] = Number::New(env, MAX_QUEUE_SIZE); + exports["startThread"] = Function::New(env, StartThread); + exports["startThreadNoNative"] = Function::New(env, StartThreadNoNative); + exports["startThreadNonblocking"] = + Function::New(env, StartThreadNonblocking); + exports["stopThread"] = Function::New(env, StopThread); + exports["release"] = Function::New(env, Release); + + return exports; +} + +#endif diff --git a/test/typed_threadsafe_function/typed_threadsafe_function.js b/test/typed_threadsafe_function/typed_threadsafe_function.js new file mode 100644 index 000000000..7aa8cc2ad --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function.js @@ -0,0 +1,194 @@ +'use strict'; + +const buildType = process.config.target_defaults.default_configuration; +const assert = require('assert'); +const common = require('../common'); + +module.exports = (async function () { + await test(require(`../build/${buildType}/binding.node`)); + await test(require(`../build/${buildType}/binding_noexcept.node`)); +})(); + +async function test(binding) { + const expectedArray = (function (arrayLength) { + const result = []; + for (let index = 0; index < arrayLength; index++) { + result.push(arrayLength - 1 - index); + } + return result; + })(binding.typed_threadsafe_function.ARRAY_LENGTH); + + function testWithJSMarshaller({ + threadStarter, + quitAfter, + abort, + maxQueueSize, + launchSecondary }) { + return new Promise((resolve) => { + const array = []; + binding.typed_threadsafe_function[threadStarter](function testCallback(value) { + array.push(value); + if (array.length === quitAfter) { + setImmediate(() => { + binding.typed_threadsafe_function.stopThread(common.mustCall(() => { + resolve(array); + }), !!abort); + }); + } + }, !!abort, !!launchSecondary, maxQueueSize); + if (threadStarter === 'startThreadNonblocking') { + // Let's make this thread really busy for a short while to ensure that + // the queue fills and the thread receives a napi_queue_full. + const start = Date.now(); + while (Date.now() - start < 200); + } + }); + } + + await new Promise(function testWithoutJSMarshaller(resolve) { + let callCount = 0; + binding.typed_threadsafe_function.startThreadNoNative(function testCallback() { + callCount++; + + // The default call-into-JS implementation passes no arguments. + assert.strictEqual(arguments.length, 0); + if (callCount === binding.typed_threadsafe_function.ARRAY_LENGTH) { + setImmediate(() => { + binding.typed_threadsafe_function.stopThread(common.mustCall(() => { + resolve(); + }), false); + }); + } + }, false /* abort */, false /* launchSecondary */, + binding.typed_threadsafe_function.MAX_QUEUE_SIZE); + }); + + // Start the thread in blocking mode, and assert that all values are passed. + // Quit after it's done. + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThread', + maxQueueSize: binding.typed_threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: binding.typed_threadsafe_function.ARRAY_LENGTH + }), + expectedArray, + ); + + // Start the thread in blocking mode with an infinite queue, and assert that + // all values are passed. Quit after it's done. + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThread', + maxQueueSize: 0, + quitAfter: binding.typed_threadsafe_function.ARRAY_LENGTH + }), + expectedArray, + ); + + // Start the thread in non-blocking mode, and assert that all values are + // passed. Quit after it's done. + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThreadNonblocking', + maxQueueSize: binding.typed_threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: binding.typed_threadsafe_function.ARRAY_LENGTH + }), + expectedArray, + ); + + // Start the thread in blocking mode, and assert that all values are passed. + // Quit early, but let the thread finish. + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThread', + maxQueueSize: binding.typed_threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: 1 + }), + expectedArray, + ); + + // Start the thread in blocking mode with an infinite queue, and assert that + // all values are passed. Quit early, but let the thread finish. + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThread', + maxQueueSize: 0, + quitAfter: 1 + }), + expectedArray, + ); + + + // Start the thread in non-blocking mode, and assert that all values are + // passed. Quit early, but let the thread finish. + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThreadNonblocking', + maxQueueSize: binding.typed_threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: 1 + }), + expectedArray, + ); + + // Start the thread in blocking mode, and assert that all values are passed. + // Quit early, but let the thread finish. Launch a secondary thread to test + // the reference counter incrementing functionality. + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThread', + quitAfter: 1, + maxQueueSize: binding.typed_threadsafe_function.MAX_QUEUE_SIZE, + launchSecondary: true + }), + expectedArray, + ); + + // Start the thread in non-blocking mode, and assert that all values are + // passed. Quit early, but let the thread finish. Launch a secondary thread + // to test the reference counter incrementing functionality. + assert.deepStrictEqual( + await testWithJSMarshaller({ + threadStarter: 'startThreadNonblocking', + quitAfter: 1, + maxQueueSize: binding.typed_threadsafe_function.MAX_QUEUE_SIZE, + launchSecondary: true + }), + expectedArray, + ); + + // Start the thread in blocking mode, and assert that it could not finish. + // Quit early by aborting. + assert.strictEqual( + (await testWithJSMarshaller({ + threadStarter: 'startThread', + quitAfter: 1, + maxQueueSize: binding.typed_threadsafe_function.MAX_QUEUE_SIZE, + abort: true + })).indexOf(0), + -1, + ); + + // Start the thread in blocking mode with an infinite queue, and assert that + // it could not finish. Quit early by aborting. + assert.strictEqual( + (await testWithJSMarshaller({ + threadStarter: 'startThread', + quitAfter: 1, + maxQueueSize: 0, + abort: true + })).indexOf(0), + -1, + ); + + // Start the thread in non-blocking mode, and assert that it could not finish. + // Quit early and aborting. + assert.strictEqual( + (await testWithJSMarshaller({ + threadStarter: 'startThreadNonblocking', + quitAfter: 1, + maxQueueSize: binding.typed_threadsafe_function.MAX_QUEUE_SIZE, + abort: true + })).indexOf(0), + -1, + ); +} diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_ctx.cc b/test/typed_threadsafe_function/typed_threadsafe_function_ctx.cc new file mode 100644 index 000000000..ee70bb352 --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_ctx.cc @@ -0,0 +1,68 @@ +#include "napi.h" + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +using ContextType = Reference; +using TSFN = TypedThreadSafeFunction; + +namespace { + +class TSFNWrap : public ObjectWrap { + public: + static Object Init(Napi::Env env, Object exports); + TSFNWrap(const CallbackInfo& info); + + Napi::Value GetContext(const CallbackInfo& /*info*/) { + ContextType* ctx = _tsfn.GetContext(); + return ctx->Value(); + }; + + Napi::Value Release(const CallbackInfo& info) { + Napi::Env env = info.Env(); + _deferred = std::unique_ptr(new Promise::Deferred(env)); + _tsfn.Release(); + return _deferred->Promise(); + }; + + private: + TSFN _tsfn; + std::unique_ptr _deferred; +}; + +Object TSFNWrap::Init(Napi::Env env, Object exports) { + Function func = + DefineClass(env, + "TSFNWrap", + {InstanceMethod("getContext", &TSFNWrap::GetContext), + InstanceMethod("release", &TSFNWrap::Release)}); + + exports.Set("TSFNWrap", func); + return exports; +} + +TSFNWrap::TSFNWrap(const CallbackInfo& info) : ObjectWrap(info) { + ContextType* _ctx = new ContextType; + *_ctx = Persistent(info[0]); + + _tsfn = TSFN::New(info.Env(), + this->Value(), + "Test", + 1, + 1, + _ctx, + [this](Napi::Env env, void*, ContextType* ctx) { + _deferred->Resolve(env.Undefined()); + ctx->Reset(); + delete ctx; + }); +} + +} // namespace + +Object InitTypedThreadSafeFunctionCtx(Env env) { + return TSFNWrap::Init(env, Object::New(env)); +} + +#endif diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_ctx.js b/test/typed_threadsafe_function/typed_threadsafe_function_ctx.js new file mode 100644 index 000000000..2651586a0 --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_ctx.js @@ -0,0 +1,14 @@ +'use strict'; + +const assert = require('assert'); +const buildType = process.config.target_defaults.default_configuration; + +module.exports = test(require(`../build/${buildType}/binding.node`)) + .then(() => test(require(`../build/${buildType}/binding_noexcept.node`))); + +async function test(binding) { + const ctx = { }; + const tsfn = new binding.threadsafe_function_ctx.TSFNWrap(ctx); + assert(tsfn.getContext() === ctx); + await tsfn.release(); +} diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_existing_tsfn.cc b/test/typed_threadsafe_function/typed_threadsafe_function_existing_tsfn.cc new file mode 100644 index 000000000..eccf87c93 --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_existing_tsfn.cc @@ -0,0 +1,124 @@ +#include +#include "napi.h" + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +namespace { + +struct TestContext { + TestContext(Promise::Deferred&& deferred) + : deferred(std::move(deferred)), callData(nullptr){}; + + napi_threadsafe_function tsfn; + Promise::Deferred deferred; + double* callData; + + ~TestContext() { + if (callData != nullptr) delete callData; + }; +}; + +using TSFN = TypedThreadSafeFunction; + +void FinalizeCB(napi_env env, void* /*finalizeData */, void* context) { + TestContext* testContext = static_cast(context); + if (testContext->callData != nullptr) { + testContext->deferred.Resolve(Number::New(env, *testContext->callData)); + } else { + testContext->deferred.Resolve(Napi::Env(env).Undefined()); + } + delete testContext; +} + +void CallJSWithData(napi_env env, + napi_value /* callback */, + void* context, + void* data) { + TestContext* testContext = static_cast(context); + testContext->callData = static_cast(data); + + napi_status status = + napi_release_threadsafe_function(testContext->tsfn, napi_tsfn_release); + + NAPI_THROW_IF_FAILED_VOID(env, status); +} + +void CallJSNoData(napi_env env, + napi_value /* callback */, + void* context, + void* /*data*/) { + TestContext* testContext = static_cast(context); + testContext->callData = nullptr; + + napi_status status = + napi_release_threadsafe_function(testContext->tsfn, napi_tsfn_release); + + NAPI_THROW_IF_FAILED_VOID(env, status); +} + +static Value TestCall(const CallbackInfo& info) { + Napi::Env env = info.Env(); + bool isBlocking = false; + bool hasData = false; + if (info.Length() > 0) { + Object opts = info[0].As(); + if (opts.Has("blocking")) { + isBlocking = opts.Get("blocking").ToBoolean(); + } + if (opts.Has("data")) { + hasData = opts.Get("data").ToBoolean(); + } + } + + // Allow optional callback passed from JS. Useful for testing. + Function cb = Function::New(env, [](const CallbackInfo& /*info*/) {}); + + TestContext* testContext = new TestContext(Napi::Promise::Deferred(env)); + + napi_status status = + napi_create_threadsafe_function(env, + cb, + Object::New(env), + String::New(env, "Test"), + 0, + 1, + nullptr, /*finalize data*/ + FinalizeCB, + testContext, + hasData ? CallJSWithData : CallJSNoData, + &testContext->tsfn); + + NAPI_THROW_IF_FAILED(env, status, Value()); + + TSFN wrapped = TSFN(testContext->tsfn); + + // Test the four napi_threadsafe_function direct-accessing calls + if (isBlocking) { + if (hasData) { + wrapped.BlockingCall(new double(std::rand())); + } else { + wrapped.BlockingCall(nullptr); + } + } else { + if (hasData) { + wrapped.NonBlockingCall(new double(std::rand())); + } else { + wrapped.NonBlockingCall(nullptr); + } + } + + return testContext->deferred.Promise(); +} + +} // namespace + +Object InitTypedThreadSafeFunctionExistingTsfn(Env env) { + Object exports = Object::New(env); + exports["testCall"] = Function::New(env, TestCall); + + return exports; +} + +#endif diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_existing_tsfn.js b/test/typed_threadsafe_function/typed_threadsafe_function_existing_tsfn.js new file mode 100644 index 000000000..b6df669d4 --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_existing_tsfn.js @@ -0,0 +1,17 @@ +'use strict'; + +const assert = require('assert'); + +const buildType = process.config.target_defaults.default_configuration; + +module.exports = test(require(`../build/${buildType}/binding.node`)) + .then(() => test(require(`../build/${buildType}/binding_noexcept.node`))); + +async function test(binding) { + const testCall = binding.typed_threadsafe_function_existing_tsfn.testCall; + + assert.strictEqual(typeof await testCall({ blocking: true, data: true }), "number"); + assert.strictEqual(typeof await testCall({ blocking: true, data: false }), "undefined"); + assert.strictEqual(typeof await testCall({ blocking: false, data: true }), "number"); + assert.strictEqual(typeof await testCall({ blocking: false, data: false }), "undefined"); +} diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_ptr.cc b/test/typed_threadsafe_function/typed_threadsafe_function_ptr.cc new file mode 100644 index 000000000..891fd560c --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_ptr.cc @@ -0,0 +1,28 @@ +#include "napi.h" + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +namespace { + +using TSFN = TypedThreadSafeFunction<>; + +static Value Test(const CallbackInfo& info) { + Object resource = info[0].As(); + Function cb = info[1].As(); + TSFN tsfn = TSFN::New(info.Env(), cb, resource, "Test", 1, 1); + tsfn.Release(); + return info.Env().Undefined(); +} + +} // namespace + +Object InitTypedThreadSafeFunctionPtr(Env env) { + Object exports = Object::New(env); + exports["test"] = Function::New(env, Test); + + return exports; +} + +#endif diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_ptr.js b/test/typed_threadsafe_function/typed_threadsafe_function_ptr.js new file mode 100644 index 000000000..47b187761 --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_ptr.js @@ -0,0 +1,10 @@ +'use strict'; + +const buildType = process.config.target_defaults.default_configuration; + +test(require(`../build/${buildType}/binding.node`)); +test(require(`../build/${buildType}/binding_noexcept.node`)); + +function test(binding) { + binding.typed_threadsafe_function_ptr.test({}, () => {}); +} diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_sum.cc b/test/typed_threadsafe_function/typed_threadsafe_function_sum.cc new file mode 100644 index 000000000..9add259c4 --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_sum.cc @@ -0,0 +1,237 @@ +#include +#include +#include +#include +#include "napi.h" + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +namespace { + +struct TestData { + TestData(Promise::Deferred&& deferred) : deferred(std::move(deferred)){}; + + // Native Promise returned to JavaScript + Promise::Deferred deferred; + + // List of threads created for test. This list only ever accessed via main + // thread. + std::vector threads = {}; + + // These variables are only accessed from the main thread. + bool mainWantsRelease = false; + size_t expected_calls = 0; + + static void CallJs(Napi::Env env, + Function callback, + TestData* testData, + double* data) { + // This lambda runs on the main thread so it's OK to access the variables + // `expected_calls` and `mainWantsRelease`. + testData->expected_calls--; + if (testData->expected_calls == 0 && testData->mainWantsRelease) + testData->tsfn.Release(); + callback.Call({Number::New(env, *data)}); + delete data; + } + + TypedThreadSafeFunction tsfn; +}; + +using TSFN = TypedThreadSafeFunction; + +void FinalizerCallback(Napi::Env env, void*, TestData* finalizeData) { + for (size_t i = 0; i < finalizeData->threads.size(); ++i) { + finalizeData->threads[i].join(); + } + finalizeData->deferred.Resolve(Boolean::New(env, true)); + delete finalizeData; +} + +/** + * See threadsafe_function_sum.js for descriptions of the tests in this file + */ + +void entryWithTSFN(TSFN tsfn, int threadId) { + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + tsfn.BlockingCall(new double(threadId)); + tsfn.Release(); +} + +static Value TestWithTSFN(const CallbackInfo& info) { + int threadCount = info[0].As().Int32Value(); + Function cb = info[1].As(); + + // We pass the test data to the Finalizer for cleanup. The finalizer is + // responsible for deleting this data as well. + TestData* testData = new TestData(Promise::Deferred::New(info.Env())); + + TSFN tsfn = + TSFN::New(info.Env(), + cb, + "Test", + 0, + threadCount, + testData, + std::function(FinalizerCallback), + testData); + + for (int i = 0; i < threadCount; ++i) { + // A copy of the ThreadSafeFunction will go to the thread entry point + testData->threads.push_back(std::thread(entryWithTSFN, tsfn, i)); + } + + return testData->deferred.Promise(); +} + +// Task instance created for each new std::thread +class DelayedTSFNTask { + public: + // Each instance has its own tsfn + TSFN tsfn; + + // Thread-safety + std::mutex mtx; + std::condition_variable cv; + + // Entry point for std::thread + void entryDelayedTSFN(int threadId) { + std::unique_lock lk(mtx); + cv.wait(lk); + tsfn.BlockingCall(new double(threadId)); + tsfn.Release(); + }; +}; + +struct TestDataDelayed : TestData { + TestDataDelayed(Promise::Deferred&& deferred) + : TestData(std::move(deferred)){}; + ~TestDataDelayed() { taskInsts.clear(); }; + + // List of DelayedTSFNThread instances + std::vector> taskInsts = {}; +}; + +void FinalizerCallbackDelayed(Napi::Env env, + TestDataDelayed* finalizeData, + TestData*) { + for (size_t i = 0; i < finalizeData->threads.size(); ++i) { + finalizeData->threads[i].join(); + } + finalizeData->deferred.Resolve(Boolean::New(env, true)); + delete finalizeData; +} + +static Value TestDelayedTSFN(const CallbackInfo& info) { + int threadCount = info[0].As().Int32Value(); + Function cb = info[1].As(); + + TestDataDelayed* testData = + new TestDataDelayed(Promise::Deferred::New(info.Env())); + + testData->tsfn = TSFN::New(info.Env(), + cb, + "Test", + 0, + threadCount, + testData, + std::function( + FinalizerCallbackDelayed), + testData); + + for (int i = 0; i < threadCount; ++i) { + testData->taskInsts.push_back( + std::unique_ptr(new DelayedTSFNTask())); + testData->threads.push_back(std::thread(&DelayedTSFNTask::entryDelayedTSFN, + testData->taskInsts.back().get(), + i)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + + for (auto& task : testData->taskInsts) { + std::lock_guard lk(task->mtx); + task->tsfn = testData->tsfn; + task->cv.notify_all(); + } + + return testData->deferred.Promise(); +} + +void AcquireFinalizerCallback(Napi::Env env, + TestData* finalizeData, + TestData* context) { + (void)context; + for (size_t i = 0; i < finalizeData->threads.size(); ++i) { + finalizeData->threads[i].join(); + } + finalizeData->deferred.Resolve(Boolean::New(env, true)); + delete finalizeData; +} + +void entryAcquire(TSFN tsfn, int threadId) { + tsfn.Acquire(); + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + tsfn.BlockingCall(new double(threadId)); + tsfn.Release(); +} + +static Value CreateThread(const CallbackInfo& info) { + TestData* testData = static_cast(info.Data()); + // Counting expected calls like this only works because on the JS side this + // binding is called from a synchronous loop. This means the main loop has no + // chance to run the tsfn JS callback before we've counted how many threads + // the JS intends to create. + testData->expected_calls++; + TSFN tsfn = testData->tsfn; + int threadId = testData->threads.size(); + // A copy of the ThreadSafeFunction will go to the thread entry point + testData->threads.push_back(std::thread(entryAcquire, tsfn, threadId)); + return Number::New(info.Env(), threadId); +} + +static Value StopThreads(const CallbackInfo& info) { + TestData* testData = static_cast(info.Data()); + testData->mainWantsRelease = true; + return info.Env().Undefined(); +} + +static Value TestAcquire(const CallbackInfo& info) { + Function cb = info[0].As(); + Napi::Env env = info.Env(); + + // We pass the test data to the Finalizer for cleanup. The finalizer is + // responsible for deleting this data as well. + TestData* testData = new TestData(Promise::Deferred::New(info.Env())); + + testData->tsfn = TSFN::New(env, + cb, + "Test", + 0, + 1, + testData, + std::function( + AcquireFinalizerCallback), + testData); + + Object result = Object::New(env); + result["createThread"] = + Function::New(env, CreateThread, "createThread", testData); + result["stopThreads"] = + Function::New(env, StopThreads, "stopThreads", testData); + result["promise"] = testData->deferred.Promise(); + + return result; +} +} // namespace + +Object InitTypedThreadSafeFunctionSum(Env env) { + Object exports = Object::New(env); + exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN); + exports["testWithTSFN"] = Function::New(env, TestWithTSFN); + exports["testAcquire"] = Function::New(env, TestAcquire); + return exports; +} + +#endif diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_sum.js b/test/typed_threadsafe_function/typed_threadsafe_function_sum.js new file mode 100644 index 000000000..8f10476f6 --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_sum.js @@ -0,0 +1,61 @@ +'use strict'; +const assert = require('assert'); +const buildType = process.config.target_defaults.default_configuration; + +/** + * + * ThreadSafeFunction Tests: Thread Id Sums + * + * Every native C++ function that utilizes the TSFN will call the registered + * callback with the thread id. Passing Array.prototype.push with a bound array + * will push the thread id to the array. Therefore, starting `N` threads, we + * will expect the sum of all elements in the array to be `(N-1) * (N) / 2` (as + * thread IDs are 0-based) + * + * We check different methods of passing a ThreadSafeFunction around multiple + * threads: + * - `testWithTSFN`: The main thread creates the TSFN. Then, it creates + * threads, passing the TSFN at thread construction. The number of threads is + * static (known at TSFN creation). + * - `testDelayedTSFN`: The main thread creates threads, passing a promise to a + * TSFN at construction. Then, it creates the TSFN, and resolves each + * threads' promise. The number of threads is static. + * - `testAcquire`: The native binding returns a function to start a new. A + * call to this function will return `false` once `N` calls have been made. + * Each thread will acquire its own use of the TSFN, call it, and then + * release. + */ + +const THREAD_COUNT = 5; +const EXPECTED_SUM = (THREAD_COUNT - 1) * (THREAD_COUNT) / 2; + +module.exports = test(require(`../build/${buildType}/binding.node`)) + .then(() => test(require(`../build/${buildType}/binding_noexcept.node`))); + +/** @param {number[]} N */ +const sum = (N) => N.reduce((sum, n) => sum + n, 0); + +function test(binding) { + async function check(bindingFunction) { + const calls = []; + const result = await bindingFunction(THREAD_COUNT, Array.prototype.push.bind(calls)); + assert.ok(result); + assert.equal(sum(calls), EXPECTED_SUM); + } + + async function checkAcquire() { + const calls = []; + const { promise, createThread, stopThreads } = binding.typed_threadsafe_function_sum.testAcquire(Array.prototype.push.bind(calls)); + for (let i = 0; i < THREAD_COUNT; i++) { + createThread(); + } + stopThreads(); + const result = await promise; + assert.ok(result); + assert.equal(sum(calls), EXPECTED_SUM); + } + + return check(binding.typed_threadsafe_function_sum.testDelayedTSFN) + .then(() => check(binding.typed_threadsafe_function_sum.testWithTSFN)) + .then(() => checkAcquire()); +} diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_unref.cc b/test/typed_threadsafe_function/typed_threadsafe_function_unref.cc new file mode 100644 index 000000000..35345568d --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_unref.cc @@ -0,0 +1,52 @@ +#include "napi.h" + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +namespace { + +using TSFN = TypedThreadSafeFunction<>; +using ContextType = std::nullptr_t; +using FinalizerDataType = void; +static Value TestUnref(const CallbackInfo& info) { + Napi::Env env = info.Env(); + Object global = env.Global(); + Object resource = info[0].As(); + Function cb = info[1].As(); + Function setTimeout = global.Get("setTimeout").As(); + TSFN* tsfn = new TSFN; + + *tsfn = TSFN::New( + info.Env(), + cb, + resource, + "Test", + 1, + 1, + nullptr, + [tsfn](Napi::Env /* env */, FinalizerDataType*, ContextType*) { + delete tsfn; + }, + static_cast(nullptr)); + + tsfn->BlockingCall(); + + setTimeout.Call( + global, + {Function::New( + env, [tsfn](const CallbackInfo& info) { tsfn->Unref(info.Env()); }), + Number::New(env, 100)}); + + return info.Env().Undefined(); +} + +} // namespace + +Object InitTypedThreadSafeFunctionUnref(Env env) { + Object exports = Object::New(env); + exports["testUnref"] = Function::New(env, TestUnref); + return exports; +} + +#endif diff --git a/test/typed_threadsafe_function/typed_threadsafe_function_unref.js b/test/typed_threadsafe_function/typed_threadsafe_function_unref.js new file mode 100644 index 000000000..55b42a553 --- /dev/null +++ b/test/typed_threadsafe_function/typed_threadsafe_function_unref.js @@ -0,0 +1,55 @@ +'use strict'; + +const assert = require('assert'); +const buildType = process.config.target_defaults.default_configuration; + +const isMainProcess = process.argv[1] != __filename; + +/** + * In order to test that the event loop exits even with an active TSFN, we need + * to spawn a new process for the test. + * - Main process: spawns new node instance, executing this script + * - Child process: creates TSFN. Native module Unref's via setTimeout after some time but does NOT call Release. + * + * Main process should expect child process to exit. + */ + +if (isMainProcess) { + module.exports = test(`../build/${buildType}/binding.node`) + .then(() => test(`../build/${buildType}/binding_noexcept.node`)); +} else { + test(process.argv[2]); +} + +function test(bindingFile) { + if (isMainProcess) { + // Main process + return new Promise((resolve, reject) => { + const child = require('../napi_child').spawn(process.argv[0], [ + '--expose-gc', __filename, bindingFile + ], { stdio: 'inherit' }); + + let timeout = setTimeout( function() { + child.kill(); + timeout = 0; + reject(new Error("Expected child to die")); + }, 5000); + + child.on("error", (err) => { + clearTimeout(timeout); + timeout = 0; + reject(new Error(err)); + }) + + child.on("close", (code) => { + if (timeout) clearTimeout(timeout); + assert.strictEqual(code, 0, "Expected return value 0"); + resolve(); + }); + }); + } else { + // Child process + const binding = require(bindingFile); + binding.typed_threadsafe_function_unref.testUnref({}, () => { }); + } +}