Skip to content

Commit

Permalink
2.x: Evaluate Schedule initialization via Callable (#4585)
Browse files Browse the repository at this point in the history
* Evaluate Schedule initialization via Callable

* Clarify docs that Schedulers are initialized by the return value of the Callable

* Enforce non-null Callable Scheduler and Scheduler

* Add remaining tests and tidy

* Expand relevant Javadoc

* Make error messages more consistent

* Correct Exception naming

* Add test for Exception message to verify root cause

* Add tests for alternative initialization path

* Simplify statement

* Use holder pattern for default Scheduler instances

* Use correct scheduler when verifying reset

* Make onInitHandler functions lazy and enforce non null.
  • Loading branch information
peter-tackage authored and akarnokd committed Sep 26, 2016
1 parent 2e100d2 commit 62612ab
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 78 deletions.
117 changes: 77 additions & 40 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
package io.reactivex.plugins;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Callable;

import io.reactivex.internal.functions.ObjectHelper;
import org.reactivestreams.Subscriber;

import io.reactivex.*;
Expand All @@ -31,13 +33,13 @@ public final class RxJavaPlugins {

static volatile Function<Runnable, Runnable> onScheduleHandler;

static volatile Function<Scheduler, Scheduler> onInitComputationHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitComputationHandler;

static volatile Function<Scheduler, Scheduler> onInitSingleHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitSingleHandler;

static volatile Function<Scheduler, Scheduler> onInitIoHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitIoHandler;

static volatile Function<Scheduler, Scheduler> onInitNewThreadHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitNewThreadHandler;

static volatile Function<Scheduler, Scheduler> onComputationHandler;

Expand Down Expand Up @@ -121,31 +123,31 @@ public static Consumer<Throwable> getErrorHandler() {
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitComputationSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitComputationSchedulerHandler() {
return onInitComputationHandler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitIoSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitIoSchedulerHandler() {
return onInitIoHandler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitNewThreadSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitNewThreadSchedulerHandler() {
return onInitNewThreadHandler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitSingleSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitSingleSchedulerHandler() {
return onInitSingleHandler;
}

Expand Down Expand Up @@ -183,54 +185,62 @@ public static Function<Scheduler, Scheduler> getSingleSchedulerHandler() {

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initComputationScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitComputationHandler;
public static Scheduler initComputationScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitComputationHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler); // JIT will skip this
return applyRequireNonNull(f, defaultScheduler); // JIT will skip this
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initIoScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitIoHandler;
public static Scheduler initIoScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitIoHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler);
return applyRequireNonNull(f, defaultScheduler);
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initNewThreadScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitNewThreadHandler;
public static Scheduler initNewThreadScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitNewThreadHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler);
return applyRequireNonNull(f, defaultScheduler);
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initSingleScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitSingleHandler;
public static Scheduler initSingleScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitSingleHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler);
return applyRequireNonNull(f, defaultScheduler);
}

/**
Expand Down Expand Up @@ -392,9 +402,9 @@ public static void setErrorHandler(Consumer<Throwable> handler) {

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitComputationSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitComputationSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -403,9 +413,9 @@ public static void setInitComputationSchedulerHandler(Function<Scheduler, Schedu

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitIoSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitIoSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -414,9 +424,9 @@ public static void setInitIoSchedulerHandler(Function<Scheduler, Scheduler> hand

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitNewThreadSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitNewThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -425,9 +435,9 @@ public static void setInitNewThreadSchedulerHandler(Function<Scheduler, Schedule

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitSingleSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitSingleSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand Down Expand Up @@ -952,6 +962,33 @@ static <T, U, R> R apply(BiFunction<T, U, R> f, T t, U u) {
}
}

/**
* Wraps the call to the Scheduler creation callable in try-catch and propagates thrown
* checked exceptions as RuntimeException and enforces that result is not null.
* @param s the {@link Callable} which returns a {@link Scheduler}, not null (not verified). Cannot return null
* @return the result of the callable call, not null
* @throws NullPointerException if the callable parameter returns null
*/
static Scheduler callRequireNonNull(Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}

/**
* Wraps the call to the Scheduler creation function in try-catch and propagates thrown
* checked exceptions as RuntimeException and enforces that result is not null.
* @param f the function to call, not null (not verified). Cannot return null
* @param s the parameter value to the function
* @return the result of the function call, not null
* @throws NullPointerException if the function parameter returns null
*/
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
return ObjectHelper.requireNonNull(apply(f, s), "Scheduler Callable result can't be null");
}

/** Helper class, no instances. */
private RxJavaPlugins() {
throw new IllegalStateException("No instances!");
Expand Down
47 changes: 42 additions & 5 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.schedulers;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import io.reactivex.Scheduler;
Expand Down Expand Up @@ -44,16 +45,52 @@ public final class Schedulers {

static final Scheduler NEW_THREAD;

static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleScheduler());
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}

static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationScheduler());
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}

static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
}

IO = RxJavaPlugins.initIoScheduler(new IoScheduler());
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return SingleHolder.DEFAULT;
}
});

COMPUTATION = RxJavaPlugins.initComputationScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return ComputationHolder.DEFAULT;
}
});

IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
});

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(NewThreadScheduler.instance());
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
});
}

/** Utility class. */
Expand Down
Loading

0 comments on commit 62612ab

Please sign in to comment.