Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxScala: Add more operators to match RxJava #1623

Merged
merged 1 commit into from
Aug 26, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,18 @@ class RxScalaDemo extends JUnitSuite {
waitFor(Olympics.yearTicks)
}

@Test def groupByExample2() {
val medalByYear = Olympics.mountainBikeMedals.groupBy(medal => medal.year, medal => medal.country)

for ((year, countries) <- medalByYear; country <- countries) {
println(s"${year}: ${country}")
}

Olympics.yearTicks.subscribe(year => println(s"\nYear $year starts."))

waitFor(Olympics.yearTicks)
}

@Test def groupByUntilExample() {
val numbers = Observable.interval(250 millis).take(14)
val grouped = numbers.groupByUntil(x => x % 2){ case (key, obs) => obs.filter(x => x == 7) }
Expand Down Expand Up @@ -1510,4 +1522,104 @@ class RxScalaDemo extends JUnitSuite {
o.take(3).toBlocking.foreach(println)
}

@Test def collectExample() {
val o = Observable.just(1, 1.0, "a", 2, 2.0, "b")
o.collect { case s: String => "Item: " + s }.foreach(println(_))
}

@Test def usingExample() {
import scala.io.{Codec, Source}

Observable.using { new java.net.URL("http://rxscala.github.io/").openStream() }(
input => Source.fromInputStream(input)(Codec.UTF8).getLines().toList.toObservable,
input => input.close
).foreach(println(_))
}

def createFastObservable: Observable[Int] = {
Observable {
subscriber: Subscriber[Int] => {
(0 to 2000).takeWhile(_ => !subscriber.isUnsubscribed).foreach(subscriber.onNext(_))
subscriber.onCompleted()
}
}
}

@Test def withoutBackpressureExample() {
val o = createFastObservable
val l = new CountDownLatch(1)
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
override def onStart() {
request(1)
}

override def onNext(n: Int) {
println(n)
Thread.sleep(10) // emulate a slow subscriber
request(1)
}

override def onError(e: Throwable) {
e.printStackTrace()
l.countDown()
}

override def onCompleted() {
l.countDown()
}
})
l.await()
}

@Test def onBackpressureDropExample() {
val o = createFastObservable.onBackpressureDrop
val l = new CountDownLatch(1)
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
override def onStart() {
request(1)
}

override def onNext(n: Int) {
println(n)
Thread.sleep(10) // emulate a slow subscriber
request(1)
}

override def onError(e: Throwable) {
e.printStackTrace()
l.countDown()
}

override def onCompleted() {
l.countDown()
}
})
l.await()
}

@Test def onBackpressureBufferExample() {
val o = createFastObservable.onBackpressureBuffer
val l = new CountDownLatch(1)
o.observeOn(NewThreadScheduler()).subscribe(new Subscriber[Int] {
override def onStart() {
request(1)
}

override def onNext(n: Int) {
println(n)
Thread.sleep(10) // emulate a slow subscriber
request(1)
}

override def onError(e: Throwable) {
e.printStackTrace()
l.countDown()
}

override def onCompleted() {
l.countDown()
}
})
l.await()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,12 @@ object JavaConversions {
}
}
}

implicit def toJavaTransformer[T, R](transformer: Observable[T] => Observable[R]): rx.Observable.Transformer[T, R] = {
new rx.Observable.Transformer[T, R] {
override def call(o: rx.Observable[_ <: T]): rx.Observable[R] = {
transformer(toScalaObservable(o)).asJavaObservable.asInstanceOf[rx.Observable[R]]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,38 @@ trait Observable[+T]
: Observable[Observable[T]] // SI-7818
}

/**
* Returns an Observable that emits windows of items it collects from the source Observable. The resulting
* Observable starts a new window periodically, as determined by the `timeshift` argument or a maximum
* size as specified by the `count` argument (whichever is reached first). It emits
* each window after a fixed timespan, specified by the `timespan` argument. When the source
* Observable completes or Observable completes or encounters an error, the resulting Observable emits the
* current window and propagates the notification from the source Observable.
*
* <img width="640" height="335" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window7.s.png" alt="">
*
* ===Backpressure Support:===
* This operator does not support backpressure as it uses time to control data flow.
*
* ===Scheduler:===
* you specify which `Scheduler` this operator will use
*
* @param timespan the period of time each window collects items before it should be emitted
* @param timeshift the period of time after which a new window will be created
* @param count the maximum size of each window before it should be emitted
* @param scheduler the `Scheduler` to use when determining the end and start of a window
* @return an Observable that emits new windows periodically as a fixed timespan elapses
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window">RxJava wiki: window</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window.aspx">MSDN: Observable.Window</a>
*/
def sliding(timespan: Duration, timeshift: Duration, count: Int, scheduler: Scheduler): Observable[Observable[T]] = {
val span: Long = timespan.length
val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit)
val unit: TimeUnit = timespan.unit
Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(span, shift, unit, count, scheduler))
: Observable[Observable[T]] // SI-7818
}

/**
* Returns an Observable which only emits those items for which a given predicate holds.
*
Expand Down Expand Up @@ -1577,6 +1609,41 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.cache())
}

/**
* Caches emissions from the source Observable and replays them in order to any subsequent Subscribers.
* This method has similar behavior to [[Observable.replay]] except that this auto-subscribes to the source
* Observable rather than returning a [[ConnectableObservable]] for which you must call
* `connect` to activate the subscription.
* <p>
* <img width="640" height="410" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/cache.png" alt="">
* <p>
* This is useful when you want an Observable to cache responses and you can't control the
* `subscribe/unsubscribe` behavior of all the [[Subscriber]]s.
* <p>
* When you call `cache`, it does not yet subscribe to the source Observable and so does not yet
* begin cacheing items. This only happens when the first Subscriber calls the resulting Observable's
* `subscribe` method.
* <p>
* <em>Note:</em> You sacrifice the ability to unsubscribe from the origin when you use the `cache`
* Observer so be careful not to use this Observer on Observables that emit an infinite or very large number
* of items that will use up memory.
*
* ===Backpressure Support:===
* This operator does not support upstream backpressure as it is purposefully requesting and caching everything emitted.
*
* ===Scheduler:===
* `cache` does not operate by default on a particular `Scheduler`.
*
* @param capacity hint for number of items to cache (for optimizing underlying data structure)
* @return an Observable that, when first subscribed to, caches all of its items and notifications for the
* benefit of subsequent subscribers
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#cache">RxJava wiki: cache</a>
* @since 0.20
*/
def cache(capacity: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.cache(capacity))
}

/**
* Returns a new [[Observable]] that multicasts (shares) the original [[Observable]]. As long a
* there is more than 1 [[Subscriber]], this [[Observable]] will be subscribed and emitting data.
Expand Down Expand Up @@ -2176,6 +2243,41 @@ trait Observable[+T]
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
}

/**
* Groups the items emitted by an [[Observable]] according to a specified criterion, and emits these
* grouped items as `(key, observable)` pairs.
*
* <img width="640" height="360" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png" alt="">
*
* Note: A `(key, observable)` will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* `(key, observable)` pairs that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like `take(0)` to them.
*
* ===Backpressure Support:===
* This operator does not support backpressure as splitting a stream effectively turns it into a "hot observable"
* and blocking any one group would block the entire parent stream. If you need backpressure on individual groups
* then you should use operators such as `nBackpressureDrop` or `@link #onBackpressureBuffer`.</dd>
* ===Scheduler:===
* groupBy` does not operate by default on a particular `Scheduler`.
*
* @param keySelector a function that extracts the key for each item
* @param valueSelector a function that extracts the return element for each item
* @tparam K the key type
* @tparam V the value type
* @return an [[Observable]] that emits `(key, observable)` pairs, each of which corresponds to a
* unique key value and each of which emits those items from the source Observable that share that
* key value
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupBy</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
*/
def groupBy[K, V](keySelector: T => K, valueSelector: T => V): Observable[(K, Observable[V])] = {
val jo: rx.Observable[rx.observables.GroupedObservable[K, V]] = asJavaObservable.groupBy[K, V](keySelector, valueSelector)
toScalaObservable[rx.observables.GroupedObservable[K, V]](jo).map {
go: rx.observables.GroupedObservable[K, V] => (go.getKey, toScalaObservable[V](go))
}
}

/**
* Groups the items emitted by this Observable according to a specified discriminator function and terminates these groups
* according to a function.
Expand Down Expand Up @@ -4298,6 +4400,75 @@ trait Observable[+T]
def nonEmpty: Observable[Boolean] = {
isEmpty.map(!_)
}

/**
* Transform an Observable by applying a particular Transformer function to it.
*
* This method operates on the Observable itself whereas [[Observable.lift]] operates on the Observable's
* Subscribers or Observers.
*
* If the operator you are creating is designed to act on the individual items emitted by a source
* Observable, use [[Observable.lift]]. If your operator is designed to transform the source Observable as a whole
* (for instance, by applying a particular set of existing RxJava operators to it) use `compose`.
*
* ===Scheduler:===
* `compose` does not operate by default on a particular [[Scheduler]].
*
* @param transformer implements the function that transforms the source Observable
* @return the source Observable, transformed by the transformer function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
def compose[R](transformer: Observable[T] => Observable[R]): Observable[R] = {
toScalaObservable[R](asJavaObservable.compose(toJavaTransformer(transformer)))
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer these
* items indefinitely until they can be emitted.
*
* <img width="640" height="300" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
*
* ===Scheduler:===
* `onBackpressureBuffer` does not operate by default on a particular `Scheduler`.
*
* @return the source Observable modified to buffer items to the extent system resources allow
* @see <a href="https://github.com/Netflix/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
*/
def onBackpressureBuffer: Observable[T] = {
toScalaObservable[T](asJavaObservable.onBackpressureBuffer)
}

/**
* Use this operator when the upstream does not natively support backpressure and you wish to drop
* `onNext` when unable to handle further events.
*
* <img width="640" height="245" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/bp.obp.drop.png" alt="">
*
* If the downstream request count hits 0 then `onNext` will be dropped until `request(long n)`
* is invoked again to increase the request count.
*
* ===Scheduler:===
* onBackpressureDrop` does not operate by default on a particular `Scheduler`.
*
* @return the source Observable modified to drop `onNext` notifications on overflow
* @see <a href="https://github.com/Netflix/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
*/
def onBackpressureDrop: Observable[T] = {
toScalaObservable[T](asJavaObservable.onBackpressureDrop)
}

/**
* Return a new [[Observable]] by applying a partial function to all elements of this [[Observable]]
* on which the function is defined.
*
* @tparam R the element type of the returned [[Observable]].
* @param pf the partial function which filters and maps the [[Observable]].
* @return a new [[Observable]] by applying a partial function to all elements of this [[Observable]]
* on which the function is defined.
*/
def collect[R](pf: PartialFunction[T, R]): Observable[R] = {
filter(pf.isDefinedAt(_)).map(pf)
}
}

/**
Expand Down Expand Up @@ -4746,6 +4917,7 @@ object Observable {
* @param observableFactory the factory function to obtain an Observable
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
*/
@deprecated("Use `using(=> Resource)(Resource => Observable[T], Resource => Unit)` instead", "0.20.1")
def using[T, Resource <: Subscription](resourceFactory: () => Resource, observableFactory: Resource => Observable[T]): Observable[T] = {
class ResourceSubscription(val resource: Resource) extends rx.Subscription {
def unsubscribe = resource.unsubscribe
Expand All @@ -4759,6 +4931,32 @@ object Observable {
))
}

/**
* Constructs an Observable that creates a dependent resource object.
*
* <img width="640" height="400" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/using.png" alt="" />
*
* ===Scheduler:===
* `using` does not operate by default on a particular `Scheduler`.
*
* @param resourceFactory the factory function to create a resource object that depends on the Observable.
* Note: this is a by-name parameter.
* @param observableFactory the factory function to create an Observable
* @param dispose the function that will dispose of the resource
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#using">RxJava wiki: using</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229585.aspx">MSDN: Observable.Using</a>
*/
def using[T, Resource](resourceFactory: => Resource)(observableFactory: Resource => Observable[T], dispose: Resource => Unit): Observable[T] = {
val jResourceFactory = new rx.functions.Func0[Resource] {
override def call: Resource = resourceFactory
}
val jObservableFactory = new rx.functions.Func1[Resource, rx.Observable[_ <: T]] {
override def call(r: Resource) = observableFactory(r).asJavaObservable
}
toScalaObservable[T](rx.Observable.using[T, Resource](jResourceFactory, jObservableFactory, dispose))
}

/**
* Mirror the one Observable in an Iterable of several Observables that first emits an item.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ private[scala] class WithFilter[+T] (p: T => Boolean, asJava: rx.Observable[_ <:
toScalaObservable[T](asJava.filter((x: T) => p(x) && q(x)))
}

// there is no foreach here, that's only available on BlockingObservable
def foreach(onNext: T => Unit): Unit = {
toScalaObservable[T](asJava.filter(p)).foreach(onNext)
}
}
Loading