diff --git a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js index e47685541931c..b7b9163799602 100644 --- a/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js +++ b/testing/web-platform/tests/dom/observable/tentative/observable-from.any.js @@ -541,3 +541,578 @@ test(() => { + +test(() => { + const sync_iterable = { + [Symbol.asyncIterator]: null, + [Symbol.iterator]() { + return { + value: 0, + next() { + if (this.value === 2) + return {value: undefined, done: true}; + else + return {value: this.value++, done: false}; + } + } + }, + }; + + const results = []; + const source = Observable.from(sync_iterable).subscribe(v => results.push(v)); + assert_array_equals(results, [0, 1]); +}, "from(): Async iterable protocol null, converts as iterator"); + +promise_test(async t => { + const results = []; + const async_iterable = { + [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator]() invoked"); + return { + val: 0, + next() { + return new Promise(resolve => { + t.step_timeout(() => { + resolve({ + value: this.val, + done: this.val++ === 4 ? true : false, + }); + }, 400); + }); + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, []); + + await new Promise(resolve => { + source.subscribe({ + next: v => { + results.push(`Observing ${v}`); + queueMicrotask(() => results.push(`next() microtask interleaving (v=${v})`)); + }, + complete: () => { + results.push('complete()'); + resolve(); + }, + }); + }); + + assert_array_equals(results, [ + "[Symbol.asyncIterator]() invoked", + "Observing 0", + "next() microtask interleaving (v=0)", + "Observing 1", + "next() microtask interleaving (v=1)", + "Observing 2", + "next() microtask interleaving (v=2)", + "Observing 3", + "next() microtask interleaving (v=3)", + "complete()", + ]); +}, "from(): Asynchronous iterable conversion"); + + + + + + + + + + + + + + +promise_test(async t => { + const async_iterable = { + slow: true, + [Symbol.asyncIterator]() { + + + + + const shouldBeSlow = this.slow; + this.slow = false; + + return { + val: 0, + next() { + + + return new Promise(resolve => { + t.step_timeout(() => resolve({ + value: `${this.val}-${shouldBeSlow ? 'slow' : 'fast'}`, + done: this.val++ === 4 ? true : false, + }), shouldBeSlow ? 200 : 0); + }); + }, + }; + }, + }; + + const results = []; + const source = Observable.from(async_iterable); + + const subscribeFunction = function(resolve, reject) { + source.subscribe({ + next: v => results.push(v), + complete: () => resolve(), + }); + + + t.step_timeout(() => reject('TIMEOUT'), 3000); + } + + const slow_promise = new Promise(subscribeFunction); + const fast_promise = new Promise(subscribeFunction); + await Promise.all([slow_promise, fast_promise]); + assert_array_equals(results, [ + '0-fast', + '1-fast', + '2-fast', + '3-fast', + '0-slow', + '1-slow', + '2-slow', + '3-slow', + ]); +}, "from(): Asynchronous iterable multiple in-flight subscriptions competing"); + +promise_test(async () => { + const async_generator = async function*() { + yield 1; + yield 2; + yield 3; + }; + + const results = []; + const source = Observable.from(async_generator()); + + const subscribeFunction = function(resolve) { + source.subscribe({ + next: v => results.push(v), + complete: () => resolve(), + }); + } + await new Promise(subscribeFunction); + assert_array_equals(results, [1, 2, 3]); + await new Promise(subscribeFunction); + assert_array_equals(results, [1, 2, 3]); +}, "from(): Asynchronous generator conversion: can only be used once"); + + + + + + + + + + + + + + +promise_test(async () => { + const results = []; + + const async_iterable = { + [Symbol.asyncIterator]() { + return { + next() { + return { + value: undefined, + get done() { + results.push('done() GETTER called'); + return true; + }, + }; + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, []); + + queueMicrotask(() => results.push('Microtask queued before subscription')); + source.subscribe(); + assert_array_equals(results, []); + + await Promise.resolve(); + assert_array_equals(results, [ + "Microtask queued before subscription", + "done() GETTER called", + ]); +}, "from(): Promise-wrapping semantics of IteratorResult interface"); + + + + + + +test(() => { + const error = new Error("[Symbol.asyncIterator] error"); + const results = []; + const async_iterable = { + [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator]() invoked"); + throw error; + } + }; + + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, [ + "[Symbol.asyncIterator]() invoked", + error, + ]); +}, "from(): Errors thrown in Symbol.asyncIterator() are propagated synchronously"); + + + + +promise_test(async () => { + const nextError = new Error('next error'); + const async_iterable = { + [Symbol.asyncIterator]() { + return { + get next() { + throw nextError; + } + }; + } + }; + + const results = []; + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, []); + + + await Promise.resolve(); + assert_array_equals(results, [nextError]); +}, "from(): Errors thrown in async iterator's next() GETTER are propagated " + + "in a microtask"); +promise_test(async () => { + const nextError = new Error('next error'); + const async_iterable = { + [Symbol.asyncIterator]() { + return { + next() { + throw nextError; + } + }; + } + }; + + const results = []; + Observable.from(async_iterable).subscribe({ + error: e => results.push(e), + }); + + assert_array_equals(results, []); + await Promise.resolve(); + assert_array_equals(results, [nextError]); +}, "from(): Errors thrown in async iterator's next() are propagated in a microtask"); + +test(() => { + const results = []; + const iterable = { + [Symbol.iterator]() { + return { + val: 0, + next() { + results.push(`IteratorRecord#next() pushing ${this.val}`); + return { + value: this.val, + done: this.val++ === 10 ? true : false, + }; + }, + return() { + results.push(`IteratorRecord#return() called with this.val=${this.val}`); + }, + }; + }, + }; + + const ac = new AbortController(); + Observable.from(iterable).subscribe(v => { + results.push(`Observing ${v}`); + if (v === 3) { + ac.abort(); + } + }, {signal: ac.signal}); + + assert_array_equals(results, [ + "IteratorRecord#next() pushing 0", + "Observing 0", + "IteratorRecord#next() pushing 1", + "Observing 1", + "IteratorRecord#next() pushing 2", + "Observing 2", + "IteratorRecord#next() pushing 3", + "Observing 3", + "IteratorRecord#return() called with this.val=4", + ]); +}, "from(): Aborting sync iterable midway through iteration both stops iteration " + + "and invokes `IteratorRecord#return()"); + + + + + + + + + + + + + + +promise_test(async () => { + const results = []; + const async_iterable = { + asyncIteratorGotten: false, + get [Symbol.asyncIterator]() { + results.push("[Symbol.asyncIterator] GETTER"); + if (this.asyncIteratorGotten) { + return null; + } + + this.asyncIteratorGotten = true; + + + return function() {}; + }, + + [Symbol.iterator]() { + results.push('[Symbol.iterator]() invoked as fallback'); + return { + val: 0, + next() { + return { + value: this.val, + done: this.val++ === 4 ? true : false, + }; + }, + }; + }, + }; + + const source = Observable.from(async_iterable); + assert_array_equals(results, [ + "[Symbol.asyncIterator] GETTER", + ]); + + await new Promise((resolve, reject) => { + source.subscribe({ + next: v => { + results.push(`Observing ${v}`); + queueMicrotask(() => results.push(`next() microtask interleaving (v=${v})`)); + }, + error: e => reject(e), + complete: () => { + results.push('complete()'); + resolve(); + }, + }); + }); + + assert_array_equals(results, [ + + "[Symbol.asyncIterator] GETTER", + + "[Symbol.asyncIterator] GETTER", + "[Symbol.iterator]() invoked as fallback", + "Observing 0", + "next() microtask interleaving (v=0)", + "Observing 1", + "next() microtask interleaving (v=1)", + "Observing 2", + "next() microtask interleaving (v=2)", + "Observing 3", + "next() microtask interleaving (v=3)", + "complete()", + ]); +}, "from(): Asynchronous iterable conversion, with synchronous iterable fallback"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + const abortController = new AbortController(); + + observable.subscribe(n => { + results.push(n); + if (n === 3) { + abortController.abort(); + } + }, {signal: abortController.signal}); + + assert_array_equals(results, [0, 1, 2, 3]); + assert_true(generatorFinalized); +}, "from(): Generator finally block runs when subscription is aborted"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } catch { + assert_unreached("generator should not be aborted"); + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + + observable.subscribe((n) => { + results.push(n); + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_true(generatorFinalized); +}, "from(): Generator finally block run when Observable completes"); + +test(() => { + const results = []; + let generatorFinalized = false; + + const generator = function*() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + throw new Error('from the generator'); + } finally { + generatorFinalized = true; + } + }; + + const observable = Observable.from(generator()); + + observable.subscribe({ + next: n => results.push(n), + error: e => results.push(e.message) + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, "from the generator"]); + assert_true(generatorFinalized); +}, "from(): Generator finally block run when Observable errors"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + const abortController = new AbortController(); + + await new Promise((resolve) => { + observable.subscribe((n) => { + results.push(n); + if (n === 3) { + abortController.abort(); + resolve(); + } + }, {signal: abortController.signal}); + }); + + assert_array_equals(results, [0, 1, 2, 3]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block run when subscription is aborted"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + + await new Promise(resolve => { + observable.subscribe({ + next: n => results.push(n), + complete: () => resolve(), + }); + }); + + assert_array_equals(results, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block runs when Observable completes"); + +promise_test(async t => { + const results = []; + let generatorFinalized = false; + + async function* asyncGenerator() { + try { + for (let n = 0; n < 10; n++) { + if (n === 4) { + throw new Error('from the async generator'); + } + yield n; + } + } finally { + generatorFinalized = true; + } + } + + const observable = Observable.from(asyncGenerator()); + + await new Promise((resolve) => { + observable.subscribe({ + next: (n) => results.push(n), + error: (e) => { + results.push(e.message); + resolve(); + } + }); + }); + + assert_array_equals(results, [0, 1, 2, 3, "from the async generator"]); + assert_true(generatorFinalized); +}, "from(): Async generator finally block run when Observable errors");