Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupBy with flatMap and observeOn does not seem to support backpressure in 1.2.x #5029

Closed
sbarlabanov opened this issue Jan 27, 2017 · 9 comments

Comments

@sbarlabanov
Copy link

sbarlabanov commented Jan 27, 2017

There are a plenty of issues fixing some sorts of backpressure problems in groupBy (e.g. #3428, #3425).
But nevertheless when I try the following code it requests all 10000 values from the source at once without waiting for processing of single items (example running with rxjava 1.2.x):

import rx.Observable;
import rx.schedulers.Schedulers;

import java.util.Random;

import static rx.Observable.range;

public class GroupByTest {
    public static void main(String[] args) throws Exception {
        Observable<Integer> source = range(1, 10000);
        source
                .doOnNext(i -> System.out.println("Requested " + i))
                .groupBy(v -> v % 5)
                .flatMap(g -> g.observeOn(Schedulers.io()).map(GroupByTest::calculation))
                .subscribe(i -> System.out.println("Got " + i));
        Thread.sleep(100000);
    }

    private static Integer calculation(Integer i) {
        sleep();
        System.out.println("Processing " + i);
        return i * 20;
    }

    private static void sleep() {
        try {
            Thread.sleep(new Random().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

When running the code I see 10000 "Requested X" messages getting printed immediately and afterwards messages "Processing X" coming after some timeout.
With this behavior it does not seem to be possible to parallelize processing of single groups in rxjava in any useful way when working with large or unlimited sources.

@sbarlabanov sbarlabanov changed the title GroupBy with FlatMap and observeOn does not seem to support backpressure in 1.2.x. GroupBy with FlatMap and observeOn does not seem to support backpressure in 1.2.x and 2.x Jan 27, 2017
@sbarlabanov sbarlabanov changed the title GroupBy with FlatMap and observeOn does not seem to support backpressure in 1.2.x and 2.x GroupBy with flatMap and observeOn does not seem to support backpressure in 1.2.x and 2.x Jan 27, 2017
@sbarlabanov sbarlabanov changed the title GroupBy with flatMap and observeOn does not seem to support backpressure in 1.2.x and 2.x GroupBy with flatMap and observeOn does not seem to support backpressure in 1.2.x Jan 27, 2017
@sbarlabanov
Copy link
Author

It does work correctly with rxjava 2.x Flowable.

@akarnokd
Copy link
Member

For historical reasons, flatMap in 1.x requests an unbounded amount of source elements for its 1 argument overload. There is another overload that takes a maxConcurrent parameter with which you can limit the number of active inner sources.

@sbarlabanov
Copy link
Author

I know ;). But it does not work either :(. If I add maxConcurrent (1 or 2 or 3 etc.) to flatMap after groupBy the behavior stays the same - all 10000 items are requested at once.

    public static void main(String[] args) throws Exception {
        Observable<Integer> source = range(1, 10000);
        source
                .doOnNext(i -> System.out.println("Requested " + i))
                .groupBy(v -> v % 5)
                .flatMap(g -> g.observeOn(Schedulers.io()).map(GroupByTest::calculation), 4) // <---- maxConcurrent
                .subscribe(i -> System.out.println("Got " + i));
        Thread.sleep(100000);
    }

@sbarlabanov
Copy link
Author

In RxJava 1.x there is OperatorGroupByTest which tests some backpressure scenarios.
If I adjust https://github.com/ReactiveX/RxJava/blob/1.x/src/test/java/rx/internal/operators/OperatorGroupByTest.java#L1027 by putting doOnNext with System.out.println I observe the same behavior as I described above - all items are requested at once.
So OperatorGroupByTest#testGroupByBackpressure does not test any backpressure ;).

@akarnokd
Copy link
Member

OperatorGroupBy L247 keeps requesting from upstream and thus consumes it in an unbounded manner. Would you like to work out a fix?

@akarnokd
Copy link
Member

This is a bug. Fix posted in #5030.

@sbarlabanov
Copy link
Author

Looks good. Your fix seems to work :).
I tried my examples from above and also tested in a more complex scenario from a customer project (consuming from Kafka, grouping by partition and processing every partition in its own thread).
Is any action required from my side?

@akarnokd
Copy link
Member

Thanks for verifying the fix. No further action is necessary of you. There are a few pending contributions targeting 1.x but I'll try to release 1.2.6 next Friday.

@akarnokd
Copy link
Member

Closing via #5030

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants