Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

quic: fix client migration and stream reset #341

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ class QuicSocket extends EventEmitter {

[kRemoveSession](session) {
this.#sessions.delete(session);
if (this.closing)
this[kMaybeDestroy]();
}

// Bind the UDP socket on demand, only if it hasn't already been bound.
Expand Down Expand Up @@ -2406,7 +2408,8 @@ class QuicStream extends Duplex {
...options,
allowHalfOpen: true,
decodeStrings: true,
emitClose: true
emitClose: true,
autoDestroy: false,
});
this.#session = session;
this.#push_id = push_id;
Expand Down Expand Up @@ -2770,7 +2773,8 @@ class QuicStream extends Duplex {
this.#stats = new BigInt64Array(handle.stats);
handle.destroy();
}
callback(error);
// The destroy callback must be invoked in a nextTick
process.nextTick(() => callback(error));
}

_onTimeout() {
Expand Down
5 changes: 4 additions & 1 deletion src/quic/node_quic_buffer-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ size_t QuicBuffer::Consume(size_t amount) {
}

size_t QuicBuffer::Cancel(int status) {
return Consume(status, length());
if (canceled_) return 0;
canceled_ = true;
size_t t = Consume(status, length());
return t;
}

void QuicBuffer::Push(uv_buf_t buf, DoneCB done) {
Expand Down
1 change: 1 addition & 0 deletions src/quic/node_quic_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class QuicBuffer : public bob::SourceImpl<ngtcp2_vec>,
QuicBufferChunk* head_ = nullptr; // Current Read Position
QuicBufferChunk* tail_ = nullptr; // Current Write Position

bool canceled_ = false;
bool ended_ = false;
size_t length_ = 0;
size_t remaining_ = 0;
Expand Down
2 changes: 2 additions & 0 deletions src/quic/node_quic_default_application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ void DefaultApplication::UnscheduleStream(int64_t stream_id) {
void DefaultApplication::StreamClose(
int64_t stream_id,
uint64_t app_error_code) {
if (!session()->HasStream(stream_id))
return;
if (app_error_code == 0)
app_error_code = NGTCP2_APP_NOERROR;
UnscheduleStream(stream_id);
Expand Down
9 changes: 6 additions & 3 deletions src/quic/node_quic_http3_application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,12 @@ void Http3Application::ReceiveData(
void Http3Application::DeferredConsume(
int64_t stream_id,
size_t consumed) {
BaseObjectPtr<QuicStream> stream = session()->FindStream(stream_id);
CHECK(stream);
session()->ExtendStreamOffset(stream->id(), consumed);
// Do nothing here for now. nghttp3 uses the on_deferred_consume
// callback to notify when stream data that had previously been
// deferred has been delivered to the application so that the
// stream data offset can be extended. However, we extend the
// data offset from within QuicStream when the data is delivered
// so we don't have to do it here.
}

// Called when a nghttp3 detects that a new block of headers
Expand Down
30 changes: 21 additions & 9 deletions src/quic/node_quic_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ void QuicApplication::MaybeSetFin(const StreamData& stream_data) {
}

void QuicApplication::StreamOpen(int64_t stream_id) {
Debug(session(), "QUIC Stream %" PRId64 " is open.");
Debug(session(), "QUIC Stream %" PRId64 " is open.", stream_id);
}

void QuicApplication::StreamHeaders(
Expand Down Expand Up @@ -1503,6 +1503,8 @@ void QuicSession::AckedStreamDataOffset(
// here is that any CID's associated with the session have to
// be associated with the new QuicSocket.
void QuicSession::AddToSocket(QuicSocket* socket) {
CHECK_NOT_NULL(socket);
Debug(this, "Adding QuicSession to %s", socket->diagnostic_name());
socket->AddSession(scid_, BaseObjectPtr<QuicSession>(this));
switch (crypto_context_->side()) {
case NGTCP2_CRYPTO_SIDE_SERVER: {
Expand Down Expand Up @@ -1978,6 +1980,8 @@ bool QuicSession::ReceiveStreamData(
// If the session is removed and there are no other references held,
// the session object will be destroyed automatically.
void QuicSession::RemoveFromSocket() {
CHECK(socket_);
Debug(this, "Removing QuicSession from %s", socket_->diagnostic_name());
if (is_server()) {
socket_->DisassociateCID(rcid_);
socket_->DisassociateCID(pscid_);
Expand Down Expand Up @@ -2241,19 +2245,29 @@ bool QuicSession::set_session(Local<Value> buffer) {
bool QuicSession::set_socket(QuicSocket* socket, bool nat_rebinding) {
CHECK(!is_server());
CHECK(!is_flag_set(QUICSESSION_FLAG_DESTROYED));
CHECK(!is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING));

if (is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING))
return false;

if (socket == nullptr || socket == socket_.get())
return true;

Debug(this, "Migrating to %s", socket->diagnostic_name());

SendSessionScope send(this);

// Step 1: Add this Session to the given Socket
AddToSocket(socket);
// Ensure that we maintain a reference to keep this from being
// destroyed while we are starting the migration.
BaseObjectPtr<QuicSession> ptr(this);

// Step 2: Remove this Session from the current Socket
// Step 1: Remove the session from the current socket
RemoveFromSocket();

// Step 3: Update the internal references
// Step 2: Add this Session to the given Socket
AddToSocket(socket);

// Step 3: Update the internal references and make sure
// we are listening.
socket_.reset(socket);
socket->ReceiveStart();

Expand Down Expand Up @@ -2371,9 +2385,7 @@ void QuicSession::StreamClose(int64_t stream_id, uint64_t app_error_code) {
stream_id,
app_error_code);

// If the stream does not actually exist, just ignore
if (HasStream(stream_id))
application_->StreamClose(stream_id, app_error_code);
application_->StreamClose(stream_id, app_error_code);
}

// Called by ngtcp2 when a stream has been opened. All we do is log
Expand Down
3 changes: 2 additions & 1 deletion src/quic/node_quic_stream-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,11 @@ void QuicStream::ResetStream(uint64_t app_error_code) {
// streambuf_ will be canceled, and all data pending
// to be acknowledged at the ngtcp2 level will be
// abandoned.
BaseObjectPtr<QuicSession> ptr(session_);
set_flag(QUICSTREAM_FLAG_READ_CLOSED);
session_->ResetStream(stream_id_, app_error_code);
streambuf_.Cancel();
streambuf_.End();
session_->ResetStream(stream_id_, app_error_code);
}

void QuicStream::Schedule(Queue* queue) {
Expand Down
2 changes: 1 addition & 1 deletion src/quic/node_quic_util-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ size_t QuicCID::Hash::operator()(const QuicCID& token) const {
}

bool QuicCID::operator==(const QuicCID& other) const {
return memcmp(cid(), other.cid(), sizeof(ngtcp2_cid)) == 0;
return memcmp(cid()->data, other.cid()->data, cid()->datalen) == 0;
}

bool QuicCID::operator!=(const QuicCID& other) const {
Expand Down
5 changes: 3 additions & 2 deletions test/parallel/test-quic-quicsession-openstream-pending.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const options = { key, cert, ca, alpn: 'meow' };
stream.on('data', (chunk) => data += chunk);
await once(stream, 'end');
assert.strictEqual(data, 'Hello!');
server.close();
client.close();
}));
}));

Expand All @@ -48,6 +46,9 @@ const options = { key, cert, ca, alpn: 'meow' };

await once(stream, 'close');

server.close();
client.close();

await Promise.allSettled([
once(server, 'close'),
once(client, 'close')
Expand Down
31 changes: 11 additions & 20 deletions test/parallel/test-quic-quicstream-close-early.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,19 @@ server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall((servername, alpn, cipher) => {
const uni = session.openStream({ halfOpen: true });

// TODO(@jasnell): There's still a bug in here somewhere. If we
// comment out the following line and close without writing
// anything, the test hangs.
uni.write('hi', common.mustCall());
uni.close(3);
uni.write('hi', common.expectsError());

uni.on('data', common.mustNotCall());
uni.close(3);

uni.on('end', common.mustCall(() => {
debug('Undirectional, Server-initiated stream %d ended on server',
uni.id);
uni.on('error', common.mustCall(() => {
assert.strictEqual(uni.aborted, true);
}));

uni.on('data', common.mustNotCall());
uni.on('close', common.mustCall(() => {
debug('Unidirectional, Server-initiated stream %d closed on server',
uni.id);
}));
uni.on('error', common.mustCall(() => {
assert.strictEqual(uni.aborted, true);
}));

debug('Unidirectional, Server-initiated stream %d opened', uni.id);
}));
Expand All @@ -86,21 +80,18 @@ server.on('ready', common.mustCall(() => {

const stream = req.openStream();

// TODO(@jasnell): The close happens synchronously, before any
// data for the stream is actually flushed out to the connected
// peer.
stream.write('hello', common.mustCall());
stream.close(1);
stream.write('hello', common.expectsError());
stream.write('there', common.expectsError());

stream.on('end', common.mustNotCall());
stream.close(1);

stream.on('error', common.mustCall(() => {
assert.strictEqual(stream.aborted, true);
}));

stream.on('end', common.mustNotCall());

stream.on('close', common.mustCall(() => {
debug('Bidirectional, Client-initiated stream %d closed on client',
stream.id);
countdown.dec();
}));

Expand Down
60 changes: 29 additions & 31 deletions test/parallel/test-quic-simple-client-migrate.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,79 +15,77 @@ const {
} = require('../common/quic');

const { createSocket } = require('quic');
const { pipeline } = require('stream');

let req;
let client;
let client2;
const server = createSocket();
const kServerName = 'agent1';
const kALPN = 'zzz';

const options = { key, cert, ca, alpn: 'zzz' };
const countdown = new Countdown(2, () => {
debug('Countdown expired. Destroying sockets');
req.close();
server.close();
client2.close();
});

server.listen({ key, cert, ca, alpn: kALPN });
server.listen(options);

server.on('session', common.mustCall((session) => {
debug('QuicServerSession Created');

session.on('stream', common.mustCall((stream) => {
debug('Bidirectional, Client-initiated stream %d received', stream.id);
stream.pipe(stream);
pipeline(stream, stream, common.mustCall());

const uni = session.openStream({ halfOpen: true });
uni.end('Hello from the server');
session.openStream({ halfOpen: true }).end('Hello from the server');
}));

}));

server.on('ready', common.mustCall(() => {
debug('Server is listening on port %d', server.endpoints[0].address.port);
const options = { key, cert, ca, alpn: kALPN };

client = createSocket({ client: options });
client2 = createSocket({ client: options });

const req = client.connect({
address: 'localhost',
req = client.connect({
address: common.localhostIPv4,
port: server.endpoints[0].address.port,
servername: kServerName,
});

client.on('close', () => debug('Client closing'));
client.on('close', common.mustCall());

req.on('secure', common.mustCall((servername, alpn, cipher) => {
req.on('secure', common.mustCall(() => {
debug('QuicClientSession TLS Handshake Complete');

let data = '';

const stream = req.openStream();
debug('Bidirectional, Client-initiated stream %d opened', stream.id);
stream.setEncoding('utf8');
stream.on('data', (chunk) => data += chunk);
stream.on('end', common.mustCall(() => {
assert.strictEqual(data, 'Hello from the client');
debug('Client received expected data for stream %d', stream.id);
}));
stream.on('close', common.mustCall(() => {
debug('Bidirectional, Client-initiated stream %d closed', stream.id);
countdown.dec();
}));
// Send some data on one connection...
stream.write('Hello ');

// Wait just a bit, then migrate to a different
// QuicSocket and continue sending.
setTimeout(() => {
setTimeout(common.mustCall(() => {
req.setSocket(client2, (err) => {
assert(!err);
debug('Client 1 port is %d', client.endpoints[0].address.port);
debug('Client 2 port is %d', client2.endpoints[0].address.port);
client.close();

stream.end('from the client');
let data = '';
stream.resume();
stream.setEncoding('utf8');
stream.on('data', (chunk) => data += chunk);
stream.on('end', common.mustCall(() => {
assert.strictEqual(data, 'Hello from the client');
debug('Client received expected data for stream %d', stream.id);
}));
stream.on('close', common.mustCall(() => {
debug('Bidirectional, Client-initiated stream %d closed', stream.id);
countdown.dec();
}));
debug('Bidirectional, Client-initiated stream %d opened', stream.id);
});
}, common.platformTimeout(100));
}), common.platformTimeout(100));
}));

req.on('stream', common.mustCall((stream) => {
Expand All @@ -107,4 +105,4 @@ server.on('ready', common.mustCall(() => {
}));

server.on('listening', common.mustCall());
server.on('close', () => debug('Server closing'));
server.on('close', common.mustCall());