Skip to content

Commit

Permalink
First class source object.
Browse files Browse the repository at this point in the history
Porting #265 to ReadableByteStream
  • Loading branch information
tyoshino committed Feb 10, 2015
1 parent 7a1856d commit 8046aff
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import * as helpers from '../helpers';

export function ErrorReadableByteStream(stream, error) {
if (stream._state === 'errored' || stream._state === 'closed') {
return;
Expand Down Expand Up @@ -30,12 +32,17 @@ export function IsReadableByteStreamLocked(stream) {
}

export function ReadFromReadableByteStream(stream) {
if (stream._readBufferSize === undefined) {
throw new TypeError('readBufferSize is not configured');
var readBufferSizeGetter = stream._underlyingByteSource['readBufferSize'];
if (readBufferSizeGetter === undefined) {
throw new TypeError('readBufferSize getter is not defined on the underlying byte source');
}
var readBufferSize = helpers.toInteger(readBufferSizeGetter.call(stream._underlyingByteSource));
if (readBufferSize < 0) {
throw new RangeError('readBufferSize must be non-negative');
}

var arrayBuffer = new ArrayBuffer(stream._readBufferSize);
var bytesRead = stream.readInto(arrayBuffer, 0, stream._readBufferSize);
var arrayBuffer = new ArrayBuffer(readBufferSize);
var bytesRead = stream.readInto(arrayBuffer, 0, readBufferSize);
// This code should be updated to use ArrayBuffer.prototype.transfer when
// it's ready.
var resizedArrayBuffer = arrayBuffer.slice(0, bytesRead);
Expand Down
42 changes: 10 additions & 32 deletions reference-implementation/lib/experimental/readable-byte-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,12 @@ function notifyReady(stream) {
}

export default class ReadableByteStream {
constructor({
start = () => {},
readInto = () => {},
cancel = () => {},
readBufferSize = undefined,
} = {}) {
if (typeof start !== 'function') {
throw new TypeError('start must be a function or undefined');
}
if (typeof readInto !== 'function') {
throw new TypeError('readInto must be a function or undefined');
}
if (typeof cancel !== 'function') {
throw new TypeError('cancel must be a function or undefined');
}
if (readBufferSize !== undefined) {
readBufferSize = helpers.toInteger(readBufferSize);
if (readBufferSize < 0) {
throw new RangeError('readBufferSize must be non-negative');
}
}
constructor(underlyingByteSource = {}) {
this._underlyingByteSource = underlyingByteSource;

this._readableByteStreamReader = undefined;
this._state = 'waiting';

this._onReadInto = readInto;
this._onCancel = cancel;

this._readBufferSize = readBufferSize;

this._readyPromise = new Promise((resolve, reject) => {
this._readyPromise_resolve = resolve;
this._readyPromise_reject = reject;
Expand All @@ -55,10 +31,8 @@ export default class ReadableByteStream {
this._closedPromise_reject = reject;
});

start(
notifyReady.bind(null, this),
ErrorReadableByteStream.bind(null, this)
);
helpers.InvokeOrNoop(underlyingByteSource, 'start',
[notifyReady.bind(null, this), ErrorReadableByteStream.bind(null, this)])
}

get state() {
Expand Down Expand Up @@ -107,7 +81,11 @@ export default class ReadableByteStream {

var bytesRead;
try {
bytesRead = this._onReadInto.call(undefined, arrayBuffer, offset, size);
var readInto = this._underlyingByteSource['readInto'];
if (readInto === undefined) {
throw new TypeError('readInto is not defiend on the underlying byte source');
}
bytesRead = readInto.call(this._underlyingByteSource, arrayBuffer, offset, size);
} catch (error) {
ErrorReadableByteStream(this, error);
throw error;
Expand Down Expand Up @@ -174,7 +152,7 @@ export default class ReadableByteStream {
this._resolveClosedPromise(undefined);

return new Promise((resolve, reject) => {
var sourceCancelPromise = helpers.promiseCall(this._onCancel, reason);
var sourceCancelPromise = helpers.PromiseInvokeOrNoop(this._underlyingByteSource, 'cancel', [reason]);
sourceCancelPromise.then(
() => {
resolve(undefined);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ test('ReadableByteStream can be constructed with no arguments', t => {
t.end();
});

test('ReadableByteStream cannot be constructed if readBufferSize is a negative integer', t => {
t.throws(() => new ReadableByteStream({readBufferSize: -1}), /RangeError/);
t.end();
});

test('ReadableByteStream: Call notifyReady() asynchronously to enter readable state', t => {
var notifyReady;
var rbs = new ReadableByteStream({
Expand Down Expand Up @@ -62,7 +57,6 @@ test('ReadableByteStream: read() must throw if constructed with passing undefine
t.fail('Unexpected cancel call');
t.end();
},
readBufferSize: undefined
});

t.throws(() => rbs.read(), /TypeError/);
Expand Down Expand Up @@ -389,7 +383,9 @@ test('ReadableByteStream: read() delegates to readInto()', t => {
t.fail('Unexpected cancel call');
t.end();
},
readBufferSize: 10
readBufferSize() {
return 10;
}
});

var readIntoArrayBuffer, readIntoOffset, readIntoSize;
Expand Down Expand Up @@ -432,7 +428,9 @@ test('ReadableByteStream: ArrayBuffer allocated by read() is partially used', t
t.fail('Unexpected cancel call');
t.end();
},
readBufferSize: 10
readBufferSize() {
return 10;
}
});

t.equal(rbs.state, 'readable');
Expand Down Expand Up @@ -599,7 +597,9 @@ test('ReadableByteStream: Transfer 1kiB using pipeTo()', t => {
t.fail('Source\'s cancel() is called');
t.end();
},
readBufferSize: 10
readBufferSize() {
return 10;
}
});

var verifyCount = 0;
Expand Down

0 comments on commit 8046aff

Please sign in to comment.