Skip to content

Commit

Permalink
Be more conservative about calling pull()
Browse files Browse the repository at this point in the history
Per @tyoshino's comment at #272 (comment)
  • Loading branch information
domenic committed Feb 2, 2015
1 parent 5803279 commit 8b2b726
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 7 deletions.
19 changes: 14 additions & 5 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<td>A <a>Readable Stream Error Function</a> created with the ability to move this stream to an
<code>"errored"</code> state
</tr>
<tr>
<td>\[[pullScheduled]]
<td>A boolean flag set to <b>true</b> when the underlying source's <code>pull</code> method is scheduled to be
called again after the current call to it finishes
</tr>
<tr>
<td>\[[pullingPromise]]
<td>A promise returned by the <a>underlying source</a>'s <code>pull</code> method, stored so that the stream can
Expand Down Expand Up @@ -346,7 +351,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<li> Set <b>this</b>@\[[readyPromise]] and <b>this</b>@\[[closedPromise]] to new promises.
<li> Set <b>this</b>@\[[queue]] to a new empty List.
<li> Set <b>this</b>@\[[state]] to <code>"waiting"</code>.
<li> Set <b>this</b>@\[[started]] and <b>this</b>@\[[draining]] to <b>false</b>.
<li> Set <b>this</b>@\[[started]], <b>this</b>@\[[draining]], and <b>this</b>@\[[pulLScheduled]] to <b>false</b>.
<li> Set <b>this</b>@\[[readableStreamReader]] to <b>undefined</b>.
<li> Set <b>this</b>@\[[enqueue]] to CreateReadableStreamEnqueueFunction(<b>this</b>).
<li> Set <b>this</b>@\[[close]] to CreateReadableStreamCloseFunction(<b>this</b>).
Expand Down Expand Up @@ -784,12 +789,16 @@ The <code>length</code> property of the <code>cancel</code> method is <b>1</b>.

<ol>
<li> If <var>stream</var>@\[[draining]] is <b>true</b> or <var>stream</var>@\[[started]] is <b>false</b> or
<var>stream</var>@\[[state]] is <code>"closed"</code> or <var>stream</var>@\[[state]] is <code>"errored"</code>,
return <b>undefined</b>.
<var>stream</var>@\[[state]] is <code>"closed"</code> or <var>stream</var>@\[[state]] is <code>"errored"</code> or
<var>stream</var>@\[[pullScheduled]] is <b>true</b>, return <b>undefined</b>.
<li> If <var>stream</var>@\[[pullingPromise]] is not <b>undefined</b>,
<ol>
<li> Upon fulfillment of <var>stream</var>@\[[pullingPromise]], call-with-rethrow
CallReadableStreamPull(<var>stream</var>).
<li> Set <var>stream</var>@\[[pullScheduled]] to <b>true</b>.
<li> Upon fulfillment of <var>stream</var>@\[[pullingPromise]],
<ol>
<li> Set <var>stream</var>@\[[pulLScheduled]] to <b>false</b>.
<li> Call-with-rethrow CallReadableStreamPull(<var>stream</var>).
</ol>
<li> Return <b>undefined</b>.
</ol>
<li> Let <var>shouldApplyBackpressure</var> be ShouldReadableStreamApplyBackpressure(<var>stream</var>).
Expand Down
9 changes: 7 additions & 2 deletions reference-implementation/lib/readable-stream-abstract-ops.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ export function AcquireExclusiveStreamReader(stream) {

export function CallReadableStreamPull(stream) {
if (stream._draining === true || stream._started === false ||
stream._state === 'closed' || stream._state === 'errored') {
stream._state === 'closed' || stream._state === 'errored' ||
stream._pullScheduled === true) {
return undefined;
}

if (stream._pullingPromise !== undefined) {
stream._pullingPromise.then(() => CallReadableStreamPull(stream));
stream._pullScheduled = true;
stream._pullingPromise.then(() => {
stream._pullScheduled = false;
CallReadableStreamPull(stream);
});
return undefined;
}

Expand Down
1 change: 1 addition & 0 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export default class ReadableStream {
this._state = 'waiting';
this._started = false;
this._draining = false;
this._pullScheduled = false;
this._pullingPromise = undefined;
this._readableStreamReader = undefined;

Expand Down
38 changes: 38 additions & 0 deletions reference-implementation/test/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,44 @@ test('ReadableStream does not call pull until previous pull\'s promise fulfills'
});
});

test('ReadableStream does not call pull multiple times after previous pull finishes', t => {
var timesCalled = 0;

var rs = new ReadableStream({
start(enqueue) {
enqueue('a');
enqueue('b');
enqueue('c');
},
pull() {
++timesCalled;
},
strategy: {
size() {
return 1;
},
shouldApplyBackpressure() {
return false;
}
}
});

t.equal(rs.state, 'readable', 'since start() synchronously enqueued chunks, the stream is readable');

// Wait for start to finish
rs.ready.then(() => {
t.equal(rs.read(), 'a', 'first chunk should be as expected');
t.equal(rs.read(), 'b', 'second chunk should be as expected');
t.equal(rs.read(), 'c', 'second chunk should be as expected');

setTimeout(() => {
// Once for after start, and once for after rs.read() === 'a'.
t.equal(timesCalled, 2, 'pull() should only be called twice');
t.end();
}, 50);
});
});

test('ReadableStream pull rejection makes stream errored', t => {
t.plan(2);

Expand Down

0 comments on commit 8b2b726

Please sign in to comment.