Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add drop(skip) and dropRight(skipLast) to rxscala #1056

Merged
merged 1 commit into from
Apr 21, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,32 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(1, List(1).toObservable.toBlockingObservable.single)
}

@Test def dropExample() {
val o = List(1, 2, 3, 4).toObservable
assertEquals(List(3, 4), o.drop(2).toBlockingObservable.toList)
}

@Test def dropWithTimeExample() {
val o = List(1, 2, 3, 4).toObservable.zip(
Observable.interval(500 millis, IOScheduler())).map(_._1) // emit every 500 millis
println(
o.drop(1250 millis, IOScheduler()).toBlockingObservable.toList // output List(3, 4)
)
}

@Test def dropRightExample() {
val o = List(1, 2, 3, 4).toObservable
assertEquals(List(1, 2), o.dropRight(2).toBlockingObservable.toList)
}

@Test def dropRightWithTimeExample() {
val o = List(1, 2, 3, 4).toObservable.zip(
Observable.interval(500 millis, IOScheduler())).map(_._1) // emit every 500 millis
println(
o.dropRight(750 millis, IOScheduler()).toBlockingObservable.toList // output List(1, 2)
)
}

def square(x: Int): Int = {
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
Thread.sleep(100) // calculating a square is heavy work :)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,35 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.skip(n))
}

/**
* Returns an Observable that drops values emitted by the source Observable before a specified time window
* elapses.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skip.t.png">
*
* @param time the length of the time window to drop
* @return an Observable that drops values emitted by the source Observable before the time window defined
* by `time` elapses and emits the remainder
*/
def drop(time: Duration): Observable[T] = {
toScalaObservable(asJavaObservable.skip(time.length, time.unit))
}

/**
* Returns an Observable that drops values emitted by the source Observable before a specified time window
* elapses.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skip.t.png">
*
* @param time the length of the time window to drop
* @param scheduler the `Scheduler` on which the timed wait happens
* @return an Observable that drops values emitted by the source Observable before the time window defined
* by `time` elapses and emits the remainder
*/
def drop(time: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable(asJavaObservable.skip(time.length, time.unit, scheduler))
}

/**
* Returns an Observable that bypasses all items from the source Observable as long as the specified
* condition holds true. Emits all further source items as soon as the condition becomes false.
Expand All @@ -1246,6 +1275,58 @@ trait Observable[+T]
toScalaObservable(asJavaObservable.skipWhile(predicate))
}

/**
* Returns an Observable that drops a specified number of items from the end of the sequence emitted by the
* source Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.png">
* <p>
* This Observer accumulates a queue long enough to store the first `n` items. As more items are
* received, items are taken from the front of the queue and emitted by the returned Observable. This causes
* such items to be delayed.
*
* @param n number of items to drop from the end of the source sequence
* @return an Observable that emits the items emitted by the source Observable except for the dropped ones
* at the end
* @throws IndexOutOfBoundsException if `n` is less than zero
*/
def dropRight(n: Int): Observable[T] = {
toScalaObservable(asJavaObservable.skipLast(n))
}

/**
* Returns an Observable that drops items emitted by the source Observable during a specified time window
* before the source completes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.t.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
*
* @param time the length of the time window
* @return an Observable that drops those items emitted by the source Observable in a time window before the
* source completes defined by `time`
*/
def dropRight(time: Duration): Observable[T] = {
toScalaObservable(asJavaObservable.skipLast(time.length, time.unit))
}

/**
* Returns an Observable that drops items emitted by the source Observable during a specified time window
* (defined on a specified scheduler) before the source completes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.ts.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
*
* @param time the length of the time window
* @param scheduler the scheduler used as the time source
* @return an Observable that drops those items emitted by the source Observable in a time window before the
* source completes defined by `time` and `scheduler`
*/
def dropRight(time: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable(asJavaObservable.skipLast(time.length, time.unit, scheduler))
}

/**
* Returns an Observable that emits only the first `num` items emitted by the source
* Observable.
Expand Down
6 changes: 5 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5566,6 +5566,8 @@ public final Observable<T> skipLast(int count) {
* before the source completes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.t.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
*
* @param time
* the length of the time window
Expand All @@ -5585,7 +5587,9 @@ public final Observable<T> skipLast(long time, TimeUnit unit) {
* (defined on a specified scheduler) before the source completes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.ts.png">
*
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
*
* @param time
* the length of the time window
* @param unit
Expand Down