diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java new file mode 100644 index 0000000000..067df7bc2c --- /dev/null +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java @@ -0,0 +1,43 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.plugins; + +import rx.Scheduler; +import rx.functions.Func0; + +/** + * Define alternate Scheduler implementations to be returned by the `Schedulers` factory methods. + *

+ * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: https://github.com/Netflix/RxJava/wiki/Plugins. + */ +public abstract class RxJavaDefaultSchedulers { + + /** + * Factory of Scheduler to return from {@link Schedulers.computation()} or null if default should be used. + */ + public abstract Func0 getComputationSchedulerFactory(); + + /** + * Factory of Scheduler to return from {@link Schedulers.io()} or null if default should be used. + */ + public abstract Func0 getIOSchedulerFactory(); + + /** + * Factory of Scheduler to return from {@link Schedulers.newThread()} or null if default should be used. + */ + public abstract Func0 getNewThreadSchedulerFactory(); +} diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulersDefault.java b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulersDefault.java new file mode 100644 index 0000000000..6e7fa348eb --- /dev/null +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulersDefault.java @@ -0,0 +1,46 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.plugins; + +import rx.Scheduler; +import rx.functions.Func0; + +/** + * Default implementation of {@link RxJavaErrorHandler} that does nothing. + * + * @ExcludeFromJavadoc + */ +public class RxJavaDefaultSchedulersDefault extends RxJavaDefaultSchedulers { + + private static RxJavaDefaultSchedulersDefault INSTANCE = new RxJavaDefaultSchedulersDefault(); + + public Func0 getComputationSchedulerFactory() { + return null; + } + + public Func0 getIOSchedulerFactory() { + return null; + } + + public Func0 getNewThreadSchedulerFactory() { + return null; + } + + public static RxJavaDefaultSchedulers getInstance() { + return INSTANCE; + } + +} diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java index a5384fecee..408802acb3 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java @@ -31,6 +31,7 @@ public class RxJavaPlugins { private final AtomicReference errorHandler = new AtomicReference(); private final AtomicReference observableExecutionHook = new AtomicReference(); + private final AtomicReference schedulerOverrides = new AtomicReference(); public static RxJavaPlugins getInstance() { return INSTANCE; @@ -149,4 +150,43 @@ private static Object getPluginImplementationViaProperty(Class pluginClass) { return null; } } + + /** + * Retrieve instance of {@link RxJavaDefaultSchedulers} to use based on order of precedence as defined in {@link RxJavaPlugins} class header. + *

+ * Override default by using {@link #registerDefaultSchedulers(RxJavaDefaultSchedulers)} or setting property: rxjava.plugin.RxJavaDefaultSchedulers.implementation with the full + * classname to + * load. + * + * @return {@link RxJavaErrorHandler} implementation to use + */ + public RxJavaDefaultSchedulers getDefaultSchedulers() { + if (schedulerOverrides.get() == null) { + // check for an implementation from System.getProperty first + Object impl = getPluginImplementationViaProperty(RxJavaDefaultSchedulers.class); + if (impl == null) { + // nothing set via properties so initialize with default + schedulerOverrides.compareAndSet(null, RxJavaDefaultSchedulersDefault.getInstance()); + // we don't return from here but call get() again in case of thread-race so the winner will always get returned + } else { + // we received an implementation from the system property so use it + schedulerOverrides.compareAndSet(null, (RxJavaDefaultSchedulers) impl); + } + } + return schedulerOverrides.get(); + } + + /** + * Register a {@link RxJavaDefaultSchedulers} implementation as a global override of any injected or default implementations. + * + * @param impl + * {@link RxJavaDefaultSchedulers} implementation + * @throws IllegalStateException + * if called more than once or after the default was initialized (if usage occurs before trying to register) + */ + public void registerDefaultSchedulers(RxJavaDefaultSchedulers impl) { + if (!schedulerOverrides.compareAndSet(null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get()); + } + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index 96db3f7bcf..78239b79ad 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -46,9 +46,14 @@ public Thread newThread(Runnable r) { } }; + @Deprecated public static NewThreadScheduler getInstance() { return INSTANCE; } + + /* package */ static NewThreadScheduler instance() { + return INSTANCE; + } private NewThreadScheduler() { diff --git a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java index bb0a75e358..0c607d6bed 100644 --- a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java +++ b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java @@ -23,15 +23,62 @@ import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; +import rx.functions.Func0; +import rx.plugins.RxJavaPlugins; /** * Static factory methods for creating Schedulers. */ public class Schedulers { - private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor(); - private static final Executor IO_EXECUTOR = createIOExecutor(); + + private final Func0 computationScheduler; + private final Func0 ioScheduler; + private final Func0 newThreadScheduler; + + private static final Schedulers INSTANCE = new Schedulers(); private Schedulers() { + Func0 c = RxJavaPlugins.getInstance().getDefaultSchedulers().getComputationSchedulerFactory(); + if (c != null) { + computationScheduler = c; + } else { + computationScheduler = new Func0() { + + @Override + public Scheduler call() { + return executor(createComputationExecutor()); + } + + }; + } + + Func0 io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOSchedulerFactory(); + if (io != null) { + ioScheduler = io; + } else { + ioScheduler = new Func0() { + + @Override + public Scheduler call() { + return executor(createIOExecutor()); + } + + }; + } + + Func0 nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadSchedulerFactory(); + if (nt != null) { + newThreadScheduler = nt; + } else { + newThreadScheduler = new Func0() { + + @Override + public Scheduler call() { + return NewThreadScheduler.instance(); + } + + }; + } } @@ -63,14 +110,14 @@ public static Scheduler currentThread() { public static Scheduler trampoline() { return TrampolineScheduler.getInstance(); } - + /** * {@link Scheduler} that creates a new {@link Thread} for each unit of work. * * @return {@link NewThreadScheduler} instance */ public static Scheduler newThread() { - return NewThreadScheduler.getInstance(); + return INSTANCE.newThreadScheduler.call(); } /** @@ -107,7 +154,7 @@ public static Scheduler executor(ScheduledExecutorService executor) { */ @Deprecated public static Scheduler threadPoolForComputation() { - return executor(COMPUTATION_EXECUTOR); + return computation(); } /** @@ -120,7 +167,7 @@ public static Scheduler threadPoolForComputation() { * @return {@link Scheduler} for computation-bound work. */ public static Scheduler computation() { - return executor(COMPUTATION_EXECUTOR); + return INSTANCE.computationScheduler.call(); } /** @@ -137,7 +184,7 @@ public static Scheduler computation() { */ @Deprecated public static Scheduler threadPoolForIO() { - return executor(IO_EXECUTOR); + return io(); } /** @@ -152,7 +199,7 @@ public static Scheduler threadPoolForIO() { * @return {@link ExecutorScheduler} for IO-bound work. */ public static Scheduler io() { - return executor(IO_EXECUTOR); + return INSTANCE.ioScheduler.call(); } private static ScheduledExecutorService createComputationExecutor() {