Skip to content

Commit

Permalink
Merge pull request #905 from benjchristensen/scheduler-defaults-plugin
Browse files Browse the repository at this point in the history
RxJavaSchedulers Plugin
  • Loading branch information
benjchristensen committed Feb 19, 2014
2 parents 44b015f + 9178d14 commit bccac64
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 8 deletions.
43 changes: 43 additions & 0 deletions rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;

import rx.Scheduler;
import rx.functions.Func0;

/**
* Define alternate Scheduler implementations to be returned by the `Schedulers` factory methods.
* <p>
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: <a
* href="https://github.com/Netflix/RxJava/wiki/Plugins">https://github.com/Netflix/RxJava/wiki/Plugins</a>.
*/
public abstract class RxJavaDefaultSchedulers {

/**
* Factory of Scheduler to return from {@link Schedulers.computation()} or null if default should be used.
*/
public abstract Func0<Scheduler> getComputationSchedulerFactory();

/**
* Factory of Scheduler to return from {@link Schedulers.io()} or null if default should be used.
*/
public abstract Func0<Scheduler> getIOSchedulerFactory();

/**
* Factory of Scheduler to return from {@link Schedulers.newThread()} or null if default should be used.
*/
public abstract Func0<Scheduler> getNewThreadSchedulerFactory();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;

import rx.Scheduler;
import rx.functions.Func0;

/**
* Default implementation of {@link RxJavaErrorHandler} that does nothing.
*
* @ExcludeFromJavadoc
*/
public class RxJavaDefaultSchedulersDefault extends RxJavaDefaultSchedulers {

private static RxJavaDefaultSchedulersDefault INSTANCE = new RxJavaDefaultSchedulersDefault();

public Func0<Scheduler> getComputationSchedulerFactory() {
return null;
}

public Func0<Scheduler> getIOSchedulerFactory() {
return null;
}

public Func0<Scheduler> getNewThreadSchedulerFactory() {
return null;
}

public static RxJavaDefaultSchedulers getInstance() {
return INSTANCE;
}

}
40 changes: 40 additions & 0 deletions rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class RxJavaPlugins {

private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
private final AtomicReference<RxJavaDefaultSchedulers> schedulerOverrides = new AtomicReference<RxJavaDefaultSchedulers>();

public static RxJavaPlugins getInstance() {
return INSTANCE;
Expand Down Expand Up @@ -149,4 +150,43 @@ private static Object getPluginImplementationViaProperty(Class<?> pluginClass) {
return null;
}
}

/**
* Retrieve instance of {@link RxJavaDefaultSchedulers} to use based on order of precedence as defined in {@link RxJavaPlugins} class header.
* <p>
* Override default by using {@link #registerDefaultSchedulers(RxJavaDefaultSchedulers)} or setting property: <code>rxjava.plugin.RxJavaDefaultSchedulers.implementation</code> with the full
* classname to
* load.
*
* @return {@link RxJavaErrorHandler} implementation to use
*/
public RxJavaDefaultSchedulers getDefaultSchedulers() {
if (schedulerOverrides.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaDefaultSchedulers.class);
if (impl == null) {
// nothing set via properties so initialize with default
schedulerOverrides.compareAndSet(null, RxJavaDefaultSchedulersDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
schedulerOverrides.compareAndSet(null, (RxJavaDefaultSchedulers) impl);
}
}
return schedulerOverrides.get();
}

/**
* Register a {@link RxJavaDefaultSchedulers} implementation as a global override of any injected or default implementations.
*
* @param impl
* {@link RxJavaDefaultSchedulers} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying to register)
*/
public void registerDefaultSchedulers(RxJavaDefaultSchedulers impl) {
if (!schedulerOverrides.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ public Thread newThread(Runnable r) {
}
};

@Deprecated
public static NewThreadScheduler getInstance() {
return INSTANCE;
}

/* package */ static NewThreadScheduler instance() {
return INSTANCE;
}

private NewThreadScheduler() {

Expand Down
63 changes: 55 additions & 8 deletions rxjava-core/src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,62 @@
import java.util.concurrent.atomic.AtomicLong;

import rx.Scheduler;
import rx.functions.Func0;
import rx.plugins.RxJavaPlugins;

/**
* Static factory methods for creating Schedulers.
*/
public class Schedulers {
private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor();
private static final Executor IO_EXECUTOR = createIOExecutor();

private final Func0<Scheduler> computationScheduler;
private final Func0<Scheduler> ioScheduler;
private final Func0<Scheduler> newThreadScheduler;

private static final Schedulers INSTANCE = new Schedulers();

private Schedulers() {
Func0<Scheduler> c = RxJavaPlugins.getInstance().getDefaultSchedulers().getComputationSchedulerFactory();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return executor(createComputationExecutor());
}

};
}

Func0<Scheduler> io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOSchedulerFactory();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return executor(createIOExecutor());
}

};
}

Func0<Scheduler> nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadSchedulerFactory();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return NewThreadScheduler.instance();
}

};
}

}

Expand Down Expand Up @@ -63,14 +110,14 @@ public static Scheduler currentThread() {
public static Scheduler trampoline() {
return TrampolineScheduler.getInstance();
}

/**
* {@link Scheduler} that creates a new {@link Thread} for each unit of work.
*
* @return {@link NewThreadScheduler} instance
*/
public static Scheduler newThread() {
return NewThreadScheduler.getInstance();
return INSTANCE.newThreadScheduler.call();
}

/**
Expand Down Expand Up @@ -107,7 +154,7 @@ public static Scheduler executor(ScheduledExecutorService executor) {
*/
@Deprecated
public static Scheduler threadPoolForComputation() {
return executor(COMPUTATION_EXECUTOR);
return computation();
}

/**
Expand All @@ -120,7 +167,7 @@ public static Scheduler threadPoolForComputation() {
* @return {@link Scheduler} for computation-bound work.
*/
public static Scheduler computation() {
return executor(COMPUTATION_EXECUTOR);
return INSTANCE.computationScheduler.call();
}

/**
Expand All @@ -137,7 +184,7 @@ public static Scheduler computation() {
*/
@Deprecated
public static Scheduler threadPoolForIO() {
return executor(IO_EXECUTOR);
return io();
}

/**
Expand All @@ -152,7 +199,7 @@ public static Scheduler threadPoolForIO() {
* @return {@link ExecutorScheduler} for IO-bound work.
*/
public static Scheduler io() {
return executor(IO_EXECUTOR);
return INSTANCE.ioScheduler.call();
}

private static ScheduledExecutorService createComputationExecutor() {
Expand Down

0 comments on commit bccac64

Please sign in to comment.