From 5b3c4f958e9cf3e196d7d418d9cd0aa766c3d164 Mon Sep 17 00:00:00 2001 From: Joakim Bodin Date: Sat, 31 May 2014 13:06:46 +0200 Subject: [PATCH 1/3] Add convenience method for adding unsubscription callback --- .../main/scala/rx/lang/scala/Subscriber.scala | 21 +++++++++++++++++++ .../scala/rx/lang/scala/SubscriberTests.scala | 15 +++++++++++++ 2 files changed, 36 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala index 2b5d714076..e71c5d7d23 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala @@ -1,5 +1,7 @@ package rx.lang.scala +import java.util.concurrent.atomic.AtomicBoolean + trait Subscriber[-T] extends Observer[T] with Subscription { self => @@ -20,6 +22,25 @@ trait Subscriber[-T] extends Observer[T] with Subscription { asJavaSubscriber.add(s.asJavaSubscription) } + /** + * Register a callback to be run when Subscriber is unsubscribed + * + * @param unsubscriptionCallback callback to run when unsubscribed + */ + final def add(unsubscriptionCallback: => Unit): Unit = { + asJavaSubscriber.add(new rx.Subscription { + val unsubscribed = new AtomicBoolean(false) + + override def unsubscribe() { + if (unsubscribed.compareAndSet(false, true)) { + unsubscriptionCallback + } + } + + override def isUnsubscribed: Boolean = { unsubscribed.get() } + }) + } + override final def unsubscribe(): Unit = { asJavaSubscriber.unsubscribe() } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala index d1c03227fe..60eb9fecd7 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala @@ -42,4 +42,19 @@ class SubscriberTests extends JUnitSuite { assertTrue(innerSubscriber.isUnsubscribed) } + @Test def testBlockCallbackOnlyOnce() { + var called = false + val o = Observable[Int](subscriber => { + subscriber.add({ called = !called }) + }) + + val subscription = o.subscribe() + subscription.unsubscribe() + subscription.unsubscribe() + + // Even if called multiple times, callback is only called once + assertTrue(called) + assertTrue(subscription.isUnsubscribed) + } + } From 796a12a811483a2a8a041929b21d5b66bf097e62 Mon Sep 17 00:00:00 2001 From: Joakim Bodin Date: Sat, 31 May 2014 17:55:29 +0200 Subject: [PATCH 2/3] Use Subscription.apply() instead of accidentally re-inventing it --- .../main/scala/rx/lang/scala/Subscriber.scala | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala index e71c5d7d23..6a5d0cf48a 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala @@ -25,20 +25,10 @@ trait Subscriber[-T] extends Observer[T] with Subscription { /** * Register a callback to be run when Subscriber is unsubscribed * - * @param unsubscriptionCallback callback to run when unsubscribed + * @param u callback to run when unsubscribed */ - final def add(unsubscriptionCallback: => Unit): Unit = { - asJavaSubscriber.add(new rx.Subscription { - val unsubscribed = new AtomicBoolean(false) - - override def unsubscribe() { - if (unsubscribed.compareAndSet(false, true)) { - unsubscriptionCallback - } - } - - override def isUnsubscribed: Boolean = { unsubscribed.get() } - }) + final def add(u: => Unit): Unit = { + asJavaSubscriber.add(Subscription(u).asJavaSubscription) } override final def unsubscribe(): Unit = { From 8533fc0a5d03d1f3fc4db724fa600a4bcb4e692a Mon Sep 17 00:00:00 2001 From: Joakim Bodin Date: Sat, 31 May 2014 13:06:46 +0200 Subject: [PATCH 3/3] Add convenience method for adding unsubscription callback --- .../rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala index 6a5d0cf48a..06ae4d7fa5 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala @@ -1,7 +1,5 @@ package rx.lang.scala -import java.util.concurrent.atomic.AtomicBoolean - trait Subscriber[-T] extends Observer[T] with Subscription { self =>