From 3b8ea56f4589d871d89d841bb6b35c6b0eae8a50 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Mon, 26 Jan 2015 20:37:12 -0500 Subject: [PATCH] Convert readable stream pull() to promise-returning See discussion in #185 for background. This allows more convenient integration with promise-returning underlying source functions, removing the need for `.catch(error)` as seen in the modified example. It also rationalizes the slightly-strange way in which pull() is mutexed based on whether enqueue() has been called: instead, it is now mutexed so as to not be called again until the previous iteration's promise has fulfilled. This gives more direct control to the underlying source, and allows them to deal more easily with cases where there are just no chunks to enqueue. Although superficially it seems this may reduce performance for cases where underlying sources can enqueue many chunks synchronously, this is not actually the case, as can be seen from the modified test illustrating such a scenario. If an underlying source can enqueue multiple chunks synchronously, then it should just do so! It shouldn't wait to be pull()ed, as the previous test was doing. --- index.bs | 50 ++++++++------- .../lib/readable-stream-abstract-ops.js | 23 +++---- .../lib/readable-stream.js | 2 +- .../test/bad-underlying-sources.js | 6 +- reference-implementation/test/pipe-to.js | 8 +-- .../test/readable-stream.js | 63 +++++++++++++++++-- .../test/utils/sequential-rs.js | 32 +++++----- 7 files changed, 121 insertions(+), 63 deletions(-) diff --git a/index.bs b/index.bs index a81fa69bf..2661c154a 100644 --- a/index.bs +++ b/index.bs @@ -273,8 +273,9 @@ Instances of ReadableStream are created with the internal slots des "errored" state - \[[pulling]] - A boolean flag indicating whether data is currently being pulled from the underlying sink + \[[pullingPromise]] + A promise returned by the underlying source's pull method, stored so that the stream can + re-pull when it fulfills \[[queue]] @@ -318,9 +319,10 @@ Instances of ReadableStream are created with the internal slots des
  • start(enqueue, close, error) is called immediately, and is typically used to adapt a push source by setting up relevant event listeners, or to acquire access to a pull source. If this process is asynchronous, it can return a promise to signal success or failure. -
  • pull(enqueue, close, error) is called when the stream's internal queue of chunks is depleted, and - the consumer has signaled that they wish to consume more data. Once it is called, it will not be called again - until the passed enqueue callback is called. +
  • pull(enqueue, close) is called when the stream's internal queue of chunks is depleted, and the + consumer has signaled that they wish to consume more data. If pull returns a promise, then it will + not be called again until the promise returned by the previous call has fulfilled; if the promise rejects, the + stream will become errored.
  • cancel(reason) is called when the consumer signals that they are no longer interested in the stream. It should perform any actions necessary to release access to the underlying source. If this process is asynchronous, it can return a promise to signal success or failure. @@ -342,7 +344,7 @@ Instances of ReadableStream are created with the internal slots des
  • Set this@\[[readyPromise]] and this@\[[closedPromise]] to new promises.
  • Set this@\[[queue]] to a new empty List.
  • Set this@\[[state]] to "waiting". -
  • Set this@\[[started]], this@\[[draining]], and this@\[[pulling]] to false. +
  • Set this@\[[started]] and this@\[[draining]] to false.
  • Set this@\[[readableStreamReader]] to undefined.
  • Set this@\[[enqueue]] to CreateReadableStreamEnqueueFunction(this).
  • Set this@\[[close]] to CreateReadableStreamCloseFunction(this). @@ -780,20 +782,24 @@ The length property of the cancel method is 1.

    CallReadableStreamPull ( stream )

      -
    1. If stream@\[[pulling]] is true or stream@\[[draining]] is true or - stream@\[[started]] is false or stream@\[[state]] is "closed" or - stream@\[[state]] is "errored", return undefined. -
    2. Let shouldApplyBackpressure be ShouldReadableStreamApplyBackpressure(stream). -
    3. If shouldApplyBackpressure is true, return undefined. -
    4. Set stream@\[[pulling]] to true. -
    5. Let pullResult be InvokeOrNoop(stream@\[[underlyingSource]], "pull", - (stream@\[[enqueue]], stream@\[[close]], stream@\[[error]]). -
    6. If pullResult is an abrupt completion, +
    7. If stream@\[[draining]] is true or stream@\[[started]] is false or + stream@\[[state]] is "closed" or stream@\[[state]] is "errored", + return undefined. +
    8. If stream@\[[pullingPromise]] is not undefined,
        -
      1. Call-with-rethrow stream@\[[error]](pullResult.\[[value]]). -
      2. Return pullResult. +
      3. Upon fulfillment of stream@\[[pullingPromise]], call-with-rethrow + CallReadableStreamPull(stream). +
      4. Return undefined.
      -
    9. Otherwise, return undefined. +
    10. Let shouldApplyBackpressure be ShouldReadableStreamApplyBackpressure(stream). +
    11. If shouldApplyBackpressure is true, return undefined. +
    12. Set stream@\[[pullingPromise]] to PromiseInvokeOrNoop(stream@\[[underlyingSource]], + "pull", (stream@\[[enqueue]], stream@\[[close]])). +
    13. Upon fulfillment of stream@\[[pullingPromise]], set stream@\[[pullingPromise]] to + undefined. +
    14. Upon rejection of stream@\[[pullingPromise]] with reason e, call-with-rethrow + stream@\[[error]](e). +
    15. Return undefined.

    CloseReadableStream ( stream )

    @@ -860,7 +866,6 @@ closing over a variable stream, that performs the following steps:
  • Call-with-rethrow EnqueueValueWithSize(stream@\[[queue]], chunk, chunkSize.\[[value]]). -
  • Set stream@\[[pulling]] to false.
  • Let shouldApplyBackpressure be ShouldReadableStreamApplyBackpressure(stream).
  • If stream@\[[state]] is "waiting",
      @@ -1914,18 +1919,17 @@ sources. Note how in contrast to the examples with push sources, most of the }); }, - pull(enqueue, close, error) { + pull(enqueue, close) { const buffer = new ArrayBuffer(CHUNK_SIZE); - fs.read(fd, buffer, 0, CHUNK_SIZE, position).then(bytesRead => { + return fs.read(fd, buffer, 0, CHUNK_SIZE, position).then(bytesRead => { if (bytesRead === 0) { return fs.close(fd).then(close); } else { position += bytesRead; enqueue(buffer); } - }) - .catch(error); + }); }, cancel() { diff --git a/reference-implementation/lib/readable-stream-abstract-ops.js b/reference-implementation/lib/readable-stream-abstract-ops.js index 24aadd3fe..3ae9172c0 100644 --- a/reference-implementation/lib/readable-stream-abstract-ops.js +++ b/reference-implementation/lib/readable-stream-abstract-ops.js @@ -1,7 +1,7 @@ var assert = require('assert'); import ExclusiveStreamReader from './exclusive-stream-reader'; import { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } from './queue-with-sizes'; -import { InvokeOrNoop, typeIsObject } from './helpers'; +import { PromiseInvokeOrNoop, typeIsObject } from './helpers'; export function AcquireExclusiveStreamReader(stream) { if (stream._state === 'closed') { @@ -15,24 +15,26 @@ export function AcquireExclusiveStreamReader(stream) { } export function CallReadableStreamPull(stream) { - if (stream._pulling === true || stream._draining === true || stream._started === false || + if (stream._draining === true || stream._started === false || stream._state === 'closed' || stream._state === 'errored') { return undefined; } + if (stream._pullingPromise !== undefined) { + stream._pullingPromise.then(() => CallReadableStreamPull(stream)); + return undefined; + } + var shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream); if (shouldApplyBackpressure === true) { return undefined; } - stream._pulling = true; - - try { - InvokeOrNoop(stream._underlyingSource, 'pull', [stream._enqueue, stream._close, stream._error]); - } catch (pullResultE) { - stream._error(pullResultE); - throw pullResultE; - } + stream._pullingPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'pull', [stream._enqueue, stream._close]); + stream._pullingPromise.then( + () => { stream._pullingPromise = undefined; }, + e => { stream._error(e); } + ); return undefined; } @@ -94,7 +96,6 @@ export function CreateReadableStreamEnqueueFunction(stream) { } EnqueueValueWithSize(stream._queue, chunk, chunkSize); - stream._pulling = false; var shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream); diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 0e2f85943..0fc150986 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -13,7 +13,7 @@ export default class ReadableStream { this._state = 'waiting'; this._started = false; this._draining = false; - this._pulling = false; + this._pullingPromise = undefined; this._readableStreamReader = undefined; this._enqueue = CreateReadableStreamEnqueueFunction(this); diff --git a/reference-implementation/test/bad-underlying-sources.js b/reference-implementation/test/bad-underlying-sources.js index 91b0367d0..19abcd913 100644 --- a/reference-implementation/test/bad-underlying-sources.js +++ b/reference-implementation/test/bad-underlying-sources.js @@ -78,8 +78,7 @@ test('Throwing underlying source pull getter (second pull)', t => { rs.ready.then(() => { t.equal(rs.state, 'readable', 'sanity check: the stream becomes readable without issue'); - - t.throws(() => rs.read(), /a unique string/, 'reading triggers a pull, and the error is re-thrown'); + t.equal(rs.read(), 'a', 'the initially-enqueued chunk can be read from the stream'); }); rs.closed.then( @@ -106,8 +105,7 @@ test('Throwing underlying source pull method (second pull)', t => { rs.ready.then(() => { t.equal(rs.state, 'readable', 'sanity check: the stream becomes readable without issue'); - - t.throws(() => rs.read(), /a unique string/, 'reading triggers a pull, and the error is re-thrown'); + t.equal(rs.read(), 'a', 'the initially-enqueued chunk can be read from the stream'); }); rs.closed.then( diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index 7f81ced6c..bf223748c 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -238,7 +238,7 @@ test('Piping from a ReadableStream in readable state which becomes closed after }, close() { t.assert(writeCalled); - t.equal(pullCount, 1); + t.equal(pullCount, 2); t.end(); }, @@ -296,7 +296,7 @@ test('Piping from a ReadableStream in readable state which becomes errored after abort(reason) { t.equal(reason, passedError); t.assert(writeCalled); - t.equal(pullCount, 1); + t.equal(pullCount, 2); t.end(); } @@ -332,7 +332,7 @@ test('Piping from a ReadableStream in waiting state which becomes readable after var ws = new WritableStream({ write(chunk) { t.equal(chunk, 'Hello'); - t.equal(pullCount, 1); + t.equal(pullCount, 2); t.end(); }, close() { @@ -473,7 +473,7 @@ test('Piping from a ReadableStream in readable state to a WritableStream in wait } else { t.equal(chunk, 'World'); - t.equal(pullCount, 1); + t.equal(pullCount, 2); t.end(); } diff --git a/reference-implementation/test/readable-stream.js b/reference-implementation/test/readable-stream.js index fa4039361..53afc625b 100644 --- a/reference-implementation/test/readable-stream.js +++ b/reference-implementation/test/readable-stream.js @@ -220,12 +220,11 @@ test('ReadableStream adapting a push source', t => { randomSource.onerror = error; }, - pull(enqueue, close, error) { + pull(enqueue, close) { if (!pullChecked) { pullChecked = true; t.equal(typeof enqueue, 'function', 'enqueue is a function in pull'); t.equal(typeof close, 'function', 'close is a function in pull'); - t.equal(typeof error, 'function', 'error is a function in pull'); } randomSource.readStart(); @@ -267,15 +266,15 @@ test('ReadableStream adapting an async pull source', t => { }); }); -test('ReadableStream is able to pull data repeatedly if it\'s available synchronously', t => { +test('ReadableStream is able to enqueue lots of data in a single pull, making it available synchronously', t => { var i = 0; var rs = new ReadableStream({ pull(enqueue, close) { - if (++i <= 10) { + while (++i <= 10) { enqueue(i); - } else { - close(); } + + close(); } }); @@ -290,6 +289,58 @@ test('ReadableStream is able to pull data repeatedly if it\'s available synchron }); }); +test('ReadableStream does not call pull until previous pull\'s promise fulfills', t => { + var resolve; + var returnedPromise; + var timesCalled = 0; + var rs = new ReadableStream({ + pull(enqueue) { + ++timesCalled; + enqueue(timesCalled); + returnedPromise = new Promise(r => { resolve = r; }); + return returnedPromise; + } + }); + + t.equal(rs.state, 'waiting', 'stream starts out waiting'); + + rs.ready.then(() => { + t.equal(rs.state, 'readable', 'stream becomes readable (even before promise fulfills)'); + t.equal(timesCalled, 1, 'pull is not yet called a second time'); + t.equal(rs.read(), 1, 'read() returns enqueued value'); + + setTimeout(() => { + t.equal(timesCalled, 1, 'after 30 ms, pull has still only been called once'); + + resolve(); + + returnedPromise.then(() => { + t.equal(timesCalled, 2, 'after the promise is fulfilled, pull is called a second time'); + t.equal(rs.read(), 2, 'read() returns the second enqueued value'); + t.end(); + }); + }, 30); + }); +}); + +test('ReadableStream pull rejection makes stream errored', t => { + t.plan(2); + + var theError = new Error('pull failure'); + var rs = new ReadableStream({ + pull() { + return Promise.reject(theError); + } + }); + + t.equal(rs.state, 'waiting', 'stream starts out waiting'); + + rs.closed.then( + () => t.fail('.closed should not fulfill'), + e => t.equal(e, theError, '.closed should reject with the error') + ); +}); + test('ReadableStream ready does not error when no more data is available', t => { // https://github.com/whatwg/streams/issues/80 diff --git a/reference-implementation/test/utils/sequential-rs.js b/reference-implementation/test/utils/sequential-rs.js index 169c1f41b..e10234a59 100644 --- a/reference-implementation/test/utils/sequential-rs.js +++ b/reference-implementation/test/utils/sequential-rs.js @@ -16,20 +16,24 @@ export default function sequentialReadableStream(limit, options) { }); }, - pull(enqueue, finish, error) { - sequentialSource.read((err, done, chunk) => { - if (err) { - error(err); - } else if (done) { - sequentialSource.close(err => { - if (err) { - error(err); - } - finish(); - }); - } else { - enqueue(chunk); - } + pull(enqueue, close) { + return new Promise((resolve, reject) => { + sequentialSource.read((err, done, chunk) => { + if (err) { + reject(err); + } else if (done) { + sequentialSource.close(err => { + if (err) { + reject(err); + } + close(); + resolve(); + }); + } else { + enqueue(chunk); + resolve(); + } + }); }); } });