From bc6ad010149592a3e868ffd0c6e7be5f9a2dbb80 Mon Sep 17 00:00:00 2001 From: Matthias Kaeppler Date: Sun, 6 Apr 2014 11:42:04 +0200 Subject: [PATCH] OperatorWeakBinding to not use WeakReferences anymore related issues: https://github.com/Netflix/RxJava/pull/1008 https://github.com/Netflix/RxJava/issues/1006 https://github.com/Netflix/RxJava/issues/979 --- .../samples/src/main/AndroidManifest.xml | 9 ++ .../android/samples/ListenInOutActivity.java | 87 ++++++++++++++ .../samples/ListeningFragmentActivity.java | 6 +- .../samples/RetainedFragmentActivity.java | 9 +- .../res/layout/listen_in_out_activity.xml | 28 +++++ .../observables/AndroidObservable.java | 30 +++-- .../rx/operators/OperatorWeakBinding.java | 108 +++++++----------- .../rx/operators/OperatorWeakBindingTest.java | 75 +++++------- 8 files changed, 222 insertions(+), 130 deletions(-) create mode 100644 rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/ListenInOutActivity.java create mode 100644 rxjava-contrib/rxjava-android-samples/samples/src/main/res/layout/listen_in_out_activity.xml diff --git a/rxjava-contrib/rxjava-android-samples/samples/src/main/AndroidManifest.xml b/rxjava-contrib/rxjava-android-samples/samples/src/main/AndroidManifest.xml index f307d55bb0..b4a1d24633 100644 --- a/rxjava-contrib/rxjava-android-samples/samples/src/main/AndroidManifest.xml +++ b/rxjava-contrib/rxjava-android-samples/samples/src/main/AndroidManifest.xml @@ -29,6 +29,15 @@ + + + + + + + + diff --git a/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/ListenInOutActivity.java b/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/ListenInOutActivity.java new file mode 100644 index 0000000000..893124b2f2 --- /dev/null +++ b/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/ListenInOutActivity.java @@ -0,0 +1,87 @@ +package com.netflix.rxjava.android.samples; + +import android.app.Activity; +import android.os.Bundle; +import android.view.View; +import android.widget.TextView; +import android.widget.Toast; +import android.widget.ToggleButton; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.observables.ConnectableObservable; + +import static rx.android.observables.AndroidObservable.bindActivity; + +/** + * Activity that binds to a counting sequence and is able to listen in and out to that + * sequence by pressing a toggle button. The button disables itself once the sequence + * finishes. + */ +public class ListenInOutActivity extends Activity implements Observer { + + private Observable source; + private Subscription subscription; + private TextView textView; + + @Override + protected void onCreate(Bundle savedInstanceState) { + super.onCreate(savedInstanceState); + + setContentView(R.layout.listen_in_out_activity); + + textView = (TextView) findViewById(android.R.id.text1); + + // in a production app, you would use dependency injection, fragments, or other + // means to preserve the observable, but this will suffice here + source = (Observable) getLastNonConfigurationInstance(); + if (source == null) { + source = SampleObservables.numberStrings(1, 100, 200).publish(); + ((ConnectableObservable) source).connect(); + } + + subscribeToSequence(); + } + + private void subscribeToSequence() { + subscription = bindActivity(this, source).subscribe(this); + } + + @Override + public Object onRetainNonConfigurationInstance() { + return source; + } + + @Override + protected void onDestroy() { + subscription.unsubscribe(); + super.onDestroy(); + } + + @Override + public void onCompleted() { + TextView button = (TextView) findViewById(R.id.toggle_button); + button.setText("Completed"); + button.setEnabled(false); + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + Toast.makeText(this, "Error: " + e, Toast.LENGTH_SHORT).show(); + } + + @Override + public void onNext(String s) { + textView.setText(s); + } + + public void onSequenceToggleClicked(View view) { + if (((ToggleButton) view).isChecked()) { + subscription.unsubscribe(); + } else { + subscribeToSequence(); + } + } +} diff --git a/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/ListeningFragmentActivity.java b/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/ListeningFragmentActivity.java index b402135bc1..966568fd15 100644 --- a/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/ListeningFragmentActivity.java +++ b/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/ListeningFragmentActivity.java @@ -14,7 +14,7 @@ import rx.observables.ConnectableObservable; import rx.subscriptions.Subscriptions; -import static rx.android.schedulers.AndroidSchedulers.mainThread; +import static rx.android.observables.AndroidObservable.bindFragment; /** * Problem: @@ -52,7 +52,7 @@ public ListeningFragment() { public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); - strings = SampleObservables.numberStrings(1, 50, 250).observeOn(mainThread()).publish(); + strings = SampleObservables.numberStrings(1, 50, 250).publish(); strings.connect(); // trigger the sequence } @@ -74,7 +74,7 @@ public void onViewCreated(final View view, Bundle savedInstanceState) { final TextView textView = (TextView) view.findViewById(android.R.id.text1); // re-connect to sequence - subscription = strings.subscribe(new Subscriber() { + subscription = bindFragment(this, strings).subscribe(new Subscriber() { @Override public void onCompleted() { diff --git a/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/RetainedFragmentActivity.java b/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/RetainedFragmentActivity.java index 82c5225101..dc8706a24a 100644 --- a/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/RetainedFragmentActivity.java +++ b/rxjava-contrib/rxjava-android-samples/samples/src/main/java/com/netflix/rxjava/android/samples/RetainedFragmentActivity.java @@ -14,11 +14,12 @@ import rx.Observable; import rx.Subscription; -import rx.android.schedulers.AndroidSchedulers; import rx.functions.Action1; import rx.functions.Func1; import rx.subscriptions.Subscriptions; +import static rx.android.observables.AndroidObservable.bindFragment; + /** * Problem: * You have a data source (where that data is potentially expensive to obtain), and you want to @@ -68,9 +69,7 @@ public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); // simulate fetching a JSON document with a latency of 2 seconds - strings = SampleObservables.fakeApiCall(2000).map(PARSE_JSON) - .observeOn(AndroidSchedulers.mainThread()) - .cache(); + strings = SampleObservables.fakeApiCall(2000).map(PARSE_JSON).cache(); } @Override @@ -93,7 +92,7 @@ public void onViewCreated(final View view, Bundle savedInstanceState) { // (re-)subscribe to the sequence, which either emits the cached result or simply re- // attaches the subscriber to wait for it to arrive - subscription = strings.subscribe(new Action1() { + subscription = bindFragment(this, strings).subscribe(new Action1() { @Override public void call(String result) { textView.setText(result); diff --git a/rxjava-contrib/rxjava-android-samples/samples/src/main/res/layout/listen_in_out_activity.xml b/rxjava-contrib/rxjava-android-samples/samples/src/main/res/layout/listen_in_out_activity.xml new file mode 100644 index 0000000000..bc2c1e1a99 --- /dev/null +++ b/rxjava-contrib/rxjava-android-samples/samples/src/main/res/layout/listen_in_out_activity.xml @@ -0,0 +1,28 @@ + + + + + + + + diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java index 46d0e48285..e0fafb14bc 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java @@ -51,7 +51,7 @@ public Boolean call(Activity activity) { private static final Func1 FRAGMENT_VALIDATOR = new Func1() { @Override public Boolean call(Fragment fragment) { - return fragment.isAdded(); + return fragment.isAdded() && !fragment.getActivity().isFinishing(); } }; @@ -59,7 +59,7 @@ public Boolean call(Fragment fragment) { new Func1() { @Override public Boolean call(android.support.v4.app.Fragment fragment) { - return fragment.isAdded(); + return fragment.isAdded() && !fragment.getActivity().isFinishing(); } }; @@ -131,11 +131,15 @@ public static Observable fromFragment(Object fragment, Observable sour } /** - * Binds the given source sequence to the life-cycle of an activity. + * Binds the given source sequence to an activity. *

* This helper will schedule the given sequence to be observed on the main UI thread and ensure - * that no notifications will be forwarded to the activity in case it gets destroyed by the Android runtime - * or garbage collected by the VM. + * that no notifications will be forwarded to the activity in case it is scheduled to finish. + *

+ * You should unsubscribe from the returned Observable in onDestroy at the latest, in order to not + * leak the activity or an inner subscriber. Conversely, when the source sequence can outlive the activity, + * make sure to bind to new instances of the activity again, e.g. after going through configuration changes. + * Refer to the samples project for actual examples. * * @param activity the activity to bind the source sequence to * @param source the source sequence @@ -146,24 +150,28 @@ public static Observable bindActivity(Activity activity, Observable so } /** - * Binds the given source sequence to the life-cycle of a fragment (native or support-v4). + * Binds the given source sequence to a fragment (native or support-v4). *

* This helper will schedule the given sequence to be observed on the main UI thread and ensure * that no notifications will be forwarded to the fragment in case it gets detached from its - * activity or garbage collected by the VM. + * activity or the activity is scheduled to finish. + *

+ * You should unsubscribe from the returned Observable in onDestroy for normal fragments, or in onDestroyView + * for retained fragments, in order to not leak any references to the host activity or the fragment. + * Refer to the samples project for actual examples. * * @param fragment the fragment to bind the source sequence to * @param source the source sequence */ - public static Observable bindFragment(Object fragment, Observable cachedSequence) { + public static Observable bindFragment(Object fragment, Observable source) { Assertions.assertUiThread(); - final Observable source = cachedSequence.observeOn(mainThread()); + final Observable o = source.observeOn(mainThread()); if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) { android.support.v4.app.Fragment f = (android.support.v4.app.Fragment) fragment; - return source.lift(new OperatorWeakBinding(f, FRAGMENTV4_VALIDATOR)); + return o.lift(new OperatorWeakBinding(f, FRAGMENTV4_VALIDATOR)); } else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) { Fragment f = (Fragment) fragment; - return source.lift(new OperatorWeakBinding(f, FRAGMENT_VALIDATOR)); + return o.lift(new OperatorWeakBinding(f, FRAGMENT_VALIDATOR)); } else { throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment"); } diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java index 3cc8258302..d0e37fdab5 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorWeakBinding.java @@ -7,12 +7,9 @@ import android.util.Log; -import java.lang.ref.WeakReference; - /** - * Ties a source sequence to the life-cycle of the given target object, and/or the subscriber - * using weak references. When either object is gone, this operator automatically unsubscribes - * from the source sequence. + * Ties a source sequence to the given target object using a predicate. If the predicate fails + * to validate, the sequence unsubscribes itself and releases the bound reference. *

* You can also pass in an optional predicate function, which whenever it evaluates to false * on the target object, will also result in the operator unsubscribing from the sequence. @@ -24,88 +21,71 @@ public final class OperatorWeakBinding implements Observable.Operator boundRef; + private R boundRef; private final Func1 predicate; public OperatorWeakBinding(R bound, Func1 predicate) { - boundRef = new WeakReference(bound); + boundRef = bound; this.predicate = predicate; } public OperatorWeakBinding(R bound) { - boundRef = new WeakReference(bound); + boundRef = bound; this.predicate = Functions.alwaysTrue(); } @Override public Subscriber call(final Subscriber child) { - return new WeakSubscriber(child); - } - - final class WeakSubscriber extends Subscriber { - - final WeakReference> subscriberRef; + return new Subscriber(child) { - private WeakSubscriber(Subscriber source) { - super(source); - subscriberRef = new WeakReference>(source); - } + @Override + public void onCompleted() { + if (shouldForwardNotification()) { + child.onCompleted(); + } else { + handleLostBinding("onCompleted"); + } + } - @Override - public void onCompleted() { - final Subscriber sub = subscriberRef.get(); - if (shouldForwardNotification(sub)) { - sub.onCompleted(); - } else { - handleLostBinding(sub, "onCompleted"); + @Override + public void onError(Throwable e) { + if (shouldForwardNotification()) { + child.onError(e); + } else { + handleLostBinding("onError"); + } } - } - @Override - public void onError(Throwable e) { - final Subscriber sub = subscriberRef.get(); - if (shouldForwardNotification(sub)) { - sub.onError(e); - } else { - handleLostBinding(sub, "onError"); + @Override + public void onNext(T t) { + if (shouldForwardNotification()) { + child.onNext(t); + } else { + handleLostBinding("onNext"); + } } - } - @Override - public void onNext(T t) { - final Subscriber sub = subscriberRef.get(); - if (shouldForwardNotification(sub)) { - sub.onNext(t); - } else { - handleLostBinding(sub, "onNext"); + private boolean shouldForwardNotification() { + return boundRef != null && predicate.call(boundRef); } - } - private boolean shouldForwardNotification(Subscriber sub) { - final R target = boundRef.get(); - return sub != null && target != null && predicate.call(target); - } + private void handleLostBinding(String context) { + log("bound object has become invalid; skipping " + context); + log("unsubscribing..."); + boundRef = null; + unsubscribe(); + } - private void handleLostBinding(Subscriber sub, String context) { - if (sub == null) { - log("subscriber gone; skipping " + context); - } else { - final R r = boundRef.get(); - if (r != null) { - // the predicate failed to validate - log("bound component has become invalid; skipping " + context); - } else { - log("bound component gone; skipping " + context); + private void log(String message) { + if (Log.isLoggable(LOG_TAG, Log.DEBUG)) { + Log.d(LOG_TAG, message); } } - log("unsubscribing..."); - unsubscribe(); - } + }; + } - private void log(String message) { - if (Log.isLoggable(LOG_TAG, Log.DEBUG)) { - Log.d(LOG_TAG, message); - } - } + /* Visible for testing */ + R getBoundRef() { + return boundRef; } } diff --git a/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java b/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java index 6dd72c606d..c27f207467 100644 --- a/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java +++ b/rxjava-contrib/rxjava-android/src/test/java/rx/operators/OperatorWeakBindingTest.java @@ -1,16 +1,19 @@ package rx.operators; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.MockitoAnnotations; import org.robolectric.RobolectricTestRunner; +import rx.Subscriber; +import rx.functions.Func1; import rx.functions.Functions; import rx.observers.TestSubscriber; -import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; @RunWith(RobolectricTestRunner.class) public class OperatorWeakBindingTest { @@ -23,49 +26,27 @@ public void setUp() throws Exception { } @Test - public void shouldForwardAllNotificationsWhenSubscriberAndTargetAlive() { - OperatorWeakBinding op = new OperatorWeakBinding(new Object()); - OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber); - weakSub.onNext("one"); - weakSub.onNext("two"); - weakSub.onCompleted(); - weakSub.onError(new Exception()); - - subscriber.assertReceivedOnNext(Arrays.asList("one", "two")); - assertEquals(1, subscriber.getOnCompletedEvents().size()); - assertEquals(1, subscriber.getOnErrorEvents().size()); - } - - @Test - public void shouldUnsubscribeFromSourceSequenceWhenSubscriberReleased() { - OperatorWeakBinding op = new OperatorWeakBinding(new Object()); - - OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber); - weakSub.onNext("one"); - weakSub.subscriberRef.clear(); - weakSub.onNext("two"); - weakSub.onCompleted(); - weakSub.onError(new Exception()); - - subscriber.assertReceivedOnNext(Arrays.asList("one")); - assertEquals(0, subscriber.getOnCompletedEvents().size()); - assertEquals(0, subscriber.getOnErrorEvents().size()); - } - - @Test - public void shouldUnsubscribeFromSourceSequenceWhenTargetObjectReleased() { - OperatorWeakBinding op = new OperatorWeakBinding(new Object()); - - OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber); - weakSub.onNext("one"); - op.boundRef.clear(); - weakSub.onNext("two"); - weakSub.onCompleted(); - weakSub.onError(new Exception()); - - subscriber.assertReceivedOnNext(Arrays.asList("one")); + public void shouldReleaseBoundReferenceIfPredicateFailsToPass() { + final AtomicBoolean toggle = new AtomicBoolean(true); + OperatorWeakBinding op = new OperatorWeakBinding( + new Object(), new Func1() { + @Override + public Boolean call(Object o) { + return toggle.get(); + } + }); + + Subscriber sub = op.call(subscriber); + sub.onNext("one"); + toggle.set(false); + sub.onNext("two"); + sub.onCompleted(); + sub.onError(new Exception()); + + assertEquals(1, subscriber.getOnNextEvents().size()); assertEquals(0, subscriber.getOnCompletedEvents().size()); assertEquals(0, subscriber.getOnErrorEvents().size()); + assertNull(op.getBoundRef()); } @Test @@ -73,11 +54,11 @@ public void shouldUnsubscribeFromSourceSequenceWhenPredicateFailsToPass() { OperatorWeakBinding op = new OperatorWeakBinding( new Object(), Functions.alwaysFalse()); - OperatorWeakBinding.WeakSubscriber weakSub = (OperatorWeakBinding.WeakSubscriber) op.call(subscriber); - weakSub.onNext("one"); - weakSub.onNext("two"); - weakSub.onCompleted(); - weakSub.onError(new Exception()); + Subscriber sub = op.call(subscriber); + sub.onNext("one"); + sub.onNext("two"); + sub.onCompleted(); + sub.onError(new Exception()); assertEquals(0, subscriber.getOnNextEvents().size()); assertEquals(0, subscriber.getOnCompletedEvents().size());