Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: fixes, cleanup, coverage 8/31-1 #4450

Merged
merged 1 commit into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5086,7 +5086,7 @@ public final T blockingSingle(T defaultItem) {
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
public final Future<T> toFuture() {
return FlowableToFuture.toFuture(this);
return subscribeWith(new FutureSubscriber<T>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool trick!

}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4538,7 +4538,7 @@ public final T blockingSingle(T defaultItem) {
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
public final Future<T> toFuture() {
return ObservableToFuture.toFuture(this);
return subscribeWith(new FutureObserver<T>());
}

/**
Expand Down
67 changes: 64 additions & 3 deletions src/main/java/io/reactivex/disposables/Disposables.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,20 @@ private Disposables() {
* executed exactly once when the Disposable is disposed.
* @param run the Runnable to wrap
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Runnable run) {
return fromRunnable(run);
}

/**
* Construct a Disposable by wrapping a Runnable that is
* executed exactly once when the Disposable is disposed.
* @param run the Runnable to wrap
* @return the new Disposable instance
*/
public static Disposable fromRunnable(Runnable run) {
ObjectHelper.requireNonNull(run, "run is null");
return new RunnableDisposable(run);
}
Expand All @@ -48,8 +60,20 @@ public static Disposable from(Runnable run) {
* executed exactly once when the Disposable is disposed.
* @param run the Action to wrap
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Action run) {
return fromAction(run);
}

/**
* Construct a Disposable by wrapping a Action that is
* executed exactly once when the Disposable is disposed.
* @param run the Action to wrap
* @return the new Disposable instance
*/
public static Disposable fromAction(Action run) {
ObjectHelper.requireNonNull(run, "run is null");
return new ActionDisposable(run);
}
Expand All @@ -59,10 +83,11 @@ public static Disposable from(Action run) {
* cancelled exactly once when the Disposable is disposed.
* @param future the Future to wrap
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Future<?> future) {
ObjectHelper.requireNonNull(future, "future is null");
return from(future, true);
return fromFuture(future, true);
}

/**
Expand All @@ -71,8 +96,32 @@ public static Disposable from(Future<?> future) {
* @param future the Runnable to wrap
* @param allowInterrupt if true, the future cancel happens via Future.cancel(true)
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Future<?> future, boolean allowInterrupt) {
return fromFuture(future, allowInterrupt);
}

/**
* Construct a Disposable by wrapping a Future that is
* cancelled exactly once when the Disposable is disposed.
* @param future the Future to wrap
* @return the new Disposable instance
*/
public static Disposable fromFuture(Future<?> future) {
ObjectHelper.requireNonNull(future, "future is null");
return fromFuture(future, true);
}

/**
* Construct a Disposable by wrapping a Runnable that is
* executed exactly once when the Disposable is disposed.
* @param future the Runnable to wrap
* @param allowInterrupt if true, the future cancel happens via Future.cancel(true)
* @return the new Disposable instance
*/
public static Disposable fromFuture(Future<?> future, boolean allowInterrupt) {
ObjectHelper.requireNonNull(future, "future is null");
return new FutureDisposable(future, allowInterrupt);
}
Expand All @@ -82,8 +131,20 @@ public static Disposable from(Future<?> future, boolean allowInterrupt) {
* cancelled exactly once when the Disposable is disposed.
* @param subscription the Runnable to wrap
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Subscription subscription) {
return fromSubscription(subscription);
}

/**
* Construct a Disposable by wrapping a Subscription that is
* cancelled exactly once when the Disposable is disposed.
* @param subscription the Runnable to wrap
* @return the new Disposable instance
*/
public static Disposable fromSubscription(Subscription subscription) {
ObjectHelper.requireNonNull(subscription, "subscription is null");
return new SubscriptionDisposable(subscription);
}
Expand All @@ -93,7 +154,7 @@ public static Disposable from(Subscription subscription) {
* @return a new, non-disposed Disposable instance
*/
public static Disposable empty() {
return from(Functions.EMPTY_RUNNABLE);
return fromRunnable(Functions.EMPTY_RUNNABLE);
}

/**
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
/**
* Utility methods to convert the Function3..Function9 instances to Function of Object array.
*/
public enum Functions {
;
public final class Functions {

/** Utility class. */
private Functions() {
throw new IllegalStateException("No instances!");
}

@SuppressWarnings("unchecked")
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> biFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
* Utility methods containing the backport of Java 7's Objects utility class.
* <p>Named as such to avoid clash with java.util.Objects.
*/
public enum ObjectHelper {
;
public final class ObjectHelper {

/** Utility class. */
private ObjectHelper() {
throw new IllegalStateException("No instances!");
}

/**
* Verifies if the object is not null and returns it or throws a NullPointerException
* with the given message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposables;
import io.reactivex.disposables.*;
import io.reactivex.internal.subscriptions.SubscriptionHelper;

public final class CompletableFromPublisher<T> extends Completable {

Expand All @@ -28,30 +29,56 @@ public CompletableFromPublisher(Publisher<T> flowable) {

@Override
protected void subscribeActual(final CompletableObserver cs) {
flowable.subscribe(new Subscriber<T>() {
flowable.subscribe(new FromPublisherSubscriber<T>(cs));
}

static final class FromPublisherSubscriber<T> implements Subscriber<T>, Disposable {

@Override
public void onComplete() {
cs.onComplete();
final CompletableObserver cs;

Subscription s;

public FromPublisherSubscriber(CompletableObserver actual) {
this.cs = actual;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

cs.onSubscribe(this);

s.request(Long.MAX_VALUE);
}
}

@Override
public void onError(Throwable t) {
cs.onError(t);
}

@Override
public void onNext(T t) {
// ignored
}
@Override
public void onNext(T t) {
// ignored
}

@Override
public void onSubscribe(Subscription s) {
cs.onSubscribe(Disposables.from(s));
s.request(Long.MAX_VALUE);
}

});
@Override
public void onError(Throwable t) {
cs.onError(t);
}

@Override
public void onComplete() {
cs.onComplete();
}

@Override
public void dispose() {
s.cancel();
s = SubscriptionHelper.CANCELLED;
}

@Override
public boolean isDisposed() {
return s == SubscriptionHelper.CANCELLED;
}
}

}
Loading