From d946ac4f27f8cd8bddf02bb00a10d7a35397779a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kasper=20Isager=20Dalsgar=C3=B0?= Date: Wed, 1 Jun 2022 16:30:20 +0200 Subject: [PATCH] Test throwing in `data` callback (#60) * Test throwing in `data` callback * Add read buffer reallocation * Skip test for now --- binding.c | 18 ++++++++++++++---- lib/stream.js | 8 +++++++- test/stream.js | 31 +++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/binding.c b/binding.c index c8bd9241..47b73319 100644 --- a/binding.c +++ b/binding.c @@ -30,12 +30,20 @@ napi_value fatal_exception; \ napi_get_and_clear_last_exception(env, &fatal_exception); \ napi_fatal_exception(env, fatal_exception); \ - printf("oh no add that realloc\n"); \ + { \ + UDX_NAPI_CALLBACK(self, self->realloc, { \ + NAPI_MAKE_CALLBACK(env, nil, ctx, callback, 0, NULL, &res); \ + UDX_NAPI_SET_READ_BUFFER(self, res); \ + }) \ + } \ } else { \ - napi_get_buffer_info(env, res, (void **) &(self->read_buf), &(self->read_buf_free)); \ - self->read_buf_head = self->read_buf; \ + UDX_NAPI_SET_READ_BUFFER(self, res); \ } +#define UDX_NAPI_SET_READ_BUFFER(self, res) \ + napi_get_buffer_info(env, res, (void **) &(self->read_buf), &(self->read_buf_free)); \ + self->read_buf_head = self->read_buf; + typedef struct { udx_socket_t socket; @@ -65,6 +73,7 @@ typedef struct { napi_ref on_message; napi_ref on_close; napi_ref on_firewall; + napi_ref realloc; } udx_napi_stream_t; inline static void @@ -368,7 +377,7 @@ NAPI_METHOD(udx_napi_socket_close) { } NAPI_METHOD(udx_napi_stream_init) { - NAPI_ARGV(12) + NAPI_ARGV(13) NAPI_ARGV_BUFFER_CAST(udx_t *, udx, 0) NAPI_ARGV_BUFFER_CAST(udx_napi_stream_t *, stream, 1) NAPI_ARGV_UINT32(id, 2) @@ -389,6 +398,7 @@ NAPI_METHOD(udx_napi_stream_init) { napi_create_reference(env, argv[9], 1, &(stream->on_message)); napi_create_reference(env, argv[10], 1, &(stream->on_close)); napi_create_reference(env, argv[11], 1, &(stream->on_firewall)); + napi_create_reference(env, argv[12], 1, &(stream->realloc)); int err = udx_stream_init(udx, (udx_stream_t *) stream, id, on_udx_stream_close); if (err < 0) UDX_NAPI_THROW(err) diff --git a/lib/stream.js b/lib/stream.js index 00c1d7e1..5327e733 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -42,7 +42,8 @@ module.exports = class UDXStream extends streamx.Duplex { this._onsend, this._onmessage, this._onclose, - this._onfirewall + this._onfirewall, + this._realloc ) binding.udx_napi_stream_recv_start(this._handle, this._readBuffer) @@ -270,6 +271,11 @@ module.exports = class UDXStream extends streamx.Duplex { return this._firewall(socket, port, host) ? 1 : 0 } + _realloc () { + this._readBuffer = Buffer.allocUnsafe(BUFFER_SIZE) + return this._readBuffer + } + _allocWrite () { if (this._wfree.length > 0) return this._wfree.pop() const handle = b4a.allocUnsafe(binding.sizeof_udx_stream_write_t) diff --git a/test/stream.js b/test/stream.js index 8402ef84..6b3c8faf 100644 --- a/test/stream.js +++ b/test/stream.js @@ -483,3 +483,34 @@ test('destroy before fully connected', async function (t) { b.destroy() }, 100) // wait for destroy to be processed }) + +// Unskip once https://github.com/nodejs/node/issues/38155 is solved +test.skip('throw in data callback', async function (t) { + t.plan(1) + + const u = new UDX() + + const socket = u.createSocket() + socket.bind(0) + + const a = u.createStream(1) + const b = u.createStream(2) + + a.connect(socket, 2, socket.address().port) + b.connect(socket, 1, socket.address().port) + + a.on('data', function () { + throw new Error('boom') + }) + + b.end(Buffer.from('hello')) + + process.once('uncaughtException', async (err) => { + t.is(err.message, 'boom') + + a.destroy() + b.destroy() + + await socket.close() + }) +})