Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement auto-pipelining for all connections [~100% op/s increase] #343

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# http://editorconfig.org
root = true

[*]
indent_style = space
indent_size = 2
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
*.cpuprofile
/test.js
.idea
89 changes: 75 additions & 14 deletions lib/connectors/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,61 @@ var net = require('net');
var tls = require('tls');
var utils = require('../utils');

var MAX_BUFFER_SIZE = 4 * 1024 * 1024; // 4 mb
var MAX_QUEUED = 100; // appears to be a good number after testing

function Connector(options) {
this.options = options;

if (options.path) {
this.connectionOptions = _.pick(options, ['path']);
} else {
this.connectionOptions = _.pick(options, ['port', 'host', 'family']);
}
if (options.tls) {
_.assign(this.connectionOptions, options.tls);
}

// auto pipeline props
this.pipelineBuffer = '';
this.pipelineQueued = 0;
this.pipelineImmediate = null;
}

Connector.prototype.check = function () {
return true;
};

/**
* Proxy stream writer for auto-pipeline.
* @param str usual stream.write arg
* @param forceWrite set to true when calling write to force pipeline bypass
*/
Connector.prototype.write = function (str, forceWrite) {
this.pipelineBuffer += str;
this.pipelineQueued++;

// queue the write for the next event loop if this is the first write issued
if (this.pipelineQueued < 2) {
this.pipelineImmediate = setImmediate(this.writePipeline.bind(this));
}

// write pipeline if limits have been exceeded or this is a forced write
if (forceWrite || this.pipelineQueued > MAX_QUEUED || this.pipelineBuffer.length > MAX_BUFFER_SIZE) {
clearImmediate(this.pipelineImmediate);
this.writePipeline();
}
};

/**
* Writes the buffered pipelines and resets counts and buffer.
*/
Connector.prototype.writePipeline = function () {
this.stream._stream.write(this.pipelineBuffer);
this.pipelineBuffer = '';
this.pipelineQueued = 0;
};

Connector.prototype.disconnect = function () {
this.connecting = false;
if (this.stream) {
Expand All @@ -21,31 +68,45 @@ Connector.prototype.disconnect = function () {
};

Connector.prototype.connect = function (callback) {
var _this = this;
this.connecting = true;
var connectionOptions;
if (this.options.path) {
connectionOptions = _.pick(this.options, ['path']);
} else {
connectionOptions = _.pick(this.options, ['port', 'host', 'family']);
}
if (this.options.tls) {
_.assign(connectionOptions, this.options.tls);
}

var _this = this;
process.nextTick(function () {
if (!_this.connecting) {
callback(new Error(utils.CONNECTION_CLOSED_ERROR_MSG));
return;
}

var stream;

if (_this.options.tls) {
stream = tls.connect(connectionOptions);
stream = tls.connect(_this.connectionOptions);
} else {
stream = net.createConnection(connectionOptions);
stream = net.createConnection(_this.connectionOptions);
}
_this.stream = stream;
callback(null, stream);

// create a fake stream so we can intercept write without overriding it
// sometimes on tear-down 'stream' is undefined so added a check
if (stream) {
_this.stream = {
_stream: stream,
writable: stream.writable,
on: stream.on.bind(stream),
end: stream.end.bind(stream),
remotePort: stream.remotePort,
once: stream.once.bind(stream),
write: _this.write.bind(_this),
remoteAddress: stream.remoteAddress,
destroy: stream.destroy.bind(stream),
_writableState: stream._writableState,
setTimeout: stream.setTimeout.bind(stream),
setKeepAlive: stream.setKeepAlive.bind(stream),
removeListener: stream.removeListener.bind(stream),
removeAllListeners: stream.removeAllListeners.bind(stream)
};
}

callback(null, _this.stream);
});
};

Expand Down