From d07d9367911d8ec3d0b65846c8707e0a41d1cf1f Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 19 Feb 2014 12:07:03 -0800 Subject: [PATCH 1/2] RxJavaSchedulers Plugin Allow setting different default schedulers for use by system. --- .../main/java/rx/plugins/RxJavaPlugins.java | 40 ++++++++++++ .../java/rx/plugins/RxJavaSchedulers.java | 43 +++++++++++++ .../rx/plugins/RxJavaSchedulersDefault.java | 46 ++++++++++++++ .../main/java/rx/schedulers/Schedulers.java | 63 ++++++++++++++++--- 4 files changed, 184 insertions(+), 8 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/plugins/RxJavaSchedulers.java create mode 100644 rxjava-core/src/main/java/rx/plugins/RxJavaSchedulersDefault.java diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java index a5384fecee..b09afa7abd 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java @@ -31,6 +31,7 @@ public class RxJavaPlugins { private final AtomicReference errorHandler = new AtomicReference(); private final AtomicReference observableExecutionHook = new AtomicReference(); + private final AtomicReference schedulerOverrides = new AtomicReference(); public static RxJavaPlugins getInstance() { return INSTANCE; @@ -149,4 +150,43 @@ private static Object getPluginImplementationViaProperty(Class pluginClass) { return null; } } + + /** + * Retrieve instance of {@link RxJavaSchedulers} to use based on order of precedence as defined in {@link RxJavaPlugins} class header. + *

+ * Override default by using {@link #registerSchedulers(RxJavaSchedulers)} or setting property: rxjava.plugin.RxJavaDefaultSchedulers.implementation with the full + * classname to + * load. + * + * @return {@link RxJavaErrorHandler} implementation to use + */ + public RxJavaSchedulers getSchedulers() { + if (schedulerOverrides.get() == null) { + // check for an implementation from System.getProperty first + Object impl = getPluginImplementationViaProperty(RxJavaSchedulers.class); + if (impl == null) { + // nothing set via properties so initialize with default + schedulerOverrides.compareAndSet(null, RxJavaSchedulersDefault.getInstance()); + // we don't return from here but call get() again in case of thread-race so the winner will always get returned + } else { + // we received an implementation from the system property so use it + schedulerOverrides.compareAndSet(null, (RxJavaSchedulers) impl); + } + } + return schedulerOverrides.get(); + } + + /** + * Register a {@link RxJavaSchedulers} implementation as a global override of any injected or default implementations. + * + * @param impl + * {@link RxJavaSchedulers} implementation + * @throws IllegalStateException + * if called more than once or after the default was initialized (if usage occurs before trying to register) + */ + public void registerSchedulers(RxJavaSchedulers impl) { + if (!schedulerOverrides.compareAndSet(null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get()); + } + } } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulers.java b/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulers.java new file mode 100644 index 0000000000..d346a09b3c --- /dev/null +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulers.java @@ -0,0 +1,43 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.plugins; + +import rx.Scheduler; +import rx.functions.Func0; + +/** + * Define alternate Scheduler implementations to be returned by the `Schedulers` factory methods. + *

+ * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: https://github.com/Netflix/RxJava/wiki/Plugins. + */ +public abstract class RxJavaSchedulers { + + /** + * Factory of Scheduler to return from {@link Schedulers.computation()} or null if default should be used. + */ + public abstract Func0 getComputationScheduler(); + + /** + * Factory of Scheduler to return from {@link Schedulers.io()} or null if default should be used. + */ + public abstract Func0 getIOScheduler(); + + /** + * Factory of Scheduler to return from {@link Schedulers.newThread()} or null if default should be used. + */ + public abstract Func0 getNewThreadScheduler(); +} diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulersDefault.java b/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulersDefault.java new file mode 100644 index 0000000000..40561a819d --- /dev/null +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulersDefault.java @@ -0,0 +1,46 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.plugins; + +import rx.Scheduler; +import rx.functions.Func0; + +/** + * Default implementation of {@link RxJavaErrorHandler} that does nothing. + * + * @ExcludeFromJavadoc + */ +public class RxJavaSchedulersDefault extends RxJavaSchedulers { + + private static RxJavaSchedulersDefault INSTANCE = new RxJavaSchedulersDefault(); + + public Func0 getComputationScheduler() { + return null; + } + + public Func0 getIOScheduler() { + return null; + } + + public Func0 getNewThreadScheduler() { + return null; + } + + public static RxJavaSchedulers getInstance() { + return INSTANCE; + } + +} diff --git a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java index bb0a75e358..14fbe08900 100644 --- a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java +++ b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java @@ -23,15 +23,62 @@ import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; +import rx.functions.Func0; +import rx.plugins.RxJavaPlugins; /** * Static factory methods for creating Schedulers. */ public class Schedulers { - private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor(); - private static final Executor IO_EXECUTOR = createIOExecutor(); + + private final Func0 computationScheduler; + private final Func0 ioScheduler; + private final Func0 newThreadScheduler; + + private static final Schedulers INSTANCE = new Schedulers(); private Schedulers() { + Func0 c = RxJavaPlugins.getInstance().getSchedulers().getComputationScheduler(); + if (c != null) { + computationScheduler = c; + } else { + computationScheduler = new Func0() { + + @Override + public Scheduler call() { + return executor(createComputationExecutor()); + } + + }; + } + + Func0 io = RxJavaPlugins.getInstance().getSchedulers().getIOScheduler(); + if (io != null) { + ioScheduler = io; + } else { + ioScheduler = new Func0() { + + @Override + public Scheduler call() { + return executor(createIOExecutor()); + } + + }; + } + + Func0 nt = RxJavaPlugins.getInstance().getSchedulers().getNewThreadScheduler(); + if (nt != null) { + newThreadScheduler = nt; + } else { + newThreadScheduler = new Func0() { + + @Override + public Scheduler call() { + return NewThreadScheduler.getInstance(); + } + + }; + } } @@ -63,14 +110,14 @@ public static Scheduler currentThread() { public static Scheduler trampoline() { return TrampolineScheduler.getInstance(); } - + /** * {@link Scheduler} that creates a new {@link Thread} for each unit of work. * * @return {@link NewThreadScheduler} instance */ public static Scheduler newThread() { - return NewThreadScheduler.getInstance(); + return INSTANCE.newThreadScheduler.call(); } /** @@ -107,7 +154,7 @@ public static Scheduler executor(ScheduledExecutorService executor) { */ @Deprecated public static Scheduler threadPoolForComputation() { - return executor(COMPUTATION_EXECUTOR); + return computation(); } /** @@ -120,7 +167,7 @@ public static Scheduler threadPoolForComputation() { * @return {@link Scheduler} for computation-bound work. */ public static Scheduler computation() { - return executor(COMPUTATION_EXECUTOR); + return INSTANCE.computationScheduler.call(); } /** @@ -137,7 +184,7 @@ public static Scheduler computation() { */ @Deprecated public static Scheduler threadPoolForIO() { - return executor(IO_EXECUTOR); + return io(); } /** @@ -152,7 +199,7 @@ public static Scheduler threadPoolForIO() { * @return {@link ExecutorScheduler} for IO-bound work. */ public static Scheduler io() { - return executor(IO_EXECUTOR); + return INSTANCE.ioScheduler.call(); } private static ScheduledExecutorService createComputationExecutor() { From 9178d14d6bb44a1925562c49a2d299b9c9422408 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 19 Feb 2014 13:33:27 -0800 Subject: [PATCH 2/2] Rename RxJavaSchedulers to RxJavaDefaultSchedulers Clearer semantic naming. --- ...lers.java => RxJavaDefaultSchedulers.java} | 8 ++++---- ...va => RxJavaDefaultSchedulersDefault.java} | 12 +++++------ .../main/java/rx/plugins/RxJavaPlugins.java | 20 +++++++++---------- .../rx/schedulers/NewThreadScheduler.java | 5 +++++ .../main/java/rx/schedulers/Schedulers.java | 8 ++++---- 5 files changed, 29 insertions(+), 24 deletions(-) rename rxjava-core/src/main/java/rx/plugins/{RxJavaSchedulers.java => RxJavaDefaultSchedulers.java} (84%) rename rxjava-core/src/main/java/rx/plugins/{RxJavaSchedulersDefault.java => RxJavaDefaultSchedulersDefault.java} (68%) diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulers.java b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java similarity index 84% rename from rxjava-core/src/main/java/rx/plugins/RxJavaSchedulers.java rename to rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java index d346a09b3c..067df7bc2c 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulers.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java @@ -24,20 +24,20 @@ * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: https://github.com/Netflix/RxJava/wiki/Plugins. */ -public abstract class RxJavaSchedulers { +public abstract class RxJavaDefaultSchedulers { /** * Factory of Scheduler to return from {@link Schedulers.computation()} or null if default should be used. */ - public abstract Func0 getComputationScheduler(); + public abstract Func0 getComputationSchedulerFactory(); /** * Factory of Scheduler to return from {@link Schedulers.io()} or null if default should be used. */ - public abstract Func0 getIOScheduler(); + public abstract Func0 getIOSchedulerFactory(); /** * Factory of Scheduler to return from {@link Schedulers.newThread()} or null if default should be used. */ - public abstract Func0 getNewThreadScheduler(); + public abstract Func0 getNewThreadSchedulerFactory(); } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulersDefault.java b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulersDefault.java similarity index 68% rename from rxjava-core/src/main/java/rx/plugins/RxJavaSchedulersDefault.java rename to rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulersDefault.java index 40561a819d..6e7fa348eb 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaSchedulersDefault.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulersDefault.java @@ -23,23 +23,23 @@ * * @ExcludeFromJavadoc */ -public class RxJavaSchedulersDefault extends RxJavaSchedulers { +public class RxJavaDefaultSchedulersDefault extends RxJavaDefaultSchedulers { - private static RxJavaSchedulersDefault INSTANCE = new RxJavaSchedulersDefault(); + private static RxJavaDefaultSchedulersDefault INSTANCE = new RxJavaDefaultSchedulersDefault(); - public Func0 getComputationScheduler() { + public Func0 getComputationSchedulerFactory() { return null; } - public Func0 getIOScheduler() { + public Func0 getIOSchedulerFactory() { return null; } - public Func0 getNewThreadScheduler() { + public Func0 getNewThreadSchedulerFactory() { return null; } - public static RxJavaSchedulers getInstance() { + public static RxJavaDefaultSchedulers getInstance() { return INSTANCE; } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java index b09afa7abd..408802acb3 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java @@ -31,7 +31,7 @@ public class RxJavaPlugins { private final AtomicReference errorHandler = new AtomicReference(); private final AtomicReference observableExecutionHook = new AtomicReference(); - private final AtomicReference schedulerOverrides = new AtomicReference(); + private final AtomicReference schedulerOverrides = new AtomicReference(); public static RxJavaPlugins getInstance() { return INSTANCE; @@ -152,39 +152,39 @@ private static Object getPluginImplementationViaProperty(Class pluginClass) { } /** - * Retrieve instance of {@link RxJavaSchedulers} to use based on order of precedence as defined in {@link RxJavaPlugins} class header. + * Retrieve instance of {@link RxJavaDefaultSchedulers} to use based on order of precedence as defined in {@link RxJavaPlugins} class header. *

- * Override default by using {@link #registerSchedulers(RxJavaSchedulers)} or setting property: rxjava.plugin.RxJavaDefaultSchedulers.implementation with the full + * Override default by using {@link #registerDefaultSchedulers(RxJavaDefaultSchedulers)} or setting property: rxjava.plugin.RxJavaDefaultSchedulers.implementation with the full * classname to * load. * * @return {@link RxJavaErrorHandler} implementation to use */ - public RxJavaSchedulers getSchedulers() { + public RxJavaDefaultSchedulers getDefaultSchedulers() { if (schedulerOverrides.get() == null) { // check for an implementation from System.getProperty first - Object impl = getPluginImplementationViaProperty(RxJavaSchedulers.class); + Object impl = getPluginImplementationViaProperty(RxJavaDefaultSchedulers.class); if (impl == null) { // nothing set via properties so initialize with default - schedulerOverrides.compareAndSet(null, RxJavaSchedulersDefault.getInstance()); + schedulerOverrides.compareAndSet(null, RxJavaDefaultSchedulersDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from the system property so use it - schedulerOverrides.compareAndSet(null, (RxJavaSchedulers) impl); + schedulerOverrides.compareAndSet(null, (RxJavaDefaultSchedulers) impl); } } return schedulerOverrides.get(); } /** - * Register a {@link RxJavaSchedulers} implementation as a global override of any injected or default implementations. + * Register a {@link RxJavaDefaultSchedulers} implementation as a global override of any injected or default implementations. * * @param impl - * {@link RxJavaSchedulers} implementation + * {@link RxJavaDefaultSchedulers} implementation * @throws IllegalStateException * if called more than once or after the default was initialized (if usage occurs before trying to register) */ - public void registerSchedulers(RxJavaSchedulers impl) { + public void registerDefaultSchedulers(RxJavaDefaultSchedulers impl) { if (!schedulerOverrides.compareAndSet(null, impl)) { throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get()); } diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index 96db3f7bcf..78239b79ad 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -46,9 +46,14 @@ public Thread newThread(Runnable r) { } }; + @Deprecated public static NewThreadScheduler getInstance() { return INSTANCE; } + + /* package */ static NewThreadScheduler instance() { + return INSTANCE; + } private NewThreadScheduler() { diff --git a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java index 14fbe08900..0c607d6bed 100644 --- a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java +++ b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java @@ -38,7 +38,7 @@ public class Schedulers { private static final Schedulers INSTANCE = new Schedulers(); private Schedulers() { - Func0 c = RxJavaPlugins.getInstance().getSchedulers().getComputationScheduler(); + Func0 c = RxJavaPlugins.getInstance().getDefaultSchedulers().getComputationSchedulerFactory(); if (c != null) { computationScheduler = c; } else { @@ -52,7 +52,7 @@ public Scheduler call() { }; } - Func0 io = RxJavaPlugins.getInstance().getSchedulers().getIOScheduler(); + Func0 io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOSchedulerFactory(); if (io != null) { ioScheduler = io; } else { @@ -66,7 +66,7 @@ public Scheduler call() { }; } - Func0 nt = RxJavaPlugins.getInstance().getSchedulers().getNewThreadScheduler(); + Func0 nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadSchedulerFactory(); if (nt != null) { newThreadScheduler = nt; } else { @@ -74,7 +74,7 @@ public Scheduler call() { @Override public Scheduler call() { - return NewThreadScheduler.getInstance(); + return NewThreadScheduler.instance(); } };