diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala index 2b5d714076..06ae4d7fa5 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala @@ -20,6 +20,15 @@ trait Subscriber[-T] extends Observer[T] with Subscription { asJavaSubscriber.add(s.asJavaSubscription) } + /** + * Register a callback to be run when Subscriber is unsubscribed + * + * @param u callback to run when unsubscribed + */ + final def add(u: => Unit): Unit = { + asJavaSubscriber.add(Subscription(u).asJavaSubscription) + } + override final def unsubscribe(): Unit = { asJavaSubscriber.unsubscribe() } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala index d1c03227fe..60eb9fecd7 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala @@ -42,4 +42,19 @@ class SubscriberTests extends JUnitSuite { assertTrue(innerSubscriber.isUnsubscribed) } + @Test def testBlockCallbackOnlyOnce() { + var called = false + val o = Observable[Int](subscriber => { + subscriber.add({ called = !called }) + }) + + val subscription = o.subscribe() + subscription.unsubscribe() + subscription.unsubscribe() + + // Even if called multiple times, callback is only called once + assertTrue(called) + assertTrue(subscription.isUnsubscribed) + } + }