-
-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
imitate() should be lazy to add and to remove #5
Comments
There's no problem here. This is what you got wrong: import xs from "xstream"
const state = xs.never()
.map(x => x + 1)
.fold((y, x) => y + x, 0)
-const actions = xs.periodic(100).debug(console.log.bind(console))
+const actions = xs.periodic(100).take(10).debug(console.log.bind(console))
state.imitate(actions)
-state.take(10).addListener({
+state.addListener({
next: x => console.log("next", x),
complete: () => console.log("complete"),
error: e => console.error(e)
}) |
Now you missed my point. 😞 Of course that toy example can be written so that it works (or doesn't even have to use |
@staltz I sort of see @milankinen's point here. I think it would be pretty easy to solve this. Though we would need to implement it internally. import xs from "xstream"
const state = xs.never()
.map(x => x + 1)
.fold((y, x) => y + x, 0)
const actions = xs.periodic(100).debug(console.log.bind(console))
state.imitate(actions)
state.take(10).addListener({
next: x => console.log("next", x),
complete: () => {
console.log("complete")
+ setTimeout(() => actions.removeListener(state)) // but allow to be cancelled like other async unsubscriptions?
},
error: e => console.error(e)
}) |
@TylorS Hmm, was the point that imitate should not immediately subscribe, but instead do lazy subscription? Then I see something too. |
No it's not about lazy subscription but disposal of the imitated stream when the imitating stream gets disposed. Lazy subscription would also be nice 'cause the current implementation loses the initial action stream event if the stream emits it synchronously (not sure how realistic/common use case this would be in practice though). import xs from "xstream"
const state = xs.never()
.map(x => x + x)
const actions = xs.of("initial").merge(xs.periodic(1000)).debug(console.log.bind(console))
state.imitate(actions)
state.addListener({
next: x => console.log("next", x),
complete: () => console.log("complete"),
error: e => console.error(e)
}) |
I think it has to do with a codepency on each other. What I see as a potential issue, is that when no one is listening to |
I think I see it as, given var proxy$ = xs.create();
var real$ = // ...
proxy$.imitate(real$); proxy$ should start only when real$ starts, and stop only when real$ stops, regardless of how many listeners proxy$ has. We may need to add some hooks to stream.start and stream.stop, or change the logic for refCounting to auto-start. |
That doesn't make sense for how the rest of library works though. As it stands a Stream only begins when the listeners go from 0 -> 1. If you rewrite this example to show what it really happening it may make more sense. var proxy$ = xs.create()
var real$ = // ...
real$.addListener(proxy$) proxy$ is actually listener to real$, so in the event that proxy$ is the only listener to real$, if there are 0 listeners to proxy$, why should there be a waste of resources in allowing real$ to push to effectively, no listeners? |
Also imagine if you switch contexts (routes, views), and now there is quite probably a memory leak. |
When |
Yep, I agree. @TylorS essentially proxy$ imitation should be a transparent operation that doesn't cause starting or stopping. proxy$ just does whatever happens to real$. |
But what happens in the case where proxy$ is the only listeners to real$? Nothing would be emitted when you would expect something. |
Why is that? I'd rather expect no side effects unless I explicitly use |
That's the whole point of the // Rx
const proxy$ = new Rx.Subject()
const real$ = // ...
real$.subscribe(proxy$)
// xstream
const proxy$ = xs.create()
cosnt real$ = // ...
proxy$.imitate(real$) // real$.addListener(proxy$) With this (possibly wrong) understanding, proxy$ should be added as a listener to real$, but that subscription was to be dependent on whether or not proxy$ still had listeners. IOW, things should get cleaned up appropriately. |
Hmm, you got a point there. To clarify, let's try to remember what will be the typical use case of this in Cycle.js:
or
In use case 1, the proxy is the only listener to the real$, and you wouldn't get any events, which is a bug. In use case 2, it would work. I think we need to support both use cases. |
Here's a proposal: proxy$ starts when:
proxy$ stops when:
|
That's what I would expect 👍 |
Is there any corner case where that would break down? Like is there some case where (Hmm now that I just said that, I don't think it makes sense to continue the execution of the proxy$ if the real$ stops, because there isn't anything else left after the moment the real$ stops) |
I think that if an |
Nahh scratch that, I don't think an |
Why not? I also thought about ImitationStream. |
I was trying to write an ImitationStream out as an example to what it might look like, and it didn't seem to fit. |
I can give it a try, let's see. |
class Stream<T> {
+ private _imitating: Stream<T>;
_remove(il: InternalListener<T>): void {
const a = this._ils;
const i = a.indexOf(il);
if (i > -1) {
a.splice(i, 1);
const p = this._prod;
+ const i = this._imitating;
if (p && a.length <= 0) {
this._stopID = setTimeout(() => {
p._stop());
+ if (i) {
+ i.removeListener(this);
+ }
})
}
}
}
imitate(other: Stream<T>) {
+ this._imitating = other;
other._add(this);
}
} |
I was thinking something like that could work, which might be the basis of what an |
I'm just trying to translate those into concrete names. Please correct me if I got it wrong.
But now when But now because And because of the multicast nature of |
So instead of directly calling _add(il: InternalListener<T>): void {
const a = this._ils;
a.push(il);
if (a.length === 1) {
if (this._stopID !== empty) {
clearTimeout(this._stopID);
this._stopID = empty;
}
+ const i = this.imitating;
+ if (i) i._start(this);
const p = this._prod;
if (p) p._start(this);
}
} |
Really good point. |
Yes, it goes up to the proxy$.
Starting the proxy$ will indeed make stateToProxy$ start too, but viewAndChildState$ started because of sink, not because of stateToProxy$.
Yeah I guess, once a stream starts you can start it double.
I would say that's an anti-pattern. A proxy$ should always be just a |
I mean, we could make |
What makes the implementation problematic (IMHO) is that the Is this the actual use case |
Then just use |
Perhaps you understand now my proposal? 😉 (naming still open though) |
Then we might also need a |
If that's what you originally meant with loopedStream, I apologize |
Ugh, would we? :( How about the real$ having remember() at the end of the operator chain. Doesn't that help? |
Is |
Yeah, probably. |
So all good, we could make |
Well actually, I don't know. If proxy$ has multiple listeners that attach at different times, only the first will be given the remember()ed event, because proxy$ would be added as a listener to real$, be given the remembered event, then a subsequent listener to proxy$ would have already missed it, since proxy$ doesn't have memory of it. |
Ugh... yeah |
I think the proxy stream could detect whether the real is Memory or not, and react accordingly. |
I'm also thinking of renaming Since the distinct feature of a proxy Stream is Any other suggestions? |
Fix imitate method to mitigate issue #5. Now we must use xs.createMimic() to get a MimicStream, and the MimicStream is the only type that has the imitate() method. Adding a listener to the MimicStream literally just adds that listener to the imitated stream. Fixes issue #5. BREAKING CHANGE: imitate() method on Stream removed. New type introduced: MimicStream, which can be created through xs.createMimic(). A MimicStream has the method imitate(), which has the same API as before, but imitate does not trigger any Stream/Producer to start.
This should be fixed in 3.0.0. |
It seems that imitate does not provide any way to "dispose" the imitation, thus it leaves the imitated stream open, even though the imitating stream closes.
Let's take a look at the following code:
Now the
state
stream get's closed after ten events butactions
never get's disposed, even though it should.Because
imitate
is meant to replace RxSubject
as a way to model circular dependencies, I'd like to propose a little bit different API that handles the same problem but in the same time is automatically garbage collected:Stream.looped() : LoopedStream
andStream.prototype.loopTo(LoopedStream) : void
The idea would be that only values get looped, thus it'd be almost trivial to make an implementation that gets the source stream events but also so that it also disposes the source stream during it's own disposal.
An (imaginary) API example:
Any thoughts?
The text was updated successfully, but these errors were encountered: