diff --git a/language-adaptors/rxjava-scala/build.gradle b/language-adaptors/rxjava-scala/build.gradle
index db194a6d28..8083feaf37 100644
--- a/language-adaptors/rxjava-scala/build.gradle
+++ b/language-adaptors/rxjava-scala/build.gradle
@@ -23,7 +23,7 @@ sourceSets {
srcDir 'src/main/scala'
srcDir 'src/test/scala'
srcDir 'src/examples/scala'
- srcDir 'src/examples/java'
+ //srcDir 'src/examples/java'
}
java.srcDirs = []
}
@@ -34,7 +34,7 @@ sourceSets {
// the scala source set:
scala {
srcDir 'src/examples/scala'
- srcDir 'src/examples/java'
+ //srcDir 'src/examples/java'
}
java.srcDirs = []
}
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
index 6db7d41e25..9be58009dd 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
@@ -1860,6 +1860,38 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.doOnEach(observer.asJavaObserver))
}
+ /**
+ * Invokes an action when the source Observable calls onNext
.
+ *
+ * @param onNext the action to invoke when the source Observable calls onNext
+ * @return the source Observable with the side-effecting behavior applied
+ */
+ def doOnNext(onNext: T => Unit): Observable[T] = {
+ toScalaObservable[T](asJavaObservable.doOnNext(onNext))
+ }
+
+ /**
+ * Invokes an action if the source Observable calls onError
.
+ *
+ * @param onError the action to invoke if the source Observable calls
+ * onError
+ * @return the source Observable with the side-effecting behavior applied
+ */
+ def doOnError(onError: Throwable => Unit): Observable[T] = {
+ toScalaObservable[T](asJavaObservable.doOnError(onError))
+ }
+
+ /**
+ * Invokes an action when the source Observable calls onCompleted
.
+ *
+ * @param onCompleted the action to invoke when the source Observable calls
+ * onCompleted
+ * @return the source Observable with the side-effecting behavior applied
+ */
+ def doOnCompleted(onCompleted: () => Unit): Observable[T] = {
+ toScalaObservable[T](asJavaObservable.doOnCompleted(onCompleted))
+ }
+
/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable.
@@ -1869,9 +1901,7 @@ trait Observable[+T]
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(onNext: T => Unit): Observable[T] = {
- toScalaObservable[T](asJavaObservable.doOnEach(
- onNext
- ))
+ toScalaObservable[T](asJavaObservable.doOnNext(onNext))
}
/**
@@ -1884,10 +1914,7 @@ trait Observable[+T]
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = {
- toScalaObservable[T](asJavaObservable.doOnEach(
- onNext,
- onError
- ))
+ toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError, ()=>{})))
}
/**
@@ -1901,11 +1928,7 @@ trait Observable[+T]
* @return an Observable with the side-effecting behavior applied.
*/
def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = {
- toScalaObservable[T](asJavaObservable.doOnEach(
- onNext,
- onError,
- onCompleted
- ))
+ toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError,onCompleted)))
}
}
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala
index d7e0431d27..5f07497ca3 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala
@@ -27,7 +27,11 @@ private [scala] object BooleanSubscription {
private [scala] class BooleanSubscription private[scala] (boolean: rx.subscriptions.BooleanSubscription)
extends Subscription {
- override val asJavaSubscription: rx.subscriptions.BooleanSubscription = new rx.subscriptions.BooleanSubscription() {
+ override val asJavaSubscription: rx.subscriptions.BooleanSubscription = boolean
+}
+
+/*
+new rx.subscriptions.BooleanSubscription() {
override def unsubscribe(): Unit = {
if(unsubscribed.compareAndSet(false, true)) {
if(!boolean.isUnsubscribed) { boolean.unsubscribe() }
@@ -35,4 +39,4 @@ private [scala] class BooleanSubscription private[scala] (boolean: rx.subscripti
}
override def isUnsubscribed(): Boolean = unsubscribed.get() || boolean.isUnsubscribed
}
-}
+ */
\ No newline at end of file
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala
index 8fa87c3ff5..1cbb0f8d6d 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala
@@ -47,7 +47,7 @@ class MultipleAssignmentSubscription private[scala] (override val asJavaSubscrip
/**
* Gets the underlying subscription.
*/
- def subscription: Subscription = Subscription(asJavaSubscription.getSubscription)
+ def subscription: Subscription = Subscription(asJavaSubscription.get)
/**
* Gets the underlying subscription
@@ -55,7 +55,7 @@ class MultipleAssignmentSubscription private[scala] (override val asJavaSubscrip
* @return the [[rx.lang.scala.subscriptions.MultipleAssignmentSubscription]] itself.
*/
def subscription_=(that: Subscription): this.type = {
- asJavaSubscription.setSubscription(that.asJavaSubscription)
+ asJavaSubscription.set(that.asJavaSubscription)
this
}
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala
index d845b7e5cd..77b52e3b5f 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala
@@ -41,10 +41,10 @@ class SerialSubscription private[scala] (override val asJavaSubscription: rx.sub
override def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed
def subscription_=(value: Subscription): this.type = {
- asJavaSubscription.setSubscription(value.asJavaSubscription)
+ asJavaSubscription.set(value.asJavaSubscription)
this
}
- def subscription: Subscription = Subscription(asJavaSubscription.getSubscription)
+ def subscription: Subscription = Subscription(asJavaSubscription.get)
}
diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java
index 6f3f51f12d..4ce54dedf6 100644
--- a/rxjava-core/src/main/java/rx/Observable.java
+++ b/rxjava-core/src/main/java/rx/Observable.java
@@ -6793,7 +6793,7 @@ public void onNext(T args) { }
* @see RxJava Wiki: doOnNext()
* @see MSDN: Observable.Do
*/
- public Observable doOnNext(final Action1 onNext) {
+ public Observable doOnNext(final Action1 super T> onNext) {
Observer observer = new Observer() {
@Override
public void onCompleted() { }
@@ -6822,7 +6822,7 @@ public void onNext(T args) {
* @see RxJava Wiki: doOnEach()
* @see MSDN: Observable.Do
*/
- public Observable doOnEach(final Action1> onNotification) {
+ public Observable doOnEach(final Action1> onNotification) {
Observer observer = new Observer() {
@Override
public void onCompleted() {
diff --git a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java
index aeaa7d5103..ebe26dacba 100644
--- a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java
+++ b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java
@@ -50,10 +50,10 @@ private static List> toLists(Observable> observables)
final List> lists = new ArrayList>();
Observable.concat(observables.map(new Func1, Observable>>() {
@Override
- public Observable> call(Observable xs) {
- return xs.toList();
- }
- })).toBlockingObservable().forEach(new Action1>() {
+ public Observable> call(Observable xs) { return xs.toList(); }
+ }))
+ .toBlockingObservable()
+ .forEach(new Action1>() {
@Override
public void call(List xs) {
lists.add(xs);