Skip to content

Commit

Permalink
Merge pull request #1721 from abersnaze/onBackpressure-request-max-0.20
Browse files Browse the repository at this point in the history
Bug in the onBackpressure operators
  • Loading branch information
benjchristensen committed Oct 4, 2014
2 parents 0e80124 + b36d833 commit ab85374
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public void request(long n) {
// don't pass through subscriber as we are async and doing queue draining
// a parent being unsubscribed should not affect the children
Subscriber<T> parent = new Subscriber<T>() {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public void request(long n) {

});
return new Subscriber<T>(child) {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

import static org.junit.Assert.assertEquals;

import java.util.concurrent.CountDownLatch;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;

public class OperatorOnBackpressureDropTest {

@Test
Expand All @@ -42,6 +43,13 @@ public void testNoBackpressureSupport() {
ts.assertNoErrors();
}

@Test(timeout = 500)
public void testWithObserveOn() throws InterruptedException {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.range(0, RxRingBuffer.SIZE * 10).onBackpressureDrop().onBackpressureDrop().observeOn(Schedulers.io()).subscribe(ts);
ts.awaitTerminalEvent();
}

@Test(timeout = 500)
public void testFixBackpressureWithBuffer() throws InterruptedException {
final CountDownLatch l1 = new CountDownLatch(100);
Expand Down

0 comments on commit ab85374

Please sign in to comment.