Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds explaination of flatMap semantics #86

Closed
wants to merge 15 commits into from
61 changes: 60 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ methods](https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype) to
- `filter()`
- `take()`
- `drop()`
- `flatMap()`
- `flatMap()` (Because the types are different, there are [some semantics to note](#flatmap-semantics).)
- `reduce()`
- `toArray()`
- `forEach()`
Expand Down Expand Up @@ -554,6 +554,65 @@ Promises whose scheduling differs from that of Observables, which sometimes
means event handlers that call `e.preventDefault()` will run too late. See the
[Concerns](#concerns) section which goes into more detail.

### flatMap semantics

`flatMap` generally behaves like `Iterator.prototype.flatMap`, however, since it's push-based,
there can be a temporal element. Given that, it behaves much like RxJS's `concatMap`, which is
one of the most useful operators from the library.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which is one of the most useful operators from the library.

I think we can remove this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping


At a high-level, it subscribes to the source, and then maps to, and emits values from "inner
observables", one at a time, ensuring they're subscribed to in sequence.

Given the following example:

```ts
const result = source.flatMap((value, index) =>
getNextInnerObservable(value, index),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the trailing comma here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably aggressive formatting rules. I'll have a look.

);
```

- `flatMap` is a method on `Observable` that is called with a `mapping function`, which takes a
value from the source observable, and a zero-based counter (or "index") of that value, and
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
value from the source observable, and a zero-based counter (or "index") of that value, and
value from the source observable, and a zero-based counter (or `index`) of that value, and

Just to make this consistent with mapping function

returns a value that can be converted to an observable with `Observable.from`. `flatMap` returns
an obeservable we'll call `result`.
- When you subscribe to `result`, it subscribes to `source` immediately.
- Let there be a `queue` of values that is empty.
- Let there be an integer `current index` that is `0`.
- Let there be an `innerSignal` that is either `undefined` or an `AbortSignal`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially....? Where does this come from? It comes from the subscribe() method being called on the Observable returned by flatMap(), right? Maybe we should mention that. After reading the rest I'm actually thinking that innerSignal is not the one passed into subscribe. So... I'm not sure :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit of state for the internal algorithm. If innerSignal is undefined, there's no current inner subscription. If innerSignal is an AbortSignal, we have an inner subscription, and we need to wait for it to complete before we start another. So we need to buffer the value.

Basically:

let innerSignal: AbortSignal | undefined;

- Let there be a boolean `isSourceComplete` that is `false`.
- When the `source` emits a `value`:
- If `innerSignal` is `undefined`
- Begin **"mapping step"**:
- Copy the `current index` into an `index` variable.
- Increment the `current index`.
- Call the `mapping function` with `value` and `index`.
- Then pass the return value of the mapping function to `Observable.from()` to convert it to
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Then pass the return value of the mapping function to `Observable.from()` to convert it to
- Then pass the return value of the `mapping function` to `Observable.from()` to convert it to

"inner observable" if it's not already.
- Then create an `AbortSignal` that is dependent on the subscriber's and set `innerSignal`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Then create an `AbortSignal` that is dependent on the subscriber's and set `innerSignal`.
- Then create an `AbortSignal` that is [dependent](https://dom.spec.whatwg.org/#create-a-dependent-abort-signal) on the subscriber's and set `innerSignal`.

- Subscribe to the inner observable, passing `innerSignal`
- Forward all values emitted by the inner observable to the `result` observer.
- If the inner observable completes;
- If there are values in the `queue`
- Take the first one from the `queue` and return to the **"mapping step"**.
- If the `queue` is empty
- If `isSourceComplete` is `true`
- Complete `result`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes you have a period and sometimes you don't. Can you edit this whole algorithm to be consistent one way or another?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I can.
Maybe I can't

- If `isSourceComplete` is `false`
- Wait
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what is this supposed to do? Certainly we're not blocking the main thread....

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No... Just "wait for another event"... do nothing, I guess.

- If the inner observable errors
- Forward the error to `result`.
- Otherwise, if `innerSignal` is an `AbortSignal`
- Add the value to the `queue` and wait.
- If the `source` completes:
- If `innerSignal` is `undefined`
- Complete `result`.
- If `innerSignal` is `AbortSignal`
- Set `isSourceComplete` to `true`.
- If the `source` errors:
- Forward the error to `result`.
- If the user aborts the signal passed to the subscription of `result`
- Abort any `innerSignal` that exists, and terminate subscription.

## Background & landscape

To illustrate how Observables fit into the current landscape of other reactive
Expand Down
Loading