Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable creation from Subscriber[T]=>Unit for Scala #923

Merged
merged 6 commits into from
Mar 6, 2014

Conversation

samuelgruetter
Copy link
Contributor

This PR adds the Subscriber type, and Observable.apply[T](Subscriber[T] => Unit).

Additionally, I made some tweaks in RxScalaDemo, and I could remove all comments of the kind TODO something behaves weirdly here, because now the weird behavior was gone. Seems like there was some progress in RxJava core :-)

Sorry that this PR mixes several topics, let me know if you want me to split it by topic.

/cc @headinthebox @vjovanov

@cloudbees-pull-request-builder

RxJava-pull-requests #863 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #864 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

Seems like there was some progress in RxJava core :-)

Good to hear :-)

@benjchristensen
Copy link
Member

Is this intended as fixes for the 0.17.0 Release Candidate or for 0.17.1?

@headinthebox
Copy link
Contributor

Looking at this, made me wonder why we deprecate public final static <T> Observable<T> create(final OnSubscribeFunc<T> f) {...}.

Observable.create(observer => subscription) is a common case when you hook up to event-based APIs. Deprecating the above overload will force me to basically repeat the implementation:
public void call(Subscriber<? super T> observer) { Subscription s = f.onSubscribe(observer); if (s != null && s != observer) { observer.add(s); } }

@benjchristensen
Copy link
Member

What use case are you referring to?

@headinthebox
Copy link
Contributor

Like this:

def mouseDrag: Observable[MouseEvent] = Observable.create(observer => {
        val handler = EventHandler[MouseEvent](m => {
         observer.onNext(m)
        })
        target.addEventHandler(MouseEvent.MOUSE_DRAGGED, handler)
        Subscription {
          target.removeEventHandler(MouseEvent.MOUSE_DRAGGED, handler)
        }
      })

@benjchristensen
Copy link
Member

It would just be like this where the Subscription is registered instead of returned:

def mouseDrag: Observable[MouseEvent] = Observable.create(subscriber => {
        val handler = EventHandler[MouseEvent](m => {
         subscriber.onNext(m)
        })
        target.addEventHandler(MouseEvent.MOUSE_DRAGGED, handler)
        subscriber.add( {
          target.removeEventHandler(MouseEvent.MOUSE_DRAGGED, handler)
        })
      })

If we do decide we want an overload, it won't be the OnSubscribeFunc overload which is the wrong name. It makes no sense to have OnSubscribe as the Subscriber -> Unit version and OnSubscribeFunc as the Observer -> Subscription one.

@headinthebox
Copy link
Contributor

That looks so imperative to me ...

@headinthebox
Copy link
Contributor

Don't fully understand your comment about the overloads, Java can handle overloading both

Subscription Create(Action<Subscriber<T>>) and Subscription Create(Func<Observer<T>,Subscription)right?

@benjchristensen
Copy link
Member

Not sure why being "imperative" is such a big deal here:

subscriber.add( {
    target.removeEventHandler(MouseEvent.MOUSE_DRAGGED, handler)
})

versus

Subscription {
     target.removeEventHandler(MouseEvent.MOUSE_DRAGGED, handler)
}

... but I can understand a preference one direction versus the other.


Updated ... By the way we have lots of operators where it is "imperative" on the inside while "functional" on the outside. That's part of the nature of RxJava residing on an imperative platform, so I don't see this as being an issue.

@benjchristensen
Copy link
Member

This is the current signature:

public final static <T> Observable<T> create(OnSubscribe<T> f)

And OnSubscribe is defined as:

public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>

@benjchristensen
Copy link
Member

Thus, with overloads you'd end up with two things looking exactly the same:

public final static <T> Observable<T> create(OnSubscribe<T> f)
public final static <T> Observable<T> create(OnOtherSubscribe<T> f)

Their types would be:

public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>
public static interface OnOtherSubscribe<T> extends Func1<Observer<? super T>, Subscription>

@headinthebox
Copy link
Contributor

Isn't OnOtherSubscribe already OnSubscribeFunc? Hence

public final static <T> Observable<T> create(OnSubscribeFunc<T> f)

Looks fine with me (and it avoid breaking all existing explanations/articles about Rx that use the legacy signature ...)

@benjchristensen
Copy link
Member

Yes, and we're killing it because the name is wrong (and we intended on having only a single create method). We named OnSubscribe what we did purposefully as OnSubscribeFunc was going away. We're not going to have two methods with different signatures with basically the same name.

@headinthebox
Copy link
Contributor

???? "We're not going to have two methods with different signatures with basically the same name." Isn't that the very definition of overloading?

@benjchristensen
Copy link
Member

Not when they are exactly the same :-)

Put these two next to each other, without reading the docs which one should I use?

public final static <T> Observable<T> create(OnSubscribe<T> f)
public final static <T> Observable<T> create(OnSubscribeFunc<T> f)

@headinthebox
Copy link
Contributor

That does not bother me, just as Add(int,int) vs Add(long,long).

Of course, it would be simpler if the aliases OnSubscribe and OnOtherSubscribe were not needed and we could just have

public final static <T> Observable<T> create(Action1<Subscriber<? super T>> f)
public final static <T> Observable<T> create(Func1<Observer<? super T>, Subscription> f)

@benjchristensen
Copy link
Member

The int and long examples are not good ones as those are primitive types that are clear in their types and all languages can disambiguate between them.

Having both an Action and a Function as overloads causes problems for dynamic languages.

The reason the OnSubscribeFunc behavior has those null checks is because to make Groovy work during the deprecation phase we had to write a plugin for Groovy that forces all executions against create to use the new one and then we look for a return value and if we got one, behave one way, or behave the other if we got null back => https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyExtensionModule.java#L73

As for the aliases, they are done because of the complexity of co/contr-variant generics that people must deal with every time they use create.

If we were to give up on ease of use for that problem and on the challenges of dynamic languages disambiguating between the overloads then yes, the Action and Func overloads work well:

    public final static <T> Observable<T> create(Action1<Subscriber<? super T>> f) {
        return new Observable<T>(f);
    }

    public final static <T> Observable<T> create(final Func1<Observer<? super T>, Subscription> f) {
        return new Observable<T>(new Action1<Subscriber<? super T>>() {

            @Override
            public void call(Subscriber<? super T> subscriber) {
                Subscription s = f.call(subscriber);
                if (s != null && s != subscriber) {
                    subscriber.add(s);
                }
            }

        });
    }

@benjchristensen
Copy link
Member

Another consideration ... my understanding of the event listeners on Android and Swing are that they typically need to be far more involved that this example anyways due to thread scheduling logic.

For example, take a look at this code: https://github.com/Netflix/RxJava/blob/master/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java#L68

Line 99 uses subscriber.add: https://github.com/Netflix/RxJava/blob/master/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java#L99

Returning that Subscription instead of using add does not seem to significantly alter the fact that inside a function a lot of often imperative behavior happens.

Also, common event listeners end up with cover libraries such as rxjava-android and rxjava-swing because they are non-trivial and need to just be done correctly once.

Therefore, I don't think the need of an overload is worth confusing the API by having two different ways of doing things with nuanced differences between them (how they behave with synchronous sources). For example, in the version with Subscriber passed in, a Subscription can be registered at the beginning so even if the rest of the function is synchronous and never returns, the Subscription is registered and will still work. I think it's better to have a single mechanism for create that pushes people to using the model in a way that works for both sync and async sources. The side-benefit is we don't have ambiguous methods or generics issues as we can stick with the OnSubscribe cover type we have.

@samuelgruetter
Copy link
Contributor Author

Is this intended as fixes for the 0.17.0 Release Candidate or for 0.17.1?

I'd say 0.17.0, the earlier people can use it, the better ;-)
FYI this PR has no breaking changes.

benjchristensen added a commit that referenced this pull request Mar 6, 2014
Observable creation from Subscriber[T]=>Unit for Scala
@benjchristensen benjchristensen merged commit 38a652a into ReactiveX:master Mar 6, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants