From bfb2cd0bfddd716366f1c89637cca9fc1234e592 Mon Sep 17 00:00:00 2001 From: Jackson Tian Date: Fri, 18 Dec 2015 11:29:08 +0800 Subject: [PATCH] stream: add bytesRead property for readable Add a bytesRead property for readable is useful in some use cases. When user want know how many bytes read of readable, need to caculate it in userland. If encoding is specificed, get the value is very slowly. PR-URL: https://github.com/nodejs/node/pull/4372 Reviewed-By: Trevor Norris Reviewed-By: Colin Ihrig Reviewed-By: James M Snell --- doc/api/stream.markdown | 5 + lib/_stream_readable.js | 3 + lib/net.js | 5 - .../test-stream2-readable-bytesread.js | 119 ++++++++++++++++++ 4 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream2-readable-bytesread.js diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index 554ddc222d6f48..e0c72bf1ff954c 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -250,6 +250,11 @@ readable: null end ``` + +#### readable.bytesRead + +The amount of read bytes. If `objectMode` is `true`, the value is 0 always. + #### readable.isPaused() * Return: `Boolean` diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index bdc263d6ef28f4..8cad1f1a37d37f 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -83,6 +83,8 @@ function Readable(options) { this._readableState = new ReadableState(options, this); + this.bytesRead = 0; + // legacy this.readable = true; @@ -135,6 +137,7 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) { var e = new Error('stream.unshift() after end event'); stream.emit('error', e); } else { + stream.bytesRead += state.objectMode ? 0 : chunk.length; if (state.decoder && !addToFront && !encoding) chunk = state.decoder.write(chunk); diff --git a/lib/net.js b/lib/net.js index 0e7bb02bb57dff..1faaa5c13bfdfd 100644 --- a/lib/net.js +++ b/lib/net.js @@ -91,7 +91,6 @@ exports._normalizeConnectArgs = normalizeConnectArgs; // called when creating new Socket, or when re-using a closed Socket function initSocketHandle(self) { self.destroyed = false; - self.bytesRead = 0; self._bytesDispatched = 0; self._sockname = null; @@ -515,10 +514,6 @@ function onread(nread, buffer) { // will prevent this from being called again until _read() gets // called again. - // if it's not enough data, we'll just call handle.readStart() - // again right away. - self.bytesRead += nread; - // Optimization: emit the original buffer with end points var ret = self.push(buffer); diff --git a/test/parallel/test-stream2-readable-bytesread.js b/test/parallel/test-stream2-readable-bytesread.js new file mode 100644 index 00000000000000..6f56af2f4e2c3e --- /dev/null +++ b/test/parallel/test-stream2-readable-bytesread.js @@ -0,0 +1,119 @@ +'use strict'; + +require('../common'); +const assert = require('assert'); +const Readable = require('stream').Readable; +const Duplex = require('stream').Duplex; +const Transform = require('stream').Transform; + +(function() { + const readable = new Readable({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('a')); + } + }); + + readable._max = 1000; + readable._index = 1; + + var total = 0; + readable.on('data', function(chunk) { + total += chunk.length; + }); + + readable.on('end', function() { + assert.equal(total, readable.bytesRead); + }); +})(); + +(function() { + const readable = new Readable({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('a')); + } + }); + + readable._max = 1000; + readable._index = 1; + + var total = 0; + readable.setEncoding('utf8'); + readable.on('data', function(chunk) { + total += Buffer.byteLength(chunk); + }); + + readable.on('end', function() { + assert.equal(total, readable.bytesRead); + }); +})(); + +(function() { + const duplex = new Duplex({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('a')); + }, + write: function(chunk, encoding, next) { + next(); + } + }); + + duplex._max = 1000; + duplex._index = 1; + + var total = 0; + duplex.setEncoding('utf8'); + duplex.on('data', function(chunk) { + total += Buffer.byteLength(chunk); + }); + + duplex.on('end', function() { + assert.equal(total, duplex.bytesRead); + }); +})(); + +(function() { + const readable = new Readable({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('{"key":"value"}')); + } + }); + readable._max = 1000; + readable._index = 1; + + const transform = new Transform({ + readableObjectMode : true, + transform: function(chunk, encoding, next) { + next(null, JSON.parse(chunk)); + }, + flush: function(done) { + done(); + } + }); + + var total = 0; + readable.on('data', function(chunk) { + total += chunk.length; + }); + + transform.on('end', function() { + assert.equal(0, transform.bytesRead); + assert.equal(total, readable.bytesRead); + }); + readable.pipe(transform); +})();