diff --git a/benchmark/net/net-c2s-cork.js b/benchmark/net/net-c2s-cork.js new file mode 100644 index 00000000000000..5f8e0fa4357b99 --- /dev/null +++ b/benchmark/net/net-c2s-cork.js @@ -0,0 +1,96 @@ +// test the speed of .pipe() with sockets + +var common = require('../common.js'); +var PORT = common.PORT; + +var bench = common.createBenchmark(main, { + len: [4, 8, 16, 32, 64, 128, 512, 1024], + type: ['buf'], + dur: [5], +}); + +var dur; +var len; +var type; +var chunk; +var encoding; + +function main(conf) { + dur = +conf.dur; + len = +conf.len; + type = conf.type; + + switch (type) { + case 'buf': + chunk = new Buffer(len); + chunk.fill('x'); + break; + case 'utf': + encoding = 'utf8'; + chunk = new Array(len / 2 + 1).join('ΓΌ'); + break; + case 'asc': + encoding = 'ascii'; + chunk = new Array(len + 1).join('x'); + break; + default: + throw new Error('invalid type: ' + type); + break; + } + + server(); +} + +var net = require('net'); + +function Writer() { + this.received = 0; + this.writable = true; +} + +Writer.prototype.write = function(chunk, encoding, cb) { + this.received += chunk.length; + + if (typeof encoding === 'function') + encoding(); + else if (typeof cb === 'function') + cb(); + + return true; +}; + +// doesn't matter, never emits anything. +Writer.prototype.on = function() {}; +Writer.prototype.once = function() {}; +Writer.prototype.emit = function() {}; + +function server() { + var writer = new Writer(); + + // the actual benchmark. + var server = net.createServer(function(socket) { + socket.pipe(writer); + }); + + server.listen(PORT, function() { + var socket = net.connect(PORT); + socket.on('connect', function() { + bench.start(); + + socket.on('drain', send) + send() + + setTimeout(function() { + var bytes = writer.received; + var gbits = (bytes * 8) / (1024 * 1024 * 1024); + bench.end(gbits); + }, dur * 1000); + + function send() { + socket.cork(); + while(socket.write(chunk, encoding)) {} + socket.uncork(); + } + }); + }); +} diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 9c7e2630161cd9..7bbb97b16aa415 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -108,6 +108,14 @@ function WritableState(options, stream) { // True if the error was already emitted and should not be thrown again this.errorEmitted = false; + + // count buffered requests + this.bufferedRequestCount = 0; + + // create the two objects needed to store the corked requests + // they are not a linked list, as no new elements are inserted in there + this.corkedRequestsFree = new CorkedRequest(this); + this.corkedRequestsFree.next = new CorkedRequest(this); } WritableState.prototype.getBuffer = function writableStateGetBuffer() { @@ -274,6 +282,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { } else { state.bufferedRequest = state.lastBufferedRequest; } + state.bufferedRequestCount += 1; } else { doWrite(stream, state, false, len, chunk, encoding, cb); } @@ -357,7 +366,6 @@ function onwriteDrain(stream, state) { } } - // if there's something in the buffer waiting, then process it function clearBuffer(stream, state) { state.bufferProcessing = true; @@ -365,26 +373,26 @@ function clearBuffer(stream, state) { if (stream._writev && entry && entry.next) { // Fast case, write everything using _writev() - var buffer = []; - var cbs = []; + var l = state.bufferedRequestCount; + var buffer = new Array(l); + var holder = state.corkedRequestsFree; + holder.entry = entry; + + var count = 0; while (entry) { - cbs.push(entry.callback); - buffer.push(entry); + buffer[count] = entry; entry = entry.next; + count += 1; } - // count the one we are adding, as well. - // TODO(isaacs) clean this up + doWrite(stream, state, true, state.length, buffer, '', holder.finish); + + // doWrite is always async, defer these to save a bit of time + // as the hot path ends with doWrite state.pendingcb++; state.lastBufferedRequest = null; - doWrite(stream, state, true, state.length, buffer, '', function(err) { - for (var i = 0; i < cbs.length; i++) { - state.pendingcb--; - cbs[i](err); - } - }); - - // Clear buffer + state.corkedRequestsFree = holder.next; + holder.next = null; } else { // Slow case, write chunks one-by-one while (entry) { @@ -407,6 +415,8 @@ function clearBuffer(stream, state) { if (entry === null) state.lastBufferedRequest = null; } + + state.bufferedRequestCount = 0; state.bufferedRequest = entry; state.bufferProcessing = false; } @@ -484,3 +494,26 @@ function endWritable(stream, state, cb) { } state.ended = true; } + +// It seems a linked list but it is not +// there will be only 2 of these for each stream +function CorkedRequest(state) { + this.next = null; + this.entry = null; + + this.finish = (err) => { + var entry = this.entry; + this.entry = null; + while (entry) { + var cb = entry.callback; + state.pendingcb--; + cb(err); + entry = entry.next; + } + if (state.corkedRequestsFree) { + state.corkedRequestsFree.next = this; + } else { + state.corkedRequestsFree = this; + } + }; +}