Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: send GOAWAY properly & don't continue reading unnecessarily #20772

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/api/http2.md
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,10 @@ The `'trailers'` event is emitted when a block of headers associated with
trailing header fields is received. The listener callback is passed the
[HTTP/2 Headers Object][] and flags associated with the headers.

Note that this event might not be emitted if `http2stream.end()` is called
before trailers are received and the incoming data is not being read or
listened for.

```js
stream.on('trailers', (headers, flags) => {
console.log(headers);
Expand Down
110 changes: 66 additions & 44 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,20 +341,25 @@ function onStreamClose(code) {

stream[kState].fd = -1;
// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
stream[kMaybeDestroy](null, code);
stream[kMaybeDestroy](code);
} else {
// Wait for end to destroy.
stream.on('end', stream[kMaybeDestroy]);
// Push a null so the stream can end whenever the client consumes
// it completely.
stream.push(null);
// If the client hasn't tried to consume the stream and there is no
// resume scheduled (which would indicate they would consume in the future),
// then just dump the incoming data so that the stream can be destroyed.
if (!stream[kState].didRead && !stream._readableState.resumeScheduled)

// If the user hasn't tried to consume the stream (and this is a server
// session) then just dump the incoming data so that the stream can
// be destroyed.
if (stream[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!stream[kState].didRead &&
stream.readableFlowing === null)
stream.resume();
else
stream.read(0);
}
}

Expand All @@ -379,7 +384,7 @@ function onStreamRead(nread, buf) {
`${sessionName(stream[kSession][kType])}]: ending readable.`);

// defer this until we actually emit end
if (stream._readableState.endEmitted) {
if (!stream.readable) {
stream[kMaybeDestroy]();
} else {
stream.on('end', stream[kMaybeDestroy]);
Expand Down Expand Up @@ -469,8 +474,7 @@ function onGoawayData(code, lastStreamID, buf) {
// goaway using NGHTTP2_NO_ERROR because there was no error
// condition on this side of the session that caused the
// shutdown.
session.destroy(new ERR_HTTP2_SESSION_ERROR(code),
{ errorCode: NGHTTP2_NO_ERROR });
session.destroy(new ERR_HTTP2_SESSION_ERROR(code), NGHTTP2_NO_ERROR);
}
}

Expand Down Expand Up @@ -813,6 +817,21 @@ function emitClose(self, error) {
self.emit('close');
}

function finishSessionDestroy(session, error) {
const socket = session[kSocket];
if (!socket.destroyed)
socket.destroy(error);

session[kProxySocket] = undefined;
session[kSocket] = undefined;
session[kHandle] = undefined;
socket[kSession] = undefined;
socket[kServer] = undefined;

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, session, error);
}

// Upon creation, the Http2Session takes ownership of the socket. The session
// may not be ready to use immediately if the socket is not yet fully connected.
// In that case, the Http2Session will wait for the socket to connect. Once
Expand Down Expand Up @@ -869,6 +888,8 @@ class Http2Session extends EventEmitter {

this[kState] = {
flags: SESSION_FLAGS_PENDING,
goawayCode: null,
goawayLastStreamID: null,
streams: new Map(),
pendingStreams: new Set(),
pendingAck: 0,
Expand Down Expand Up @@ -1171,25 +1192,13 @@ class Http2Session extends EventEmitter {
if (handle !== undefined)
handle.destroy(code, socket.destroyed);

// If there is no error, use setImmediate to destroy the socket on the
// If the socket is alive, use setImmediate to destroy the session on the
// next iteration of the event loop in order to give data time to transmit.
// Otherwise, destroy immediately.
if (!socket.destroyed) {
if (!error) {
setImmediate(socket.destroy.bind(socket));
} else {
socket.destroy(error);
}
}

this[kProxySocket] = undefined;
this[kSocket] = undefined;
this[kHandle] = undefined;
socket[kSession] = undefined;
socket[kServer] = undefined;

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, this, error);
if (!socket.destroyed)
setImmediate(finishSessionDestroy, this, error);
else
finishSessionDestroy(this, error);
}

// Closing the session will:
Expand Down Expand Up @@ -1441,11 +1450,8 @@ function afterDoStreamWrite(status, handle) {
}

function streamOnResume() {
if (!this.destroyed && !this.pending) {
if (!this[kState].didRead)
this[kState].didRead = true;
if (!this.destroyed)
this[kHandle].readStart();
}
}

function streamOnPause() {
Expand All @@ -1460,6 +1466,16 @@ function afterShutdown() {
stream[kMaybeDestroy]();
}

function finishSendTrailers(stream, headersList) {
stream[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;

const ret = stream[kHandle].trailers(headersList);
if (ret < 0)
stream.destroy(new NghttpError(ret));
else
stream[kMaybeDestroy]();
}

function closeStream(stream, code, shouldSubmitRstStream = true) {
const state = stream[kState];
state.flags |= STREAM_FLAGS_CLOSED;
Expand Down Expand Up @@ -1521,6 +1537,10 @@ class Http2Stream extends Duplex {
this[kSession] = session;
session[kState].pendingStreams.add(this);

// Allow our logic for determining whether any reads have happened to
// work in all situations. This is similar to what we do in _http_incoming.
this._readableState.readingMore = true;

this[kTimeout] = null;

this[kState] = {
Expand All @@ -1531,7 +1551,6 @@ class Http2Stream extends Duplex {
trailersReady: false
};

this.on('resume', streamOnResume);
this.on('pause', streamOnPause);
}

Expand Down Expand Up @@ -1725,6 +1744,10 @@ class Http2Stream extends Duplex {
this.push(null);
return;
}
if (!this[kState].didRead) {
this._readableState.readingMore = false;
this[kState].didRead = true;
}
if (!this.pending) {
streamOnResume.call(this);
} else {
Expand Down Expand Up @@ -1773,13 +1796,8 @@ class Http2Stream extends Duplex {
throw headersList;
this[kSentTrailers] = headers;

this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;

const ret = this[kHandle].trailers(headersList);
if (ret < 0)
this.destroy(new NghttpError(ret));
else
this[kMaybeDestroy]();
// Send the trailers in setImmediate so we don't do it on nghttp2 stack.
setImmediate(finishSendTrailers, this, headersList);
}

get closed() {
Expand Down Expand Up @@ -1866,15 +1884,15 @@ class Http2Stream extends Duplex {
}
// The Http2Stream can be destroyed if it has closed and if the readable
// side has received the final chunk.
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
if (error || code !== NGHTTP2_NO_ERROR) {
this.destroy(error);
[kMaybeDestroy](code = NGHTTP2_NO_ERROR) {
if (code !== NGHTTP2_NO_ERROR) {
this.destroy();
return;
}

// TODO(mcollina): remove usage of _*State properties
if (this._writableState.ended && this._writableState.pendingcb === 0) {
if (this._readableState.ended && this.closed) {
if (!this.writable) {
if (!this.readable && this.closed) {
this.destroy();
return;
}
Expand All @@ -1887,7 +1905,7 @@ class Http2Stream extends Duplex {
this[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!(state.flags & STREAM_FLAGS_HAS_TRAILERS) &&
!state.didRead &&
!this._readableState.resumeScheduled) {
this.readableFlowing === null) {
this.close();
}
}
Expand Down Expand Up @@ -2477,6 +2495,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout);
function socketOnError(error) {
const session = this[kSession];
if (session !== undefined) {
// We can ignore ECONNRESET after GOAWAY was received as there's nothing
// we can do and the other side is fully within its rights to do so.
if (error.code === 'ECONNRESET' && session[kState].goawayCode !== null)
return session.destroy();
debug(`Http2Session ${sessionName(session[kType])}: socket error [` +
`${error.message}]`);
session.destroy(error);
Expand Down
50 changes: 33 additions & 17 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -577,26 +577,28 @@ void Http2Session::EmitStatistics() {
void Http2Session::Close(uint32_t code, bool socket_closed) {
DEBUG_HTTP2SESSION(this, "closing session");

if (flags_ & SESSION_STATE_CLOSED)
if (flags_ & SESSION_STATE_CLOSING)
return;
flags_ |= SESSION_STATE_CLOSED;
flags_ |= SESSION_STATE_CLOSING;

// Stop reading on the i/o stream
if (stream_ != nullptr)
stream_->ReadStop();

// If the socket is not closed, then attempt to send a closing GOAWAY
// frame. There is no guarantee that this GOAWAY will be received by
// the peer but the HTTP/2 spec recommends sendinng it anyway. We'll
// the peer but the HTTP/2 spec recommends sending it anyway. We'll
// make a best effort.
if (!socket_closed) {
Http2Scope h2scope(this);
DEBUG_HTTP2SESSION2(this, "terminating session with code %d", code);
CHECK_EQ(nghttp2_session_terminate_session(session_, code), 0);
SendPendingData();
} else if (stream_ != nullptr) {
stream_->RemoveStreamListener(this);
}

flags_ |= SESSION_STATE_CLOSED;

// If there are outstanding pings, those will need to be canceled, do
// so on the next iteration of the event loop to avoid calling out into
// javascript since this may be called during garbage collection.
Expand Down Expand Up @@ -1355,25 +1357,32 @@ void Http2Session::MaybeScheduleWrite() {
}
}

void Http2Session::MaybeStopReading() {
int want_read = nghttp2_session_want_read(session_);
DEBUG_HTTP2SESSION2(this, "wants read? %d", want_read);
if (want_read == 0)
stream_->ReadStop();
}

// Unset the sending state, finish up all current writes, and reset
// storage for data and metadata that was associated with these writes.
void Http2Session::ClearOutgoing(int status) {
CHECK_NE(flags_ & SESSION_STATE_SENDING, 0);

flags_ &= ~SESSION_STATE_SENDING;

if (outgoing_buffers_.size() > 0) {
outgoing_storage_.clear();

for (const nghttp2_stream_write& wr : outgoing_buffers_) {
std::vector<nghttp2_stream_write> current_outgoing_buffers_;
current_outgoing_buffers_.swap(outgoing_buffers_);
for (const nghttp2_stream_write& wr : current_outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
if (wrap != nullptr)
wrap->Done(status);
}

outgoing_buffers_.clear();
}

flags_ &= ~SESSION_STATE_SENDING;

// Now that we've finished sending queued data, if there are any pending
// RstStreams we should try sending again and then flush them one by one.
if (pending_rst_streams_.size() > 0) {
Expand Down Expand Up @@ -1484,8 +1493,7 @@ uint8_t Http2Session::SendPendingData() {
ClearOutgoing(res.err);
}

DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
nghttp2_session_want_read(session_));
MaybeStopReading();

return 0;
}
Expand Down Expand Up @@ -1618,8 +1626,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
};
MakeCallback(env()->error_string(), arraysize(argv), argv);
} else {
DEBUG_HTTP2SESSION2(this, "processed %d bytes. wants more? %d", ret,
nghttp2_session_want_read(session_));
MaybeStopReading();
}
}

Expand Down Expand Up @@ -1814,6 +1821,7 @@ void Http2Stream::OnTrailers() {
HandleScope scope(isolate);
Local<Context> context = env()->context();
Context::Scope context_scope(context);
flags_ &= ~NGHTTP2_STREAM_FLAG_TRAILERS;
MakeCallback(env()->ontrailers_string(), 0, nullptr);
}

Expand All @@ -1822,7 +1830,16 @@ int Http2Stream::SubmitTrailers(nghttp2_nv* nva, size_t len) {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM2(this, "sending %d trailers", len);
int ret = nghttp2_submit_trailer(**session_, id_, nva, len);
int ret;
// Sending an empty trailers frame poses problems in Safari, Edge & IE.
// Instead we can just send an empty data frame with NGHTTP2_FLAG_END_STREAM
// to indicate that the stream is ready to be closed.
if (len == 0) {
Http2Stream::Provider::Stream prov(this, 0);
ret = nghttp2_submit_data(**session_, NGHTTP2_FLAG_END_STREAM, id_, *prov);
} else {
ret = nghttp2_submit_trailer(**session_, id_, nva, len);
}
CHECK_NE(ret, NGHTTP2_ERR_NOMEM);
return ret;
}
Expand Down Expand Up @@ -2351,8 +2368,7 @@ void Http2Stream::Info(const FunctionCallbackInfo<Value>& args) {

Headers list(isolate, context, headers);
args.GetReturnValue().Set(stream->SubmitInfo(*list, list.length()));
DEBUG_HTTP2STREAM2(stream, "%d informational headers sent",
headers->Length());
DEBUG_HTTP2STREAM2(stream, "%d informational headers sent", list.length());
}

// Submits trailing headers on the Http2Stream
Expand All @@ -2367,7 +2383,7 @@ void Http2Stream::Trailers(const FunctionCallbackInfo<Value>& args) {

Headers list(isolate, context, headers);
args.GetReturnValue().Set(stream->SubmitTrailers(*list, list.length()));
DEBUG_HTTP2STREAM2(stream, "%d trailing headers sent", headers->Length());
DEBUG_HTTP2STREAM2(stream, "%d trailing headers sent", list.length());
}

// Grab the numeric id of the Http2Stream
Expand Down
Loading