diff --git a/index.bs b/index.bs index 9834876ed..374c16fa6 100644 --- a/index.bs +++ b/index.bs @@ -274,6 +274,11 @@ Instances of ReadableStream are created with the internal slots des A Readable Stream Error Function created with the ability to move this stream to an "errored" state + + \[[pullScheduled]] + A boolean flag set to true when the underlying source's pull method is scheduled to be + called again after the current call to it finishes + \[[pullingPromise]] A promise returned by the underlying source's pull method, stored so that the stream can @@ -346,7 +351,7 @@ Instances of ReadableStream are created with the internal slots des
  • Set this@\[[readyPromise]] and this@\[[closedPromise]] to new promises.
  • Set this@\[[queue]] to a new empty List.
  • Set this@\[[state]] to "waiting". -
  • Set this@\[[started]] and this@\[[draining]] to false. +
  • Set this@\[[started]], this@\[[draining]], and this@\[[pulLScheduled]] to false.
  • Set this@\[[readableStreamReader]] to undefined.
  • Set this@\[[enqueue]] to CreateReadableStreamEnqueueFunction(this).
  • Set this@\[[close]] to CreateReadableStreamCloseFunction(this). @@ -784,12 +789,16 @@ The length property of the cancel method is 1.
    1. If stream@\[[draining]] is true or stream@\[[started]] is false or - stream@\[[state]] is "closed" or stream@\[[state]] is "errored", - return undefined. + stream@\[[state]] is "closed" or stream@\[[state]] is "errored" or + stream@\[[pullScheduled]] is true, return undefined.
    2. If stream@\[[pullingPromise]] is not undefined,
        -
      1. Upon fulfillment of stream@\[[pullingPromise]], call-with-rethrow - CallReadableStreamPull(stream). +
      2. Set stream@\[[pullScheduled]] to true. +
      3. Upon fulfillment of stream@\[[pullingPromise]], +
          +
        1. Set stream@\[[pulLScheduled]] to false. +
        2. Call-with-rethrow CallReadableStreamPull(stream). +
      4. Return undefined.
    3. Let shouldApplyBackpressure be ShouldReadableStreamApplyBackpressure(stream). diff --git a/reference-implementation/lib/readable-stream-abstract-ops.js b/reference-implementation/lib/readable-stream-abstract-ops.js index 3ae9172c0..f28a03428 100644 --- a/reference-implementation/lib/readable-stream-abstract-ops.js +++ b/reference-implementation/lib/readable-stream-abstract-ops.js @@ -16,12 +16,17 @@ export function AcquireExclusiveStreamReader(stream) { export function CallReadableStreamPull(stream) { if (stream._draining === true || stream._started === false || - stream._state === 'closed' || stream._state === 'errored') { + stream._state === 'closed' || stream._state === 'errored' || + stream._pullScheduled === true) { return undefined; } if (stream._pullingPromise !== undefined) { - stream._pullingPromise.then(() => CallReadableStreamPull(stream)); + stream._pullScheduled = true; + stream._pullingPromise.then(() => { + stream._pullScheduled = false; + CallReadableStreamPull(stream); + }); return undefined; } diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 0fc150986..8df5b5fc3 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -13,6 +13,7 @@ export default class ReadableStream { this._state = 'waiting'; this._started = false; this._draining = false; + this._pullScheduled = false; this._pullingPromise = undefined; this._readableStreamReader = undefined; diff --git a/reference-implementation/test/readable-stream.js b/reference-implementation/test/readable-stream.js index 53afc625b..2fd64c85c 100644 --- a/reference-implementation/test/readable-stream.js +++ b/reference-implementation/test/readable-stream.js @@ -323,6 +323,44 @@ test('ReadableStream does not call pull until previous pull\'s promise fulfills' }); }); +test('ReadableStream does not call pull multiple times after previous pull finishes', t => { + var timesCalled = 0; + + var rs = new ReadableStream({ + start(enqueue) { + enqueue('a'); + enqueue('b'); + enqueue('c'); + }, + pull() { + ++timesCalled; + }, + strategy: { + size() { + return 1; + }, + shouldApplyBackpressure() { + return false; + } + } + }); + + t.equal(rs.state, 'readable', 'since start() synchronously enqueued chunks, the stream is readable'); + + // Wait for start to finish + rs.ready.then(() => { + t.equal(rs.read(), 'a', 'first chunk should be as expected'); + t.equal(rs.read(), 'b', 'second chunk should be as expected'); + t.equal(rs.read(), 'c', 'second chunk should be as expected'); + + setTimeout(() => { + // Once for after start, and once for after rs.read() === 'a'. + t.equal(timesCalled, 2, 'pull() should only be called twice'); + t.end(); + }, 50); + }); +}); + test('ReadableStream pull rejection makes stream errored', t => { t.plan(2);