diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 92984eb08eb24b..39eee61460fe61 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -28,6 +28,7 @@ Writable.WritableState = WritableState; var util = require('util'); var Stream = require('stream'); +var debug = util.debuglog('stream'); util.inherits(Writable, Stream); @@ -35,6 +36,7 @@ function WriteReq(chunk, encoding, cb) { this.chunk = chunk; this.encoding = encoding; this.callback = cb; + this.next = null; } function WritableState(options, stream) { @@ -109,7 +111,8 @@ function WritableState(options, stream) { // the amount that is being written when _write is called. this.writelen = 0; - this.buffer = []; + this.bufferedRequest = null; + this.lastBufferedRequest = null; // number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted @@ -123,6 +126,23 @@ function WritableState(options, stream) { this.errorEmitted = false; } +WritableState.prototype.getBuffer = function writableStateGetBuffer() { + var current = this.bufferedRequest; + var out = []; + while (current) { + out.push(current); + current = current.next; + } + return out; +}; + +Object.defineProperty(WritableState.prototype, 'buffer', { + get: util.deprecate(function() { + return this.getBuffer(); + }, '_writableState.buffer is deprecated. Use ' + + '_writableState.getBuffer() instead.') +}); + function Writable(options) { // Writable ctor is applied to Duplexes, though they're not // instanceof Writable, they're instanceof Readable. @@ -216,7 +236,7 @@ Writable.prototype.uncork = function() { !state.corked && !state.finished && !state.bufferProcessing && - state.buffer.length) + state.bufferedRequest) clearBuffer(this, state); } }; @@ -255,8 +275,15 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { if (!ret) state.needDrain = true; - if (state.writing || state.corked) - state.buffer.push(new WriteReq(chunk, encoding, cb)); + if (state.writing || state.corked) { + var last = state.lastBufferedRequest; + state.lastBufferedRequest = new WriteReq(chunk, encoding, cb); + if (last) { + last.next = state.lastBufferedRequest; + } else { + state.bufferedRequest = state.lastBufferedRequest; + } + } else doWrite(stream, state, false, len, chunk, encoding, cb); @@ -313,7 +340,7 @@ function onwrite(stream, er) { if (!finished && !state.corked && !state.bufferProcessing && - state.buffer.length) { + state.bufferedRequest) { clearBuffer(stream, state); } @@ -349,17 +376,23 @@ function onwriteDrain(stream, state) { // if there's something in the buffer waiting, then process it function clearBuffer(stream, state) { state.bufferProcessing = true; + var entry = state.bufferedRequest; - if (stream._writev && state.buffer.length > 1) { + if (stream._writev && entry && entry.next) { // Fast case, write everything using _writev() + var buffer = []; var cbs = []; - for (var c = 0; c < state.buffer.length; c++) - cbs.push(state.buffer[c].callback); + while (entry) { + cbs.push(entry.callback); + buffer.push(entry); + entry = entry.next; + } // count the one we are adding, as well. // TODO(isaacs) clean this up state.pendingcb++; - doWrite(stream, state, true, state.length, state.buffer, '', function(err) { + state.lastBufferedRequest = null; + doWrite(stream, state, true, state.length, buffer, '', function(err) { for (var i = 0; i < cbs.length; i++) { state.pendingcb--; cbs[i](err); @@ -367,34 +400,29 @@ function clearBuffer(stream, state) { }); // Clear buffer - state.buffer = []; } else { // Slow case, write chunks one-by-one - for (var c = 0; c < state.buffer.length; c++) { - var entry = state.buffer[c]; + while (entry) { var chunk = entry.chunk; var encoding = entry.encoding; var cb = entry.callback; var len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, cb); - + entry = entry.next; // if we didn't call the onwrite immediately, then // it means that we need to wait until it does. // also, that means that the chunk and cb are currently // being processed, so move the buffer counter past them. if (state.writing) { - c++; break; } } - if (c < state.buffer.length) - state.buffer = state.buffer.slice(c); - else - state.buffer.length = 0; + if (entry === null) + state.lastBufferedRequest = null; } - + state.bufferedRequest = entry; state.bufferProcessing = false; } @@ -435,7 +463,7 @@ Writable.prototype.end = function(chunk, encoding, cb) { function needFinish(stream, state) { return (state.ending && state.length === 0 && - state.buffer.length === 0 && + state.bufferedRequest === null && !state.finished && !state.writing); } diff --git a/lib/net.js b/lib/net.js index fac78f8c04dcbc..f0075b5a33e5b4 100644 --- a/lib/net.js +++ b/lib/net.js @@ -732,7 +732,7 @@ Socket.prototype.__defineGetter__('bytesWritten', function() { data = this._pendingData, encoding = this._pendingEncoding; - state.buffer.forEach(function(el) { + state.getBuffer().forEach(function(el) { if (util.isBuffer(el.chunk)) bytes += el.chunk.length; else diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index 9c9ddd8efc3141..6064565be0a124 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -81,7 +81,7 @@ test('writable side consumption', function(t) { t.equal(tx._readableState.length, 10); t.equal(transformed, 10); t.equal(tx._transformState.writechunk.length, 5); - t.same(tx._writableState.buffer.map(function(c) { + t.same(tx._writableState.getBuffer().map(function(c) { return c.chunk.length; }), [6, 7, 8, 9, 10]);