Skip to content

Commit

Permalink
Convert readable stream pull() to promise-returning
Browse files Browse the repository at this point in the history
See discussion in #185 for background.

This allows more convenient integration with promise-returning underlying source functions, removing the need for `.catch(error)` as seen in the modified example.

It also rationalizes the slightly-strange way in which pull() is mutexed based on whether enqueue() has been called: instead, it is now mutexed so as to not be called again until the previous iteration's promise has fulfilled. This gives more direct control to the underlying source, and allows them to deal more easily with cases where there are just no chunks to enqueue.

Although superficially it seems this may reduce performance for cases where underlying sources can enqueue many chunks synchronously, this is not actually the case, as can be seen from the modified test illustrating such a scenario. If an underlying source can enqueue multiple chunks synchronously, then it should just do so! It shouldn't wait to be pull()ed, as the previous test was doing.
  • Loading branch information
domenic committed Feb 2, 2015
1 parent 64df8a2 commit 5803279
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 63 deletions.
50 changes: 27 additions & 23 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,9 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<code>"errored"</code> state
</tr>
<tr>
<td>\[[pulling]]
<td>A boolean flag indicating whether data is currently being pulled from the underlying sink
<td>\[[pullingPromise]]
<td>A promise returned by the <a>underlying source</a>'s <code>pull</code> method, stored so that the stream can
re-pull when it fulfills
</tr>
<tr>
<td>\[[queue]]
Expand Down Expand Up @@ -320,9 +321,10 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<li> <code>start(enqueue, close, error)</code> is called immediately, and is typically used to adapt a <a>push
source</a> by setting up relevant event listeners, or to acquire access to a <a>pull source</a>. If this process
is asynchronous, it can return a promise to signal success or failure.
<li> <code>pull(enqueue, close, error)</code> is called when the stream's internal queue of chunks is depleted, and
the consumer has signaled that they wish to consume more data. Once it is called, it will not be called again
until the passed <code>enqueue</code> callback is called.
<li> <code>pull(enqueue, close)</code> is called when the stream's internal queue of chunks is depleted, and the
consumer has signaled that they wish to consume more data. If <code>pull</code> returns a promise, then it will
not be called again until the promise returned by the previous call has fulfilled; if the promise rejects, the
stream will become errored.
<li> <code>cancel(reason)</code> is called when the consumer signals that they are no longer interested in the
stream. It should perform any actions necessary to release access to the <a>underlying source</a>. If this
process is asynchronous, it can return a promise to signal success or failure.
Expand All @@ -344,7 +346,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<li> Set <b>this</b>@\[[readyPromise]] and <b>this</b>@\[[closedPromise]] to new promises.
<li> Set <b>this</b>@\[[queue]] to a new empty List.
<li> Set <b>this</b>@\[[state]] to <code>"waiting"</code>.
<li> Set <b>this</b>@\[[started]], <b>this</b>@\[[draining]], and <b>this</b>@\[[pulling]] to <b>false</b>.
<li> Set <b>this</b>@\[[started]] and <b>this</b>@\[[draining]] to <b>false</b>.
<li> Set <b>this</b>@\[[readableStreamReader]] to <b>undefined</b>.
<li> Set <b>this</b>@\[[enqueue]] to CreateReadableStreamEnqueueFunction(<b>this</b>).
<li> Set <b>this</b>@\[[close]] to CreateReadableStreamCloseFunction(<b>this</b>).
Expand Down Expand Up @@ -781,20 +783,24 @@ The <code>length</code> property of the <code>cancel</code> method is <b>1</b>.
<h4 id="call-readable-stream-pull">CallReadableStreamPull ( stream )</h4>

<ol>
<li> If <var>stream</var>@\[[pulling]] is <b>true</b> or <var>stream</var>@\[[draining]] is <b>true</b> or
<var>stream</var>@\[[started]] is <b>false</b> or <var>stream</var>@\[[state]] is <code>"closed"</code> or
<var>stream</var>@\[[state]] is <code>"errored"</code>, return <b>undefined</b>.
<li> Let <var>shouldApplyBackpressure</var> be ShouldReadableStreamApplyBackpressure(<var>stream</var>).
<li> If <var>shouldApplyBackpressure</var> is <b>true</b>, return <b>undefined</b>.
<li> Set <var>stream</var>@\[[pulling]] to <b>true</b>.
<li> Let <var>pullResult</var> be InvokeOrNoop(<var>stream</var>@\[[underlyingSource]], <code>"pull"</code>,
(<var>stream</var>@\[[enqueue]], <var>stream</var>@\[[close]], <var>stream</var>@\[[error]]).
<li> If <var>pullResult</var> is an abrupt completion,
<li> If <var>stream</var>@\[[draining]] is <b>true</b> or <var>stream</var>@\[[started]] is <b>false</b> or
<var>stream</var>@\[[state]] is <code>"closed"</code> or <var>stream</var>@\[[state]] is <code>"errored"</code>,
return <b>undefined</b>.
<li> If <var>stream</var>@\[[pullingPromise]] is not <b>undefined</b>,
<ol>
<li> Call-with-rethrow Call(<var>stream</var>@\[[error]], <b>undefined</b>, «‍<var>pullResult</var>.\[[value]]»).
<li> Return <var>pullResult</var>.
<li> Upon fulfillment of <var>stream</var>@\[[pullingPromise]], call-with-rethrow
CallReadableStreamPull(<var>stream</var>).
<li> Return <b>undefined</b>.
</ol>
<li> Otherwise, return <b>undefined</b>.
<li> Let <var>shouldApplyBackpressure</var> be ShouldReadableStreamApplyBackpressure(<var>stream</var>).
<li> If <var>shouldApplyBackpressure</var> is <b>true</b>, return <b>undefined</b>.
<li> Set <var>stream</var>@\[[pullingPromise]] to PromiseInvokeOrNoop(<var>stream</var>@\[[underlyingSource]],
<code>"pull"</code>, (<var>stream</var>@\[[enqueue]], <var>stream</var>@\[[close]])).
<li> Upon fulfillment of <var>stream</var>@\[[pullingPromise]], set <var>stream</var>@\[[pullingPromise]] to
<b>undefined</b>.
<li> Upon rejection of <var>stream</var>@\[[pullingPromise]] with reason <var>e</var>, call-with-rethrow
<var>stream</var>@\[[error]](<var>e</var>).
<li> Return <b>undefined</b>.
</ol>

<h4 id="close-readable-stream">CloseReadableStream ( stream )</h4>
Expand Down Expand Up @@ -862,7 +868,6 @@ closing over a variable <var>stream</var>, that performs the following steps:
</ol>
<li>Call-with-rethrow EnqueueValueWithSize(<var>stream</var>@\[[queue]], <var>chunk</var>,
<var>chunkSize</var>.\[[value]]).
<li> Set <var>stream</var>@\[[pulling]] to <b>false</b>.
<li> Let <var>shouldApplyBackpressure</var> be ShouldReadableStreamApplyBackpressure(<var>stream</var>).
<li> If <var>stream</var>@\[[state]] is <code>"waiting"</code>,
<ol>
Expand Down Expand Up @@ -1947,18 +1952,17 @@ sources</a>. Note how in contrast to the examples with push sources, most of the
});
},

pull(enqueue, close, error) {
pull(enqueue, close) {
const buffer = new ArrayBuffer(CHUNK_SIZE);

fs.read(fd, buffer, 0, CHUNK_SIZE, position).then(bytesRead => {
return fs.read(fd, buffer, 0, CHUNK_SIZE, position).then(bytesRead => {
if (bytesRead === 0) {
return fs.close(fd).then(close);
} else {
position += bytesRead;
enqueue(buffer);
}
})
.catch(error);
});
},

cancel() {
Expand Down
23 changes: 12 additions & 11 deletions reference-implementation/lib/readable-stream-abstract-ops.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
var assert = require('assert');
import ExclusiveStreamReader from './exclusive-stream-reader';
import { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } from './queue-with-sizes';
import { InvokeOrNoop, typeIsObject } from './helpers';
import { PromiseInvokeOrNoop, typeIsObject } from './helpers';

export function AcquireExclusiveStreamReader(stream) {
if (stream._state === 'closed') {
Expand All @@ -15,24 +15,26 @@ export function AcquireExclusiveStreamReader(stream) {
}

export function CallReadableStreamPull(stream) {
if (stream._pulling === true || stream._draining === true || stream._started === false ||
if (stream._draining === true || stream._started === false ||
stream._state === 'closed' || stream._state === 'errored') {
return undefined;
}

if (stream._pullingPromise !== undefined) {
stream._pullingPromise.then(() => CallReadableStreamPull(stream));
return undefined;
}

var shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream);
if (shouldApplyBackpressure === true) {
return undefined;
}

stream._pulling = true;

try {
InvokeOrNoop(stream._underlyingSource, 'pull', [stream._enqueue, stream._close, stream._error]);
} catch (pullResultE) {
stream._error(pullResultE);
throw pullResultE;
}
stream._pullingPromise = PromiseInvokeOrNoop(stream._underlyingSource, 'pull', [stream._enqueue, stream._close]);
stream._pullingPromise.then(
() => { stream._pullingPromise = undefined; },
e => { stream._error(e); }
);

return undefined;
}
Expand Down Expand Up @@ -94,7 +96,6 @@ export function CreateReadableStreamEnqueueFunction(stream) {
}

EnqueueValueWithSize(stream._queue, chunk, chunkSize);
stream._pulling = false;

var shouldApplyBackpressure = ShouldReadableStreamApplyBackpressure(stream);

Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export default class ReadableStream {
this._state = 'waiting';
this._started = false;
this._draining = false;
this._pulling = false;
this._pullingPromise = undefined;
this._readableStreamReader = undefined;

this._enqueue = CreateReadableStreamEnqueueFunction(this);
Expand Down
6 changes: 2 additions & 4 deletions reference-implementation/test/bad-underlying-sources.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ test('Throwing underlying source pull getter (second pull)', t => {

rs.ready.then(() => {
t.equal(rs.state, 'readable', 'sanity check: the stream becomes readable without issue');

t.throws(() => rs.read(), /a unique string/, 'reading triggers a pull, and the error is re-thrown');
t.equal(rs.read(), 'a', 'the initially-enqueued chunk can be read from the stream');
});

rs.closed.then(
Expand All @@ -106,8 +105,7 @@ test('Throwing underlying source pull method (second pull)', t => {

rs.ready.then(() => {
t.equal(rs.state, 'readable', 'sanity check: the stream becomes readable without issue');

t.throws(() => rs.read(), /a unique string/, 'reading triggers a pull, and the error is re-thrown');
t.equal(rs.read(), 'a', 'the initially-enqueued chunk can be read from the stream');
});

rs.closed.then(
Expand Down
8 changes: 4 additions & 4 deletions reference-implementation/test/pipe-to.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ test('Piping from a ReadableStream in readable state which becomes closed after
},
close() {
t.assert(writeCalled);
t.equal(pullCount, 1);
t.equal(pullCount, 2);

t.end();
},
Expand Down Expand Up @@ -296,7 +296,7 @@ test('Piping from a ReadableStream in readable state which becomes errored after
abort(reason) {
t.equal(reason, passedError);
t.assert(writeCalled);
t.equal(pullCount, 1);
t.equal(pullCount, 2);

t.end();
}
Expand Down Expand Up @@ -332,7 +332,7 @@ test('Piping from a ReadableStream in waiting state which becomes readable after
var ws = new WritableStream({
write(chunk) {
t.equal(chunk, 'Hello');
t.equal(pullCount, 1);
t.equal(pullCount, 2);
t.end();
},
close() {
Expand Down Expand Up @@ -473,7 +473,7 @@ test('Piping from a ReadableStream in readable state to a WritableStream in wait
} else {
t.equal(chunk, 'World');

t.equal(pullCount, 1);
t.equal(pullCount, 2);

t.end();
}
Expand Down
63 changes: 57 additions & 6 deletions reference-implementation/test/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,11 @@ test('ReadableStream adapting a push source', t => {
randomSource.onerror = error;
},

pull(enqueue, close, error) {
pull(enqueue, close) {
if (!pullChecked) {
pullChecked = true;
t.equal(typeof enqueue, 'function', 'enqueue is a function in pull');
t.equal(typeof close, 'function', 'close is a function in pull');
t.equal(typeof error, 'function', 'error is a function in pull');
}

randomSource.readStart();
Expand Down Expand Up @@ -267,15 +266,15 @@ test('ReadableStream adapting an async pull source', t => {
});
});

test('ReadableStream is able to pull data repeatedly if it\'s available synchronously', t => {
test('ReadableStream is able to enqueue lots of data in a single pull, making it available synchronously', t => {
var i = 0;
var rs = new ReadableStream({
pull(enqueue, close) {
if (++i <= 10) {
while (++i <= 10) {
enqueue(i);
} else {
close();
}

close();
}
});

Expand All @@ -290,6 +289,58 @@ test('ReadableStream is able to pull data repeatedly if it\'s available synchron
});
});

test('ReadableStream does not call pull until previous pull\'s promise fulfills', t => {
var resolve;
var returnedPromise;
var timesCalled = 0;
var rs = new ReadableStream({
pull(enqueue) {
++timesCalled;
enqueue(timesCalled);
returnedPromise = new Promise(r => { resolve = r; });
return returnedPromise;
}
});

t.equal(rs.state, 'waiting', 'stream starts out waiting');

rs.ready.then(() => {
t.equal(rs.state, 'readable', 'stream becomes readable (even before promise fulfills)');
t.equal(timesCalled, 1, 'pull is not yet called a second time');
t.equal(rs.read(), 1, 'read() returns enqueued value');

setTimeout(() => {
t.equal(timesCalled, 1, 'after 30 ms, pull has still only been called once');

resolve();

returnedPromise.then(() => {
t.equal(timesCalled, 2, 'after the promise is fulfilled, pull is called a second time');
t.equal(rs.read(), 2, 'read() returns the second enqueued value');
t.end();
});
}, 30);
});
});

test('ReadableStream pull rejection makes stream errored', t => {
t.plan(2);

var theError = new Error('pull failure');
var rs = new ReadableStream({
pull() {
return Promise.reject(theError);
}
});

t.equal(rs.state, 'waiting', 'stream starts out waiting');

rs.closed.then(
() => t.fail('.closed should not fulfill'),
e => t.equal(e, theError, '.closed should reject with the error')
);
});

test('ReadableStream ready does not error when no more data is available', t => {
// https://github.com/whatwg/streams/issues/80

Expand Down
32 changes: 18 additions & 14 deletions reference-implementation/test/utils/sequential-rs.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,24 @@ export default function sequentialReadableStream(limit, options) {
});
},

pull(enqueue, finish, error) {
sequentialSource.read((err, done, chunk) => {
if (err) {
error(err);
} else if (done) {
sequentialSource.close(err => {
if (err) {
error(err);
}
finish();
});
} else {
enqueue(chunk);
}
pull(enqueue, close) {
return new Promise((resolve, reject) => {
sequentialSource.read((err, done, chunk) => {
if (err) {
reject(err);
} else if (done) {
sequentialSource.close(err => {
if (err) {
reject(err);
}
close();
resolve();
});
} else {
enqueue(chunk);
resolve();
}
});
});
}
});
Expand Down

0 comments on commit 5803279

Please sign in to comment.