diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index 136d20d842..724974cbbb 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -13,8 +13,6 @@ package io.reactivex; -import java.util.concurrent.TimeUnit; - import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; @@ -23,6 +21,9 @@ import io.reactivex.internal.schedulers.*; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.SchedulerRunnableIntrospection; + +import java.util.concurrent.TimeUnit; /** * A {@code Scheduler} is an object that specifies an API for scheduling @@ -197,7 +198,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial *

* Limit the amount concurrency two at a time without creating a new fix * size thread pool: - * + * *

      * Scheduler limitScheduler = Schedulers.computation().when(workers -> {
      *  // use merge max concurrent to limit the number of concurrent
@@ -215,7 +216,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
      * {@link Flowable#zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.functions.BiFunction)} where
      * subscribing to the first {@link Flowable} could deadlock the
      * subscription to the second.
-     * 
+     *
      * 
      * Scheduler limitScheduler = Schedulers.computation().when(workers -> {
      *  // use merge max concurrent to limit the number of concurrent
@@ -223,12 +224,12 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
      *  return Completable.merge(Flowable.merge(workers, 2));
      * });
      * 
- * + * * Slowing down the rate to no more than than 1 a second. This suffers from * the same problem as the one above I could find an {@link Flowable} * operator that limits the rate without dropping the values (aka leaky * bucket algorithm). - * + * *
      * Scheduler slowScheduler = Schedulers.computation().when(workers -> {
      *  // use concatenate to make each worker happen one at a time.
@@ -238,7 +239,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
      *  }));
      * });
      * 
- * + * *

History: 2.0.1 - experimental * @param a Scheduler and a Subscription * @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns @@ -347,7 +348,7 @@ public long now(@NonNull TimeUnit unit) { * Holds state and logic to calculate when the next delayed invocation * of this task has to happen (accounting for clock drifts). */ - final class PeriodicTask implements Runnable { + final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection { @NonNull final Runnable decoratedRun; @NonNull @@ -393,11 +394,16 @@ public void run() { sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); } } + + @Override + public Runnable getWrappedRunnable() { + return this.decoratedRun; + } } } static class PeriodicDirectTask - implements Runnable, Disposable { + implements Disposable, Runnable, SchedulerRunnableIntrospection { final Runnable run; @NonNull final Worker worker; @@ -432,9 +438,14 @@ public void dispose() { public boolean isDisposed() { return disposed; } + + @Override + public Runnable getWrappedRunnable() { + return run; + } } - static final class DisposeTask implements Runnable, Disposable { + static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection { final Runnable decoratedRun; final Worker w; @@ -469,5 +480,10 @@ public void dispose() { public boolean isDisposed() { return w.isDisposed(); } + + @Override + public Runnable getWrappedRunnable() { + return this.decoratedRun; + } } } diff --git a/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java b/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java index cd278fe590..2bf3c454f4 100644 --- a/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java +++ b/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java @@ -21,6 +21,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.functions.Functions; +import io.reactivex.schedulers.SchedulerRunnableIntrospection; /** * Base functionality for direct tasks that manage a runnable and cancellation/completion. @@ -28,7 +29,7 @@ */ abstract class AbstractDirectTask extends AtomicReference> -implements Disposable { +implements Disposable, SchedulerRunnableIntrospection { private static final long serialVersionUID = 1811839108042568751L; @@ -77,4 +78,9 @@ public final void setFuture(Future future) { } } } + + @Override + public Runnable getWrappedRunnable() { + return runnable; + } } diff --git a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java index e050fed1f7..b18e18d16d 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java @@ -20,10 +20,11 @@ import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.queue.MpscLinkedQueue; import io.reactivex.internal.schedulers.ExecutorScheduler.ExecutorWorker.BooleanRunnable; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.schedulers.*; /** * Wraps an Executor and provides the Scheduler API over it. @@ -290,7 +291,8 @@ public void run() { } } - static final class DelayedRunnable extends AtomicReference implements Runnable, Disposable { + static final class DelayedRunnable extends AtomicReference + implements Runnable, Disposable, SchedulerRunnableIntrospection { private static final long serialVersionUID = -4101336210206799084L; @@ -330,6 +332,12 @@ public void dispose() { direct.dispose(); } } + + @Override + public Runnable getWrappedRunnable() { + Runnable r = get(); + return r != null ? r : Functions.EMPTY_RUNNABLE; + } } final class DelayedDispose implements Runnable { diff --git a/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java b/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java new file mode 100644 index 0000000000..4e558c8343 --- /dev/null +++ b/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java @@ -0,0 +1,35 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.schedulers; + +import io.reactivex.annotations.*; +import io.reactivex.functions.Function; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Interface to wrap an action inside internal scheduler's task. + * + * You can check if runnable implements this interface and unwrap original runnable. + * For example inside of the {@link RxJavaPlugins#setScheduleHandler(Function)} + * + * @since 2.1.7 - experimental + */ +@Experimental +public interface SchedulerRunnableIntrospection { + + /** + * Returns the wrapped action. + * + * @return the wrapped action. Cannot be null. + */ + @NonNull + Runnable getWrappedRunnable(); +} diff --git a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java index b81036a7e0..8d78e3756f 100644 --- a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java +++ b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java @@ -698,4 +698,48 @@ public void run() { disposable.get().dispose(); } } -} + + @Test(timeout = 5000) + public void unwrapDefaultPeriodicTask() throws InterruptedException { + Scheduler s = getScheduler(); + if (s instanceof TrampolineScheduler) { + // TrampolineScheduler always return EmptyDisposable + return; + } + + + final CountDownLatch cdl = new CountDownLatch(1); + Runnable countDownRunnable = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + Disposable disposable = s.schedulePeriodicallyDirect(countDownRunnable, 100, 100, TimeUnit.MILLISECONDS); + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable; + + assertSame(countDownRunnable, wrapper.getWrappedRunnable()); + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + disposable.dispose(); + } + + @Test + public void unwrapScheduleDirectTask() { + Scheduler scheduler = getScheduler(); + if (scheduler instanceof TrampolineScheduler) { + // TrampolineScheduler always return EmptyDisposable + return; + } + final CountDownLatch cdl = new CountDownLatch(1); + Runnable countDownRunnable = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + Disposable disposable = scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS); + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable; + assertSame(countDownRunnable, wrapper.getWrappedRunnable()); + disposable.dispose(); + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java index c0a19efa5e..4e53fbcf42 100644 --- a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java @@ -561,4 +561,22 @@ public void runnableDisposedAsyncTimed2() throws Exception { executorScheduler.shutdownNow(); } } + + @Test + public void unwrapScheduleDirectTaskAfterDispose() { + Scheduler scheduler = getScheduler(); + final CountDownLatch cdl = new CountDownLatch(1); + Runnable countDownRunnable = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + Disposable disposable = scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS); + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable; + assertSame(countDownRunnable, wrapper.getWrappedRunnable()); + disposable.dispose(); + + assertSame(Functions.EMPTY_RUNNABLE, wrapper.getWrappedRunnable()); + } } diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index 2845cf3a91..714482e5cb 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -307,4 +307,66 @@ public void customScheduleDirectDisposed() { assertTrue(d.isDisposed()); } + + @Test + public void unwrapDefaultPeriodicTask() { + TestScheduler scheduler = new TestScheduler(); + + Runnable runnable = new Runnable() { + @Override + public void run() { + } + }; + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.schedulePeriodicallyDirect(runnable, 100, 100, TimeUnit.MILLISECONDS); + + assertSame(runnable, wrapper.getWrappedRunnable()); + } + + @Test + public void unwrapScheduleDirectTask() { + TestScheduler scheduler = new TestScheduler(); + + Runnable runnable = new Runnable() { + @Override + public void run() { + } + }; + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.scheduleDirect(runnable, 100, TimeUnit.MILLISECONDS); + assertSame(runnable, wrapper.getWrappedRunnable()); + } + + @Test + public void unwrapWorkerPeriodicTask() { + final Runnable runnable = new Runnable() { + @Override + public void run() { + } + }; + + Scheduler scheduler = new Scheduler() { + @Override + public Worker createWorker() { + return new Worker() { + @Override + public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + SchedulerRunnableIntrospection outerWrapper = (SchedulerRunnableIntrospection) run; + SchedulerRunnableIntrospection innerWrapper = (SchedulerRunnableIntrospection) outerWrapper.getWrappedRunnable(); + assertSame(runnable, innerWrapper.getWrappedRunnable()); + return (Disposable) innerWrapper; + } + + @Override + public void dispose() { + } + + @Override + public boolean isDisposed() { + return false; + } + }; + } + }; + + scheduler.schedulePeriodicallyDirect(runnable, 100, 100, TimeUnit.MILLISECONDS); + } }