Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler.schedulePeriodically - memory leak #782

Closed
alexandru opened this issue Jan 23, 2014 · 8 comments
Closed

Scheduler.schedulePeriodically - memory leak #782

alexandru opened this issue Jan 23, 2014 · 8 comments

Comments

@alexandru
Copy link

Folks, this is a pretty bad memory leak. The schedulePeriodically methods as currently defined are leaking.

For example, let's start with the current code in rx.Scheduler, line 93.

The method in question is this:

    public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
        final long periodInNanos = unit.toNanos(period);
        final AtomicBoolean complete = new AtomicBoolean();

        final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
            @Override
            public Subscription call(Scheduler scheduler, T state0) {
                if (!complete.get()) {
                    long startedAt = now();
                    final Subscription sub1 = action.call(scheduler, state0);
                    long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt);
                    final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
                    return Subscriptions.create(new Action0() {
                        @Override
                        public void call() {
                            sub1.unsubscribe();
                            sub2.unsubscribe();
                        }
                    });
                }
                return Subscriptions.empty();
            }
        };
        final Subscription sub = schedule(state, recursiveAction, initialDelay, unit);
        return Subscriptions.create(new Action0() {
            @Override
            public void call() {
                complete.set(true);
                sub.unsubscribe();
            }
        });
    }

The problem is that each and every one periodic execution of the given action is creating a new Subscription reference that is capturing the reference of the previous subscription. And no subscription reference will ever be garbage collected, unless unsubscribe() happens.

It's easy to reproduce the effect (i.e. OutOfMemoryError) with a simple test that will end up with an error in a matter of seconds:

  val sch = rx.schedulers.Schedulers.executor(Executors.newFixedThreadPool(2))
  val counter = new AtomicInteger(0)

  val action = new Action0 {
    def call(): Unit = {
      counter.increment()
    }
  }

  val sub = sch.schedulePeriodically(action, 1, 1, TimeUnit.NANOSECONDS)

Here's how that looks like in a profiler:
image

The above, because it doesn't use a ScheduledExecutorService, is using the default Scheduler.schedulePeriodically that I pasted above, which is pretty heavy. The problem is that even with a ScheduledExecutorService, it still leaks, because the ExecutorScheduler is still using a CompositeSubscription and when scheduling an Action0, this basically pushes Subscriptions.empty over and over again into an ArrayList. Thus, the code is slower in leaking memory, but it still leaks.

From what I can see, this affects all schedulePeriodically implementations and as a consequence, things like Observable.interval are anything but infinite.

UPDATE: I now see that a pull request addressing some issues was made in #712 - in particular, it changed the behavior of ExecutorScheduler. I haven't tested this version, I will do so today hopefully.

However the default Scheduler.schedulePeriodically is still there and something to consider - if something that does periodic scheduling leaks, it probably shouldn't be there.

@alexandru
Copy link
Author

I updated the issue multiple times, hopefully it's helpful.

@akarnokd
Copy link
Member

Thanks. @benjchristensen are you working on this?

@samueltardieu
Copy link
Contributor

I've just been hit by that while implementing a short-period periodic task using an Action0 (hence without explicitly adding new subscriptions to the party) for cgeo.

I'm using 0.16.1 on Android.

@benjchristensen
Copy link
Member

Yes I can as schedulers is on my list for 0.17 and I fixed memory leaks in schedulers already, obviously missing this one. I completely ignored the 'schedulePeriodically' methods.

@alexandru
Copy link
Author

Hey, I believe this can be closed, no? I saw that Schedulers changed the interface.

@benjchristensen
Copy link
Member

Yup it can be ... I need to do a pass through all of the issues, I bet 1/3 of them are probably ready for me to close.

@akarnokd
Copy link
Member

akarnokd commented May 8, 2014

I think the current version doesn't leak but can't be cancelled either. This program prints 1 through 6.

public static void main(String[] args) throws Exception {
        Worker w = Schedulers.computation().createWorker();

        CompositeSubscription cs = new CompositeSubscription();

        AtomicInteger i = new AtomicInteger();

        cs.add(w.schedulePeriodically(() -> {
            int j = i.incrementAndGet();
            if (j == 3) {
                cs.unsubscribe();
            }
            System.out.println(j);
            if (j == 6) {
                w.unsubscribe();
            }
        }, 200, 200, TimeUnit.MILLISECONDS));

        TimeUnit.SECONDS.sleep(2);
    }

The reason this doesn't seem to be an issue because most operators don't mix schedule with schedulePeriodic.

@benjchristensen
Copy link
Member

Memory leak is fixed. The cancellation issue is being solved elsewhere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants