Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

imitate() should be lazy to add and to remove #5

Closed
milankinen opened this issue Apr 25, 2016 · 44 comments
Closed

imitate() should be lazy to add and to remove #5

milankinen opened this issue Apr 25, 2016 · 44 comments

Comments

@milankinen
Copy link

It seems that imitate does not provide any way to "dispose" the imitation, thus it leaves the imitated stream open, even though the imitating stream closes.

Let's take a look at the following code:

import xs from "xstream"

const state = xs.never()
  .map(x => x + 1)
  .fold((y, x) => y + x, 0)

const actions = xs.periodic(100).debug(console.log.bind(console))
state.imitate(actions)

state.take(10).addListener({
  next: x => console.log("next", x),
  complete: () => console.log("complete"),
  error: e => console.error(e)
})

Now the state stream get's closed after ten events but actions never get's disposed, even though it should.

Because imitate is meant to replace Rx Subject as a way to model circular dependencies, I'd like to propose a little bit different API that handles the same problem but in the same time is automatically garbage collected: Stream.looped() : LoopedStream and Stream.prototype.loopTo(LoopedStream) : void

The idea would be that only values get looped, thus it'd be almost trivial to make an implementation that gets the source stream events but also so that it also disposes the source stream during it's own disposal.

An (imaginary) API example:

const loopedActions = xs.looped()
const state = loopedActions
  .map(x => x + 1)
  .fold((y, x) => y + x, 0)

xs.periodic(100).debug(console.log.bind(console)).loopTo(loopedActions)

state.take(10).addListener({
  next: x => console.log("next", x),
  complete: () => console.log("complete"),
  error: e => console.error(e)
})

Any thoughts?

@staltz
Copy link
Owner

staltz commented Apr 25, 2016

There's no problem here. This is what you got wrong:

 import xs from "xstream"

 const state = xs.never()
   .map(x => x + 1)
   .fold((y, x) => y + x, 0)

-const actions = xs.periodic(100).debug(console.log.bind(console))
+const actions = xs.periodic(100).take(10).debug(console.log.bind(console))
 state.imitate(actions)

-state.take(10).addListener({
+state.addListener({
   next: x => console.log("next", x),
   complete: () => console.log("complete"),
   error: e => console.error(e)
 })

@staltz staltz closed this as completed Apr 25, 2016
@milankinen
Copy link
Author

milankinen commented Apr 25, 2016

Now you missed my point. 😞

Of course that toy example can be written so that it works (or doesn't even have to use imitate) but there will come situations where you have to dispose the imitating stream when you're doing real apps.

@TylorS
Copy link
Collaborator

TylorS commented Apr 25, 2016

@staltz I sort of see @milankinen's point here. I think it would be pretty easy to solve this. Though we would need to implement it internally.

import xs from "xstream"

const state = xs.never()
  .map(x => x + 1)
  .fold((y, x) => y + x, 0)

const actions = xs.periodic(100).debug(console.log.bind(console))
state.imitate(actions)

state.take(10).addListener({
  next: x => console.log("next", x),
  complete: () => {
    console.log("complete")
+   setTimeout(() => actions.removeListener(state)) // but allow to be cancelled like other async unsubscriptions?
  },
  error: e => console.error(e)
})

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

@TylorS Hmm, was the point that imitate should not immediately subscribe, but instead do lazy subscription? Then I see something too.

@milankinen
Copy link
Author

No it's not about lazy subscription but disposal of the imitated stream when the imitating stream gets disposed.

Lazy subscription would also be nice 'cause the current implementation loses the initial action stream event if the stream emits it synchronously (not sure how realistic/common use case this would be in practice though).

import xs from "xstream"

const state = xs.never()
  .map(x => x + x)

const actions = xs.of("initial").merge(xs.periodic(1000)).debug(console.log.bind(console))
state.imitate(actions)

state.addListener({
  next: x => console.log("next", x),
  complete: () => console.log("complete"),
  error: e => console.error(e)
})

@staltz staltz changed the title imitated streams never get disposed imitate() should be lazy to add and to remove Apr 26, 2016
@staltz staltz reopened this Apr 26, 2016
@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

I think it has to do with a codepency on each other. a shouldn't emit anything if there are essentially no listeners. If b is imitating a and no one is listening to b, then a should also stop.

What I see as a potential issue, is that when no one is listening to b, b should be removed from the listeners of a as well.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

I think I see it as, given

var proxy$ = xs.create();
var real$ = // ...
proxy$.imitate(real$);

proxy$ should start only when real$ starts, and stop only when real$ stops, regardless of how many listeners proxy$ has. We may need to add some hooks to stream.start and stream.stop, or change the logic for refCounting to auto-start.

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

That doesn't make sense for how the rest of library works though. As it stands a Stream only begins when the listeners go from 0 -> 1. If you rewrite this example to show what it really happening it may make more sense.

var proxy$ = xs.create()
var real$ = // ...
real$.addListener(proxy$)

proxy$ is actually listener to real$, so in the event that proxy$ is the only listener to real$, if there are 0 listeners to proxy$, why should there be a waste of resources in allowing real$ to push to effectively, no listeners?

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

Also imagine if you switch contexts (routes, views), and now there is quite probably a memory leak.

@milankinen
Copy link
Author

milankinen commented Apr 26, 2016

When real$ starts, it should add listener to proxy$. When real$ ends, it should remove that listener from the proxy$. If proxy$ has more listeners, then it can continue normally.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

When real$ starts, it should add listener to proxy$. When real$ ends, it should remove that listener from the proxy$. If proxy$ has more listeners, then it can continue normally.

Yep, I agree.

@TylorS essentially proxy$ imitation should be a transparent operation that doesn't cause starting or stopping. proxy$ just does whatever happens to real$.

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

But what happens in the case where proxy$ is the only listeners to real$? Nothing would be emitted when you would expect something.

@milankinen
Copy link
Author

But what happens in the case where proxy$ is the only listeners to real$? Nothing would be emitted when you would expect something.

Why is that? I'd rather expect no side effects unless I explicitly use addListener.

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

That's the whole point of the imitate(), to my best understanding, was to solve this issue memory leaks when creating a proxy subject (see: cyclejs/cyclejs#257).

// Rx
const proxy$ = new Rx.Subject()
const real$ = // ...
real$.subscribe(proxy$)

// xstream
const proxy$ = xs.create()
cosnt real$ = // ...
proxy$.imitate(real$) // real$.addListener(proxy$)

With this (possibly wrong) understanding, proxy$ should be added as a listener to real$, but that subscription was to be dependent on whether or not proxy$ still had listeners. IOW, things should get cleaned up appropriately.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

But what happens in the case where proxy$ is the only listeners to real$? Nothing would be emitted when you would expect something.

Hmm, you got a point there. To clarify, let's try to remember what will be the typical use case of this in Cycle.js:

proxy$-->x$;
x$-->y$;
y$-->real$;
real$-->proxy$;
y$-->sink;

or

proxy$-->x$;
x$-->y$;
y$-->real$;
real$-->proxy$;
real$-->sink;

In use case 1, the proxy is the only listener to the real$, and you wouldn't get any events, which is a bug. In use case 2, it would work. I think we need to support both use cases.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

Here's a proposal:

proxy$ starts when:

  • real$ starts
  • OR
  • proxy$ gets the first listener (refCount start)

proxy$ stops when:

  • real$ stops
  • OR
  • proxy$ loses its last listener (async refCount stop)

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

That's what I would expect 👍

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

Is there any corner case where that would break down? Like is there some case where proxy$ stops when real$ stops is bad because proxy$ still has active listeners?

(Hmm now that I just said that, I don't think it makes sense to continue the execution of the proxy$ if the real$ stops, because there isn't anything else left after the moment the real$ stops)

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

I think that if an ImitationStream were to be created, it could keep track of what it is imitating (real$), and when itself (proxy$) has 0 listeners, it would remove itself as a listener to what it is imitating (real$). This way, if the ImitationStream is the only listener, the real stream will also end, but if the real stream still has other listeners it will continue.

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

Nahh scratch that, I don't think an ImitationStream is the answer. But I still think the rest of it makes sense.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

Why not? I also thought about ImitationStream.

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

I was trying to write an ImitationStream out as an example to what it might look like, and it didn't seem to fit.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

I can give it a try, let's see.

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

class Stream<T> {
+  private _imitating: Stream<T>;

  _remove(il: InternalListener<T>): void {
    const a = this._ils;
    const i = a.indexOf(il);
    if (i > -1) {
      a.splice(i, 1);
      const p = this._prod;
+     const i = this._imitating;
      if (p && a.length <= 0) {
        this._stopID = setTimeout(() => {
          p._stop());
+        if (i) {
+           i.removeListener(this);
+        }
        })
      }
    }
  }

  imitate(other: Stream<T>) {
+    this._imitating = other;
     other._add(this);
   }
}

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

I was thinking something like that could work, which might be the basis of what an ImitationStream would look like.

@milankinen
Copy link
Author

To clarify, let's try to remember what will be the typical use case of this in Cycle.js:

I'm just trying to translate those into concrete names. Please correct me if I got it wrong.

proxy$-->action$;
action$-->state$;
state$-->viewAndChildState$;
viewAndChildState$-->stateToProxy$;
stateToProxy$-->proxy$;
viewAndChildState$-->view$-->sink;

In use case 1, the proxy is the only listener to the real$, and you wouldn't get any events, which is a bug.

But now when sink get subscribed (addListener), the start of subscription goes down to proxy$ right?

But now because proxy$ (should) know that it depends on stateToProxy$, proxy can start the underlying stateToProxy$ which eventually goes down to viewAndChildState$ right?

And because of the multicast nature of xstream, the start should not propagate any further because viewAndChildState$ is already started?

@milankinen
Copy link
Author

milankinen commented Apr 26, 2016

So instead of directly calling other._add(this), the subscription could be lazy and it should work correctly?

_add(il: InternalListener<T>): void {
    const a = this._ils;
    a.push(il);
    if (a.length === 1) {
      if (this._stopID !== empty) {
        clearTimeout(this._stopID);
        this._stopID = empty;
      }
+     const i = this.imitating;
+     if (i) i._start(this);
      const p = this._prod;
      if (p) p._start(this);
    }
  }

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

proxy$ can be technically any stream

Really good point.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

But now when sink get subscribed (addListener), the start of subscription goes down to proxy$ right?

Yes, it goes up to the proxy$.

But now because proxy$ (should) know that it depends on stateToProxy$, proxy can start the underlying stateToProxy$ which eventually goes down to viewAndChildState$ right?

Starting the proxy$ will indeed make stateToProxy$ start too, but viewAndChildState$ started because of sink, not because of stateToProxy$.

And because of the multicast nature of xstream, the start should not propagate any further because viewAndChildState$ is already started?

Yeah I guess, once a stream starts you can start it double.

What makes the implementation hard (IMHO) is that the proxy$ can be technically any stream (e.g. proxy = xs.create(...).map().filter())

I would say that's an anti-pattern. A proxy$ should always be just a xs.create(). We could even bake this in the API, like xs.createProxy(); (<-- is the only one that has imitate()), but I don't know about that yet.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

I mean, we could make ProxyStream extends Stream, and only have imitate in ProxyStream.

@milankinen
Copy link
Author

What makes the implementation problematic (IMHO) is that the proxy$ can be technically any stream (e.g. proxy = xs.create(...).map().filter()).

Is this the actual use case xstream wants to support? If it's not, then it'd be possible to simplify the implementation so that the imitating member is not needed anymore but the imitate method could modify _prod instead.

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

Then just use xs.createProxy() ?

@milankinen
Copy link
Author

milankinen commented Apr 26, 2016

I mean, we could make ProxyStream extends Stream, and only have imitate in ProxyStream.

Perhaps you understand now my proposal? 😉 (naming still open though)

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

Then we might also need a ProxyMemoryStream extends MemoryStream and xs.createProxyWithMemory()

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

If that's what you originally meant with loopedStream, I apologize

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

Then we might also need a ProxyMemoryStream extends MemoryStream and xs.createProxyWithMemory()

Ugh, would we? :(

How about the real$ having remember() at the end of the operator chain. Doesn't that help?

@milankinen
Copy link
Author

Then we might also need a ProxyMemoryStream extends MemoryStream and xs.createProxyWithMemory()

Is createProxyWithMemory serving any real use case?

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

How about the real$ having remember() at the end of the operator chain. Doesn't that help?

Yeah, probably.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

So all good, we could make ProxyStream extends Stream and put logic there. I hope it still works with the implementation you were sketching @TylorS.

@TylorS
Copy link
Collaborator

TylorS commented Apr 26, 2016

Well actually, I don't know. If proxy$ has multiple listeners that attach at different times, only the first will be given the remember()ed event, because proxy$ would be added as a listener to real$, be given the remembered event, then a subsequent listener to proxy$ would have already missed it, since proxy$ doesn't have memory of it.

@staltz
Copy link
Owner

staltz commented Apr 26, 2016

Ugh... yeah

@staltz
Copy link
Owner

staltz commented May 2, 2016

I think the proxy stream could detect whether the real is Memory or not, and react accordingly.

@staltz
Copy link
Owner

staltz commented May 2, 2016

I'm also thinking of renaming createProxy.

Since the distinct feature of a proxy Stream is imitate, we could name it createMimic() that returns a MimicStream.

Any other suggestions?

staltz added a commit that referenced this issue Jun 1, 2016
Fix imitate method to mitigate issue #5. Now we must use xs.createMimic() to get a MimicStream, and
the MimicStream is the only type that has the imitate() method. Adding a listener to the MimicStream
literally just adds that listener to the imitated stream.

Fixes issue #5.

BREAKING CHANGE:
imitate() method on Stream removed. New type introduced: MimicStream,
which can be created through xs.createMimic(). A MimicStream has the
method imitate(), which has the same API as before, but imitate does not
trigger any Stream/Producer to start.
@staltz
Copy link
Owner

staltz commented Jun 2, 2016

This should be fixed in 3.0.0.

@staltz staltz closed this as completed Jun 2, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants