-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix bug in zipWithIndex
and set zip(that, selector)
public in RxScala
#1226
Conversation
RxJava-pull-requests #1132 SUCCESS |
Fix bug in `zipWithIndex` and set `zip(that, selector)` public in RxScala
Here are some tests that I did on commit 1e07ccc : // passes
@Test def testZipWithIndex2() {
val o = Observable.interval(100 millis).map(_ * 100).take(3).zipWithIndex.map(_._2)
assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
}
// 100% CPU usage on all cores until I abort the test
@Test def testZipWithIndex3() {
val o = Observable.interval(100 millis).map(_ * 100).take(3).doOnEach(println(_)).zipWithIndex.map(_._2)
assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
}
// fails (as expected)
// java.lang.AssertionError: expected:<List(0, 1, 2)> but was:<List(0, 100, 200)>
@Test def testZipWithIndex4() {
val o = Observable.interval(100 millis).map(_ * 100).take(3).doOnEach(println(_))
assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
} I don't know exactly what's happening, but it looks like something weird's going on with |
Weird. Can you try to run tests in gradle CLI? |
My test was on 566e892 |
The problem is that as soon as we have several threads, everything becomes non-deterministic... I made another test: @Test def testZipWithIndex5() {
for (i <- 1 to 10) {
val start = System.currentTimeMillis()
val o = Observable.interval(100 millis).map(_ * 100).take(3).zipWithIndex.map(_._2)
assertEquals(List(0, 1, 2), o.subscribeOn(NewThreadScheduler()).toBlockingObservable.toList)
println(s"iteration took ${System.currentTimeMillis() - start} ms")
}
} And I ran it on 566e892, in gradle CLI using the command
The output I got was this:
As you can see, usually it takes about 3 * 100 millis, as expected, but sometimes it takes much longer. I imagine this might be because the implementation of zipWithIndex feeds a lot of unused numbers to the zip Observer before the thread emitting onComplete and unsubscribing from the (0 until Int.MaxValue) Observable can do its job. And if you replace the body of your zipWithIndex implementation by zip((0 until Int.MaxValue).toObservable.doOnEach(println(_))) you can see that this is indeed the case (I get thousands of numbers printed). |
Agreed. Maybe the best solution is getting |
Or maybe |
Or add def zip[U](other: Iterable[U]): Observable[(T, U)] implemented using public final <T2, R> Observable<R> zip(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) and then use it to implement |
zipWithIndex
is mutable, which make the Observable cannot be reused. Fixed it usingzip((0 until Int.MaxValue).toObservable)
.zip(that, selector)
public and rename tozipWith
. RxScala zip #1189/cc @headinthebox, @samuelgruetter