diff --git a/language-adaptors/rxjava-scala/build.gradle b/language-adaptors/rxjava-scala/build.gradle index db194a6d28..8083feaf37 100644 --- a/language-adaptors/rxjava-scala/build.gradle +++ b/language-adaptors/rxjava-scala/build.gradle @@ -23,7 +23,7 @@ sourceSets { srcDir 'src/main/scala' srcDir 'src/test/scala' srcDir 'src/examples/scala' - srcDir 'src/examples/java' + //srcDir 'src/examples/java' } java.srcDirs = [] } @@ -34,7 +34,7 @@ sourceSets { // the scala source set: scala { srcDir 'src/examples/scala' - srcDir 'src/examples/java' + //srcDir 'src/examples/java' } java.srcDirs = [] } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 6db7d41e25..9be58009dd 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1860,6 +1860,38 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.doOnEach(observer.asJavaObserver)) } + /** + * Invokes an action when the source Observable calls onNext. + * + * @param onNext the action to invoke when the source Observable calls onNext + * @return the source Observable with the side-effecting behavior applied + */ + def doOnNext(onNext: T => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doOnNext(onNext)) + } + + /** + * Invokes an action if the source Observable calls onError. + * + * @param onError the action to invoke if the source Observable calls + * onError + * @return the source Observable with the side-effecting behavior applied + */ + def doOnError(onError: Throwable => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doOnError(onError)) + } + + /** + * Invokes an action when the source Observable calls onCompleted. + * + * @param onCompleted the action to invoke when the source Observable calls + * onCompleted + * @return the source Observable with the side-effecting behavior applied + */ + def doOnCompleted(onCompleted: () => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doOnCompleted(onCompleted)) + } + /** * Returns an Observable that applies the given function to each item emitted by an * Observable. @@ -1869,9 +1901,7 @@ trait Observable[+T] * @return an Observable with the side-effecting behavior applied. */ def doOnEach(onNext: T => Unit): Observable[T] = { - toScalaObservable[T](asJavaObservable.doOnEach( - onNext - )) + toScalaObservable[T](asJavaObservable.doOnNext(onNext)) } /** @@ -1884,10 +1914,7 @@ trait Observable[+T] * @return an Observable with the side-effecting behavior applied. */ def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = { - toScalaObservable[T](asJavaObservable.doOnEach( - onNext, - onError - )) + toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError, ()=>{}))) } /** @@ -1901,11 +1928,7 @@ trait Observable[+T] * @return an Observable with the side-effecting behavior applied. */ def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = { - toScalaObservable[T](asJavaObservable.doOnEach( - onNext, - onError, - onCompleted - )) + toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError,onCompleted))) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala index d7e0431d27..5f07497ca3 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala @@ -27,7 +27,11 @@ private [scala] object BooleanSubscription { private [scala] class BooleanSubscription private[scala] (boolean: rx.subscriptions.BooleanSubscription) extends Subscription { - override val asJavaSubscription: rx.subscriptions.BooleanSubscription = new rx.subscriptions.BooleanSubscription() { + override val asJavaSubscription: rx.subscriptions.BooleanSubscription = boolean +} + +/* +new rx.subscriptions.BooleanSubscription() { override def unsubscribe(): Unit = { if(unsubscribed.compareAndSet(false, true)) { if(!boolean.isUnsubscribed) { boolean.unsubscribe() } @@ -35,4 +39,4 @@ private [scala] class BooleanSubscription private[scala] (boolean: rx.subscripti } override def isUnsubscribed(): Boolean = unsubscribed.get() || boolean.isUnsubscribed } -} + */ \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala index 8fa87c3ff5..1cbb0f8d6d 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala @@ -47,7 +47,7 @@ class MultipleAssignmentSubscription private[scala] (override val asJavaSubscrip /** * Gets the underlying subscription. */ - def subscription: Subscription = Subscription(asJavaSubscription.getSubscription) + def subscription: Subscription = Subscription(asJavaSubscription.get) /** * Gets the underlying subscription @@ -55,7 +55,7 @@ class MultipleAssignmentSubscription private[scala] (override val asJavaSubscrip * @return the [[rx.lang.scala.subscriptions.MultipleAssignmentSubscription]] itself. */ def subscription_=(that: Subscription): this.type = { - asJavaSubscription.setSubscription(that.asJavaSubscription) + asJavaSubscription.set(that.asJavaSubscription) this } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala index d845b7e5cd..77b52e3b5f 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala @@ -41,10 +41,10 @@ class SerialSubscription private[scala] (override val asJavaSubscription: rx.sub override def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed def subscription_=(value: Subscription): this.type = { - asJavaSubscription.setSubscription(value.asJavaSubscription) + asJavaSubscription.set(value.asJavaSubscription) this } - def subscription: Subscription = Subscription(asJavaSubscription.getSubscription) + def subscription: Subscription = Subscription(asJavaSubscription.get) } diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6f3f51f12d..4ce54dedf6 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -6793,7 +6793,7 @@ public void onNext(T args) { } * @see RxJava Wiki: doOnNext() * @see MSDN: Observable.Do */ - public Observable doOnNext(final Action1 onNext) { + public Observable doOnNext(final Action1 onNext) { Observer observer = new Observer() { @Override public void onCompleted() { } @@ -6822,7 +6822,7 @@ public void onNext(T args) { * @see RxJava Wiki: doOnEach() * @see MSDN: Observable.Do */ - public Observable doOnEach(final Action1> onNotification) { + public Observable doOnEach(final Action1> onNotification) { Observer observer = new Observer() { @Override public void onCompleted() { diff --git a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java index aeaa7d5103..ebe26dacba 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java @@ -50,10 +50,10 @@ private static List> toLists(Observable> observables) final List> lists = new ArrayList>(); Observable.concat(observables.map(new Func1, Observable>>() { @Override - public Observable> call(Observable xs) { - return xs.toList(); - } - })).toBlockingObservable().forEach(new Action1>() { + public Observable> call(Observable xs) { return xs.toList(); } + })) + .toBlockingObservable() + .forEach(new Action1>() { @Override public void call(List xs) { lists.add(xs);