From c5bea5f71277cd9fe464f8c6f7cc96d84ff19361 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 11 Feb 2020 14:14:57 -0800 Subject: [PATCH 1/7] quic: fix client migration --- lib/internal/quic/core.js | 2 + src/quic/node_quic_session.cc | 24 ++++++-- src/quic/node_quic_util-inl.h | 2 +- .../test-quic-simple-client-migrate.js | 60 +++++++++---------- 4 files changed, 51 insertions(+), 37 deletions(-) diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index 77189ce097..def6082012 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -1046,6 +1046,8 @@ class QuicSocket extends EventEmitter { [kRemoveSession](session) { this.#sessions.delete(session); + if (this.closing) + this[kMaybeDestroy](); } // Bind the UDP socket on demand, only if it hasn't already been bound. diff --git a/src/quic/node_quic_session.cc b/src/quic/node_quic_session.cc index 171fc43bf5..83e32e6834 100644 --- a/src/quic/node_quic_session.cc +++ b/src/quic/node_quic_session.cc @@ -1503,6 +1503,8 @@ void QuicSession::AckedStreamDataOffset( // here is that any CID's associated with the session have to // be associated with the new QuicSocket. void QuicSession::AddToSocket(QuicSocket* socket) { + CHECK_NOT_NULL(socket); + Debug(this, "Adding QuicSession to %s", socket->diagnostic_name()); socket->AddSession(scid_, BaseObjectPtr(this)); switch (crypto_context_->side()) { case NGTCP2_CRYPTO_SIDE_SERVER: { @@ -1978,6 +1980,8 @@ bool QuicSession::ReceiveStreamData( // If the session is removed and there are no other references held, // the session object will be destroyed automatically. void QuicSession::RemoveFromSocket() { + CHECK(socket_); + Debug(this, "Removing QuicSession from %s", socket_->diagnostic_name()); if (is_server()) { socket_->DisassociateCID(rcid_); socket_->DisassociateCID(pscid_); @@ -2241,19 +2245,29 @@ bool QuicSession::set_session(Local buffer) { bool QuicSession::set_socket(QuicSocket* socket, bool nat_rebinding) { CHECK(!is_server()); CHECK(!is_flag_set(QUICSESSION_FLAG_DESTROYED)); - CHECK(!is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING)); + + if (is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING)) + return false; + if (socket == nullptr || socket == socket_.get()) return true; + Debug(this, "Migrating to %s", socket->diagnostic_name()); + SendSessionScope send(this); - // Step 1: Add this Session to the given Socket - AddToSocket(socket); + // Ensure that we maintain a reference to keep this from being + // destroyed while we are starting the migration. + BaseObjectPtr ptr(this); - // Step 2: Remove this Session from the current Socket + // Step 1: Remove the session from the current socket RemoveFromSocket(); - // Step 3: Update the internal references + // Step 2: Add this Session to the given Socket + AddToSocket(socket); + + // Step 3: Update the internal references and make sure + // we are listening. socket_.reset(socket); socket->ReceiveStart(); diff --git a/src/quic/node_quic_util-inl.h b/src/quic/node_quic_util-inl.h index 36bdddabcc..71b5ceab06 100644 --- a/src/quic/node_quic_util-inl.h +++ b/src/quic/node_quic_util-inl.h @@ -43,7 +43,7 @@ size_t QuicCID::Hash::operator()(const QuicCID& token) const { } bool QuicCID::operator==(const QuicCID& other) const { - return memcmp(cid(), other.cid(), sizeof(ngtcp2_cid)) == 0; + return memcmp(cid()->data, other.cid()->data, cid()->datalen) == 0; } bool QuicCID::operator!=(const QuicCID& other) const { diff --git a/test/parallel/test-quic-simple-client-migrate.js b/test/parallel/test-quic-simple-client-migrate.js index f8c675f53b..b25fd43716 100644 --- a/test/parallel/test-quic-simple-client-migrate.js +++ b/test/parallel/test-quic-simple-client-migrate.js @@ -15,79 +15,77 @@ const { } = require('../common/quic'); const { createSocket } = require('quic'); +const { pipeline } = require('stream'); +let req; let client; let client2; const server = createSocket(); -const kServerName = 'agent1'; -const kALPN = 'zzz'; +const options = { key, cert, ca, alpn: 'zzz' }; const countdown = new Countdown(2, () => { debug('Countdown expired. Destroying sockets'); + req.close(); server.close(); client2.close(); }); -server.listen({ key, cert, ca, alpn: kALPN }); +server.listen(options); + server.on('session', common.mustCall((session) => { debug('QuicServerSession Created'); session.on('stream', common.mustCall((stream) => { debug('Bidirectional, Client-initiated stream %d received', stream.id); - stream.pipe(stream); + pipeline(stream, stream, common.mustCall()); - const uni = session.openStream({ halfOpen: true }); - uni.end('Hello from the server'); + session.openStream({ halfOpen: true }).end('Hello from the server'); })); })); server.on('ready', common.mustCall(() => { debug('Server is listening on port %d', server.endpoints[0].address.port); - const options = { key, cert, ca, alpn: kALPN }; + client = createSocket({ client: options }); client2 = createSocket({ client: options }); - const req = client.connect({ - address: 'localhost', + req = client.connect({ + address: common.localhostIPv4, port: server.endpoints[0].address.port, - servername: kServerName, }); - client.on('close', () => debug('Client closing')); + client.on('close', common.mustCall()); - req.on('secure', common.mustCall((servername, alpn, cipher) => { + req.on('secure', common.mustCall(() => { debug('QuicClientSession TLS Handshake Complete'); + let data = ''; + const stream = req.openStream(); + debug('Bidirectional, Client-initiated stream %d opened', stream.id); + stream.setEncoding('utf8'); + stream.on('data', (chunk) => data += chunk); + stream.on('end', common.mustCall(() => { + assert.strictEqual(data, 'Hello from the client'); + debug('Client received expected data for stream %d', stream.id); + })); + stream.on('close', common.mustCall(() => { + debug('Bidirectional, Client-initiated stream %d closed', stream.id); + countdown.dec(); + })); // Send some data on one connection... stream.write('Hello '); // Wait just a bit, then migrate to a different // QuicSocket and continue sending. - setTimeout(() => { + setTimeout(common.mustCall(() => { req.setSocket(client2, (err) => { assert(!err); - debug('Client 1 port is %d', client.endpoints[0].address.port); - debug('Client 2 port is %d', client2.endpoints[0].address.port); client.close(); - stream.end('from the client'); - let data = ''; - stream.resume(); - stream.setEncoding('utf8'); - stream.on('data', (chunk) => data += chunk); - stream.on('end', common.mustCall(() => { - assert.strictEqual(data, 'Hello from the client'); - debug('Client received expected data for stream %d', stream.id); - })); - stream.on('close', common.mustCall(() => { - debug('Bidirectional, Client-initiated stream %d closed', stream.id); - countdown.dec(); - })); - debug('Bidirectional, Client-initiated stream %d opened', stream.id); }); - }, common.platformTimeout(100)); + }), common.platformTimeout(100)); })); req.on('stream', common.mustCall((stream) => { @@ -107,4 +105,4 @@ server.on('ready', common.mustCall(() => { })); server.on('listening', common.mustCall()); -server.on('close', () => debug('Server closing')); +server.on('close', common.mustCall()); From 436e2c73a44957dda0f845802efc5413e4f3a445 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 11 Feb 2020 16:14:08 -0800 Subject: [PATCH 2/7] quic: fixup ResetStream cancel stream buffer after sending reset to avoid segfault --- src/quic/node_quic_stream-inl.h | 3 ++- test/parallel/test-quic-quicstream-close-early.js | 3 --- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/quic/node_quic_stream-inl.h b/src/quic/node_quic_stream-inl.h index 3ba16e60a0..9dbc567629 100644 --- a/src/quic/node_quic_stream-inl.h +++ b/src/quic/node_quic_stream-inl.h @@ -133,10 +133,11 @@ void QuicStream::ResetStream(uint64_t app_error_code) { // streambuf_ will be canceled, and all data pending // to be acknowledged at the ngtcp2 level will be // abandoned. + BaseObjectPtr ptr(session_); set_flag(QUICSTREAM_FLAG_READ_CLOSED); + session_->ResetStream(stream_id_, app_error_code); streambuf_.Cancel(); streambuf_.End(); - session_->ResetStream(stream_id_, app_error_code); } void QuicStream::Schedule(Queue* queue) { diff --git a/test/parallel/test-quic-quicstream-close-early.js b/test/parallel/test-quic-quicstream-close-early.js index 60c0aab54a..aaacd75435 100644 --- a/test/parallel/test-quic-quicstream-close-early.js +++ b/test/parallel/test-quic-quicstream-close-early.js @@ -86,9 +86,6 @@ server.on('ready', common.mustCall(() => { const stream = req.openStream(); - // TODO(@jasnell): The close happens synchronously, before any - // data for the stream is actually flushed out to the connected - // peer. stream.write('hello', common.mustCall()); stream.close(1); From faf78fa3dda7225b93b77c8e373001a655cbdc37 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 12 Feb 2020 08:28:00 -0800 Subject: [PATCH 3/7] quic: fix push stream closing --- lib/internal/quic/core.js | 3 ++- src/quic/node_quic_default_application.cc | 2 ++ src/quic/node_quic_http3_application.cc | 9 ++++++--- src/quic/node_quic_session.cc | 6 ++---- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index def6082012..a7417fcce9 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -2408,7 +2408,8 @@ class QuicStream extends Duplex { ...options, allowHalfOpen: true, decodeStrings: true, - emitClose: true + emitClose: true, + autoDestroy: false, }); this.#session = session; this.#push_id = push_id; diff --git a/src/quic/node_quic_default_application.cc b/src/quic/node_quic_default_application.cc index add832d58c..8f3f0349cf 100644 --- a/src/quic/node_quic_default_application.cc +++ b/src/quic/node_quic_default_application.cc @@ -67,6 +67,8 @@ void DefaultApplication::UnscheduleStream(int64_t stream_id) { void DefaultApplication::StreamClose( int64_t stream_id, uint64_t app_error_code) { + if (!session()->HasStream(stream_id)) + return; if (app_error_code == 0) app_error_code = NGTCP2_APP_NOERROR; UnscheduleStream(stream_id); diff --git a/src/quic/node_quic_http3_application.cc b/src/quic/node_quic_http3_application.cc index 0752ffbeda..89f88fffdf 100644 --- a/src/quic/node_quic_http3_application.cc +++ b/src/quic/node_quic_http3_application.cc @@ -682,9 +682,12 @@ void Http3Application::ReceiveData( void Http3Application::DeferredConsume( int64_t stream_id, size_t consumed) { - BaseObjectPtr stream = session()->FindStream(stream_id); - CHECK(stream); - session()->ExtendStreamOffset(stream->id(), consumed); + // Do nothing here for now. nghttp3 uses the on_deferred_consume + // callback to notify when stream data that had previously been + // deferred has been delivered to the application so that the + // stream data offset can be extended. However, we extend the + // data offset from within QuicStream when the data is delivered + // so we don't have to do it here. } // Called when a nghttp3 detects that a new block of headers diff --git a/src/quic/node_quic_session.cc b/src/quic/node_quic_session.cc index 83e32e6834..165003b3f1 100644 --- a/src/quic/node_quic_session.cc +++ b/src/quic/node_quic_session.cc @@ -1264,7 +1264,7 @@ void QuicApplication::MaybeSetFin(const StreamData& stream_data) { } void QuicApplication::StreamOpen(int64_t stream_id) { - Debug(session(), "QUIC Stream %" PRId64 " is open."); + Debug(session(), "QUIC Stream %" PRId64 " is open.", stream_id); } void QuicApplication::StreamHeaders( @@ -2385,9 +2385,7 @@ void QuicSession::StreamClose(int64_t stream_id, uint64_t app_error_code) { stream_id, app_error_code); - // If the stream does not actually exist, just ignore - if (HasStream(stream_id)) - application_->StreamClose(stream_id, app_error_code); + application_->StreamClose(stream_id, app_error_code); } // Called by ngtcp2 when a stream has been opened. All we do is log From a18b0464f5ec08f096d907e5325ed3286ee143a8 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 12 Feb 2020 10:20:56 -0800 Subject: [PATCH 4/7] quic: prevent multiple cancels --- src/quic/node_quic_buffer-inl.h | 5 ++++- src/quic/node_quic_buffer.h | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/quic/node_quic_buffer-inl.h b/src/quic/node_quic_buffer-inl.h index cc68817205..e03378a331 100644 --- a/src/quic/node_quic_buffer-inl.h +++ b/src/quic/node_quic_buffer-inl.h @@ -81,7 +81,10 @@ size_t QuicBuffer::Consume(size_t amount) { } size_t QuicBuffer::Cancel(int status) { - return Consume(status, length()); + if (canceled_) return 0; + canceled_ = true; + size_t t = Consume(status, length()); + return t; } void QuicBuffer::Push(uv_buf_t buf, DoneCB done) { diff --git a/src/quic/node_quic_buffer.h b/src/quic/node_quic_buffer.h index e7e501b0af..17f59a7e75 100644 --- a/src/quic/node_quic_buffer.h +++ b/src/quic/node_quic_buffer.h @@ -223,6 +223,7 @@ class QuicBuffer : public bob::SourceImpl, QuicBufferChunk* head_ = nullptr; // Current Read Position QuicBufferChunk* tail_ = nullptr; // Current Write Position + bool canceled_ = false; bool ended_ = false; size_t length_ = 0; size_t remaining_ = 0; From 7a682354fd6d7f04a1f3a9444607e5d7681f965e Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 12 Feb 2020 11:36:39 -0800 Subject: [PATCH 5/7] quic: fix test --- test/parallel/test-quic-quicsession-openstream-pending.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-quic-quicsession-openstream-pending.js b/test/parallel/test-quic-quicsession-openstream-pending.js index 1812cb8a1b..23a6ae254d 100644 --- a/test/parallel/test-quic-quicsession-openstream-pending.js +++ b/test/parallel/test-quic-quicsession-openstream-pending.js @@ -25,8 +25,6 @@ const options = { key, cert, ca, alpn: 'meow' }; stream.on('data', (chunk) => data += chunk); await once(stream, 'end'); assert.strictEqual(data, 'Hello!'); - server.close(); - client.close(); })); })); @@ -48,6 +46,9 @@ const options = { key, cert, ca, alpn: 'meow' }; await once(stream, 'close'); + server.close(); + client.close(); + await Promise.allSettled([ once(server, 'close'), once(client, 'close') From 9496bc8a4159e8881e9c4d83edcf5e551273fb06 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 12 Feb 2020 11:36:59 -0800 Subject: [PATCH 6/7] quic: temporary fixup for test --- .../test-quic-quicstream-close-early.js | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/test/parallel/test-quic-quicstream-close-early.js b/test/parallel/test-quic-quicstream-close-early.js index aaacd75435..358e8705e4 100644 --- a/test/parallel/test-quic-quicstream-close-early.js +++ b/test/parallel/test-quic-quicstream-close-early.js @@ -41,25 +41,19 @@ server.on('session', common.mustCall((session) => { session.on('secure', common.mustCall((servername, alpn, cipher) => { const uni = session.openStream({ halfOpen: true }); - // TODO(@jasnell): There's still a bug in here somewhere. If we - // comment out the following line and close without writing - // anything, the test hangs. - uni.write('hi', common.mustCall()); - uni.close(3); + uni.write('hi', common.expectsError()); - uni.on('data', common.mustNotCall()); + uni.close(3); - uni.on('end', common.mustCall(() => { - debug('Undirectional, Server-initiated stream %d ended on server', - uni.id); + uni.on('error', common.mustCall(() => { + assert.strictEqual(uni.aborted, true); })); + + uni.on('data', common.mustNotCall()); uni.on('close', common.mustCall(() => { debug('Unidirectional, Server-initiated stream %d closed on server', uni.id); })); - uni.on('error', common.mustCall(() => { - assert.strictEqual(uni.aborted, true); - })); debug('Unidirectional, Server-initiated stream %d opened', uni.id); })); @@ -86,18 +80,18 @@ server.on('ready', common.mustCall(() => { const stream = req.openStream(); - stream.write('hello', common.mustCall()); - stream.close(1); + stream.write('hello', common.expectsError()); + stream.write('there', common.expectsError()); - stream.on('end', common.mustNotCall()); + stream.close(1); stream.on('error', common.mustCall(() => { assert.strictEqual(stream.aborted, true); })); + stream.on('end', common.mustNotCall()); + stream.on('close', common.mustCall(() => { - debug('Bidirectional, Client-initiated stream %d closed on client', - stream.id); countdown.dec(); })); From f72a03a389377340dced6709f54f3f7f95314292 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 12 Feb 2020 14:12:23 -0800 Subject: [PATCH 7/7] quic: call the _destroy callback in a nextTick --- lib/internal/quic/core.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index a7417fcce9..982935e094 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -2773,7 +2773,8 @@ class QuicStream extends Duplex { this.#stats = new BigInt64Array(handle.stats); handle.destroy(); } - callback(error); + // The destroy callback must be invoked in a nextTick + process.nextTick(() => callback(error)); } _onTimeout() {