Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spec from() static converter #160

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 182 additions & 3 deletions spec.bs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ WPT Display: open
urlPrefix: https://tc39.es/ecma262/#; spec: ECMASCRIPT
type: dfn
text: current realm
text: Object; url: sec-object-type
text: normal completion; url: sec-normalcompletion
text: throw completion; url: sec-throwcompletion
url: sec-returnifabrupt-shorthands
text: ?
type: abstract-op
text: Type; url: sec-ecmascript-data-types-and-values
urlPrefix: https://dom.spec.whatwg.org; spec: DOM
type: dfn
for: event listener
Expand All @@ -38,6 +45,11 @@ urlPrefix: https://dom.spec.whatwg.org; spec: DOM
text: dependent signals; url: abortsignal-dependent-signals
text: signal abort; url:abortsignal-signal-abort
text: abort reason; url:abortsignal-abort-reason
urlPrefix: https://webidl.spec.whatwg.org; spec: WEBIDL
type: dfn
text: a promise rejected with
type: dfn
text: react
</pre>

<style>
Expand Down Expand Up @@ -371,7 +383,7 @@ interface Observable {
//
// takeUntil() can consume promises, iterables, async iterables, and other
// observables.
Observable takeUntil(any notifier);
Observable takeUntil(any value);
Observable map(Mapper mapper);
Observable filter(Predicate predicate);
Observable take(unsigned long long amount);
Expand Down Expand Up @@ -461,6 +473,165 @@ An <dfn>internal observer</dfn> is a [=struct=] with the following [=struct/item
[[#promise-returning-operators]] that make use of this, for example.</p>
</div>

<div algorithm>
To <dfn for=Observable>convert to an Observable</dfn> an {{any}} |value|, run these steps:

Note: We split this algorithm out from the Web IDL {{Observable/from()}} method, so that
spec prose can <a for=Observable lt="convert to an observable">convert</a> values to without
going through the Web IDL bindings.

1. If [$Type$](|value|) is not [=Object=], [=exception/throw=] a {{TypeError}}.

Note: This prevents primitive types from being coerced into iterables (e.g., String).

Issue: See if this is even the behavior we want. See <a
href=https://github.com/WICG/observable/issues/125>WICG/observable#125</a>

1. <i id=from-observable-conversion><b>From Observable</b></i>: If |value|'s [=specific type=]
is an {{Observable}}, then return |value|.

1. <i id=from-async-iterable-conversion><b>From async iterator</b></i>: Let
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important in here to make sure that we add a check and call to the async iterator's return() method, if it has one. Because if it's a generator, I'd expect gen.return() to be called so it hits the finally block:

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

async function* infiniteGenerator() {
  let n = 0;
  try {
    while (true) {
      await sleep(100);
      yield n++;
    }
  } finally {
    console.log('this must be called!');
  }
}

const ac = new AbortController();

Observable.from(infiniteGenerator())
  .subscribe(console.log, { signal: ac.signal });
  
setTimeout(() => {
  ac.abort(); // should cause log of "this must be called!"
}, 333);

|asyncIteratorMethodRecord| be [=?=] [$GetMethod$](|value|, {{%Symbol.asyncIterator%}}).

Note: We have to use [$GetMethod$] directly instead of [$GetIterator$], because
[$GetIterator$] makes it impossible to distinguish between (a) the case where |value| simply
doesn't implement any iterable protocol, and (b) the case where |value| implements one of the
iterator protocols, and its method getter throws an exception.

Note: This step is web-observable, and re-throws any errors that the {{%Symbol.iterator%}}
method *getter* might have thrown.

1. If |asyncIteratorMethodRecord|'s \[[Value]] is undefined or null, then jump to the step
labeled <a href=#from-iterable-conversion>From iterable</a>.

1. If [$IsCallable$](|asyncIteratorMethodRecord|'s \[[Value]]) is false, then
[=exception/throw=] a {{TypeError}}.

1. Let |nextAlgorithm| be the following steps, given a {{Subscriber}} |subscriber| and an
Iterator Record |iteratorRecord|:

1. Let |nextPromise| be a {{Promise}}-or-undefined, initially undefined.

1. Let |nextRecord| be [$IteratorNext$](|iteratorRecord|), and process it as follows:

1. If |nextRecord| is a [=throw completion=], then:

1. [=Assert=]: |iteratorRecord|'s \[[Done]] is true.

1. Set |nextPromise| to [=a promise rejected with=] |nextRecord|'s \[[Value]].

1. Otherwise, if |nextRecord| is [=normal completion=], then set |nextPromise| to [=a
promise resolved with=] |nextRecord|'s \[[Value]].

Note: This is done in case |nextRecord|'s \[[Value]] is not *itself* already a
{{Promise}}.

1. [=React=] to |nextPromise|:

1. If |nextPromise| was fulfilled with value |iteratorResult|, then:

1. If [$Type$](|iteratorResult|) is not Object, then run |subscriber|'s
{{Subscriber/error()}} method with a {{TypeError}} and abort these steps.

1. Let |done| be [$IteratorComplete$](|iteratorResult|).

1. If |done| is a [=throw completion=], then run |subscriber|'s
{{Subscriber/error()}} method with |done|'s \[[Value]] and abort these steps.

Otherwise, if |done|'s \[[Value]] is true, then run |subscriber|'s
{{Subscriber/complete()}} and abort these steps.

1. Let |value| be [$IteratorValue$](|iteratorResult|).

1. If |value| is a [=throw completion=], then run |subscriber|'s
{{Subscriber/error()}} method with |value|'s \[[Value]] and abort these steps.

1. Run |subscriber|'s {{Subscriber/next()}} method, given |value|'s \[[Value]].

1. Run |nextAlgorithm|, given |subscriber| and |iteratorRecord|.

1. If |nextPromise| was rejected with reason |r|, then run |subscriber|'s
{{Subscriber/error()}} method, given |r|.

1. Return a [=new=] {{Observable}} whose [=Observable/subscribe callback=] is an algorithm that
takes a {{Subscriber}} |subscriber| and does the following:

1. Let |iteratorRecord| be [$GetIterator$](|value|, async).

Note: This re-invokes any method getters for the {{%Symbol.iterator%}} method on |value|.
Whether or not this is desirable is an extreme corner case, but this behavior currently
matches what is expected by tests. See <a
href=https://github.com/WICG/observable/issues/127>issue#127</a> for discussion.

1. If |iteratorRecord| is a [=throw completion=], then run |subscriber|'s
{{Subscriber/error()}} method with |iteratorRecord|'s \[[Value]].

Note: This means we invoke the {{Subscriber/error()}} method synchronously with respect to
subscription, which is the only time this can happen for async iterables that are
converted to {{Observable}}s. In all other cases, errors are propagated to the observer
asynchronously, with microtask timing, by virtue of being wrapped in a rejected
{{Promise}} that |nextAlgorithm| [=reacts=] to. This synchronous-error-propagation
behavior is hopefully OK, since it is consistent with language constructs (i.e.,
**for-await of** loops) that invoke {{%Symbol.asyncIterator%}} and synchronously re-throw
exceptions to catch blocks outside the loop, before any [$Await$]ing takes place.

1. [=Assert=]: |iteratorRecord| is an Iterator Record.

1. Run |nextAlgorithm| given |subscriber| and |iteratorRecord|.

1. <i id=from-iterable-conversion><b>From iterable</b></i>: Let |iteratorMethodRecord| be [=?=]
[$GetMethod$](|value|, {{%Symbol.iterator%}}).

1. If |iteratorMethodRecord|'s \[[Value]] is undefined or null, then jump to the step labeled <a
href=#from-promise-conversion>From Promise</a>.

1. If [$IsCallable$](|iteratorMethodRecord|'s \[[Value]]) is false, then [=exception/throw=] a
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to what I said above about async generators. This has to accommodate generators.

function* infiniteGenerator() {
  let n = 0;
  try {
    while (true) {
      yield n++;
    }
  } finally {
    console.log('this must be logged!');
  }
}

const ac = new AbortController();

Observable.from(infiniteGenerator())
  .subscribe((n) => {
    console.log(n);
    if (n === 4) ac.abort(); // Should force the logging of "this must be logged!"
  }, { signal: ac.signal })

{{TypeError}}.

Otherwise, return a [=new=] {{Observable}} whose [=Observable/subscribe callback=] is an
algorithm that takes a {{Subscriber}} |subscriber| and does the following:

1. Let |iteratorRecord| be [$GetIterator$](|value|, sync).

1. If |iteratorRecord| is a [=throw completion=], then run |subscriber|'s
{{Subscriber/error()}} method, given |iteratorRecord|'s \[[Value]], and abort these
steps.

1. Let |iterator| be |iteratorRecord|'s \[[Value]].

1. [=iteration/While=] true:

1. If |iterator|'s \[[Done]] is true, then run |subscriber|'s {{Subscriber/complete()}}
method, and [=iteration/break=].

1. Let |nextRecord| be [$IteratorStepValue$](|iterator|).

1. If |nextRecord| is a [=throw completion=], then run |subscriber|'s
{{Subscriber/error()}} method, given |nextRecord|'s \[[Value]], and
[=iteration/break=].

Otherwise, run |subscriber|'s {{Subscriber/next()}} given |nextRecord|'s \[[Value]].

1. <i id=from-promise-conversion><b>From Promise</b></i>: If [$IsPromise$](|value|) is true,
then:

1. Return a [=new=] {{Observable}} whose [=Observable/subscribe callback=] is an algorithm
that takes a {{Subscriber}} |subscriber| and does the following:

1. [=React=] to |value|:

1. If |value| was fulfilled with value |v|, then:

1. Run |subscriber|'s {{Subscriber/next()}} method, given |v|.

1. Run |subscriber|'s {{Subscriber/complete()}} method.

1. If |value| was rejected with reason |r|, then run |subscriber|'s
{{Subscriber/error()}} method, given |r|.

1. [=exception/Throw=] a {{TypeError}}.
</div>

<div algorithm>
To <dfn for=Observable>subscribe to an {{Observable}}</dfn> given an
{{ObserverUnion}}-or-[=internal observer=] |observer|, and a {{SubscribeOptions}} |options|, run
Expand Down Expand Up @@ -577,15 +748,23 @@ For now, see [https://github.com/wicg/observable#operators](https://github.com/w

<h4 id=observable-from>{{Observable/from()}}</h4>

<p class=XXX>Spec the exact semantics of {{Observable/from()}} conversion.</p>
<div algorithm>
The <dfn for=Observable method><code>from(|value|)</code></dfn> method steps are:

1. Return the result of <a for=Observable lt="convert to an Observable">converting</a> |value|
to an {{Observable}}. Rethrow any exceptions.
</div>

<h4 id=observable-returning-operators>{{Observable}}-returning operators</h4>

<div algorithm>
The <dfn for=Observable method><code>takeUntil(|notifier|)</code></dfn> method steps are:
The <dfn for=Observable method><code>takeUntil(|value|)</code></dfn> method steps are:

1. Let |sourceObservable| be [=this=].

1. Let |notifier| be the result of <a for=Observable lt="convert to an Observable">
converting</a> |value| to an Observable.

1. Let |observable| be a [=new=] {{Observable}} whose [=Observable/subscribe callback=] is an
algorithm that takes a {{Subscriber}} |subscriber| and does the following:

Expand Down
Loading