Skip to content

Commit

Permalink
Merge pull request #4 from zsxwing/pr166
Browse files Browse the repository at this point in the history
Update the signature of flatMapWith and fix the Subject doc
  • Loading branch information
jbripley committed May 29, 2015
2 parents b48ccc7 + 6d329f0 commit faa17cf
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ object ExperimentalAPIExamples {
@Test def flatMapWithMaxConcurrentExample(): Unit = {
(1 to 1000000).toObservable
.doOnNext(v => println(s"Emitted Value: $v"))
.flatMap(maxConcurrent = 10, (v: Int) => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()))
.flatMap(maxConcurrent = 10, v => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()))
.toBlocking.foreach(v => System.out.println("Received: " + v))
}

Expand All @@ -137,13 +137,23 @@ object ExperimentalAPIExamples {
.doOnNext(v => println(s"Emitted Value: $v"))
.flatMap(
maxConcurrent = 10,
(v: Int) => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
v => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
e => Observable.just(-1).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
() => Observable.just(Int.MaxValue).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler())
)
.toBlocking.foreach(v => System.out.println("Received: " + v))
}

@Test def flatMapWithMaxConcurrentExample3() {
(1 to 1000000).toObservable
.doOnNext(v => println(s"Emitted Value: $v"))
.flatMapWith(
maxConcurrent = 10,
v => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler())
)(_ * _).subscribeOn(IOScheduler())
.toBlocking.foreach(v => System.out.println("Received: " + v))
}

@Test def onBackpressureDropDoExample(): Unit = {
Observable[Int](subscriber => {
(1 to 200).foreach(subscriber.onNext)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ trait Observable[+T]
* source Observable and the collection Observable
*/
@Beta
def flatMapWith[U, R](maxConcurrent: Int)(collectionSelector: T => Observable[U])(resultSelector: (T, U) => R): Observable[R] = {
def flatMapWith[U, R](maxConcurrent: Int, collectionSelector: T => Observable[U])(resultSelector: (T, U) => R): Observable[R] = {
val jCollectionSelector = new Func1[T, rx.Observable[_ <: U]] {
override def call(t: T): rx.Observable[_ <: U] = collectionSelector(t).asJavaObservable
}
Expand Down
57 changes: 33 additions & 24 deletions src/main/scala/rx/lang/scala/Subject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,67 +65,76 @@ trait Subject[T] extends Observable[T] with Observer[T] {
}

/**
* $experimental Check if the Subject has terminated with an exception.
* <p>The operation is threadsafe.
* $experimental Check if the [[Subject]] has terminated with an exception.
*
* @return `true` if the subject has received a throwable through { @code onError}.
* The operation is threadsafe.
*
* @return `true` if the [[Subject]] has received a throwable through `onError`.
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
def hasThrowable: Boolean = asJavaSubject.hasThrowable

/**
* $experimental Check if the Subject has terminated normally.
* <p>The operation is threadsafe.
* $experimental Check if the [[Subject]] has terminated normally.
*
* The operation is threadsafe.
*
* @return `true` if the subject completed normally via { @code onCompleted}
* @return `true` if the [[Subject]] completed normally via `onCompleted`
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
def hasCompleted: Boolean = asJavaSubject.hasCompleted

/**
* $experimental Returns the Throwable that terminated the Subject.
* <p>The operation is threadsafe.
* $experimental Returns the `Throwable` that terminated the [[Subject]].
*
* The operation is threadsafe.
*
* @return the Throwable that terminated the Subject or { @code null} if the subject hasn't terminated yet or
* if it terminated normally.
* @return the `Throwable` that terminated the [[Subject]] or `null` if the subject hasn't terminated yet or
* if it terminated normally.
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
def getThrowable: Throwable = asJavaSubject.getThrowable

/**
* $experimental Check if the Subject has any value.
* <p>Use the `#getValue()` method to retrieve such a value.
* <p>Note that unless `#hasCompleted()` or `#hasThrowable()` returns true, the value
* retrieved by `getValue()` may get outdated.
* <p>The operation is threadsafe.
* $experimental Check if the [[Subject]] has any value.
*
* Use the [[Subject.getValue]] method to retrieve such a value.
*
* Note that unless [[Subject.hasCompleted]] or [[Subject.hasThrowable]] returns true, the value
* retrieved by [[Subject.getValue]] may get outdated.
*
* @return { @code true} if and only if the subject has some value but not an error
* The operation is threadsafe.
*
* @return `true` if and only if the [[Subject]] has some value but not an error
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
def hasValue: Boolean = asJavaSubject.hasValue

/**
* $experimental Returns the current or latest value of the Subject if there is such a value and
* $experimental Returns the current or latest value of the [[Subject]] if there is such a value and
* the subject hasn't terminated with an exception.
* <p>The method can return `null` for various reasons. Use `#hasValue()`, `#hasThrowable()`
* and `#hasCompleted()` to determine if such `null` is a valid value, there was an
* exception or the Subject terminated without receiving any value.
* <p>The operation is threadsafe.
*
* @return the current value or { @code null} if the Subject doesn't have a value, has terminated with an
* exception or has an actual { @code null} as a value.
* The method can return `null` for various reasons. Use [[Subject.hasValue]], [[Subject.hasThrowable]]
* and [[Subject.hasCompleted]] to determine if such `null` is a valid value, there was an
* exception or the [[Subject]] terminated without receiving any value.
*
* The operation is threadsafe.
*
* @return the current value or `null` if the [[Subject]] doesn't have a value, has terminated with an
* exception or has an actual `null` as a value.
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
def getValue: T = asJavaSubject.getValue.asInstanceOf[T]

/**
* $experimental Returns a snapshot of the currently buffered non-terminal events.
* <p>The operation is threadsafe.
*
* The operation is threadsafe.
*
* @return a snapshot of the currently buffered non-terminal events.
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/rx/lang/scala/CompletenessTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class CompletenessTest extends JUnitSuite {
"limit(Int)" -> "take(Int)",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapWith(T => Observable[U])((T, U) => R)",
"flatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapIterableWith(T => Iterable[U])((T, U) => R)",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R], Int)" -> "flatMapWith(Int)(T => Observable[U])((T, U) => R)",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R], Int)" -> "flatMapWith(Int, T => Observable[U])((T, U) => R)",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R])",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Func1[_ >: Throwable, _ <: Observable[_ <: R]], Func0[_ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R], Throwable => Observable[R], () => Observable[R])",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "groupBy(T => K, T => V)",
Expand Down

0 comments on commit faa17cf

Please sign in to comment.