From 3bbcd077d98535f0ce6c2ae14277a62a48b4738c Mon Sep 17 00:00:00 2001 From: Salakar Date: Sat, 2 Jul 2016 18:30:33 +0100 Subject: [PATCH] implemented auto-pipline for all connections --- .editorconfig | 9 ++++ .gitignore | 1 + lib/connectors/connector.js | 89 +++++++++++++++++++++++++++++++------ 3 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..848b0615 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,9 @@ +# http://editorconfig.org +root = true + +[*] +indent_style = space +indent_size = 2 +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true diff --git a/.gitignore b/.gitignore index 4cd0929d..e189f9a7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules *.cpuprofile /test.js +.idea diff --git a/lib/connectors/connector.js b/lib/connectors/connector.js index 1b7f9892..70a3c6dc 100644 --- a/lib/connectors/connector.js +++ b/lib/connectors/connector.js @@ -5,14 +5,61 @@ var net = require('net'); var tls = require('tls'); var utils = require('../utils'); +var MAX_BUFFER_SIZE = 4 * 1024 * 1024; // 4 mb +var MAX_QUEUED = 100; // appears to be a good number after testing + function Connector(options) { this.options = options; + + if (options.path) { + this.connectionOptions = _.pick(options, ['path']); + } else { + this.connectionOptions = _.pick(options, ['port', 'host', 'family']); + } + if (options.tls) { + _.assign(this.connectionOptions, options.tls); + } + + // auto pipeline props + this.pipelineBuffer = ''; + this.pipelineQueued = 0; + this.pipelineImmediate = null; } Connector.prototype.check = function () { return true; }; +/** + * Proxy stream writer for auto-pipeline. + * @param str usual stream.write arg + * @param forceWrite set to true when calling write to force pipeline bypass + */ +Connector.prototype.write = function (str, forceWrite) { + this.pipelineBuffer += str; + this.pipelineQueued++; + + // queue the write for the next event loop if this is the first write issued + if (this.pipelineQueued < 2) { + this.pipelineImmediate = setImmediate(this.writePipeline.bind(this)); + } + + // write pipeline if limits have been exceeded or this is a forced write + if (forceWrite || this.pipelineQueued > MAX_QUEUED || this.pipelineBuffer.length > MAX_BUFFER_SIZE) { + clearImmediate(this.pipelineImmediate); + this.writePipeline(); + } +}; + +/** + * Writes the buffered pipelines and resets counts and buffer. + */ +Connector.prototype.writePipeline = function () { + this.stream._stream.write(this.pipelineBuffer); + this.pipelineBuffer = ''; + this.pipelineQueued = 0; +}; + Connector.prototype.disconnect = function () { this.connecting = false; if (this.stream) { @@ -21,31 +68,45 @@ Connector.prototype.disconnect = function () { }; Connector.prototype.connect = function (callback) { + var _this = this; this.connecting = true; - var connectionOptions; - if (this.options.path) { - connectionOptions = _.pick(this.options, ['path']); - } else { - connectionOptions = _.pick(this.options, ['port', 'host', 'family']); - } - if (this.options.tls) { - _.assign(connectionOptions, this.options.tls); - } - var _this = this; process.nextTick(function () { if (!_this.connecting) { callback(new Error(utils.CONNECTION_CLOSED_ERROR_MSG)); return; } + var stream; + if (_this.options.tls) { - stream = tls.connect(connectionOptions); + stream = tls.connect(_this.connectionOptions); } else { - stream = net.createConnection(connectionOptions); + stream = net.createConnection(_this.connectionOptions); } - _this.stream = stream; - callback(null, stream); + + // create a fake stream so we can intercept write without overriding it + // sometimes on tear-down 'stream' is undefined so added a check + if (stream) { + _this.stream = { + _stream: stream, + writable: stream.writable, + on: stream.on.bind(stream), + end: stream.end.bind(stream), + remotePort: stream.remotePort, + once: stream.once.bind(stream), + write: _this.write.bind(_this), + remoteAddress: stream.remoteAddress, + destroy: stream.destroy.bind(stream), + _writableState: stream._writableState, + setTimeout: stream.setTimeout.bind(stream), + setKeepAlive: stream.setKeepAlive.bind(stream), + removeListener: stream.removeListener.bind(stream), + removeAllListeners: stream.removeAllListeners.bind(stream) + }; + } + + callback(null, _this.stream); }); };