Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Evaluate Schedule initialization via Callable #4585

Merged
merged 13 commits into from
Sep 26, 2016
117 changes: 77 additions & 40 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
package io.reactivex.plugins;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Callable;

import io.reactivex.internal.functions.ObjectHelper;
import org.reactivestreams.Subscriber;

import io.reactivex.*;
Expand All @@ -31,13 +33,13 @@ public final class RxJavaPlugins {

static volatile Function<Runnable, Runnable> onScheduleHandler;

static volatile Function<Scheduler, Scheduler> onInitComputationHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitComputationHandler;

static volatile Function<Scheduler, Scheduler> onInitSingleHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitSingleHandler;

static volatile Function<Scheduler, Scheduler> onInitIoHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitIoHandler;

static volatile Function<Scheduler, Scheduler> onInitNewThreadHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitNewThreadHandler;

static volatile Function<Scheduler, Scheduler> onComputationHandler;

Expand Down Expand Up @@ -121,31 +123,31 @@ public static Consumer<Throwable> getErrorHandler() {
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitComputationSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitComputationSchedulerHandler() {
return onInitComputationHandler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitIoSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitIoSchedulerHandler() {
return onInitIoHandler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitNewThreadSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitNewThreadSchedulerHandler() {
return onInitNewThreadHandler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitSingleSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitSingleSchedulerHandler() {
return onInitSingleHandler;
}

Expand Down Expand Up @@ -183,54 +185,62 @@ public static Function<Scheduler, Scheduler> getSingleSchedulerHandler() {

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initComputationScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitComputationHandler;
public static Scheduler initComputationScheduler(Callable<Scheduler> defaultScheduler) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not it throw on a null callable? What's the point of calling with null?

Copy link
Contributor Author

@peter-tackage peter-tackage Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done for consistency with the existing expectations in RxJavaPluginsTest.clearIsPassthrough(), specifically:

assertNull(RxJavaPlugins.initComputationScheduler(null));
assertNull(RxJavaPlugins.initIoScheduler(null));
assertNull(RxJavaPlugins.initNewThreadScheduler(null));
assertNull(RxJavaPlugins.initSingleScheduler(null));

Should this be changed to only return null if the Callable returns null (and throw if the Callable itself is null)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null should not be allowed as a return value from the Callable nor from the init Function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a change, because previously the Scheduler value for RxJavaPlugins.init* was allowed to be null, as per - assertNull(RxJavaPlugins.initSingleScheduler(null));.

I will add an additional set of tests for the new behavior (something along the lines of assemblyHookCrashes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitComputationHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler); // JIT will skip this
return applyRequireNonNull(f, defaultScheduler); // JIT will skip this
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initIoScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitIoHandler;
public static Scheduler initIoScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitIoHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler);
return applyRequireNonNull(f, defaultScheduler);
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initNewThreadScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitNewThreadHandler;
public static Scheduler initNewThreadScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitNewThreadHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler);
return applyRequireNonNull(f, defaultScheduler);
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initSingleScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitSingleHandler;
public static Scheduler initSingleScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitSingleHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler);
return applyRequireNonNull(f, defaultScheduler);
}

/**
Expand Down Expand Up @@ -393,9 +403,9 @@ public static void setErrorHandler(Consumer<Throwable> handler) {

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitComputationSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitComputationSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -404,9 +414,9 @@ public static void setInitComputationSchedulerHandler(Function<Scheduler, Schedu

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitIoSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitIoSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -415,9 +425,9 @@ public static void setInitIoSchedulerHandler(Function<Scheduler, Scheduler> hand

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitNewThreadSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitNewThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -426,9 +436,9 @@ public static void setInitNewThreadSchedulerHandler(Function<Scheduler, Schedule

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitSingleSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitSingleSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand Down Expand Up @@ -953,6 +963,33 @@ static <T, U, R> R apply(BiFunction<T, U, R> f, T t, U u) {
}
}

/**
* Wraps the call to the Scheduler creation callable in try-catch and propagates thrown
* checked exceptions as RuntimeException and enforces that result is not null.
* @param s the {@link Callable} which returns a {@link Scheduler}, not null (not verified). Cannot return null
* @return the result of the callable call, not null
* @throws NullPointerException if the callable parameter returns null
*/
static Scheduler callRequireNonNull(Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}

/**
* Wraps the call to the Scheduler creation function in try-catch and propagates thrown
* checked exceptions as RuntimeException and enforces that result is not null.
* @param f the function to call, not null (not verified). Cannot return null
* @param s the parameter value to the function
* @return the result of the function call, not null
* @throws NullPointerException if the function parameter returns null
*/
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
return ObjectHelper.requireNonNull(apply(f, s), "Scheduler Callable result can't be null");
}

/** Helper class, no instances. */
private RxJavaPlugins() {
throw new IllegalStateException("No instances!");
Expand Down
47 changes: 42 additions & 5 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.schedulers;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import io.reactivex.Scheduler;
Expand Down Expand Up @@ -44,16 +45,52 @@ public final class Schedulers {

static final Scheduler NEW_THREAD;

static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleScheduler());
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}

static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationScheduler());
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}

static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
}

IO = RxJavaPlugins.initIoScheduler(new IoScheduler());
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return SingleHolder.DEFAULT;
}
});

COMPUTATION = RxJavaPlugins.initComputationScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return ComputationHolder.DEFAULT;
}
});

IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
});

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(NewThreadScheduler.instance());
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
});
}

/** Utility class. */
Expand Down
Loading