Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a processor with backpressure #5999

Closed
nicolaferraro opened this issue May 7, 2018 · 4 comments
Closed

Add a processor with backpressure #5999

nicolaferraro opened this issue May 7, 2018 · 4 comments

Comments

@nicolaferraro
Copy link

I'm working on a rxjava 2 integration with Apache Camel and I've found difficult to find a processor supporting backpressure.

All processors in https://github.com/ReactiveX/RxJava/tree/2.x/src/main/java/io/reactivex/processors (except SerializedProcessor that has a special purpose) do the following call during onSubscribe:

    @Override
    public void onSubscribe(Subscription s) {
        // ...
        s.request(Long.MAX_VALUE); // <-- i.e. no backpressure
    }

So if we put any of those processor between a publisher and a subscriber, there's no way to slow down the publisher in case the subscriber is slow.

We need a processor because our API expect that publishers and subscribers connect to the library in independent moments. First the publisher then the subscriber(s) or the opposite, both scenarios are allowed.

So we've used with other implementations a backpressure-aware "connector" in the middle: the connector is subscribed to the publisher when the publisher is available and the subscriber subscribe to the connector when the subscriber is available. When both are connected the flow starts.

I don't know if there there is a way to create such a backpressure-aware "connector" in rx-java 2, but doesn't seem so.

@akarnokd
Copy link
Member

akarnokd commented May 7, 2018

Hi. The standard RxJava 2 Processors are all unbounded-in, but there exists the MulticastProcessor in the extensions project that does support backpressure coordination between its upstream and its downstream Subscribers.

@nicolaferraro
Copy link
Author

Seems to be perfect for our case. Thanks.

Any chance that it gets included in the main lib?

@akarnokd
Copy link
Member

akarnokd commented May 8, 2018

Probably.

@nicolaferraro
Copy link
Author

Thanks a lot 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants