Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: fix responses to long payload reqs #20084

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 98 additions & 83 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ const STREAM_FLAGS_CLOSED = 0x2;
const STREAM_FLAGS_HEADERS_SENT = 0x4;
const STREAM_FLAGS_HEAD_REQUEST = 0x8;
const STREAM_FLAGS_ABORTED = 0x10;
const STREAM_FLAGS_HAS_TRAILERS = 0x20;

const SESSION_FLAGS_PENDING = 0x0;
const SESSION_FLAGS_READY = 0x1;
Expand Down Expand Up @@ -330,26 +331,13 @@ function onStreamClose(code) {
if (stream.destroyed)
return;

const state = stream[kState];

debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: closed with code ${code}`);

if (!stream.closed) {
// Clear timeout and remove timeout listeners
stream.setTimeout(0);
stream.removeAllListeners('timeout');
if (!stream.closed)
closeStream(stream, code, false);

// Set the state flags
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;

// Close the writable side of the stream
abort(stream);
stream.end();
}

state.fd = -1;
stream[kState].fd = -1;
// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
Expand Down Expand Up @@ -504,7 +492,7 @@ function requestOnConnect(headers, options) {

// At this point, the stream should have already been destroyed during
// the session.destroy() method. Do nothing else.
if (session.destroyed)
if (session === undefined || session.destroyed)
return;

// If the session was closed while waiting for the connect, destroy
Expand Down Expand Up @@ -1412,6 +1400,9 @@ class ClientHttp2Session extends Http2Session {
if (options.endStream)
stream.end();

if (options.waitForTrailers)
stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;

const onConnect = requestOnConnect.bind(stream, headersList, options);
if (this.connecting) {
this.on('connect', onConnect);
Expand Down Expand Up @@ -1445,32 +1436,70 @@ function afterDoStreamWrite(status, handle) {
}

function streamOnResume() {
if (!this.destroyed && !this.pending)
if (!this.destroyed && !this.pending) {
if (!this[kState].didRead)
this[kState].didRead = true;
this[kHandle].readStart();
}
}

function streamOnPause() {
if (!this.destroyed && !this.pending)
this[kHandle].readStop();
}

// If the writable side of the Http2Stream is still open, emit the
// 'aborted' event and set the aborted flag.
function abort(stream) {
if (!stream.aborted &&
!(stream._writableState.ended || stream._writableState.ending)) {
stream[kState].flags |= STREAM_FLAGS_ABORTED;
stream.emit('aborted');
}
}

function afterShutdown() {
this.callback();
const stream = this.handle[kOwner];
if (stream)
stream[kMaybeDestroy]();
}

function closeStream(stream, code, shouldSubmitRstStream = true) {
const state = stream[kState];
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;

// Clear timeout and remove timeout listeners
stream.setTimeout(0);
stream.removeAllListeners('timeout');

const { ending, finished } = stream._writableState;

if (!ending) {
// If the writable side of the Http2Stream is still open, emit the
// 'aborted' event and set the aborted flag.
if (!stream.aborted) {
state.flags |= STREAM_FLAGS_ABORTED;
stream.emit('aborted');
}

// Close the writable side.
stream.end();
}

if (shouldSubmitRstStream) {
const finishFn = finishCloseStream.bind(stream, code);
if (!ending || finished || code !== NGHTTP2_NO_ERROR)
finishFn();
else
stream.once('finish', finishFn);
}
}

function finishCloseStream(code) {
const rstStreamFn = submitRstStream.bind(this, code);
// If the handle has not yet been assigned, queue up the request to
// ensure that the RST_STREAM frame is sent after the stream ID has
// been determined.
if (this.pending) {
this.push(null);
this.once('ready', rstStreamFn);
return;
}
rstStreamFn();
}

// An Http2Stream is a Duplex stream that is backed by a
// node::http2::Http2Stream handle implementing StreamBase.
class Http2Stream extends Duplex {
Expand All @@ -1490,6 +1519,7 @@ class Http2Stream extends Duplex {
this[kTimeout] = null;

this[kState] = {
didRead: false,
flags: STREAM_FLAGS_PENDING,
rstCode: NGHTTP2_NO_ERROR,
writeQueueSize: 0,
Expand Down Expand Up @@ -1756,6 +1786,8 @@ class Http2Stream extends Duplex {
throw headersList;
this[kSentTrailers] = headers;

this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;

const ret = this[kHandle].trailers(headersList);
if (ret < 0)
this.destroy(new NghttpError(ret));
Expand Down Expand Up @@ -1786,38 +1818,13 @@ class Http2Stream extends Duplex {
if (callback !== undefined && typeof callback !== 'function')
throw new ERR_INVALID_CALLBACK();

// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');

// Close the writable
abort(this);
this.end();

if (this.closed)
return;

const state = this[kState];
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;

if (callback !== undefined) {
if (callback !== undefined)
this.once('close', callback);
}

if (this[kHandle] === undefined)
return;

const rstStreamFn = submitRstStream.bind(this, code);
// If the handle has not yet been assigned, queue up the request to
// ensure that the RST_STREAM frame is sent after the stream ID has
// been determined.
if (this.pending) {
this.push(null);
this.once('ready', rstStreamFn);
return;
}
rstStreamFn();
closeStream(this, code);
}

// Called by this.destroy().
Expand All @@ -1832,26 +1839,19 @@ class Http2Stream extends Duplex {
debug(`Http2Stream ${this[kID] || '<pending>'} [Http2Session ` +
`${sessionName(session[kType])}]: destroying stream`);
const state = this[kState];
const code = state.rstCode =
err != null ?
NGHTTP2_INTERNAL_ERROR :
state.rstCode || NGHTTP2_NO_ERROR;
if (handle !== undefined) {
// If the handle exists, we need to close, then destroy the handle
this.close(code);
if (!this._readableState.ended && !this._readableState.ending)
this.push(null);
const code = err != null ?
NGHTTP2_INTERNAL_ERROR : (state.rstCode || NGHTTP2_NO_ERROR);

const hasHandle = handle !== undefined;

if (!this.closed)
closeStream(this, code, hasHandle);
this.push(null);

if (hasHandle) {
handle.destroy();
session[kState].streams.delete(id);
} else {
// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');

state.flags |= STREAM_FLAGS_CLOSED;
abort(this);
this.end();
this.push(null);
session[kState].pendingStreams.delete(this);
}

Expand Down Expand Up @@ -1884,13 +1884,23 @@ class Http2Stream extends Duplex {
}

// TODO(mcollina): remove usage of _*State properties
if (this._readableState.ended &&
this._writableState.ended &&
this._writableState.pendingcb === 0 &&
this.closed) {
this.destroy();
// This should return, but eslint complains.
// return
if (this._writableState.ended && this._writableState.pendingcb === 0) {
if (this._readableState.ended && this.closed) {
this.destroy();
return;
}

// We've submitted a response from our server session, have not attempted
// to process any incoming data, and have no trailers. This means we can
// attempt to gracefully close the session.
const state = this[kState];
if (this.headersSent &&
this[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!(state.flags & STREAM_FLAGS_HAS_TRAILERS) &&
!state.didRead &&
!this._readableState.resumeScheduled) {
this.close();
}
}
}
}
Expand Down Expand Up @@ -2095,7 +2105,6 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
}
if (this.destroyed || this.closed) {
tryClose(fd);
abort(this);
return;
}
state.fd = fd;
Expand Down Expand Up @@ -2224,8 +2233,10 @@ class ServerHttp2Stream extends Http2Stream {
if (options.endStream)
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;

if (options.waitForTrailers)
if (options.waitForTrailers) {
streamOptions |= STREAM_OPTION_GET_TRAILERS;
state.flags |= STREAM_FLAGS_HAS_TRAILERS;
}

headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
Expand Down Expand Up @@ -2285,8 +2296,10 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.waitForTrailers)
if (options.waitForTrailers) {
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
}

if (typeof fd !== 'number')
throw new ERR_INVALID_ARG_TYPE('fd', 'number', fd);
Expand Down Expand Up @@ -2346,8 +2359,10 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.waitForTrailers)
if (options.waitForTrailers) {
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
}

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
Expand Down
Loading