Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Observable.from(future) #307

Closed
samhendley opened this issue Jul 18, 2013 · 8 comments
Closed

Async Observable.from(future) #307

samhendley opened this issue Jul 18, 2013 · 8 comments

Comments

@samhendley
Copy link

I was surprised that Observable.from(future) doesn't return a truly async future. I wrote one for my own purposes and thought it might be useful as a core part of rxJava.

https://gist.github.com/samhendley/6030565

Observable<T> obs = Observable.future(future);

// would be converted to

// class level or static members probably
FutureWatcher futureWatcher = new FutureWatcher();
Executor callbackPool = new ThreadPoolExecutor(5);

Observable<T> obs = futureWatcher.watchFuture(future, callbackPool);

I imagine the most straightforward implementation would be something like Observable.from(future, executor), a bit like the scheduleOn() method from Hystrix.

I would also suggest that you update the documentation on Observable.from(future) to reflect that the observable are effectively blocking, I had assumed otherwise and was very disappointed in the performance of my system as a whole when I integrated with a library that returned futures (hystrix in this case). A co-worker was asking some probing questions and it made me actually look at the implementation and once I integrated the FutureWatcher our performance was back to where I expected it to be.

@benjchristensen
Copy link
Member

Thanks for getting involved!

A Java Future does not support callbacks thus it can only block when dereferencing the value. The only way to make this asynchronous is wrapping yet another thread around it - as you are doing - which means a thread waiting on a thread. It's a legit thing to do when you must deal with a Java Future, but obviously not ideal.

For this enhancement, instead of accepting an Executor in the overload it should use a Scheduler which is the Rx abstraction for concurrency.

Regarding Hystrix, you'll appreciate the work being done here: Netflix/Hystrix#151

@samhendley
Copy link
Author

Yeah I saw that ticket a while ago, Can't wait to see it! I just needed a temporary shim in my code until that support is added.

I am trying to convince my group here to build some systems based on rxJava and hystrix and am building some samples/demos to first convince myself that it will be workable. So far I am very impressed with both of them, keep up the good work. I am actually trying to move our group entirely onto the netflix stack for all of our SOA oriented components. Wish you guys had an office out in North Carolina, your infrastructure team seems to really have its act together, looks like it would be a fun place to work.

@benjchristensen
Copy link
Member

I understand the need for the shim, that's where the Observable.from(future) came from actually - the implementor just chose to only do it in a blocking manner and not wrap a scheduler around it, but that should definitely exist.

Thanks for the kind words about our teams, Hystrix and RxJava, I'm glad they are serving you well.

Do you want to submit a pull request with an Observable.from(Future f, Scheduler s) overload?

@samhendley
Copy link
Author

Yes I can do that, I am hoping to start contributing soon, but haven't
actually checked out the project yet, so far been getting just browsing the
downloaded sources.

There are a few more helpers that we are working on "SettableFuture" that
you may also find useful.

Sam

On Fri, Jul 19, 2013 at 4:07 PM, Ben Christensen
notifications@github.comwrote:

I understand the need for the shim, that's where the
Observable.from(future) came from actually - the implementor just chose
to only do it in a blocking manner and not wrap a scheduler around it, but
that should definitely exist.

Thanks for the kind words about our teams, Hystrix and RxJava, I'm glad
they are serving you well.

Do you want to submit a pull request with an Observable.from(Future f,
Scheduler s) overload?


Reply to this email directly or view it on GitHubhttps://github.com//issues/307#issuecomment-21274113
.

@benjchristensen
Copy link
Member

This has been implemented for several of the from methods. For example:

from(java.util.concurrent.Future<? extends T> future, Scheduler scheduler)
from(java.lang.Iterable<? extends T> iterable, Scheduler scheduler)

@thelastbirth
Copy link

thelastbirth commented Dec 2, 2016

Isn't this new fromFuture(java.util.concurrent.Future<? extends T> future, Scheduler scheduler) in 2.x still going to return a blocking Observable? Because if you trace the code path of this function, in ObservableFromFuture class, there's this line:

v = ObjectHelper.requireNonNull(unit != null ? future.get(timeout, unit) : future.get(), "Future returned null");

it still uses future.get(), which is a blocking call.

@JakeWharton
Copy link
Contributor

JakeWharton commented Dec 3, 2016 via email

@akarnokd
Copy link
Member

akarnokd commented Dec 3, 2016

That overload applies the subscribeOn and when subscribed to, the code blocks on that Scheduler. For example, using the io() scheduler, when you subscribe to the returned Observable, the subscribe() call returns promplty without blocking. Note also that there is no non-blocking way to listen to a java.util.concurrent.Future: we are limited to Java 6.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants