-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathnapi-thread-safe-callback-impl.hpp
276 lines (243 loc) · 8.59 KB
/
napi-thread-safe-callback-impl.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#pragma once
// Needs to go first because of winsock issues
#include <uv.h>
#include <mutex>
#include <iostream>
class ThreadSafeCallback::Impl
{
public:
Impl(Napi::Reference<Napi::Value> &&receiver, Napi::FunctionReference &&callback)
: receiver_(std::move(receiver)), callback_(std::move(callback)), close_(false)
{
if (receiver_.IsEmpty())
receiver_ = Napi::Persistent(static_cast<Napi::Value>(Napi::Object::New(callback_.Env())));
uv_async_init(uv_default_loop(), &handle_, &static_async_callback);
handle_.data = this;
}
void unref()
{
uv_unref(reinterpret_cast<uv_handle_t *>(&handle_));
}
void call(arg_func_t arg_function, completion_func_t completion_function)
{
std::lock_guard<std::mutex> lock(mutex_);
function_pairs_.push_back({arg_function, completion_function});
uv_async_send(&handle_);
}
void close()
{
std::lock_guard<std::mutex> lock(mutex_);
close_ = true;
uv_async_send(&handle_);
}
protected:
using func_pair_t = std::pair<arg_func_t, completion_func_t>;
static void static_async_callback(uv_async_t *handle)
{
try
{
static_cast<Impl *>(handle->data)->async_callback();
}
catch (std::exception& e)
{
Napi::Error::Fatal("", e.what());
}
catch (...)
{
Napi::Error::Fatal("", "ERROR: Unknown exception during async callback");
}
}
void async_callback()
{
auto env = callback_.Env();
while (true)
{
std::vector<func_pair_t> func_pairs;
{
std::lock_guard<std::mutex> lock(mutex_);
if (function_pairs_.empty())
break;
else
func_pairs.swap(function_pairs_);
}
for (const auto &function_pair : func_pairs)
{
Napi::HandleScope scope(env);
std::vector<napi_value> args;
if (function_pair.first)
function_pair.first(env, args);
Napi::Value result(env, nullptr);
Napi::Error error(env, nullptr);
try
{
result = callback_.MakeCallback(receiver_.Value(), args);
}
catch (Napi::Error& err)
{
error = std::move(err);
}
if (function_pair.second)
function_pair.second(result, error);
else if (!error.IsEmpty())
throw std::runtime_error(error.Message());
}
}
if (close_)
uv_close(reinterpret_cast<uv_handle_t *>(&handle_), [](uv_handle_t *handle) {
delete static_cast<Impl *>(handle->data);
});
}
Napi::Reference<Napi::Value> receiver_;
Napi::FunctionReference callback_;
uv_async_t handle_;
std::mutex mutex_;
std::vector<func_pair_t> function_pairs_;
bool close_;
};
// public API
inline ThreadSafeCallback::ThreadSafeCallback(const Napi::Function &callback)
: ThreadSafeCallback(Napi::Value(), callback)
{}
inline ThreadSafeCallback::ThreadSafeCallback(const Napi::Value& receiver, const Napi::Function& callback)
: impl(nullptr)
{
if (!receiver.IsEmpty() && !(receiver.IsObject() || receiver.IsFunction()))
throw Napi::Error::New(callback.Env(), "Callback receiver must be an object or function");
if (!callback.IsFunction())
throw Napi::Error::New(callback.Env(), "Callback must be a function");
impl = new Impl(Napi::Persistent(receiver), Napi::Persistent(callback));
}
inline void ThreadSafeCallback::unref()
{
impl->unref();
}
inline ThreadSafeCallback::ThreadSafeCallback(ThreadSafeCallback&& other)
: impl(other.impl)
{
other.impl = nullptr;
}
inline ThreadSafeCallback::~ThreadSafeCallback()
{
// Destruction of the impl is defered because:
// 1) uv_async_close may only be called on nodejs main thread
// 2) uv_async_t memory may only be freed in close callback
if (impl)
impl->close();
}
inline std::future<void> ThreadSafeCallback::operator()()
{
return operator()(arg_func_t(nullptr));
}
inline std::future<void> ThreadSafeCallback::operator()(arg_func_t arg_function)
{
auto promise = std::make_shared<std::promise<void>>();
operator()(arg_function, [promise](const Napi::Value &value, const Napi::Error &error)
{
try
{
if (error.IsEmpty())
promise->set_value();
else
throw std::runtime_error(error.Message());
}
catch (...)
{
try
{
promise->set_exception(std::current_exception());
}
catch (...)
{
Napi::Error::Fatal("", "Unable to set exception on promise");
}
}
});
return promise->get_future();
}
inline std::future<void> ThreadSafeCallback::error(const std::string& message)
{
return operator()([message](napi_env env, std::vector<napi_value>& args) {
args.push_back(Napi::Error::New(env, message).Value());
});
}
inline void ThreadSafeCallback::operator()(completion_func_t completion_function)
{
operator()(nullptr, completion_function);
}
inline void ThreadSafeCallback::operator()(arg_func_t arg_function, completion_func_t completion_function)
{
if (impl)
impl->call(arg_function, completion_function);
else
throw std::runtime_error("Callback called after move");
}
inline void ThreadSafeCallback::error(const std::string& message, completion_func_t completion_function)
{
operator()([message](napi_env env, std::vector<napi_value>& args) {
args.push_back(Napi::Error::New(env, message).Value());
}, completion_function);
}
inline std::future<std::string> ThreadSafeCallback::callStringify()
{
return callStringify(nullptr);
}
inline std::future<std::string> ThreadSafeCallback::callStringify(arg_func_t arg_function)
{
return call<std::string>(arg_function, [](const Napi::Value& value)
{
auto JSON = value.Env().Global().Get("JSON").As<Napi::Object>();
auto stringify = JSON.Get("stringify").As<Napi::Function>();
return stringify.Call(JSON, {value}).As<Napi::String>().Utf8Value();
});
}
inline std::future<std::string> ThreadSafeCallback::errorStringify(const std::string& message)
{
return callStringify([message](napi_env env, std::vector<napi_value>& args)
{
args.push_back(Napi::Error::New(env, message).Value());
});
}
template <typename T>
inline std::future<T> ThreadSafeCallback::call(std::function<T(const Napi::Value &)> completion_function)
{
return call<T>(nullptr, completion_function);
}
template <typename T>
inline std::future<T> ThreadSafeCallback::call(arg_func_t arg_function, std::function<T(const Napi::Value &)> completion_function)
{
auto promise = std::make_shared<std::promise<T>>();
operator()(arg_function, [promise, completion_function](const Napi::Value &value, const Napi::Error& error)
{
try
{
if (error.IsEmpty())
promise->set_value(completion_function(value));
else
throw std::runtime_error(error.Message());
}
catch (...)
{
try
{
promise->set_exception(std::current_exception());
}
catch (...)
{
Napi::Error::Fatal("", "Unable to set exception on promise");
}
}
});
return promise->get_future();
}
inline void ThreadSafeCallback::call()
{
operator()(nullptr, nullptr);
}
inline void ThreadSafeCallback::call(arg_func_t arg_function)
{
operator()(arg_function, nullptr);
}
inline void ThreadSafeCallback::callError(const std::string& message)
{
error(message, nullptr);
}