Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API for Creating Safe Back Pressure Observables #3003

Closed
stealthcode opened this issue Jun 2, 2015 · 17 comments
Closed

API for Creating Safe Back Pressure Observables #3003

stealthcode opened this issue Jun 2, 2015 · 17 comments

Comments

@stealthcode
Copy link

The Problem:

Creating Back-Pressure Enabled Observables that produce data from a non-blocking source is hard and our current APIs for creating arbitrary observables do not lend towards creating observables that respect the back pressure rules. Users have a choice between calling one of the Observable static method to generate standard case observables using well known patterns or they can attempt to implement a responsible observable out of an OnSubscribe<T>. I will define a "responsible Observable" to be an observable that does the following:

  • calls subscriber.onNext(t) only when the subscriberCapacity > 0 where subscriberCapacity is the sum of the integer arguments for all invocations of producer.request(n) minus the number of calls made to subscriber.onNext(t).
  • calls subscriber.onCompleted() or subscriber.onError(e) only once.
  • does not call subscriber.onNext(t) concurrently.

The onSubscribe lifecycle of a responsible observable should fit within the following lifecycle.

  1. The onSubscribe.call(Subscriber<T>) method is invoked by the Observable.subscribe(Subscriber<T>) method.
  2. The OnSubscribe func creates a Producer via the subscriber.setProducer(Producer<T>) method. At some point in the (potentially distant) future the subscriber may call producer.request(n). This signals to the observable that capacity is allocated in the subscriber.
    3a. Values are produced via a call to subscriber.onNext(t).
    3b. At some point a terminal event is emitted via onCompleted or onError.

This allows for many different variations of control flow given that onNexts could possibly be scheduled, data could be pre-fetched, excess data could be batched or dropped, etc...

Proposal

With this ground work in mind I would like to start a discussion about a general framework that correctly models the various use cases, particularly with respects to the following requirements:

  • capable of paginating data across multiple requests
  • batching requests
  • hot and cold data sources
  • async and blocking onNexts

Naive Implementation

User provides callbacks that the framework will call between each of the steps above. That is

  1. Observable calls onSubscribe.call(s)
  2. The framework's onSubscribe calls: S dataState = onSubscribeGenerator.call()
  3. Next the onSubscribe calls: subscriber.setProducer(p) setting a framework Producer
  4. At some point the request is received by our producer p.request(n)
  5. Then the framework will call the user provided request callback and store the new resultant dataState for future requests. dataState = onRequestCallback.call(dataState, n, s)

Obvious short comings of this approach: Order may not be preserved across multiple requests if the onRequestCallback schedules the onNext. Also onNexts may be concurrent so the subscriber must be wrapped with a serializing subscriber. Also, a case could be made that many observables created should specify the backpressure strategy. Any thoughts are welcome.

@akarnokd
Copy link
Member

akarnokd commented Jun 2, 2015

What's wrong with AbstractOnSubscribe?

In addition, there is a PR #2813 which features a batching-capable producer-construction.

Also you can read my blog about Advanced RxJava which explains the concepts and shows examples about how one can build producers and thus operators and Observables.

@stealthcode
Copy link
Author

@akarnokd The purpose of this issue is to initiate a discussion and refine the concepts in AbstractOnSubscribe and the AbstractProducer. Both of these facilities are currently annotated Experimental. The goal here is to refine them to something that supports the various use cases in an approachable manner.

Some items that need some work on the AbstractOnSubscribe:

  • does not support batching multiple requests
  • generating data in a paginated way is difficult with the api since for each request there is an invocation of the user callback. i suspect this is where the phases are intended to help.
  • the decoupling of the onNext to the SubscriptionState and the onNext to the subscriber makes stacktraces and errors harder to reason about

The AbstractProducer is clearly a better approach to me. For every request(n) call there is a call to the onRequest callback. This makes it easier to paginate data and also to respond to requests asynchronously. However I think that async responses to a request would cause interleaving batches to be processed in the naive implementation. @benjchristensen and I have been discussing this problem and believe that the user would have to account for this by appending to a queue and then draining. This could probably be made first class for some use cases if that's what's needed for responsible observables from non-blocking sources.

@stealthcode stealthcode changed the title Framework for Creating Safe Back Pressure Observables API for Creating Safe Back Pressure Observables Jun 4, 2015
@stealthcode
Copy link
Author

@benjchristensen and I worked on the following examples and we would like input on the following APIs. We have broken apart this issue into what we think are 2 primary use cases. Firstly there is the case where data is generated asynchronously from a non-blocking source. Secondly there is an api for writing responsible observables from synchronous sources. This looks very similar to the AbstractOnSubscribe.

The async case is illustrated in code below. We are considering a system which calls your function every time there is a call to producer.request(n) and the user's function would return an Observable<T>. The returned observable would then be eagerly subscribed and concatted (to ensure ordering is maintained).

// to illustrate method signature in below example
class ObservableAsync {
    /**
     * Has an overload for handling request(Long.MAX_VALUE)
     * It is possible to chunk an infinite request and make a requestFiniteFunc
     * play nicely.
     *
     * Note: this assumes serialized request calls per subscriber. This means we 
     * will have to do thread stealing (essentially merge).
    */
    public static <T> Observable<T> createAsynchronous(
        Func0<S> generatorFunc,
        Func2<S, Long requested, Observable<T>> requestFiniteFunc,
        Func1<S, Observable<T>> requestInfiniteFunc,
        Action1<S> onTerminate) {}
}

// Data state class used in the below example
public class RestClientState<T> {
    private Client client = /* your favorite async client */ null;
    private Integer lastIndex = 0;
}

// ASYNC EXAMPLE
Observable.<T>createAsynchronous(
    () -> { new RestClientState<Foo>(url) },
    (RestClientState<Foo> state, Long requested) -> {
        int i = state.lastIndex;
        state.lastIndex = i + requested;
        Observable<Foo> obs = state.client.getRange(i, i+requested);
        return obs;
    },
    (RestClientState<Foo> state) -> {
        Observable<Foo> allOfThem = state.client.getAll();
        return allOfThem;
    },
    (RestClientState<Foo> state) -> {
        state.client.free();
    });

The synchronous case (below) is a modification from the existing designs embodied in the AbstractOnSubscrbe. The highlight of the changes are as follows.

  • Changed the SubscriptionState to be agnostic of the data state. This results in the handler passing in the datastate and returning a datastate.
  • Changed the data generator func to call onNext, onCompleted, and onError straight into an object that wraps the subscriber. This object would prevent calling onNext more than once per invocation of the user's func.
  • Generator func no longer has access to the Subscriber
class ObservableSync {
    public static <T> Observable<T> createSynchronous(
        Func0<S> generatorFunc, 
        Func2<S, Subscriber<T>, S> nextFunc, 
        Action1<S> onTerminate) {/*...*/}

    // Functional equivalent overload but no need to return
    public static <T> Observable<T> createSynchronousStateful(
        Func0<S> generatorFunc, 
        Action2<S, Subscriber<T>> nextFunc, 
        Action1<S> onTerminate) {/*...*/}
}

List<Foo> myList = null; // ....

// SYNC Example
Observable.<Foo>createSynchronous(
    () -> { myList.iterator() }, 
    (Iterator<Foo> state, Subscriber<Foo> child) -> {
        if (!state.hasNext())
            child.onCompleted();
        // cannot onNext more than once from func
        child.onNext(state.next());
        return state;
    },
    (it) -> {it.free()}
    ).subscribe();

@akarnokd
Copy link
Member

there is a call to producer.request(n) and the user's function would return an Observable<T>

How will you make sure the observable contains exactly n elements? What kind of observable would you create and return?

Func2<S, Integer requested, Observable<T>>

Requested amounts are Longs.

@stealthcode
Copy link
Author

@akarnokd The return type would be a plain ole rx.Observable (kinda like a POJO but... without the J). We should be able to eagerly subscribe to all emitted observables then serialize their outputs using buffers. I imagine these would be calls using RxNetty, RxJdbc, wrapped futures, etc.

Do we have to ensure that the observable contains exactly n elements? If they write an irresponsible observable then they get a MissingBackpressureException. Its possible to subscribe a take(n) to the observable but then we are getting into the business of enforcing that all observables play nicely when its relatively easy to follow the documentation.

I have edited the comment to correct the types.

@stealthcode
Copy link
Author

I think that an implementation for #3017 should help serialize the output from multiple observables.

@stealthcode
Copy link
Author

@akarnokd Do you disagree with anything here in principle? If not, are you already working on refactoring the synchronous AbstractOnSubscribe or working on the async case? I'd like to coordinate our efforts to make sure we aren't both working on the same piece.

@akarnokd
Copy link
Member

I'm not working on any of the cases and I'm not planning it either.

@benjchristensen
Copy link
Member

I strongly agree that we need to improve how Observable.create can be used to generate correct Observables with and without backpressure.

These "abstract" classes are the base, but I suggest we need to incorporate them into the Observable API such as this:

Non-backpressured Push Sources

This is for pure push sources that don't have backpressure.

Observable.createEventStream(..., DefaultBackpressureStrategy)

For example:

// mouse event listener with default backpressure strategy to DROP
Observable.createEventStream(o -> {
          mouseEventListener.addListener(l -> o.onNext(l));
}, OnBackpressure.DROP);
Backpressured Synchronous Pull Sources (Batched Stream)

This is for data sources that can be pulled synchronously, such as an Iterable or other similar data structure.

Observable.createSync(...)
Backpressured Asynchronous Pull Sources (Batched Stream)

This is for data sources that can be pulled asynchronously, such as network IO, async File IO, or retrieving from queues across thread-boundaries, etc.

This is very common, and very hard to do right.

Observable.createAsync(...)
Backpressured Asynchronous Pull Sources (Single Item)

This is actually for returning a Single, but I'm suggesting we have a factory method on the Observable because that is the common place to start from. Or we still return an Observable but it is only a single item.

This is the same behavior as the createAsync, except that it is far easier since it only fetches a single item. This is the most common case for async request/response.

Single<T> Observable.createFetch(...)
Summary

If we can improve how these all work, it will greatly simplify creation and use of Observable.

Are there use cases or variants I'm missing? What should we name them? What are the method signatures we should have for each?

@akarnokd
Copy link
Member

akarnokd commented Feb 9, 2016

We have SyncOnSubscribe and AsyncOnSubscribe. Is there anything else?

@benjchristensen
Copy link
Member

I think we should come up with factory methods on Observable to expose these abilities alongside the default 'create'. What do you think the right names are?

@akarnokd
Copy link
Member

akarnokd commented Feb 9, 2016

generate and generateAsync

@benjchristensen
Copy link
Member

How about something starting with 'create' so that people find it alongside the 'create' method in Javadocs and IDE code-completion?

We want to nudge people to using these instead of normal 'create'. Also, I don't want to confuse with the Rx.Net/RxJS 'generate', as they aren't exactly the same (though the sync one is similar).

@davidmoten
Copy link
Collaborator

A quick note that I don't think the names SyncOnSubscribe and AsyncOnSubscribe are great in that the names do not characterize their functionality. To my mind they are not natural duals which is suggested by the naming. I also note that AsyncOnSubscribe is not practical at the moment without a request batching operator (a downstream filter for example may cause async requests for one element at a time at the source).

@stealthcode
Copy link
Author

It'd be a good idea if we fixed these inefficient producer.request(1) cases and batched them more intelligently. For instance when observeOn request behavior devolves into request 1 patterns.

Initial names that come to mind... createSynchronousSource and createAsynchronousSource

But considering @davidmoten 's input perhaps the existing names are not indicative of their usage. SyncOnSubscribe is intended for usage generating data from a synchronous data source while the AsyncOnSubscribe is intended to consume data from a potentially asynchronous data source. The confusion may come from a lack of clarity in their usage.

That said, would you agree with Observable.createFromHotProducer(...) (which would use a SyncOnSubscribe) and Observable.createFromColdProducer(...) (AsyncOnSubscribe)?

@akarnokd
Copy link
Member

We have create(SyncOnSubscribe) and create(AsyncOnSubscribe) in 1.x. Do you want to pursue the remaining generator modes for 1.x?

  • Signal 0 or 1 element or an error in push fashion. (for Observable and Single) Use case: working with classical callback-based APIs.
  • Signal 0 to N elements, followed by an error or a complete signal in a push fashion. Use case: callback-based APIs and listeners that produce multiple values.

@akarnokd akarnokd modified the milestones: 1.2, 1.0.x Jun 17, 2016
@akarnokd akarnokd modified the milestones: 1.3, 1.2 Sep 15, 2016
@akarnokd akarnokd added the 1.x label Nov 12, 2016
@akarnokd
Copy link
Member

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants