diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 945794c23b4d2b..aade7e83be3511 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -277,7 +277,7 @@ function submitRstStream(code) { // point, close them. If there is an open fd for file send, close that also. // At this point the underlying node::http2:Http2Stream handle is no // longer usable so destroy it also. -function onStreamClose(code, hasData) { +function onStreamClose(code) { const stream = this[kOwner]; if (stream.destroyed) return; @@ -285,8 +285,7 @@ function onStreamClose(code, hasData) { const state = stream[kState]; debug(`Http2Stream ${stream[kID]} [Http2Session ` + - `${sessionName(stream[kSession][kType])}]: closed with code ${code}` + - ` [has data? ${hasData}]`); + `${sessionName(stream[kSession][kType])}]: closed with code ${code}`); if (!stream.closed) { // Unenroll from timeouts @@ -304,13 +303,14 @@ function onStreamClose(code, hasData) { if (state.fd !== undefined) tryClose(state.fd); - stream[kMaybeDestroy](null, code, hasData); + stream.push(null); + stream[kMaybeDestroy](null, code); } // Receives a chunk of data for a given stream and forwards it on // to the Http2Stream Duplex for processing. -function onStreamRead(nread, buf, handle) { - const stream = handle[kOwner]; +function onStreamRead(nread, buf) { + const stream = this[kOwner]; if (nread >= 0 && !stream.destroyed) { debug(`Http2Stream ${stream[kID]} [Http2Session ` + `${sessionName(stream[kSession][kType])}]: receiving data chunk ` + @@ -318,7 +318,7 @@ function onStreamRead(nread, buf, handle) { stream[kUpdateTimer](); if (!stream.push(buf)) { if (!stream.destroyed) // we have to check a second time - handle.readStop(); + this.readStop(); } return; } @@ -1427,13 +1427,8 @@ function streamOnResume() { } function streamOnPause() { - // if (!this.destroyed && !this.pending) - // this[kHandle].readStop(); -} - -function handleFlushData(self) { if (!this.destroyed && !this.pending) - this[kHandle].flushData(); + this[kHandle].readStop(); } // If the writable side of the Http2Stream is still open, emit the @@ -1679,11 +1674,10 @@ class Http2Stream extends Duplex { this.push(null); return; } - const flushfn = handleFlushData.bind(this); if (!this.pending) { - flushfn(); + streamOnResume.call(this); } else { - this.once('ready', flushfn); + this.once('ready', streamOnResume); } } @@ -1822,10 +1816,10 @@ class Http2Stream extends Duplex { // The Http2Stream can be destroyed if it has closed and if the readable // side has received the final chunk. - [kMaybeDestroy](error, code = NGHTTP2_NO_ERROR, hasData = true) { + [kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) { if (error == null) { if (code === NGHTTP2_NO_ERROR && - ((!this._readableState.ended && hasData) || + (!this._readableState.ended || !this._writableState.ended || this._writableState.pendingcb > 0 || !this.closed)) { diff --git a/src/node_http2.cc b/src/node_http2.cc index cf346efba45c2b..cd28f57ffe2d06 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -9,6 +9,7 @@ namespace node { +using v8::ArrayBuffer; using v8::Boolean; using v8::Context; using v8::Float64Array; @@ -978,7 +979,6 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle, // Intentionally ignore the callback if the stream does not exist or has // already been destroyed if (stream != nullptr && !stream->IsDestroyed()) { - stream->AddChunk(nullptr, 0); stream->Close(code); // It is possible for the stream close to occur before the stream is // ever passed on to the javascript side. If that happens, skip straight @@ -989,9 +989,8 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle, stream->object()->Get(context, env->onstreamclose_string()) .ToLocalChecked(); if (fn->IsFunction()) { - Local argv[2] = { - Integer::NewFromUnsigned(isolate, code), - Boolean::New(isolate, stream->HasDataChunks(true)) + Local argv[] = { + Integer::NewFromUnsigned(isolate, code) }; stream->MakeCallback(fn.As(), arraysize(argv), argv); } else { @@ -1028,6 +1027,8 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle, Http2Session* session = static_cast(user_data); DEBUG_HTTP2SESSION2(session, "buffering data chunk for stream %d, size: " "%d, flags: %d", id, len, flags); + Environment* env = session->env(); + HandleScope scope(env->isolate()); // We should never actually get a 0-length chunk so this check is // only a precaution at this point. if (len > 0) { @@ -1039,8 +1040,25 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle, // If the stream has been destroyed, ignore this chunk if (stream->IsDestroyed()) return 0; + stream->statistics_.received_bytes += len; - stream->AddChunk(data, len); + + // There is a single large array buffer for the entire data read from the + // network; create a slice of that array buffer and emit it as the + // received data buffer. + CHECK(!session->stream_buf_ab_.IsEmpty()); + size_t offset = reinterpret_cast(data) - session->stream_buf_; + // Verify that the data offset is inside the current read buffer. + CHECK_LE(offset, session->stream_buf_size_); + + Local buf = + Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked(); + + stream->EmitData(len, buf, Local()); + if (!stream->IsReading()) + stream->inbound_consumed_data_while_paused_ += len; + else + nghttp2_session_consume_stream(handle, id, len); } return 0; } @@ -1226,9 +1244,8 @@ inline void Http2Session::HandlePriorityFrame(const nghttp2_frame* frame) { // Called by OnFrameReceived when a complete DATA frame has been received. -// If we know that this is the last DATA frame (because the END_STREAM flag -// is set), then we'll terminate the readable side of the StreamBase. If -// the StreamBase is flowing, we'll push the chunks of data out to JS land. +// If we know that this was the last DATA frame (because the END_STREAM flag +// is set), then we'll terminate the readable side of the StreamBase. inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) { int32_t id = GetFrameID(frame); DEBUG_HTTP2SESSION2(this, "handling data frame for stream %d", id); @@ -1239,11 +1256,8 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) { return; if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { - stream->AddChunk(nullptr, 0); + stream->EmitData(UV_EOF, Local(), Local()); } - - if (stream->IsReading()) - stream->FlushDataChunks(); } @@ -1618,45 +1632,67 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { Http2Session* session = static_cast(ctx); - buf->base = session->stream_alloc(); - buf->len = kAllocBufferSize; + CHECK_EQ(session->stream_buf_, nullptr); + CHECK_EQ(session->stream_buf_size_, 0); + buf->base = session->stream_buf_ = Malloc(suggested_size); + buf->len = session->stream_buf_size_ = suggested_size; + session->IncrementCurrentSessionMemory(suggested_size); } // Callback used to receive inbound data from the i/o stream void Http2Session::OnStreamReadImpl(ssize_t nread, - const uv_buf_t* bufs, + const uv_buf_t* buf, uv_handle_type pending, void* ctx) { Http2Session* session = static_cast(ctx); Http2Scope h2scope(session); CHECK_NE(session->stream_, nullptr); DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread); - if (nread < 0) { - uv_buf_t tmp_buf; - tmp_buf.base = nullptr; - tmp_buf.len = 0; - session->prev_read_cb_.fn(nread, - &tmp_buf, - pending, - session->prev_read_cb_.ctx); - return; - } - if (bufs->len > 0) { + if (nread <= 0) { + free(session->stream_buf_); + if (nread < 0) { + uv_buf_t tmp_buf = uv_buf_init(nullptr, 0); + session->prev_read_cb_.fn(nread, + &tmp_buf, + pending, + session->prev_read_cb_.ctx); + } + } else { // Only pass data on if nread > 0 - uv_buf_t buf[] { uv_buf_init((*bufs).base, nread) }; + + // Verify that currently: There is memory allocated into which + // the data has been read, and that memory buffer is at least as large + // as the amount of data we have read, but we have not yet made an + // ArrayBuffer out of it. + CHECK_NE(session->stream_buf_, nullptr); + CHECK_EQ(session->stream_buf_, buf->base); + CHECK_EQ(session->stream_buf_size_, buf->len); + CHECK_GE(session->stream_buf_size_, static_cast(nread)); + CHECK(session->stream_buf_ab_.IsEmpty()); + + Environment* env = session->env(); + Isolate* isolate = env->isolate(); + HandleScope scope(isolate); + Local context = env->context(); + Context::Scope context_scope(context); + + // Create an array buffer for the read data. DATA frames will be emitted + // as slices of this array buffer to avoid having to copy memory. + session->stream_buf_ab_ = + ArrayBuffer::New(isolate, + session->stream_buf_, + session->stream_buf_size_, + v8::ArrayBufferCreationMode::kInternalized); + + uv_buf_t buf_ = uv_buf_init(buf->base, nread); session->statistics_.data_received += nread; - ssize_t ret = session->Write(buf, 1); + ssize_t ret = session->Write(&buf_, 1); // Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef // ssize_t to int. Cast here so that the < 0 check actually works on // Windows. if (static_cast(ret) < 0) { DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret); - Environment* env = session->env(); - Isolate* isolate = env->isolate(); - HandleScope scope(isolate); - Local context = env->context(); - Context::Scope context_scope(context); Local argv[1] = { Integer::New(isolate, ret), @@ -1667,6 +1703,13 @@ void Http2Session::OnStreamReadImpl(ssize_t nread, nghttp2_session_want_read(**session)); } } + + // Since we are finished handling this write, reset the stream buffer. + // The memory has either been free()d or was handed over to V8. + session->DecrementCurrentSessionMemory(session->stream_buf_size_); + session->stream_buf_ = nullptr; + session->stream_buf_size_ = 0; + session->stream_buf_ab_ = Local(); } void Http2Session::OnStreamDestructImpl(void* ctx) { @@ -1781,30 +1824,6 @@ void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) { } } -inline bool Http2Stream::HasDataChunks(bool ignore_eos) { - return data_chunks_.size() > (ignore_eos ? 1 : 0); -} - -// Appends a chunk of received DATA frame data to this Http2Streams internal -// queue. Note that we must memcpy each chunk because of the way that nghttp2 -// handles it's internal memory`. -inline void Http2Stream::AddChunk(const uint8_t* data, size_t len) { - CHECK(!this->IsDestroyed()); - if (this->statistics_.first_byte == 0) - this->statistics_.first_byte = uv_hrtime(); - if (flags_ & NGHTTP2_STREAM_FLAG_EOS) - return; - char* buf = nullptr; - if (len > 0 && data != nullptr) { - buf = Malloc(len); - memcpy(buf, data, len); - } else if (data == nullptr) { - flags_ |= NGHTTP2_STREAM_FLAG_EOS; - } - data_chunks_.emplace(uv_buf_init(buf, len)); -} - - inline void Http2Stream::Close(int32_t code) { CHECK(!this->IsDestroyed()); flags_ |= NGHTTP2_STREAM_FLAG_CLOSED; @@ -1841,13 +1860,6 @@ inline void Http2Stream::Destroy() { DEBUG_HTTP2STREAM(this, "destroying stream"); - // Free any remaining incoming data chunks. - while (!data_chunks_.empty()) { - uv_buf_t buf = data_chunks_.front(); - free(buf.base); - data_chunks_.pop(); - } - // Wait until the start of the next loop to delete because there // may still be some pending operations queued for this stream. env()->SetImmediate([](Environment* env, void* data) { @@ -1873,39 +1885,6 @@ inline void Http2Stream::Destroy() { } -// Uses the StreamBase API to push a single chunk of queued inbound DATA -// to JS land. -void Http2Stream::OnDataChunk(uv_buf_t* chunk) { - CHECK(!this->IsDestroyed()); - Isolate* isolate = env()->isolate(); - HandleScope scope(isolate); - ssize_t len = -1; - Local buf; - if (chunk != nullptr) { - len = chunk->len; - buf = Buffer::New(isolate, chunk->base, len).ToLocalChecked(); - } - EmitData(len, buf, this->object()); -} - - -inline void Http2Stream::FlushDataChunks() { - CHECK(!this->IsDestroyed()); - Http2Scope h2scope(this); - if (!data_chunks_.empty()) { - uv_buf_t buf = data_chunks_.front(); - data_chunks_.pop(); - if (buf.len > 0) { - CHECK_EQ(nghttp2_session_consume_stream(session_->session(), - id_, buf.len), 0); - OnDataChunk(&buf); - } else { - OnDataChunk(nullptr); - } - } -} - - // Initiates a response on the Http2Stream using data provided via the // StreamBase Streams API. inline int Http2Stream::SubmitResponse(nghttp2_nv* nva, @@ -2012,13 +1991,20 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva, // Switch the StreamBase into flowing mode to begin pushing chunks of data // out to JS land. inline int Http2Stream::ReadStart() { + Http2Scope h2scope(this); CHECK(!this->IsDestroyed()); flags_ |= NGHTTP2_STREAM_FLAG_READ_START; flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED; - // Flush any queued data chunks immediately out to the JS layer - FlushDataChunks(); DEBUG_HTTP2STREAM(this, "reading starting"); + + // Tell nghttp2 about our consumption of the data that was handed + // off to JS land. + nghttp2_session_consume_stream(session_->session(), + id_, + inbound_consumed_data_while_paused_); + inbound_consumed_data_while_paused_ = 0; + return 0; } diff --git a/src/node_http2.h b/src/node_http2.h index 4ed06c95970039..9027ed7feb7dad 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -550,12 +550,6 @@ class Http2Stream : public AsyncWrap, inline void EmitStatistics(); - inline bool HasDataChunks(bool ignore_eos = false); - - inline void AddChunk(const uint8_t* data, size_t len); - - inline void FlushDataChunks(); - // Process a Data Chunk void OnDataChunk(uv_buf_t* chunk); @@ -740,8 +734,11 @@ class Http2Stream : public AsyncWrap, uint32_t current_headers_length_ = 0; // total number of octets std::vector current_headers_; - // Inbound Data... This is the data received via DATA frames for this stream. - std::queue data_chunks_; + // This keeps track of the amount of data read from the socket while the + // socket was in paused mode. When `ReadStart()` is called (and not before + // then), we tell nghttp2 that we consumed that data to get proper + // backpressure handling. + size_t inbound_consumed_data_while_paused_ = 0; // Outbound Data... This is the data written by the JS layer that is // waiting to be written out to the socket. @@ -1085,8 +1082,9 @@ class Http2Session : public AsyncWrap { // use this to allow timeout tracking during long-lasting writes uint32_t chunks_sent_since_last_write_ = 0; - uv_prepare_t* prep_ = nullptr; - char stream_buf_[kAllocBufferSize]; + char* stream_buf_ = nullptr; + size_t stream_buf_size_ = 0; + v8::Local stream_buf_ab_; size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS; std::queue outstanding_pings_; diff --git a/test/common/README.md b/test/common/README.md index e629c490fc2c12..da38de742e4805 100644 --- a/test/common/README.md +++ b/test/common/README.md @@ -268,6 +268,17 @@ fail. If `fn` is not provided, an empty function will be used. +### mustCallAsync([fn][, exact]) +* `fn` [<Function>] +* `exact` [<Number>] default = 1 +* return [<Function>] + +The same as `mustCall()`, except that it is also checked that the Promise +returned by the function is fulfilled for each invocation of the function. + +The return value of the wrapped function is the return value of the original +function, if necessary wrapped as a promise. + ### mustCallAtLeast([fn][, minimum]) * `fn` [<Function>] default = () => {} * `minimum` [<Number>] default = 1 diff --git a/test/common/index.js b/test/common/index.js index 5d8a18e96a3868..9ae27e232e3ca8 100644 --- a/test/common/index.js +++ b/test/common/index.js @@ -501,6 +501,12 @@ exports.mustCallAtLeast = function(fn, minimum) { return _mustCallInner(fn, minimum, 'minimum'); }; +exports.mustCallAsync = function(fn, exact) { + return exports.mustCall((...args) => { + return Promise.resolve(fn(...args)).then(exports.mustCall((val) => val)); + }, exact); +}; + function _mustCallInner(fn, criteria = 1, field) { if (process._exiting) throw new Error('Cannot use common.mustCall*() in process exit handler'); diff --git a/test/parallel/test-http2-backpressure.js b/test/parallel/test-http2-backpressure.js new file mode 100644 index 00000000000000..9b69dddbfd2e26 --- /dev/null +++ b/test/parallel/test-http2-backpressure.js @@ -0,0 +1,49 @@ +'use strict'; + +// Verifies that a full HTTP2 pipeline handles backpressure. + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); +const makeDuplexPair = require('../common/duplexpair'); + +common.crashOnUnhandledRejection(); + +{ + let req; + const server = http2.createServer(); + server.on('stream', common.mustCallAsync(async (stream, headers) => { + stream.respond({ + 'content-type': 'text/html', + ':status': 200 + }); + req._readableState.highWaterMark = 20; + stream._writableState.highWaterMark = 20; + assert.strictEqual(stream.write('A'.repeat(5)), true); + assert.strictEqual(stream.write('A'.repeat(40)), false); + assert.strictEqual(await event(req, 'data'), 'A'.repeat(5)); + assert.strictEqual(await event(req, 'data'), 'A'.repeat(40)); + await event(stream, 'drain'); + assert.strictEqual(stream.write('A'.repeat(5)), true); + assert.strictEqual(stream.write('A'.repeat(40)), false); + })); + + const { clientSide, serverSide } = makeDuplexPair(); + server.emit('connection', serverSide); + + const client = http2.connect('http://localhost:80', { + createConnection: common.mustCall(() => clientSide) + }); + + req = client.request({ ':path': '/' }); + req.setEncoding('utf8'); + req.end(); +} + +function event(ee, eventName) { + return new Promise((resolve) => { + ee.once(eventName, common.mustCall(resolve)); + }); +} diff --git a/test/parallel/test-http2-misbehaving-flow-control-paused.js b/test/parallel/test-http2-misbehaving-flow-control-paused.js index 0b7299d5ac80a8..d69e0fd802979a 100644 --- a/test/parallel/test-http2-misbehaving-flow-control-paused.js +++ b/test/parallel/test-http2-misbehaving-flow-control-paused.js @@ -56,6 +56,9 @@ let client; const server = h2.createServer({ settings: { initialWindowSize: 36 } }); server.on('stream', (stream) => { + // Set the high water mark to zero, since otherwise we still accept + // reads from the source stream (if we can consume them). + stream._readableState.highWaterMark = 0; stream.pause(); stream.on('error', common.expectsError({ code: 'ERR_HTTP2_STREAM_ERROR',