From 0f10874bbebd7b6faa52ef6f48cd0a8e075d97d7 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Thu, 2 Feb 2017 20:55:43 +0100 Subject: [PATCH] add @NonNull annotations to schedulers --- src/main/java/io/reactivex/Scheduler.java | 39 ++++-- .../schedulers/ComputationScheduler.java | 14 +- .../schedulers/ExecutorScheduler.java | 20 ++- .../schedulers/ImmediateThinScheduler.java | 20 ++- .../internal/schedulers/IoScheduler.java | 5 +- .../schedulers/NewThreadScheduler.java | 2 + .../internal/schedulers/NewThreadWorker.java | 7 +- .../internal/schedulers/SchedulerWhen.java | 8 +- .../internal/schedulers/SingleScheduler.java | 11 +- .../schedulers/TrampolineScheduler.java | 14 +- .../io/reactivex/plugins/RxJavaPlugins.java | 122 +++++++++--------- .../io/reactivex/schedulers/Schedulers.java | 14 +- .../reactivex/schedulers/TestScheduler.java | 12 +- .../flowable/FlowableReplayTest.java | 7 +- .../flowable/FlowableSubscribeOnTest.java | 8 +- .../flowable/FlowableUnsubscribeOnTest.java | 2 + .../observable/ObservableReplayTest.java | 7 +- .../observable/ObservableSubscribeOnTest.java | 8 +- .../ObservableUnsubscribeOnTest.java | 2 + .../reactivex/plugins/RxJavaPluginsTest.java | 8 -- .../reactivex/schedulers/SchedulerTest.java | 5 +- .../schedulers/SchedulerWorkerTest.java | 10 +- 22 files changed, 217 insertions(+), 128 deletions(-) diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index a96fa469cf..310e56b849 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit; import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; @@ -61,6 +62,7 @@ public static long clockDriftTolerance() { * * @return a Worker representing a serial queue of actions to be executed */ + @NonNull public abstract Worker createWorker(); /** @@ -69,7 +71,7 @@ public static long clockDriftTolerance() { * @return the 'current time' * @since 2.0 */ - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @@ -105,7 +107,8 @@ public void shutdown() { * @return the Disposable instance that let's one cancel this particular task. * @since 2.0 */ - public Disposable scheduleDirect(Runnable run) { + @NonNull + public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } @@ -122,7 +125,8 @@ public Disposable scheduleDirect(Runnable run) { * @return the Disposable that let's one cancel this particular delayed task. * @since 2.0 */ - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + @NonNull + public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); @@ -159,7 +163,8 @@ public void run() { * @return the Disposable that let's one cancel this particular delayed task. * @since 2.0 */ - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + @NonNull + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); @@ -249,7 +254,8 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo */ @SuppressWarnings("unchecked") @Experimental - public S when(Function>, Completable> combine) { + @NonNull + public S when(@NonNull Function>, Completable> combine) { return (S) new SchedulerWhen(combine, this); } @@ -268,7 +274,8 @@ public abstract static class Worker implements Disposable { * Runnable to schedule * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */ - public Disposable schedule(Runnable run) { + @NonNull + public Disposable schedule(@NonNull Runnable run) { return schedule(run, 0L, TimeUnit.NANOSECONDS); } @@ -287,7 +294,8 @@ public Disposable schedule(Runnable run) { * the time unit of {@code delayTime} * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */ - public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit); + @NonNull + public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit); /** * Schedules a cancelable action to be executed periodically. This default implementation schedules @@ -309,7 +317,8 @@ public Disposable schedule(Runnable run) { * the time unit of {@code period} * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */ - public Disposable schedulePeriodically(Runnable run, final long initialDelay, final long period, final TimeUnit unit) { + @NonNull + public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) { final SequentialDisposable first = new SequentialDisposable(); final SequentialDisposable sd = new SequentialDisposable(first); @@ -337,7 +346,7 @@ public Disposable schedulePeriodically(Runnable run, final long initialDelay, fi * @return the 'current time' * @since 2.0 */ - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @@ -346,15 +355,17 @@ public long now(TimeUnit unit) { * of this task has to happen (accounting for clock drifts). */ final class PeriodicTask implements Runnable { + @NonNull final Runnable decoratedRun; + @NonNull final SequentialDisposable sd; final long periodInNanoseconds; long count; long lastNowNanoseconds; long startInNanoseconds; - PeriodicTask(long firstStartInNanoseconds, Runnable decoratedRun, - long firstNowNanoseconds, SequentialDisposable sd, long periodInNanoseconds) { + PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun, + long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) { this.decoratedRun = decoratedRun; this.sd = sd; this.periodInNanoseconds = periodInNanoseconds; @@ -395,12 +406,12 @@ public void run() { static class PeriodicDirectTask implements Runnable, Disposable { final Runnable run; - + @NonNull final Worker worker; - + @NonNull volatile boolean disposed; - PeriodicDirectTask(Runnable run, Worker worker) { + PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) { this.run = run; this.worker = worker; } diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index a74439fc0f..5faaa555a5 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -16,6 +16,7 @@ package io.reactivex.internal.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; @@ -118,19 +119,22 @@ public ComputationScheduler(ThreadFactory threadFactory) { start(); } + @NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get().getEventLoop()); } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { PoolWorker w = pool.get().getEventLoop(); return w.scheduleDirect(run, delay, unit); } + @NonNull @Override - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { PoolWorker w = pool.get().getEventLoop(); return w.schedulePeriodicallyDirect(run, initialDelay, period, unit); } @@ -188,16 +192,18 @@ public boolean isDisposed() { return disposed; } + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { if (disposed) { return EmptyDisposable.INSTANCE; } return poolWorker.scheduleActual(action, 0, null, serial); } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } diff --git a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java index 3916073a40..6ce46c44f7 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java @@ -17,6 +17,7 @@ import java.util.concurrent.atomic.*; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; import io.reactivex.internal.queue.MpscLinkedQueue; @@ -29,21 +30,24 @@ */ public final class ExecutorScheduler extends Scheduler { + @NonNull final Executor executor; static final Scheduler HELPER = Schedulers.single(); - public ExecutorScheduler(Executor executor) { + public ExecutorScheduler(@NonNull Executor executor) { this.executor = executor; } + @NonNull @Override public Worker createWorker() { return new ExecutorWorker(executor); } + @NonNull @Override - public Disposable scheduleDirect(Runnable run) { + public Disposable scheduleDirect(@NonNull Runnable run) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { if (executor instanceof ExecutorService) { @@ -60,8 +64,9 @@ public Disposable scheduleDirect(Runnable run) { } } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, final long delay, final TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final TimeUnit unit) { final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); if (executor instanceof ScheduledExecutorService) { try { @@ -87,8 +92,9 @@ public void run() { return dr; } + @NonNull @Override - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { if (executor instanceof ScheduledExecutorService) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { @@ -118,8 +124,9 @@ public ExecutorWorker(Executor executor) { this.queue = new MpscLinkedQueue(); } + @NonNull @Override - public Disposable schedule(Runnable run) { + public Disposable schedule(@NonNull Runnable run) { if (disposed) { return EmptyDisposable.INSTANCE; } @@ -143,8 +150,9 @@ public Disposable schedule(Runnable run) { return br; } + @NonNull @Override - public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { if (delay <= 0) { return schedule(run); } diff --git a/src/main/java/io/reactivex/internal/schedulers/ImmediateThinScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ImmediateThinScheduler.java index 2dbdeb95a9..990a8cdd14 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ImmediateThinScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ImmediateThinScheduler.java @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; /** @@ -45,22 +46,26 @@ private ImmediateThinScheduler() { // singleton class } + @NonNull @Override - public Disposable scheduleDirect(Runnable run) { + public Disposable scheduleDirect(@NonNull Runnable run) { run.run(); return DISPOSED; } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { throw new UnsupportedOperationException("This scheduler doesn't support delayed execution"); } + @NonNull @Override - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException("This scheduler doesn't support periodic execution"); } + @NonNull @Override public Worker createWorker() { return WORKER; @@ -78,19 +83,22 @@ public boolean isDisposed() { return false; // dispose() has no effect } + @NonNull @Override - public Disposable schedule(Runnable run) { + public Disposable schedule(@NonNull Runnable run) { run.run(); return DISPOSED; } + @NonNull @Override - public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { throw new UnsupportedOperationException("This scheduler doesn't support delayed execution"); } + @NonNull @Override - public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodically(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException("This scheduler doesn't support periodic execution"); } } diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java index 8030cd7333..cc22f0d4b1 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java @@ -17,6 +17,7 @@ package io.reactivex.internal.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; @@ -180,6 +181,7 @@ public void shutdown() { } } + @NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); @@ -223,8 +225,9 @@ public boolean isDisposed() { return once.get(); } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java index e78f897d7e..2513cb300d 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java @@ -17,6 +17,7 @@ package io.reactivex.internal.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import java.util.concurrent.ThreadFactory; @@ -48,6 +49,7 @@ public NewThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } + @NonNull @Override public Worker createWorker() { return new NewThreadWorker(threadFactory); diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index a16955b76d..ea45d78e60 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -16,6 +16,7 @@ import java.util.concurrent.*; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; import io.reactivex.plugins.RxJavaPlugins; @@ -34,13 +35,15 @@ public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } + @NonNull @Override - public Disposable schedule(final Runnable run) { + public Disposable schedule(@NonNull final Runnable run) { return schedule(run, 0, null); } + @NonNull @Override - public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java index dea4e07efe..d8190ec30a 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java @@ -25,6 +25,7 @@ import io.reactivex.Observable; import io.reactivex.Scheduler; import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.Exceptions; @@ -129,6 +130,7 @@ public boolean isDisposed() { return disposable.isDisposed(); } + @NonNull @Override public Worker createWorker() { final Worker actualWorker = actualScheduler.createWorker(); @@ -168,16 +170,18 @@ public boolean isDisposed() { return unsubscribed.get(); } + @NonNull @Override - public Disposable schedule(final Runnable action, final long delayTime, final TimeUnit unit) { + public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit unit) { // send a scheduled action to the actionQueue DelayedAction delayedAction = new DelayedAction(action, delayTime, unit); actionProcessor.onNext(delayedAction); return delayedAction; } + @NonNull @Override - public Disposable schedule(final Runnable action) { + public Disposable schedule(@NonNull final Runnable action) { // send a scheduled action to the actionQueue ImmediateAction immediateAction = new ImmediateAction(action); actionProcessor.onNext(immediateAction); diff --git a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java index 5e0e46cc39..47e7761f89 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java @@ -13,6 +13,7 @@ package io.reactivex.internal.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.plugins.RxJavaPlugins; @@ -96,13 +97,15 @@ public void shutdown() { } } + @NonNull @Override public Worker createWorker() { return new ScheduledWorker(executor.get()); } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { Future f; @@ -118,8 +121,9 @@ public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { } } + @NonNull @Override - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { Future f = executor.get().scheduleAtFixedRate(decoratedRun, initialDelay, period, unit); @@ -143,8 +147,9 @@ static final class ScheduledWorker extends Scheduler.Worker { this.tasks = new CompositeDisposable(); } + @NonNull @Override - public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } diff --git a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java index dc55b2a6d3..c63fdf2dac 100644 --- a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.ObjectHelper; @@ -36,6 +37,7 @@ public static TrampolineScheduler instance() { return INSTANCE; } + @NonNull @Override public Worker createWorker() { return new TrampolineWorker(); @@ -44,14 +46,16 @@ public Worker createWorker() { /* package accessible for unit tests */TrampolineScheduler() { } + @NonNull @Override - public Disposable scheduleDirect(Runnable run) { + public Disposable scheduleDirect(@NonNull Runnable run) { run.run(); return EmptyDisposable.INSTANCE; } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { try { unit.sleep(delay); run.run(); @@ -71,13 +75,15 @@ static final class TrampolineWorker extends Scheduler.Worker implements Disposab volatile boolean disposed; + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { return enqueue(action, now(TimeUnit.MILLISECONDS)); } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { long execTime = now(TimeUnit.MILLISECONDS) + unit.toMillis(delayTime); return enqueue(new SleepingRunnable(action, this, execTime), execTime); diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 95242bccfb..03a90dcd03 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -264,8 +264,8 @@ public static Function getSingleSchedulerHandler() { * @return the value returned by the hook, not null * @throws NullPointerException if the callable parameter or its result are null */ - @Nonnull - public static Scheduler initComputationScheduler(@Nonnull Callable defaultScheduler) { + @NonNull + public static Scheduler initComputationScheduler(@NonNull Callable defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function, Scheduler> f = onInitComputationHandler; if (f == null) { @@ -280,8 +280,8 @@ public static Scheduler initComputationScheduler(@Nonnull Callable de * @return the value returned by the hook, not null * @throws NullPointerException if the callable parameter or its result are null */ - @Nonnull - public static Scheduler initIoScheduler(@Nonnull Callable defaultScheduler) { + @NonNull + public static Scheduler initIoScheduler(@NonNull Callable defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function, Scheduler> f = onInitIoHandler; if (f == null) { @@ -296,8 +296,8 @@ public static Scheduler initIoScheduler(@Nonnull Callable defaultSche * @return the value returned by the hook, not null * @throws NullPointerException if the callable parameter or its result are null */ - @Nonnull - public static Scheduler initNewThreadScheduler(@Nonnull Callable defaultScheduler) { + @NonNull + public static Scheduler initNewThreadScheduler(@NonNull Callable defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function, Scheduler> f = onInitNewThreadHandler; if (f == null) { @@ -312,8 +312,8 @@ public static Scheduler initNewThreadScheduler(@Nonnull Callable defa * @return the value returned by the hook, not null * @throws NullPointerException if the callable parameter or its result are null */ - @Nonnull - public static Scheduler initSingleScheduler(@Nonnull Callable defaultScheduler) { + @NonNull + public static Scheduler initSingleScheduler(@NonNull Callable defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function, Scheduler> f = onInitSingleHandler; if (f == null) { @@ -327,8 +327,8 @@ public static Scheduler initSingleScheduler(@Nonnull Callable default * @param defaultScheduler the hook's input value * @return the value returned by the hook */ - @Nonnull - public static Scheduler onComputationScheduler(@Nonnull Scheduler defaultScheduler) { + @NonNull + public static Scheduler onComputationScheduler(@NonNull Scheduler defaultScheduler) { Function f = onComputationHandler; if (f == null) { return defaultScheduler; @@ -340,7 +340,7 @@ public static Scheduler onComputationScheduler(@Nonnull Scheduler defaultSchedul * Called when an undeliverable error occurs. * @param error the error to report */ - public static void onError(@Nonnull Throwable error) { + public static void onError(@NonNull Throwable error) { Consumer f = errorHandler; if (error == null) { @@ -362,7 +362,7 @@ public static void onError(@Nonnull Throwable error) { uncaught(error); } - static void uncaught(@Nonnull Throwable error) { + static void uncaught(@NonNull Throwable error) { Thread currentThread = Thread.currentThread(); UncaughtExceptionHandler handler = currentThread.getUncaughtExceptionHandler(); handler.uncaughtException(currentThread, error); @@ -373,8 +373,8 @@ static void uncaught(@Nonnull Throwable error) { * @param defaultScheduler the hook's input value * @return the value returned by the hook */ - @Nonnull - public static Scheduler onIoScheduler(@Nonnull Scheduler defaultScheduler) { + @NonNull + public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) { Function f = onIoHandler; if (f == null) { return defaultScheduler; @@ -387,8 +387,8 @@ public static Scheduler onIoScheduler(@Nonnull Scheduler defaultScheduler) { * @param defaultScheduler the hook's input value * @return the value returned by the hook */ - @Nonnull - public static Scheduler onNewThreadScheduler(@Nonnull Scheduler defaultScheduler) { + @NonNull + public static Scheduler onNewThreadScheduler(@NonNull Scheduler defaultScheduler) { Function f = onNewThreadHandler; if (f == null) { return defaultScheduler; @@ -401,8 +401,8 @@ public static Scheduler onNewThreadScheduler(@Nonnull Scheduler defaultScheduler * @param run the runnable instance * @return the replacement runnable */ - @Nonnull - public static Runnable onSchedule(@Nonnull Runnable run) { + @NonNull + public static Runnable onSchedule(@NonNull Runnable run) { Function f = onScheduleHandler; if (f == null) { return run; @@ -415,8 +415,8 @@ public static Runnable onSchedule(@Nonnull Runnable run) { * @param defaultScheduler the hook's input value * @return the value returned by the hook */ - @Nonnull - public static Scheduler onSingleScheduler(@Nonnull Scheduler defaultScheduler) { + @NonNull + public static Scheduler onSingleScheduler(@NonNull Scheduler defaultScheduler) { Function f = onSingleHandler; if (f == null) { return defaultScheduler; @@ -854,8 +854,8 @@ public static void setOnSingleSubscribe(@Nullable BiFunction Subscriber onSubscribe(@Nonnull Flowable source, @Nonnull Subscriber subscriber) { + @NonNull + public static Subscriber onSubscribe(@NonNull Flowable source, @NonNull Subscriber subscriber) { BiFunction f = onFlowableSubscribe; if (f != null) { return apply(f, source, subscriber); @@ -871,8 +871,8 @@ public static Subscriber onSubscribe(@Nonnull Flowable source, * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static Observer onSubscribe(@Nonnull Observable source, @Nonnull Observer observer) { + @NonNull + public static Observer onSubscribe(@NonNull Observable source, @NonNull Observer observer) { BiFunction f = onObservableSubscribe; if (f != null) { return apply(f, source, observer); @@ -888,8 +888,8 @@ public static Observer onSubscribe(@Nonnull Observable source, * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static SingleObserver onSubscribe(@Nonnull Single source, @Nonnull SingleObserver observer) { + @NonNull + public static SingleObserver onSubscribe(@NonNull Single source, @NonNull SingleObserver observer) { BiFunction f = onSingleSubscribe; if (f != null) { return apply(f, source, observer); @@ -903,8 +903,8 @@ public static SingleObserver onSubscribe(@Nonnull Single sourc * @param observer the observer * @return the value returned by the hook */ - @Nonnull - public static CompletableObserver onSubscribe(@Nonnull Completable source, @Nonnull CompletableObserver observer) { + @NonNull + public static CompletableObserver onSubscribe(@NonNull Completable source, @NonNull CompletableObserver observer) { BiFunction f = onCompletableSubscribe; if (f != null) { return apply(f, source, observer); @@ -920,8 +920,8 @@ public static CompletableObserver onSubscribe(@Nonnull Completable source, @Nonn * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static MaybeObserver onSubscribe(@Nonnull Maybe source, @Nonnull MaybeObserver subscriber) { + @NonNull + public static MaybeObserver onSubscribe(@NonNull Maybe source, @NonNull MaybeObserver subscriber) { BiFunction f = onMaybeSubscribe; if (f != null) { return apply(f, source, subscriber); @@ -936,8 +936,8 @@ public static MaybeObserver onSubscribe(@Nonnull Maybe source, * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static Maybe onAssembly(@Nonnull Maybe source) { + @NonNull + public static Maybe onAssembly(@NonNull Maybe source) { Function f = onMaybeAssembly; if (f != null) { return apply(f, source); @@ -952,8 +952,8 @@ public static Maybe onAssembly(@Nonnull Maybe source) { * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static Flowable onAssembly(@Nonnull Flowable source) { + @NonNull + public static Flowable onAssembly(@NonNull Flowable source) { Function f = onFlowableAssembly; if (f != null) { return apply(f, source); @@ -968,8 +968,8 @@ public static Flowable onAssembly(@Nonnull Flowable source) { * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static ConnectableFlowable onAssembly(@Nonnull ConnectableFlowable source) { + @NonNull + public static ConnectableFlowable onAssembly(@NonNull ConnectableFlowable source) { Function f = onConnectableFlowableAssembly; if (f != null) { return apply(f, source); @@ -984,8 +984,8 @@ public static ConnectableFlowable onAssembly(@Nonnull ConnectableFlowable * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static Observable onAssembly(@Nonnull Observable source) { + @NonNull + public static Observable onAssembly(@NonNull Observable source) { Function f = onObservableAssembly; if (f != null) { return apply(f, source); @@ -1000,8 +1000,8 @@ public static Observable onAssembly(@Nonnull Observable source) { * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static ConnectableObservable onAssembly(@Nonnull ConnectableObservable source) { + @NonNull + public static ConnectableObservable onAssembly(@NonNull ConnectableObservable source) { Function f = onConnectableObservableAssembly; if (f != null) { return apply(f, source); @@ -1016,8 +1016,8 @@ public static ConnectableObservable onAssembly(@Nonnull ConnectableObserv * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static Single onAssembly(@Nonnull Single source) { + @NonNull + public static Single onAssembly(@NonNull Single source) { Function f = onSingleAssembly; if (f != null) { return apply(f, source); @@ -1030,8 +1030,8 @@ public static Single onAssembly(@Nonnull Single source) { * @param source the hook's input value * @return the value returned by the hook */ - @Nonnull - public static Completable onAssembly(@Nonnull Completable source) { + @NonNull + public static Completable onAssembly(@NonNull Completable source) { Function f = onCompletableAssembly; if (f != null) { return apply(f, source); @@ -1074,8 +1074,8 @@ public static Function getOnParallelAssembly */ @Experimental @SuppressWarnings({ "rawtypes", "unchecked" }) - @Nonnull - public static ParallelFlowable onAssembly(@Nonnull ParallelFlowable source) { + @NonNull + public static ParallelFlowable onAssembly(@NonNull ParallelFlowable source) { Function f = onParallelAssembly; if (f != null) { return apply(f, source); @@ -1143,8 +1143,8 @@ public static BooleanSupplier getOnBeforeBlocking() { * @since 2.0.5 - experimental */ @Experimental - @Nonnull - public static Scheduler createComputationScheduler(@Nonnull ThreadFactory threadFactory) { + @NonNull + public static Scheduler createComputationScheduler(@NonNull ThreadFactory threadFactory) { return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); } @@ -1157,8 +1157,8 @@ public static Scheduler createComputationScheduler(@Nonnull ThreadFactory thread * @since 2.0.5 - experimental */ @Experimental - @Nonnull - public static Scheduler createIoScheduler(@Nonnull ThreadFactory threadFactory) { + @NonNull + public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory) { return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); } @@ -1171,8 +1171,8 @@ public static Scheduler createIoScheduler(@Nonnull ThreadFactory threadFactory) * @since 2.0.5 - experimental */ @Experimental - @Nonnull - public static Scheduler createNewThreadScheduler(@Nonnull ThreadFactory threadFactory) { + @NonNull + public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFactory) { return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); } @@ -1185,8 +1185,8 @@ public static Scheduler createNewThreadScheduler(@Nonnull ThreadFactory threadFa * @since 2.0.5 - experimental */ @Experimental - @Nonnull - public static Scheduler createSingleScheduler(@Nonnull ThreadFactory threadFactory) { + @NonNull + public static Scheduler createSingleScheduler(@NonNull ThreadFactory threadFactory) { return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); } @@ -1200,7 +1200,7 @@ public static Scheduler createSingleScheduler(@Nonnull ThreadFactory threadFacto * @return the result of the function call */ @NonNull - static R apply(@Nonnull Function f, @NonNull T t) { + static R apply(@NonNull Function f, @NonNull T t) { try { return f.apply(t); } catch (Throwable ex) { @@ -1219,8 +1219,8 @@ static R apply(@Nonnull Function f, @NonNull T t) { * @param u the second parameter value to the function * @return the result of the function call */ - @Nonnull - static R apply(@Nonnull BiFunction f, @Nonnull T t, @Nonnull U u) { + @NonNull + static R apply(@NonNull BiFunction f, @NonNull T t, @NonNull U u) { try { return f.apply(t, u); } catch (Throwable ex) { @@ -1235,8 +1235,8 @@ static R apply(@Nonnull BiFunction f, @Nonnull T t, @Nonnull * @return the result of the callable call, not null * @throws NullPointerException if the callable parameter returns null */ - @Nonnull - static Scheduler callRequireNonNull(@Nonnull Callable s) { + @NonNull + static Scheduler callRequireNonNull(@NonNull Callable s) { try { return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null"); } catch (Throwable ex) { @@ -1252,8 +1252,8 @@ static Scheduler callRequireNonNull(@Nonnull Callable s) { * @return the result of the function call, not null * @throws NullPointerException if the function parameter returns null */ - @Nonnull - static Scheduler applyRequireNonNull(@Nonnull Function, Scheduler> f, Callable s) { + @NonNull + static Scheduler applyRequireNonNull(@NonNull Function, Scheduler> f, Callable s) { return ObjectHelper.requireNonNull(apply(f, s), "Scheduler Callable result can't be null"); } diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index c5913b70cf..967acce36d 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -14,6 +14,7 @@ package io.reactivex.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.internal.schedulers.*; import io.reactivex.plugins.RxJavaPlugins; @@ -34,14 +35,19 @@ * */ public final class Schedulers { + @NonNull static final Scheduler SINGLE; + @NonNull static final Scheduler COMPUTATION; + @NonNull static final Scheduler IO; + @NonNull static final Scheduler TRAMPOLINE; + @NonNull static final Scheduler NEW_THREAD; static final class SingleHolder { @@ -108,6 +114,7 @@ private Schedulers() { * * @return a {@link Scheduler} meant for computation-bound work */ + @NonNull public static Scheduler computation() { return RxJavaPlugins.onComputationScheduler(COMPUTATION); } @@ -125,6 +132,7 @@ public static Scheduler computation() { * * @return a {@link Scheduler} meant for IO-bound work */ + @NonNull public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); } @@ -135,6 +143,7 @@ public static Scheduler io() { * * @return a {@link Scheduler} that queues work on the current thread */ + @NonNull public static Scheduler trampoline() { return TRAMPOLINE; } @@ -146,6 +155,7 @@ public static Scheduler trampoline() { * * @return a {@link Scheduler} that creates new threads */ + @NonNull public static Scheduler newThread() { return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD); } @@ -163,6 +173,7 @@ public static Scheduler newThread() { * @return a {@link Scheduler} that shares a single backing thread. * @since 2.0 */ + @NonNull public static Scheduler single() { return RxJavaPlugins.onSingleScheduler(SINGLE); } @@ -174,7 +185,8 @@ public static Scheduler single() { * the executor to wrap * @return the new Scheduler wrapping the Executor */ - public static Scheduler from(Executor executor) { + @NonNull + public static Scheduler from(@NonNull Executor executor) { return new ExecutorScheduler(executor); } diff --git a/src/main/java/io/reactivex/schedulers/TestScheduler.java b/src/main/java/io/reactivex/schedulers/TestScheduler.java index ffb592ff30..d8700e4770 100644 --- a/src/main/java/io/reactivex/schedulers/TestScheduler.java +++ b/src/main/java/io/reactivex/schedulers/TestScheduler.java @@ -17,6 +17,7 @@ import java.util.concurrent.*; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.ObjectHelper; @@ -63,7 +64,7 @@ public int compareTo(TimedRunnable o) { } @Override - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return unit.convert(time, TimeUnit.NANOSECONDS); } @@ -118,6 +119,7 @@ private void triggerActions(long targetTimeInNanoseconds) { time = targetTimeInNanoseconds; } + @NonNull @Override public Worker createWorker() { return new TestWorker(); @@ -137,8 +139,9 @@ public boolean isDisposed() { return disposed; } + @NonNull @Override - public Disposable schedule(Runnable run, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } @@ -153,8 +156,9 @@ public void run() { }); } + @NonNull @Override - public Disposable schedule(Runnable run) { + public Disposable schedule(@NonNull Runnable run) { if (disposed) { return EmptyDisposable.INSTANCE; } @@ -169,7 +173,7 @@ public void run() { } @Override - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return TestScheduler.this.now(unit); } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index 358e3b2d2d..ff10a30a23 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.*; @@ -708,14 +709,16 @@ private static class InprocessWorker extends Worker { this.mockDisposable = mockDisposable; } + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { action.run(); return mockDisposable; // this subscription is returned but discarded } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { action.run(); return mockDisposable; } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java index 578a7e8984..c505c7610d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; import org.junit.*; import org.reactivestreams.*; @@ -122,6 +123,7 @@ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) { this.unit = unit; } + @NonNull @Override public Worker createWorker() { return new SlowInner(actual.createWorker()); @@ -145,13 +147,15 @@ public boolean isDisposed() { return actualInner.isDisposed(); } + @NonNull @Override - public Disposable schedule(final Runnable action) { + public Disposable schedule(@NonNull final Runnable action) { return actualInner.schedule(action, delay, unit); } + @NonNull @Override - public Disposable schedule(final Runnable action, final long delayTime, final TimeUnit delayUnit) { + public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit delayUnit) { TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit; long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit); return actualInner.schedule(action, t, common); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java index 8b309546e3..b6aadd7c8f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.annotations.NonNull; import org.junit.Test; import org.reactivestreams.*; @@ -177,6 +178,7 @@ public void run() { } } + @NonNull @Override public Worker createWorker() { return eventLoop.createWorker(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 002d0b086c..0d87564aed 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import io.reactivex.annotations.NonNull; import org.junit.*; import org.mockito.InOrder; @@ -689,14 +690,16 @@ static class InprocessWorker extends Worker { this.mockDisposable = mockDisposable; } + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { action.run(); return mockDisposable; // this subscription is returned but discarded } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { action.run(); return mockDisposable; } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSubscribeOnTest.java index 241f6ab554..6418c5338f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSubscribeOnTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import io.reactivex.annotations.NonNull; import org.junit.*; import io.reactivex.*; @@ -117,6 +118,7 @@ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) { this.unit = unit; } + @NonNull @Override public Worker createWorker() { return new SlowInner(actual.createWorker()); @@ -140,13 +142,15 @@ public boolean isDisposed() { return actualInner.isDisposed(); } + @NonNull @Override - public Disposable schedule(final Runnable action) { + public Disposable schedule(@NonNull final Runnable action) { return actualInner.schedule(action, delay, unit); } + @NonNull @Override - public Disposable schedule(final Runnable action, final long delayTime, final TimeUnit delayUnit) { + public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit delayUnit) { TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit; long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit); return actualInner.schedule(action, t, common); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java index 7a55b73350..3003c71ea3 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; import org.junit.Test; import io.reactivex.*; @@ -179,6 +180,7 @@ public void run() { } } + @NonNull @Override public Worker createWorker() { return eventLoop.createWorker(); diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index 06bd6ec6e3..26b80f392a 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -1349,14 +1349,6 @@ public void onComplete() { // // assertSame(cop, RxJavaPlugins.onCompletableLift(cop)); - assertNull(RxJavaPlugins.onComputationScheduler(null)); - - assertNull(RxJavaPlugins.onIoScheduler(null)); - - assertNull(RxJavaPlugins.onNewThreadScheduler(null)); - - assertNull(RxJavaPlugins.onSingleScheduler(null)); - final Scheduler s = ImmediateThinScheduler.INSTANCE; Callable c = new Callable() { @Override diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index 165ed2e026..0962d35080 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import io.reactivex.annotations.NonNull; import org.junit.Test; import io.reactivex.*; @@ -248,11 +249,13 @@ public void schedulersUtility() { @Test public void defaultSchedulePeriodicallyDirectRejects() { Scheduler s = new Scheduler() { + @NonNull @Override public Worker createWorker() { return new Worker() { + @NonNull @Override - public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { return EmptyDisposable.INSTANCE; } diff --git a/src/test/java/io/reactivex/schedulers/SchedulerWorkerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerWorkerTest.java index ecacd4328e..910bf715b0 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerWorkerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerWorkerTest.java @@ -18,6 +18,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; +import io.reactivex.annotations.NonNull; import org.junit.Test; import io.reactivex.Scheduler; @@ -27,6 +28,7 @@ public class SchedulerWorkerTest { static final class CustomDriftScheduler extends Scheduler { public volatile long drift; + @NonNull @Override public Worker createWorker() { final Worker w = Schedulers.computation().createWorker(); @@ -42,13 +44,15 @@ public boolean isDisposed() { return w.isDisposed(); } + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { return w.schedule(action); } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { return w.schedule(action, delayTime, unit); } @@ -60,7 +64,7 @@ public long now(TimeUnit unit) { } @Override - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return super.now(unit) + unit.convert(drift, TimeUnit.NANOSECONDS); } }