From 29740b21747f4c5d917603721513efac23631816 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 11 Feb 2020 14:14:57 -0800 Subject: [PATCH] quic: fix client migration PR-URL: https://github.com/nodejs/quic/pull/341 Reviewed-By: Anna Henningsen --- lib/internal/quic/core.js | 2 + src/quic/node_quic_session.cc | 24 ++++++-- src/quic/node_quic_util-inl.h | 2 +- .../test-quic-simple-client-migrate.js | 60 +++++++++---------- 4 files changed, 51 insertions(+), 37 deletions(-) diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index 77189ce097..def6082012 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -1046,6 +1046,8 @@ class QuicSocket extends EventEmitter { [kRemoveSession](session) { this.#sessions.delete(session); + if (this.closing) + this[kMaybeDestroy](); } // Bind the UDP socket on demand, only if it hasn't already been bound. diff --git a/src/quic/node_quic_session.cc b/src/quic/node_quic_session.cc index 171fc43bf5..83e32e6834 100644 --- a/src/quic/node_quic_session.cc +++ b/src/quic/node_quic_session.cc @@ -1503,6 +1503,8 @@ void QuicSession::AckedStreamDataOffset( // here is that any CID's associated with the session have to // be associated with the new QuicSocket. void QuicSession::AddToSocket(QuicSocket* socket) { + CHECK_NOT_NULL(socket); + Debug(this, "Adding QuicSession to %s", socket->diagnostic_name()); socket->AddSession(scid_, BaseObjectPtr(this)); switch (crypto_context_->side()) { case NGTCP2_CRYPTO_SIDE_SERVER: { @@ -1978,6 +1980,8 @@ bool QuicSession::ReceiveStreamData( // If the session is removed and there are no other references held, // the session object will be destroyed automatically. void QuicSession::RemoveFromSocket() { + CHECK(socket_); + Debug(this, "Removing QuicSession from %s", socket_->diagnostic_name()); if (is_server()) { socket_->DisassociateCID(rcid_); socket_->DisassociateCID(pscid_); @@ -2241,19 +2245,29 @@ bool QuicSession::set_session(Local buffer) { bool QuicSession::set_socket(QuicSocket* socket, bool nat_rebinding) { CHECK(!is_server()); CHECK(!is_flag_set(QUICSESSION_FLAG_DESTROYED)); - CHECK(!is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING)); + + if (is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING)) + return false; + if (socket == nullptr || socket == socket_.get()) return true; + Debug(this, "Migrating to %s", socket->diagnostic_name()); + SendSessionScope send(this); - // Step 1: Add this Session to the given Socket - AddToSocket(socket); + // Ensure that we maintain a reference to keep this from being + // destroyed while we are starting the migration. + BaseObjectPtr ptr(this); - // Step 2: Remove this Session from the current Socket + // Step 1: Remove the session from the current socket RemoveFromSocket(); - // Step 3: Update the internal references + // Step 2: Add this Session to the given Socket + AddToSocket(socket); + + // Step 3: Update the internal references and make sure + // we are listening. socket_.reset(socket); socket->ReceiveStart(); diff --git a/src/quic/node_quic_util-inl.h b/src/quic/node_quic_util-inl.h index 36bdddabcc..71b5ceab06 100644 --- a/src/quic/node_quic_util-inl.h +++ b/src/quic/node_quic_util-inl.h @@ -43,7 +43,7 @@ size_t QuicCID::Hash::operator()(const QuicCID& token) const { } bool QuicCID::operator==(const QuicCID& other) const { - return memcmp(cid(), other.cid(), sizeof(ngtcp2_cid)) == 0; + return memcmp(cid()->data, other.cid()->data, cid()->datalen) == 0; } bool QuicCID::operator!=(const QuicCID& other) const { diff --git a/test/parallel/test-quic-simple-client-migrate.js b/test/parallel/test-quic-simple-client-migrate.js index f8c675f53b..b25fd43716 100644 --- a/test/parallel/test-quic-simple-client-migrate.js +++ b/test/parallel/test-quic-simple-client-migrate.js @@ -15,79 +15,77 @@ const { } = require('../common/quic'); const { createSocket } = require('quic'); +const { pipeline } = require('stream'); +let req; let client; let client2; const server = createSocket(); -const kServerName = 'agent1'; -const kALPN = 'zzz'; +const options = { key, cert, ca, alpn: 'zzz' }; const countdown = new Countdown(2, () => { debug('Countdown expired. Destroying sockets'); + req.close(); server.close(); client2.close(); }); -server.listen({ key, cert, ca, alpn: kALPN }); +server.listen(options); + server.on('session', common.mustCall((session) => { debug('QuicServerSession Created'); session.on('stream', common.mustCall((stream) => { debug('Bidirectional, Client-initiated stream %d received', stream.id); - stream.pipe(stream); + pipeline(stream, stream, common.mustCall()); - const uni = session.openStream({ halfOpen: true }); - uni.end('Hello from the server'); + session.openStream({ halfOpen: true }).end('Hello from the server'); })); })); server.on('ready', common.mustCall(() => { debug('Server is listening on port %d', server.endpoints[0].address.port); - const options = { key, cert, ca, alpn: kALPN }; + client = createSocket({ client: options }); client2 = createSocket({ client: options }); - const req = client.connect({ - address: 'localhost', + req = client.connect({ + address: common.localhostIPv4, port: server.endpoints[0].address.port, - servername: kServerName, }); - client.on('close', () => debug('Client closing')); + client.on('close', common.mustCall()); - req.on('secure', common.mustCall((servername, alpn, cipher) => { + req.on('secure', common.mustCall(() => { debug('QuicClientSession TLS Handshake Complete'); + let data = ''; + const stream = req.openStream(); + debug('Bidirectional, Client-initiated stream %d opened', stream.id); + stream.setEncoding('utf8'); + stream.on('data', (chunk) => data += chunk); + stream.on('end', common.mustCall(() => { + assert.strictEqual(data, 'Hello from the client'); + debug('Client received expected data for stream %d', stream.id); + })); + stream.on('close', common.mustCall(() => { + debug('Bidirectional, Client-initiated stream %d closed', stream.id); + countdown.dec(); + })); // Send some data on one connection... stream.write('Hello '); // Wait just a bit, then migrate to a different // QuicSocket and continue sending. - setTimeout(() => { + setTimeout(common.mustCall(() => { req.setSocket(client2, (err) => { assert(!err); - debug('Client 1 port is %d', client.endpoints[0].address.port); - debug('Client 2 port is %d', client2.endpoints[0].address.port); client.close(); - stream.end('from the client'); - let data = ''; - stream.resume(); - stream.setEncoding('utf8'); - stream.on('data', (chunk) => data += chunk); - stream.on('end', common.mustCall(() => { - assert.strictEqual(data, 'Hello from the client'); - debug('Client received expected data for stream %d', stream.id); - })); - stream.on('close', common.mustCall(() => { - debug('Bidirectional, Client-initiated stream %d closed', stream.id); - countdown.dec(); - })); - debug('Bidirectional, Client-initiated stream %d opened', stream.id); }); - }, common.platformTimeout(100)); + }), common.platformTimeout(100)); })); req.on('stream', common.mustCall((stream) => { @@ -107,4 +105,4 @@ server.on('ready', common.mustCall(() => { })); server.on('listening', common.mustCall()); -server.on('close', () => debug('Server closing')); +server.on('close', common.mustCall());