Skip to content

Commit

Permalink
Merge pull request #1321 from dpsm/master
Browse files Browse the repository at this point in the history
Ensuring Runnables posted with delay to a Handler are removed when unsub...
  • Loading branch information
benjchristensen committed Jun 12, 2014
2 parents 473a567 + b5655d2 commit 56d76bb
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2014 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -20,8 +20,8 @@
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subscriptions.BooleanSubscription;
import rx.internal.schedulers.ScheduledAction;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import android.os.Handler;

Expand All @@ -34,7 +34,7 @@ public class HandlerThreadScheduler extends Scheduler {

/**
* Constructs a {@link HandlerThreadScheduler} using the given {@link Handler}
*
*
* @param handler
* {@link Handler} to use when scheduling actions
*/
Expand All @@ -46,47 +46,42 @@ public HandlerThreadScheduler(Handler handler) {
public Worker createWorker() {
return new InnerHandlerThreadScheduler(handler);
}

private static class InnerHandlerThreadScheduler extends Worker {

private final Handler handler;
private BooleanSubscription innerSubscription = new BooleanSubscription();

private final CompositeSubscription compositeSubscription = new CompositeSubscription();

public InnerHandlerThreadScheduler(Handler handler) {
this.handler = handler;
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
compositeSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
return compositeSubscription.isUnsubscribed();
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
final Runnable runnable = new Runnable() {
@Override
public void run() {
if (isUnsubscribed()) {
return;
}
action.call();
}
};
handler.postDelayed(runnable, unit.toMillis(delayTime));
return Subscriptions.create(new Action0() {

final ScheduledAction scheduledAction = new ScheduledAction(action);
scheduledAction.add(Subscriptions.create(new Action0() {
@Override
public void call() {
handler.removeCallbacks(runnable);

handler.removeCallbacks(scheduledAction);
}

});
}));
scheduledAction.addParent(compositeSubscription);
compositeSubscription.add(scheduledAction);

handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

return scheduledAction;
}

@Override
Expand All @@ -95,5 +90,4 @@ public Subscription schedule(final Action0 action) {
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,26 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.robolectric.Robolectric;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;

import rx.Observable;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import android.os.Handler;
Expand Down Expand Up @@ -75,4 +83,46 @@ public void shouldScheduleDelayedActionOnHandlerThread() {
runnable.getValue().run();
verify(action).call();
}

@Test
public void shouldRemoveCallbacksFromHandlerWhenUnsubscribedSubscription() {
final Handler handler = spy(new Handler());
final Observable.OnSubscribe<Integer> onSubscribe = mock(Observable.OnSubscribe.class);
final Subscription subscription = Observable.create(onSubscribe).subscribeOn(
new HandlerThreadScheduler(handler)).subscribe();

verify(onSubscribe).call(Matchers.any(Subscriber.class));

subscription.unsubscribe();

verify(handler).removeCallbacks(Matchers.any(Runnable.class));
}

@Test
public void shouldNotCallOnSubscribeWhenSubscriptionUnsubscribedBeforeDelay() {
final Observable.OnSubscribe<Integer> onSubscribe = mock(Observable.OnSubscribe.class);
final Handler handler = spy(new Handler());

final Scheduler scheduler = new HandlerThreadScheduler(handler);
final Worker worker = spy(scheduler.createWorker());

final Scheduler spyScheduler = spy(scheduler);
when(spyScheduler.createWorker()).thenReturn(worker);

final Subscription subscription = Observable.create(onSubscribe)
.delaySubscription(1, TimeUnit.MINUTES, spyScheduler)
.subscribe();

verify(worker).schedule(Matchers.any(Action0.class),
Matchers.eq(1L), Matchers.eq(TimeUnit.MINUTES));
verify(handler).postDelayed(Matchers.any(Runnable.class),
Matchers.eq(TimeUnit.MINUTES.toMillis(1L)));

subscription.unsubscribe();

Robolectric.runUiThreadTasksIncludingDelayedTasks();

verify(onSubscribe, never()).call(Matchers.any(Subscriber.class));
verify(handler).removeCallbacks(Matchers.any(Runnable.class));
}
}

0 comments on commit 56d76bb

Please sign in to comment.