From 59b91f1447095cae373e6f66765f9877b6b3d830 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 11 Aug 2015 20:02:22 -0700 Subject: [PATCH 1/4] http_parser: consume StreamBase instance Consume StreamBase instance and operate on incoming data directly without allocating Buffer instances. Improves performance. PR-URL: https://github.com/nodejs/node/pull/2355 Reviewed-By: Trevor Norris --- lib/_http_server.js | 53 ++++- src/env-inl.h | 11 ++ src/env.h | 5 + src/node_http_parser.cc | 202 ++++++++++++++++---- test/parallel/test-http-server-unconsume.js | 30 +++ 5 files changed, 264 insertions(+), 37 deletions(-) create mode 100644 test/parallel/test-http-server-unconsume.js diff --git a/lib/_http_server.js b/lib/_http_server.js index 6769f4a1521bd9..3f6d2a702892ed 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -79,6 +79,8 @@ const STATUS_CODES = exports.STATUS_CODES = { 511 : 'Network Authentication Required' // RFC 6585 }; +const kOnExecute = HTTPParser.kOnExecute | 0; + function ServerResponse(req) { OutgoingMessage.call(this); @@ -317,6 +319,18 @@ function connectionListener(socket) { socket.on('end', socketOnEnd); socket.on('data', socketOnData); + // We are consuming socket, so it won't get any actual data + socket.on('resume', onSocketResume); + socket.on('pause', onSocketPause); + + socket.on('drain', socketOnDrain); + + // Override on to unconsume on `data`, `readable` listeners + socket.on = socketOnWrap; + + parser.consume(socket._handle._externalStream); + parser[kOnExecute] = onParserExecute; + // TODO(isaacs): Move all these functions out of here function socketOnError(e) { self.emit('clientError', e, this); @@ -326,6 +340,19 @@ function connectionListener(socket) { assert(!socket._paused); debug('SERVER socketOnData %d', d.length); var ret = parser.execute(d); + + onParserExecuteCommon(ret, d); + } + + function onParserExecute(ret, d) { + debug('SERVER socketOnParserExecute %d', ret); + onParserExecuteCommon(ret, undefined); + + // Kick-off next ticks + setImmediate(function() {}); + } + + function onParserExecuteCommon(ret, d) { if (ret instanceof Error) { debug('parse error'); socket.destroy(ret); @@ -335,9 +362,13 @@ function connectionListener(socket) { var req = parser.incoming; debug('SERVER upgrade or connect', req.method); + if (!d) + d = parser.getCurrentBuffer(); + socket.removeListener('data', socketOnData); socket.removeListener('end', socketOnEnd); socket.removeListener('close', serverSocketCloseListener); + parser.unconsume(socket._handle._externalStream); parser.finish(); freeParser(parser, req, null); parser = null; @@ -400,7 +431,6 @@ function connectionListener(socket) { socket.resume(); } } - socket.on('drain', socketOnDrain); function parserOnIncoming(req, shouldKeepAlive) { incoming.push(req); @@ -480,3 +510,24 @@ function connectionListener(socket) { } } exports._connectionListener = connectionListener; + +function onSocketResume() { + this._handle.readStart(); +} + +function onSocketPause() { + this._handle.readStop(); +} + +function socketOnWrap(ev, fn) { + var res = net.Socket.prototype.on.call(this, ev, fn); + if (!this.parser) { + this.on = net.Socket.prototype.on; + return res; + } + + if (ev === 'data' || ev === 'readable') + this.parser.unconsume(this._handle._externalStream); + + return res; +} diff --git a/src/env-inl.h b/src/env-inl.h index 369bc2f780eb90..cbc8c4ff1f5eaf 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -178,6 +178,7 @@ inline Environment::Environment(v8::Local context, printed_error_(false), trace_sync_io_(false), debugger_agent_(this), + http_parser_buffer_(nullptr), context_(context->GetIsolate(), context) { // We'll be creating new objects so make sure we've entered the context. v8::HandleScope handle_scope(isolate()); @@ -200,6 +201,7 @@ inline Environment::~Environment() { isolate_data()->Put(); delete[] heap_statistics_buffer_; + delete[] http_parser_buffer_; } inline void Environment::CleanupHandles() { @@ -338,6 +340,15 @@ inline void Environment::set_heap_statistics_buffer(uint32_t* pointer) { heap_statistics_buffer_ = pointer; } +inline char* Environment::http_parser_buffer() const { + return http_parser_buffer_; +} + +inline void Environment::set_http_parser_buffer(char* buffer) { + CHECK_EQ(http_parser_buffer_, nullptr); // Should be set only once. + http_parser_buffer_ = buffer; +} + inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) { return ContainerOf(&Environment::cares_timer_handle_, handle); } diff --git a/src/env.h b/src/env.h index 1801ffecd3ccea..3115c5fe538d63 100644 --- a/src/env.h +++ b/src/env.h @@ -427,6 +427,9 @@ class Environment { inline uint32_t* heap_statistics_buffer() const; inline void set_heap_statistics_buffer(uint32_t* pointer); + inline char* http_parser_buffer() const; + inline void set_http_parser_buffer(char* buffer); + inline void ThrowError(const char* errmsg); inline void ThrowTypeError(const char* errmsg); inline void ThrowRangeError(const char* errmsg); @@ -524,6 +527,8 @@ class Environment { uint32_t* heap_statistics_buffer_ = nullptr; + char* http_parser_buffer_; + #define V(PropertyName, TypeName) \ v8::Persistent PropertyName ## _; ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V) diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 6c5d76ecf6cc95..b08bc0f88acb9e 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -6,6 +6,8 @@ #include "base-object-inl.h" #include "env.h" #include "env-inl.h" +#include "stream_base.h" +#include "stream_base-inl.h" #include "util.h" #include "util-inl.h" #include "v8.h" @@ -36,6 +38,7 @@ namespace node { using v8::Array; using v8::Boolean; using v8::Context; +using v8::EscapableHandleScope; using v8::Exception; using v8::Function; using v8::FunctionCallbackInfo; @@ -54,6 +57,7 @@ const uint32_t kOnHeaders = 0; const uint32_t kOnHeadersComplete = 1; const uint32_t kOnBody = 2; const uint32_t kOnMessageComplete = 3; +const uint32_t kOnExecute = 4; #define HTTP_CB(name) \ @@ -295,7 +299,7 @@ class Parser : public BaseObject { HTTP_DATA_CB(on_body) { - HandleScope scope(env()->isolate()); + EscapableHandleScope scope(env()->isolate()); Local obj = object(); Local cb = obj->Get(kOnBody); @@ -303,6 +307,15 @@ class Parser : public BaseObject { if (!cb->IsFunction()) return 0; + // We came from consumed stream + if (current_buffer_.IsEmpty()) { + // Make sure Buffer will be in parent HandleScope + current_buffer_ = scope.Escape(Buffer::Copy( + env()->isolate(), + current_buffer_data_, + current_buffer_len_).ToLocalChecked()); + } + Local argv[3] = { current_buffer_, Integer::NewFromUnsigned(env()->isolate(), at - current_buffer_data_), @@ -374,8 +387,6 @@ class Parser : public BaseObject { // var bytesParsed = parser->execute(buffer); static void Execute(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - Parser* parser = Unwrap(args.Holder()); CHECK(parser->current_buffer_.IsEmpty()); CHECK_EQ(parser->current_buffer_len_, 0); @@ -390,40 +401,11 @@ class Parser : public BaseObject { // amount of overhead. Nothing else will run while http_parser_execute() // runs, therefore this pointer can be set and used for the execution. parser->current_buffer_ = buffer_obj; - parser->current_buffer_len_ = buffer_len; - parser->current_buffer_data_ = buffer_data; - parser->got_exception_ = false; - size_t nparsed = - http_parser_execute(&parser->parser_, &settings, buffer_data, buffer_len); - - parser->Save(); + Local ret = parser->Execute(buffer_data, buffer_len); - // Unassign the 'buffer_' variable - parser->current_buffer_.Clear(); - parser->current_buffer_len_ = 0; - parser->current_buffer_data_ = nullptr; - - // If there was an exception in one of the callbacks - if (parser->got_exception_) - return; - - Local nparsed_obj = Integer::New(env->isolate(), nparsed); - // If there was a parse error in one of the callbacks - // TODO(bnoordhuis) What if there is an error on EOF? - if (!parser->parser_.upgrade && nparsed != buffer_len) { - enum http_errno err = HTTP_PARSER_ERRNO(&parser->parser_); - - Local e = Exception::Error(env->parse_error_string()); - Local obj = e->ToObject(env->isolate()); - obj->Set(env->bytes_parsed_string(), nparsed_obj); - obj->Set(env->code_string(), - OneByteString(env->isolate(), http_errno_name(err))); - - args.GetReturnValue().Set(e); - } else { - args.GetReturnValue().Set(nparsed_obj); - } + if (!ret.IsEmpty()) + args.GetReturnValue().Set(ret); } @@ -478,7 +460,148 @@ class Parser : public BaseObject { } - private: + static void Consume(const FunctionCallbackInfo& args) { + Parser* parser = Unwrap(args.Holder()); + Local stream_obj = args[0].As(); + StreamBase* stream = static_cast(stream_obj->Value()); + CHECK_NE(stream, nullptr); + + stream->Consume(); + + parser->prev_alloc_cb_ = stream->alloc_cb(); + parser->prev_read_cb_ = stream->read_cb(); + + stream->set_alloc_cb({ OnAllocImpl, parser }); + stream->set_read_cb({ OnReadImpl, parser }); + } + + + static void Unconsume(const FunctionCallbackInfo& args) { + Parser* parser = Unwrap(args.Holder()); + + // Already unconsumed + if (parser->prev_alloc_cb_.is_empty()) + return; + + CHECK(args[0]->IsExternal()); + Local stream_obj = args[0].As(); + StreamBase* stream = static_cast(stream_obj->Value()); + CHECK_NE(stream, nullptr); + + stream->set_alloc_cb(parser->prev_alloc_cb_); + stream->set_read_cb(parser->prev_read_cb_); + } + + + static void GetCurrentBuffer(const FunctionCallbackInfo& args) { + Parser* parser = Unwrap(args.Holder()); + + Local ret = Buffer::Copy( + parser->env(), + parser->current_buffer_data_, + parser->current_buffer_len_).ToLocalChecked(); + + args.GetReturnValue().Set(ret); + } + + protected: + static const size_t kAllocBufferSize = 64 * 1024; + + static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { + Parser* parser = static_cast(ctx); + Environment* env = parser->env(); + + if (env->http_parser_buffer() == nullptr) + env->set_http_parser_buffer(new char[kAllocBufferSize]); + + buf->base = env->http_parser_buffer(); + buf->len = kAllocBufferSize; + } + + + static void OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + Parser* parser = static_cast(ctx); + HandleScope scope(parser->env()->isolate()); + + if (nread < 0) { + uv_buf_t tmp_buf; + tmp_buf.base = nullptr; + tmp_buf.len = 0; + parser->prev_read_cb_.fn(nread, + &tmp_buf, + pending, + parser->prev_read_cb_.ctx); + return; + } + + // Ignore, empty reads have special meaning in http parser + if (nread == 0) + return; + + parser->current_buffer_.Clear(); + Local ret = parser->Execute(buf->base, nread); + + // Exception + if (ret.IsEmpty()) + return; + + Local obj = parser->object(); + Local cb = obj->Get(kOnExecute); + + if (!cb->IsFunction()) + return; + + // Hooks for GetCurrentBuffer + parser->current_buffer_len_ = nread; + parser->current_buffer_data_ = buf->base; + + cb.As()->Call(obj, 1, &ret); + + parser->current_buffer_len_ = 0; + parser->current_buffer_data_ = nullptr; + } + + + Local Execute(char* data, size_t len) { + EscapableHandleScope scope(env()->isolate()); + + current_buffer_len_ = len; + current_buffer_data_ = data; + got_exception_ = false; + + size_t nparsed = + http_parser_execute(&parser_, &settings, data, len); + + Save(); + + // Unassign the 'buffer_' variable + current_buffer_.Clear(); + current_buffer_len_ = 0; + current_buffer_data_ = nullptr; + + // If there was an exception in one of the callbacks + if (got_exception_) + return scope.Escape(Local()); + + Local nparsed_obj = Integer::New(env()->isolate(), nparsed); + // If there was a parse error in one of the callbacks + // TODO(bnoordhuis) What if there is an error on EOF? + if (!parser_.upgrade && nparsed != len) { + enum http_errno err = HTTP_PARSER_ERRNO(&parser_); + + Local e = Exception::Error(env()->parse_error_string()); + Local obj = e->ToObject(env()->isolate()); + obj->Set(env()->bytes_parsed_string(), nparsed_obj); + obj->Set(env()->code_string(), + OneByteString(env()->isolate(), http_errno_name(err))); + + return scope.Escape(e); + } + return scope.Escape(nparsed_obj); + } Local CreateHeaders() { // num_values_ is either -1 or the entry # of the last header @@ -542,6 +665,8 @@ class Parser : public BaseObject { Local current_buffer_; size_t current_buffer_len_; char* current_buffer_data_; + StreamResource::Callback prev_alloc_cb_; + StreamResource::Callback prev_read_cb_; static const struct http_parser_settings settings; }; @@ -581,6 +706,8 @@ void InitHttpParser(Handle target, Integer::NewFromUnsigned(env->isolate(), kOnBody)); t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnMessageComplete"), Integer::NewFromUnsigned(env->isolate(), kOnMessageComplete)); + t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnExecute"), + Integer::NewFromUnsigned(env->isolate(), kOnExecute)); Local methods = Array::New(env->isolate()); #define V(num, name, string) \ @@ -595,6 +722,9 @@ void InitHttpParser(Handle target, env->SetProtoMethod(t, "reinitialize", Parser::Reinitialize); env->SetProtoMethod(t, "pause", Parser::Pause); env->SetProtoMethod(t, "resume", Parser::Pause); + env->SetProtoMethod(t, "consume", Parser::Consume); + env->SetProtoMethod(t, "unconsume", Parser::Unconsume); + env->SetProtoMethod(t, "getCurrentBuffer", Parser::GetCurrentBuffer); target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "HTTPParser"), t->GetFunction()); diff --git a/test/parallel/test-http-server-unconsume.js b/test/parallel/test-http-server-unconsume.js new file mode 100644 index 00000000000000..0d33263ad6ffa6 --- /dev/null +++ b/test/parallel/test-http-server-unconsume.js @@ -0,0 +1,30 @@ +'use strict'; +var common = require('../common'); +var assert = require('assert'); +var http = require('http'); +var net = require('net'); + +var received = ''; + +var server = http.createServer(function(req, res) { + res.writeHead(200); + res.end(); + + req.socket.on('data', function(data) { + received += data; + }); + + server.close(); +}).listen(common.PORT, function() { + var socket = net.connect(common.PORT, function() { + socket.write('PUT / HTTP/1.1\r\n\r\n'); + + socket.once('data', function() { + socket.end('hello world'); + }); + }); +}); + +process.on('exit', function() { + assert.equal(received, 'hello world'); +}); From bc821080b2034662bdb1c4fdf83abcafb19331e5 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 26 Aug 2015 02:29:25 -0700 Subject: [PATCH 2/4] improvement --- lib/_http_server.js | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/_http_server.js b/lib/_http_server.js index 3f6d2a702892ed..690e76f8f10145 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -255,6 +255,10 @@ Server.prototype.setTimeout = function(msecs, callback) { exports.Server = Server; +function noop() { +} + + function connectionListener(socket) { var self = this; var outgoing = []; @@ -328,7 +332,10 @@ function connectionListener(socket) { // Override on to unconsume on `data`, `readable` listeners socket.on = socketOnWrap; - parser.consume(socket._handle._externalStream); + var external = socket._handle._externalStream; + if (external) + parser.consume(external); + external = null; parser[kOnExecute] = onParserExecute; // TODO(isaacs): Move all these functions out of here @@ -349,7 +356,7 @@ function connectionListener(socket) { onParserExecuteCommon(ret, undefined); // Kick-off next ticks - setImmediate(function() {}); + setImmediate(noop); } function onParserExecuteCommon(ret, d) { From d94f7f4d3a6d7daf63b3a499c83fd06d94782558 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 26 Aug 2015 04:37:50 -0700 Subject: [PATCH 3/4] env: introduce `KickNextTick` There might be a need to "kick off" the next tick queue and execute events on it. Normally it is done through the `MakeCallback` interface, but in case when it is not - we need a way to "kick them off" manually. --- src/env.cc | 35 +++++++++++++++++++++++++++++++++++ src/env.h | 2 ++ src/node.cc | 27 +-------------------------- src/node_internals.h | 2 ++ 4 files changed, 40 insertions(+), 26 deletions(-) diff --git a/src/env.cc b/src/env.cc index 3deb4db09e6d92..e28866efd06894 100644 --- a/src/env.cc +++ b/src/env.cc @@ -10,6 +10,7 @@ using v8::Local; using v8::Message; using v8::StackFrame; using v8::StackTrace; +using v8::TryCatch; void Environment::PrintSyncTrace() const { if (!trace_sync_io_) @@ -55,4 +56,38 @@ void Environment::PrintSyncTrace() const { fflush(stderr); } + +bool Environment::KickNextTick() { + TickInfo* info = tick_info(); + + if (info->in_tick()) { + return true; + } + + if (info->length() == 0) { + isolate()->RunMicrotasks(); + } + + if (info->length() == 0) { + info->set_index(0); + return true; + } + + info->set_in_tick(true); + + // process nextTicks after call + TryCatch try_catch; + try_catch.SetVerbose(true); + tick_callback_function()->Call(process_object(), 0, nullptr); + + info->set_in_tick(false); + + if (try_catch.HasCaught()) { + info->set_last_threw(true); + return false; + } + + return true; +} + } // namespace node diff --git a/src/env.h b/src/env.h index 3115c5fe538d63..ce972d598edf99 100644 --- a/src/env.h +++ b/src/env.h @@ -424,6 +424,8 @@ class Environment { void PrintSyncTrace() const; inline void set_trace_sync_io(bool value); + bool KickNextTick(); + inline uint32_t* heap_statistics_buffer() const; inline void set_heap_statistics_buffer(uint32_t* pointer); diff --git a/src/node.cc b/src/node.cc index 3c6441a6f2531e..084fe900cd35d4 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1026,7 +1026,6 @@ Handle MakeCallback(Environment* env, // If you hit this assertion, you forgot to enter the v8::Context first. CHECK_EQ(env->context(), env->isolate()->GetCurrentContext()); - Local process = env->process_object(); Local object, domain; bool has_async_queue = false; bool has_domain = false; @@ -1092,32 +1091,8 @@ Handle MakeCallback(Environment* env, return Undefined(env->isolate()); } - Environment::TickInfo* tick_info = env->tick_info(); - - if (tick_info->in_tick()) { - return ret; - } - - if (tick_info->length() == 0) { - env->isolate()->RunMicrotasks(); - } - - if (tick_info->length() == 0) { - tick_info->set_index(0); - return ret; - } - - tick_info->set_in_tick(true); - - // process nextTicks after call - env->tick_callback_function()->Call(process, 0, nullptr); - - tick_info->set_in_tick(false); - - if (try_catch.HasCaught()) { - tick_info->set_last_threw(true); + if (!env->KickNextTick()) return Undefined(env->isolate()); - } return ret; } diff --git a/src/node_internals.h b/src/node_internals.h index 8f35433b2f85c3..ffb5ec7ad96242 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -69,6 +69,8 @@ v8::Handle MakeCallback(Environment* env, int argc = 0, v8::Handle* argv = nullptr); +bool KickNextTick(); + // Convert a struct sockaddr to a { address: '1.2.3.4', port: 1234 } JS object. // Sets address and port properties on the info object and returns it. // If |info| is omitted, a new object is returned. From 5cb0a0ec7da0cd546c288327b70c5164b7bcbd88 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 26 Aug 2015 04:38:43 -0700 Subject: [PATCH 4/4] http_parser: follow-up fixes --- lib/_http_server.js | 7 ------- src/node_http_parser.cc | 2 ++ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/lib/_http_server.js b/lib/_http_server.js index 690e76f8f10145..dd787fa2f7ad99 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -255,10 +255,6 @@ Server.prototype.setTimeout = function(msecs, callback) { exports.Server = Server; -function noop() { -} - - function connectionListener(socket) { var self = this; var outgoing = []; @@ -354,9 +350,6 @@ function connectionListener(socket) { function onParserExecute(ret, d) { debug('SERVER socketOnParserExecute %d', ret); onParserExecuteCommon(ret, undefined); - - // Kick-off next ticks - setImmediate(noop); } function onParserExecuteCommon(ret, d) { diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index b08bc0f88acb9e..d95083d44ad36e 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -562,6 +562,8 @@ class Parser : public BaseObject { parser->current_buffer_len_ = 0; parser->current_buffer_data_ = nullptr; + + parser->env()->KickNextTick(); }