Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxJavaSchedulers Plugin #905

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions rxjava-core/src/main/java/rx/plugins/RxJavaDefaultSchedulers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;

import rx.Scheduler;
import rx.functions.Func0;

/**
* Define alternate Scheduler implementations to be returned by the `Schedulers` factory methods.
* <p>
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: <a
* href="https://github.com/Netflix/RxJava/wiki/Plugins">https://github.com/Netflix/RxJava/wiki/Plugins</a>.
*/
public abstract class RxJavaDefaultSchedulers {

/**
* Factory of Scheduler to return from {@link Schedulers.computation()} or null if default should be used.
*/
public abstract Func0<Scheduler> getComputationSchedulerFactory();

/**
* Factory of Scheduler to return from {@link Schedulers.io()} or null if default should be used.
*/
public abstract Func0<Scheduler> getIOSchedulerFactory();

/**
* Factory of Scheduler to return from {@link Schedulers.newThread()} or null if default should be used.
*/
public abstract Func0<Scheduler> getNewThreadSchedulerFactory();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;

import rx.Scheduler;
import rx.functions.Func0;

/**
* Default implementation of {@link RxJavaErrorHandler} that does nothing.
*
* @ExcludeFromJavadoc
*/
public class RxJavaDefaultSchedulersDefault extends RxJavaDefaultSchedulers {

private static RxJavaDefaultSchedulersDefault INSTANCE = new RxJavaDefaultSchedulersDefault();

public Func0<Scheduler> getComputationSchedulerFactory() {
return null;
}

public Func0<Scheduler> getIOSchedulerFactory() {
return null;
}

public Func0<Scheduler> getNewThreadSchedulerFactory() {
return null;
}

public static RxJavaDefaultSchedulers getInstance() {
return INSTANCE;
}

}
40 changes: 40 additions & 0 deletions rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class RxJavaPlugins {

private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
private final AtomicReference<RxJavaDefaultSchedulers> schedulerOverrides = new AtomicReference<RxJavaDefaultSchedulers>();

public static RxJavaPlugins getInstance() {
return INSTANCE;
Expand Down Expand Up @@ -149,4 +150,43 @@ private static Object getPluginImplementationViaProperty(Class<?> pluginClass) {
return null;
}
}

/**
* Retrieve instance of {@link RxJavaDefaultSchedulers} to use based on order of precedence as defined in {@link RxJavaPlugins} class header.
* <p>
* Override default by using {@link #registerDefaultSchedulers(RxJavaDefaultSchedulers)} or setting property: <code>rxjava.plugin.RxJavaDefaultSchedulers.implementation</code> with the full
* classname to
* load.
*
* @return {@link RxJavaErrorHandler} implementation to use
*/
public RxJavaDefaultSchedulers getDefaultSchedulers() {
if (schedulerOverrides.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaDefaultSchedulers.class);
if (impl == null) {
// nothing set via properties so initialize with default
schedulerOverrides.compareAndSet(null, RxJavaDefaultSchedulersDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
schedulerOverrides.compareAndSet(null, (RxJavaDefaultSchedulers) impl);
}
}
return schedulerOverrides.get();
}

/**
* Register a {@link RxJavaDefaultSchedulers} implementation as a global override of any injected or default implementations.
*
* @param impl
* {@link RxJavaDefaultSchedulers} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying to register)
*/
public void registerDefaultSchedulers(RxJavaDefaultSchedulers impl) {
if (!schedulerOverrides.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered: " + schedulerOverrides.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ public Thread newThread(Runnable r) {
}
};

@Deprecated
public static NewThreadScheduler getInstance() {
return INSTANCE;
}

/* package */ static NewThreadScheduler instance() {
return INSTANCE;
}

private NewThreadScheduler() {

Expand Down
63 changes: 55 additions & 8 deletions rxjava-core/src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,62 @@
import java.util.concurrent.atomic.AtomicLong;

import rx.Scheduler;
import rx.functions.Func0;
import rx.plugins.RxJavaPlugins;

/**
* Static factory methods for creating Schedulers.
*/
public class Schedulers {
private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor();
private static final Executor IO_EXECUTOR = createIOExecutor();

private final Func0<Scheduler> computationScheduler;
private final Func0<Scheduler> ioScheduler;
private final Func0<Scheduler> newThreadScheduler;

private static final Schedulers INSTANCE = new Schedulers();

private Schedulers() {
Func0<Scheduler> c = RxJavaPlugins.getInstance().getDefaultSchedulers().getComputationSchedulerFactory();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return executor(createComputationExecutor());
}

};
}

Func0<Scheduler> io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOSchedulerFactory();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return executor(createIOExecutor());
}

};
}

Func0<Scheduler> nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadSchedulerFactory();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = new Func0<Scheduler>() {

@Override
public Scheduler call() {
return NewThreadScheduler.instance();
}

};
}

}

Expand Down Expand Up @@ -63,14 +110,14 @@ public static Scheduler currentThread() {
public static Scheduler trampoline() {
return TrampolineScheduler.getInstance();
}

/**
* {@link Scheduler} that creates a new {@link Thread} for each unit of work.
*
* @return {@link NewThreadScheduler} instance
*/
public static Scheduler newThread() {
return NewThreadScheduler.getInstance();
return INSTANCE.newThreadScheduler.call();
}

/**
Expand Down Expand Up @@ -107,7 +154,7 @@ public static Scheduler executor(ScheduledExecutorService executor) {
*/
@Deprecated
public static Scheduler threadPoolForComputation() {
return executor(COMPUTATION_EXECUTOR);
return computation();
}

/**
Expand All @@ -120,7 +167,7 @@ public static Scheduler threadPoolForComputation() {
* @return {@link Scheduler} for computation-bound work.
*/
public static Scheduler computation() {
return executor(COMPUTATION_EXECUTOR);
return INSTANCE.computationScheduler.call();
}

/**
Expand All @@ -137,7 +184,7 @@ public static Scheduler computation() {
*/
@Deprecated
public static Scheduler threadPoolForIO() {
return executor(IO_EXECUTOR);
return io();
}

/**
Expand All @@ -152,7 +199,7 @@ public static Scheduler threadPoolForIO() {
* @return {@link ExecutorScheduler} for IO-bound work.
*/
public static Scheduler io() {
return executor(IO_EXECUTOR);
return INSTANCE.ioScheduler.call();
}

private static ScheduledExecutorService createComputationExecutor() {
Expand Down