From 6b4d8f57a03c06214cd1d516b66e4301dfb886cc Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 20 Apr 2014 21:05:58 +0800 Subject: [PATCH] Add drop and dropRight to rxscala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 26 ++++++ .../main/scala/rx/lang/scala/Observable.scala | 81 +++++++++++++++++++ rxjava-core/src/main/java/rx/Observable.java | 6 +- 3 files changed, 112 insertions(+), 1 deletion(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index e5746fc2a7..8d04e9f053 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -400,6 +400,32 @@ class RxScalaDemo extends JUnitSuite { assertEquals(1, List(1).toObservable.toBlockingObservable.single) } + @Test def dropExample() { + val o = List(1, 2, 3, 4).toObservable + assertEquals(List(3, 4), o.drop(2).toBlockingObservable.toList) + } + + @Test def dropWithTimeExample() { + val o = List(1, 2, 3, 4).toObservable.zip( + Observable.interval(500 millis, IOScheduler())).map(_._1) // emit every 500 millis + println( + o.drop(1250 millis, IOScheduler()).toBlockingObservable.toList // output List(3, 4) + ) + } + + @Test def dropRightExample() { + val o = List(1, 2, 3, 4).toObservable + assertEquals(List(1, 2), o.dropRight(2).toBlockingObservable.toList) + } + + @Test def dropRightWithTimeExample() { + val o = List(1, 2, 3, 4).toObservable.zip( + Observable.interval(500 millis, IOScheduler())).map(_._1) // emit every 500 millis + println( + o.dropRight(750 millis, IOScheduler()).toBlockingObservable.toList // output List(1, 2) + ) + } + def square(x: Int): Int = { println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}") Thread.sleep(100) // calculating a square is heavy work :) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 56171b4853..4595127bde 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1231,6 +1231,35 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.skip(n)) } + /** + * Returns an Observable that drops values emitted by the source Observable before a specified time window + * elapses. + * + * + * + * @param time the length of the time window to drop + * @return an Observable that drops values emitted by the source Observable before the time window defined + * by `time` elapses and emits the remainder + */ + def drop(time: Duration): Observable[T] = { + toScalaObservable(asJavaObservable.skip(time.length, time.unit)) + } + + /** + * Returns an Observable that drops values emitted by the source Observable before a specified time window + * elapses. + * + * + * + * @param time the length of the time window to drop + * @param scheduler the `Scheduler` on which the timed wait happens + * @return an Observable that drops values emitted by the source Observable before the time window defined + * by `time` elapses and emits the remainder + */ + def drop(time: Duration, scheduler: Scheduler): Observable[T] = { + toScalaObservable(asJavaObservable.skip(time.length, time.unit, scheduler)) + } + /** * Returns an Observable that bypasses all items from the source Observable as long as the specified * condition holds true. Emits all further source items as soon as the condition becomes false. @@ -1246,6 +1275,58 @@ trait Observable[+T] toScalaObservable(asJavaObservable.skipWhile(predicate)) } + /** + * Returns an Observable that drops a specified number of items from the end of the sequence emitted by the + * source Observable. + *

+ * + *

+ * This Observer accumulates a queue long enough to store the first `n` items. As more items are + * received, items are taken from the front of the queue and emitted by the returned Observable. This causes + * such items to be delayed. + * + * @param n number of items to drop from the end of the source sequence + * @return an Observable that emits the items emitted by the source Observable except for the dropped ones + * at the end + * @throws IndexOutOfBoundsException if `n` is less than zero + */ + def dropRight(n: Int): Observable[T] = { + toScalaObservable(asJavaObservable.skipLast(n)) + } + + /** + * Returns an Observable that drops items emitted by the source Observable during a specified time window + * before the source completes. + *

+ * + * + * Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable. + * + * @param time the length of the time window + * @return an Observable that drops those items emitted by the source Observable in a time window before the + * source completes defined by `time` + */ + def dropRight(time: Duration): Observable[T] = { + toScalaObservable(asJavaObservable.skipLast(time.length, time.unit)) + } + + /** + * Returns an Observable that drops items emitted by the source Observable during a specified time window + * (defined on a specified scheduler) before the source completes. + *

+ * + * + * Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable. + * + * @param time the length of the time window + * @param scheduler the scheduler used as the time source + * @return an Observable that drops those items emitted by the source Observable in a time window before the + * source completes defined by `time` and `scheduler` + */ + def dropRight(time: Duration, scheduler: Scheduler): Observable[T] = { + toScalaObservable(asJavaObservable.skipLast(time.length, time.unit, scheduler)) + } + /** * Returns an Observable that emits only the first `num` items emitted by the source * Observable. diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 297d715318..0c7e64802e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -5566,6 +5566,8 @@ public final Observable skipLast(int count) { * before the source completes. *

* + * + * Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable. * * @param time * the length of the time window @@ -5585,7 +5587,9 @@ public final Observable skipLast(long time, TimeUnit unit) { * (defined on a specified scheduler) before the source completes. *

* - * + * + * Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable. + * * @param time * the length of the time window * @param unit