Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
quic: fix client migration
Browse files Browse the repository at this point in the history
PR-URL: #341
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
jasnell committed Feb 12, 2020
1 parent e5d2d69 commit 29740b2
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 37 deletions.
2 changes: 2 additions & 0 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ class QuicSocket extends EventEmitter {

[kRemoveSession](session) {
this.#sessions.delete(session);
if (this.closing)
this[kMaybeDestroy]();
}

// Bind the UDP socket on demand, only if it hasn't already been bound.
Expand Down
24 changes: 19 additions & 5 deletions src/quic/node_quic_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,8 @@ void QuicSession::AckedStreamDataOffset(
// here is that any CID's associated with the session have to
// be associated with the new QuicSocket.
void QuicSession::AddToSocket(QuicSocket* socket) {
CHECK_NOT_NULL(socket);
Debug(this, "Adding QuicSession to %s", socket->diagnostic_name());
socket->AddSession(scid_, BaseObjectPtr<QuicSession>(this));
switch (crypto_context_->side()) {
case NGTCP2_CRYPTO_SIDE_SERVER: {
Expand Down Expand Up @@ -1978,6 +1980,8 @@ bool QuicSession::ReceiveStreamData(
// If the session is removed and there are no other references held,
// the session object will be destroyed automatically.
void QuicSession::RemoveFromSocket() {
CHECK(socket_);
Debug(this, "Removing QuicSession from %s", socket_->diagnostic_name());
if (is_server()) {
socket_->DisassociateCID(rcid_);
socket_->DisassociateCID(pscid_);
Expand Down Expand Up @@ -2241,19 +2245,29 @@ bool QuicSession::set_session(Local<Value> buffer) {
bool QuicSession::set_socket(QuicSocket* socket, bool nat_rebinding) {
CHECK(!is_server());
CHECK(!is_flag_set(QUICSESSION_FLAG_DESTROYED));
CHECK(!is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING));

if (is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING))
return false;

if (socket == nullptr || socket == socket_.get())
return true;

Debug(this, "Migrating to %s", socket->diagnostic_name());

SendSessionScope send(this);

// Step 1: Add this Session to the given Socket
AddToSocket(socket);
// Ensure that we maintain a reference to keep this from being
// destroyed while we are starting the migration.
BaseObjectPtr<QuicSession> ptr(this);

// Step 2: Remove this Session from the current Socket
// Step 1: Remove the session from the current socket
RemoveFromSocket();

// Step 3: Update the internal references
// Step 2: Add this Session to the given Socket
AddToSocket(socket);

// Step 3: Update the internal references and make sure
// we are listening.
socket_.reset(socket);
socket->ReceiveStart();

Expand Down
2 changes: 1 addition & 1 deletion src/quic/node_quic_util-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ size_t QuicCID::Hash::operator()(const QuicCID& token) const {
}

bool QuicCID::operator==(const QuicCID& other) const {
return memcmp(cid(), other.cid(), sizeof(ngtcp2_cid)) == 0;
return memcmp(cid()->data, other.cid()->data, cid()->datalen) == 0;
}

bool QuicCID::operator!=(const QuicCID& other) const {
Expand Down
60 changes: 29 additions & 31 deletions test/parallel/test-quic-simple-client-migrate.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,79 +15,77 @@ const {
} = require('../common/quic');

const { createSocket } = require('quic');
const { pipeline } = require('stream');

let req;
let client;
let client2;
const server = createSocket();
const kServerName = 'agent1';
const kALPN = 'zzz';

const options = { key, cert, ca, alpn: 'zzz' };
const countdown = new Countdown(2, () => {
debug('Countdown expired. Destroying sockets');
req.close();
server.close();
client2.close();
});

server.listen({ key, cert, ca, alpn: kALPN });
server.listen(options);

server.on('session', common.mustCall((session) => {
debug('QuicServerSession Created');

session.on('stream', common.mustCall((stream) => {
debug('Bidirectional, Client-initiated stream %d received', stream.id);
stream.pipe(stream);
pipeline(stream, stream, common.mustCall());

const uni = session.openStream({ halfOpen: true });
uni.end('Hello from the server');
session.openStream({ halfOpen: true }).end('Hello from the server');
}));

}));

server.on('ready', common.mustCall(() => {
debug('Server is listening on port %d', server.endpoints[0].address.port);
const options = { key, cert, ca, alpn: kALPN };

client = createSocket({ client: options });
client2 = createSocket({ client: options });

const req = client.connect({
address: 'localhost',
req = client.connect({
address: common.localhostIPv4,
port: server.endpoints[0].address.port,
servername: kServerName,
});

client.on('close', () => debug('Client closing'));
client.on('close', common.mustCall());

req.on('secure', common.mustCall((servername, alpn, cipher) => {
req.on('secure', common.mustCall(() => {
debug('QuicClientSession TLS Handshake Complete');

let data = '';

const stream = req.openStream();
debug('Bidirectional, Client-initiated stream %d opened', stream.id);
stream.setEncoding('utf8');
stream.on('data', (chunk) => data += chunk);
stream.on('end', common.mustCall(() => {
assert.strictEqual(data, 'Hello from the client');
debug('Client received expected data for stream %d', stream.id);
}));
stream.on('close', common.mustCall(() => {
debug('Bidirectional, Client-initiated stream %d closed', stream.id);
countdown.dec();
}));
// Send some data on one connection...
stream.write('Hello ');

// Wait just a bit, then migrate to a different
// QuicSocket and continue sending.
setTimeout(() => {
setTimeout(common.mustCall(() => {
req.setSocket(client2, (err) => {
assert(!err);
debug('Client 1 port is %d', client.endpoints[0].address.port);
debug('Client 2 port is %d', client2.endpoints[0].address.port);
client.close();

stream.end('from the client');
let data = '';
stream.resume();
stream.setEncoding('utf8');
stream.on('data', (chunk) => data += chunk);
stream.on('end', common.mustCall(() => {
assert.strictEqual(data, 'Hello from the client');
debug('Client received expected data for stream %d', stream.id);
}));
stream.on('close', common.mustCall(() => {
debug('Bidirectional, Client-initiated stream %d closed', stream.id);
countdown.dec();
}));
debug('Bidirectional, Client-initiated stream %d opened', stream.id);
});
}, common.platformTimeout(100));
}), common.platformTimeout(100));
}));

req.on('stream', common.mustCall((stream) => {
Expand All @@ -107,4 +105,4 @@ server.on('ready', common.mustCall(() => {
}));

server.on('listening', common.mustCall());
server.on('close', () => debug('Server closing'));
server.on('close', common.mustCall());

0 comments on commit 29740b2

Please sign in to comment.