From d0589d463cbce64f69339b563b58c1393cf65a5d Mon Sep 17 00:00:00 2001 From: lguz Date: Tue, 21 Nov 2017 12:07:33 +0100 Subject: [PATCH 01/23] 2.x: RxJavaPlugins unwrapRunnable --- src/main/java/io/reactivex/Scheduler.java | 44 +++++++---- .../schedulers/SchedulerRunnableWrapper.java | 16 ++++ .../io/reactivex/plugins/RxJavaPlugins.java | 12 +++ .../reactivex/plugins/RxJavaPluginsTest.java | 75 +++++++++++++++---- 4 files changed, 118 insertions(+), 29 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index 136d20d842..98226a9d01 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -13,17 +13,20 @@ package io.reactivex; -import java.util.concurrent.TimeUnit; - -import io.reactivex.annotations.*; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; -import io.reactivex.internal.disposables.*; -import io.reactivex.internal.schedulers.*; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.disposables.SequentialDisposable; +import io.reactivex.internal.schedulers.NewThreadWorker; +import io.reactivex.internal.schedulers.SchedulerRunnableWrapper; +import io.reactivex.internal.schedulers.SchedulerWhen; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.plugins.RxJavaPlugins; +import java.util.concurrent.TimeUnit; + /** * A {@code Scheduler} is an object that specifies an API for scheduling * units of work with or without delays or periodically. @@ -197,7 +200,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial *

* Limit the amount concurrency two at a time without creating a new fix * size thread pool: - * + * *

      * Scheduler limitScheduler = Schedulers.computation().when(workers -> {
      *  // use merge max concurrent to limit the number of concurrent
@@ -215,7 +218,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
      * {@link Flowable#zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.functions.BiFunction)} where
      * subscribing to the first {@link Flowable} could deadlock the
      * subscription to the second.
-     * 
+     *
      * 
      * Scheduler limitScheduler = Schedulers.computation().when(workers -> {
      *  // use merge max concurrent to limit the number of concurrent
@@ -223,12 +226,12 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
      *  return Completable.merge(Flowable.merge(workers, 2));
      * });
      * 
- * + * * Slowing down the rate to no more than than 1 a second. This suffers from * the same problem as the one above I could find an {@link Flowable} * operator that limits the rate without dropping the values (aka leaky * bucket algorithm). - * + * *
      * Scheduler slowScheduler = Schedulers.computation().when(workers -> {
      *  // use concatenate to make each worker happen one at a time.
@@ -238,7 +241,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
      *  }));
      * });
      * 
- * + * *

History: 2.0.1 - experimental * @param a Scheduler and a Subscription * @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns @@ -347,7 +350,7 @@ public long now(@NonNull TimeUnit unit) { * Holds state and logic to calculate when the next delayed invocation * of this task has to happen (accounting for clock drifts). */ - final class PeriodicTask implements Runnable { + final class PeriodicTask implements SchedulerRunnableWrapper { @NonNull final Runnable decoratedRun; @NonNull @@ -393,11 +396,16 @@ public void run() { sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); } } + + @Override + public Runnable getWrappedRunnable() { + return this.decoratedRun; + } } } static class PeriodicDirectTask - implements Runnable, Disposable { + implements Disposable, SchedulerRunnableWrapper { final Runnable run; @NonNull final Worker worker; @@ -432,9 +440,14 @@ public void dispose() { public boolean isDisposed() { return disposed; } + + @Override + public Runnable getWrappedRunnable() { + return run; + } } - static final class DisposeTask implements Runnable, Disposable { + static final class DisposeTask implements Disposable, SchedulerRunnableWrapper { final Runnable decoratedRun; final Worker w; @@ -469,5 +482,10 @@ public void dispose() { public boolean isDisposed() { return w.isDisposed(); } + + @Override + public Runnable getWrappedRunnable() { + return this.decoratedRun; + } } } diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java new file mode 100644 index 0000000000..0888f6549c --- /dev/null +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java @@ -0,0 +1,16 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +public interface SchedulerRunnableWrapper extends Runnable { + + Runnable getWrappedRunnable(); +} diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 5ab0192342..3e50f4c718 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -620,6 +620,18 @@ public static void setSingleSchedulerHandler(@Nullable Function Date: Tue, 21 Nov 2017 12:57:05 +0100 Subject: [PATCH 02/23] 2.x: RxJavaPlugins unwrapRunnable-javadoc --- src/main/java/io/reactivex/Scheduler.java | 9 ++--- .../schedulers/SchedulerRunnableWrapper.java | 14 ++++++++ .../io/reactivex/plugins/RxJavaPlugins.java | 9 +++-- .../reactivex/plugins/RxJavaPluginsTest.java | 36 +++++++------------ 4 files changed, 37 insertions(+), 31 deletions(-) diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index 98226a9d01..fee89ddcf6 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -13,15 +13,12 @@ package io.reactivex; -import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.*;; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; -import io.reactivex.internal.disposables.EmptyDisposable; -import io.reactivex.internal.disposables.SequentialDisposable; -import io.reactivex.internal.schedulers.NewThreadWorker; -import io.reactivex.internal.schedulers.SchedulerRunnableWrapper; -import io.reactivex.internal.schedulers.SchedulerWhen; +import io.reactivex.internal.disposables.*; +import io.reactivex.internal.schedulers.*; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.plugins.RxJavaPlugins; diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java index 0888f6549c..2209e9378b 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java @@ -10,7 +10,21 @@ package io.reactivex.internal.schedulers; +import io.reactivex.annotations.*; + +/** + * Represents a wrapped action inside internal scheduler's task. + * + * @since 2.1.7 - experimental + */ +@Experimental public interface SchedulerRunnableWrapper extends Runnable { + /** + * Returns the wrapped action. + * + * @return the wrapped action. Cannot be null. + */ + @NonNull Runnable getWrappedRunnable(); } diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 3e50f4c718..7f38be7ff5 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -623,9 +623,14 @@ public static void setSingleSchedulerHandler(@Nullable Function Date: Tue, 21 Nov 2017 13:26:52 +0100 Subject: [PATCH 03/23] 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage --- src/main/java/io/reactivex/plugins/RxJavaPlugins.java | 6 +++--- .../java/io/reactivex/plugins/RxJavaPluginsTest.java | 11 ++++++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 7f38be7ff5..e40144338d 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -623,14 +623,14 @@ public static void setSingleSchedulerHandler(@Nullable Function Date: Tue, 21 Nov 2017 13:35:55 +0100 Subject: [PATCH 04/23] 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage --- src/main/java/io/reactivex/plugins/RxJavaPlugins.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index e40144338d..fcb4073ea8 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -630,7 +630,7 @@ public static void setSingleSchedulerHandler(@Nullable Function Date: Tue, 21 Nov 2017 13:40:56 +0100 Subject: [PATCH 05/23] 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage --- .../internal/schedulers/SchedulerRunnableWrapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java index 2209e9378b..6f9581a2a7 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java @@ -23,8 +23,8 @@ public interface SchedulerRunnableWrapper extends Runnable { /** * Returns the wrapped action. * - * @return the wrapped action. Cannot be null. + * @return the wrapped action, may be null. */ - @NonNull + @Nullable Runnable getWrappedRunnable(); } From 31ff1e93f3d00ae6355d778d62b254df09334a56 Mon Sep 17 00:00:00 2001 From: lguz Date: Tue, 21 Nov 2017 14:04:48 +0100 Subject: [PATCH 06/23] 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage --- .../java/io/reactivex/schedulers/SchedulerTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index 2845cf3a91..a00eaef357 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -16,9 +16,10 @@ import static org.junit.Assert.*; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import io.reactivex.annotations.NonNull; +import io.reactivex.internal.schedulers.SchedulerRunnableWrapper; import org.junit.Test; import io.reactivex.*; @@ -44,6 +45,7 @@ public void run() { } }, 100, 100, TimeUnit.MILLISECONDS); + assertTrue(d instanceof SchedulerRunnableWrapper); assertEquals(0, count[0]); assertFalse(d.isDisposed()); @@ -87,6 +89,7 @@ public void run() { } }, 100, 100, TimeUnit.MILLISECONDS); + assertTrue(d instanceof SchedulerRunnableWrapper); d.dispose(); assertEquals(0, count[0]); @@ -134,6 +137,7 @@ public void run() { } }, 100, 100, TimeUnit.MILLISECONDS); + assertTrue(d instanceof SchedulerRunnableWrapper); sd.set(d); assertEquals(0, count[0]); @@ -198,7 +202,7 @@ public void run() { scheduler.advanceTimeBy(1, TimeUnit.SECONDS); } }; - + assertTrue(d instanceof SchedulerRunnableWrapper); TestHelper.race(r1, r2, Schedulers.io()); } @@ -214,7 +218,7 @@ public void periodicDirectTaskRaceIO() throws Exception { Functions.EMPTY_RUNNABLE, 0, 0, TimeUnit.MILLISECONDS); Thread.sleep(1); - + assertTrue(d instanceof SchedulerRunnableWrapper); d.dispose(); } @@ -302,6 +306,7 @@ public void customScheduleDirectDisposed() { Disposable d = scheduler.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MINUTES); assertFalse(d.isDisposed()); + assertTrue(d instanceof SchedulerRunnableWrapper); d.dispose(); From d5c21666aa9593c5f98052338b25907f69325966 Mon Sep 17 00:00:00 2001 From: lguz Date: Tue, 21 Nov 2017 15:08:15 +0100 Subject: [PATCH 07/23] 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage --- src/main/java/io/reactivex/Scheduler.java | 6 +-- .../reactivex/schedulers/SchedulerTest.java | 43 ++++++++----------- 2 files changed, 21 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index fee89ddcf6..e700012987 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -368,7 +368,7 @@ final class PeriodicTask implements SchedulerRunnableWrapper { @Override public void run() { - decoratedRun.run(); + getWrappedRunnable().run(); if (!sd.isDisposed()) { @@ -418,7 +418,7 @@ static class PeriodicDirectTask public void run() { if (!disposed) { try { - run.run(); + getWrappedRunnable().run(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); worker.dispose(); @@ -459,7 +459,7 @@ static final class DisposeTask implements Disposable, SchedulerRunnableWrapper { public void run() { runner = Thread.currentThread(); try { - decoratedRun.run(); + getWrappedRunnable().run(); } finally { dispose(); runner = null; diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index a00eaef357..5c4824f06f 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -1,11 +1,11 @@ /** * Copyright (c) 2016-present, RxJava Contributors. - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. @@ -13,39 +13,37 @@ package io.reactivex.schedulers; -import static org.junit.Assert.*; - -import java.util.List; -import java.util.concurrent.*; - -import io.reactivex.annotations.NonNull; -import io.reactivex.internal.schedulers.SchedulerRunnableWrapper; -import org.junit.Test; - import io.reactivex.*; import io.reactivex.Scheduler.Worker; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.Functions; import io.reactivex.plugins.RxJavaPlugins; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; public class SchedulerTest { @Test public void defaultPeriodicTask() { - final int[] count = { 0 }; + final int[] count = {0}; TestScheduler scheduler = new TestScheduler(); - Disposable d = scheduler.schedulePeriodicallyDirect(new Runnable() { + Runnable action = new Runnable() { @Override public void run() { count[0]++; } - }, 100, 100, TimeUnit.MILLISECONDS); + }; + Disposable d = scheduler.schedulePeriodicallyDirect(action, 100, 100, TimeUnit.MILLISECONDS); - assertTrue(d instanceof SchedulerRunnableWrapper); assertEquals(0, count[0]); assertFalse(d.isDisposed()); @@ -78,7 +76,7 @@ public void run() { @Test public void disposePeriodicDirect() { - final int[] count = { 0 }; + final int[] count = {0}; TestScheduler scheduler = new TestScheduler(); @@ -89,7 +87,6 @@ public void run() { } }, 100, 100, TimeUnit.MILLISECONDS); - assertTrue(d instanceof SchedulerRunnableWrapper); d.dispose(); assertEquals(0, count[0]); @@ -103,7 +100,7 @@ public void run() { @Test public void scheduleDirect() { - final int[] count = { 0 }; + final int[] count = {0}; TestScheduler scheduler = new TestScheduler(); @@ -123,7 +120,7 @@ public void run() { @Test public void disposeSelfPeriodicDirect() { - final int[] count = { 0 }; + final int[] count = {0}; TestScheduler scheduler = new TestScheduler(); @@ -137,7 +134,6 @@ public void run() { } }, 100, 100, TimeUnit.MILLISECONDS); - assertTrue(d instanceof SchedulerRunnableWrapper); sd.set(d); assertEquals(0, count[0]); @@ -151,7 +147,7 @@ public void run() { @Test public void disposeSelfPeriodic() { - final int[] count = { 0 }; + final int[] count = {0}; TestScheduler scheduler = new TestScheduler(); @@ -202,7 +198,6 @@ public void run() { scheduler.advanceTimeBy(1, TimeUnit.SECONDS); } }; - assertTrue(d instanceof SchedulerRunnableWrapper); TestHelper.race(r1, r2, Schedulers.io()); } @@ -218,7 +213,6 @@ public void periodicDirectTaskRaceIO() throws Exception { Functions.EMPTY_RUNNABLE, 0, 0, TimeUnit.MILLISECONDS); Thread.sleep(1); - assertTrue(d instanceof SchedulerRunnableWrapper); d.dispose(); } @@ -306,7 +300,6 @@ public void customScheduleDirectDisposed() { Disposable d = scheduler.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MINUTES); assertFalse(d.isDisposed()); - assertTrue(d instanceof SchedulerRunnableWrapper); d.dispose(); From a75a7f1f798cf97b017df24376d96466fe0708da Mon Sep 17 00:00:00 2001 From: lguz Date: Wed, 22 Nov 2017 08:42:34 +0100 Subject: [PATCH 08/23] 2.x: RxJavaPlugins unwrapRunnable --- .../io/reactivex/schedulers/SchedulerTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index 5c4824f06f..1d37a1ae44 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -1,11 +1,11 @@ /** * Copyright (c) 2016-present, RxJava Contributors. - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. @@ -32,7 +32,7 @@ public class SchedulerTest { @Test public void defaultPeriodicTask() { - final int[] count = {0}; + final int[] count = { 0 }; TestScheduler scheduler = new TestScheduler(); @@ -76,7 +76,7 @@ public void run() { @Test public void disposePeriodicDirect() { - final int[] count = {0}; + final int[] count = { 0 }; TestScheduler scheduler = new TestScheduler(); @@ -100,7 +100,7 @@ public void run() { @Test public void scheduleDirect() { - final int[] count = {0}; + final int[] count = { 0 }; TestScheduler scheduler = new TestScheduler(); @@ -120,7 +120,7 @@ public void run() { @Test public void disposeSelfPeriodicDirect() { - final int[] count = {0}; + final int[] count = { 0 }; TestScheduler scheduler = new TestScheduler(); From b47fcd8e7e0b1a1faa21713959ca3566150a8b3f Mon Sep 17 00:00:00 2001 From: lguz Date: Wed, 22 Nov 2017 08:51:59 +0100 Subject: [PATCH 09/23] 2.x: RxJavaPlugins unwrapRunnable --- src/main/java/io/reactivex/Scheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index e700012987..31b2ef7499 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -13,7 +13,7 @@ package io.reactivex; -import io.reactivex.annotations.*;; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; From c0d6c787f647d1d745cb2a4723e7c709dda93ea3 Mon Sep 17 00:00:00 2001 From: lguz Date: Wed, 22 Nov 2017 08:59:14 +0100 Subject: [PATCH 10/23] 2.x: RxJavaPlugins unwrapRunnable --- src/test/java/io/reactivex/schedulers/SchedulerTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index 1d37a1ae44..359b8774ad 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -36,13 +36,12 @@ public void defaultPeriodicTask() { TestScheduler scheduler = new TestScheduler(); - Runnable action = new Runnable() { + Disposable d = scheduler.schedulePeriodicallyDirect(new Runnable() { @Override public void run() { count[0]++; } - }; - Disposable d = scheduler.schedulePeriodicallyDirect(action, 100, 100, TimeUnit.MILLISECONDS); + }, 100, 100, TimeUnit.MILLISECONDS); assertEquals(0, count[0]); assertFalse(d.isDisposed()); From f63a799f5c694f257f889e1bc96abe246d242cbf Mon Sep 17 00:00:00 2001 From: lguz Date: Wed, 22 Nov 2017 09:14:58 +0100 Subject: [PATCH 11/23] 2.x: RxJavaPlugins unwrapRunnable --- src/test/java/io/reactivex/schedulers/SchedulerTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index 359b8774ad..7dccd8135e 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -146,7 +146,7 @@ public void run() { @Test public void disposeSelfPeriodic() { - final int[] count = {0}; + final int[] count = { 0 }; TestScheduler scheduler = new TestScheduler(); @@ -197,6 +197,7 @@ public void run() { scheduler.advanceTimeBy(1, TimeUnit.SECONDS); } }; + TestHelper.race(r1, r2, Schedulers.io()); } @@ -212,6 +213,7 @@ public void periodicDirectTaskRaceIO() throws Exception { Functions.EMPTY_RUNNABLE, 0, 0, TimeUnit.MILLISECONDS); Thread.sleep(1); + d.dispose(); } From 1db27f8b3fcb44bbb45dd2961526b9ecc1bfe844 Mon Sep 17 00:00:00 2001 From: lguz Date: Wed, 22 Nov 2017 10:17:20 +0100 Subject: [PATCH 12/23] 2.x: RxJavaPlugins unwrapRunnable --- src/main/java/io/reactivex/Scheduler.java | 12 +-- .../schedulers/SchedulerRunnableWrapper.java | 2 +- .../io/reactivex/plugins/RxJavaPlugins.java | 4 +- .../reactivex/plugins/RxJavaPluginsTest.java | 11 +-- .../reactivex/schedulers/SchedulerTest.java | 79 +++++++++++++++++-- 5 files changed, 83 insertions(+), 25 deletions(-) diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index 31b2ef7499..8358287e39 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -347,7 +347,7 @@ public long now(@NonNull TimeUnit unit) { * Holds state and logic to calculate when the next delayed invocation * of this task has to happen (accounting for clock drifts). */ - final class PeriodicTask implements SchedulerRunnableWrapper { + final class PeriodicTask implements Runnable, SchedulerRunnableWrapper { @NonNull final Runnable decoratedRun; @NonNull @@ -368,7 +368,7 @@ final class PeriodicTask implements SchedulerRunnableWrapper { @Override public void run() { - getWrappedRunnable().run(); + decoratedRun.run(); if (!sd.isDisposed()) { @@ -402,7 +402,7 @@ public Runnable getWrappedRunnable() { } static class PeriodicDirectTask - implements Disposable, SchedulerRunnableWrapper { + implements Disposable, Runnable, SchedulerRunnableWrapper { final Runnable run; @NonNull final Worker worker; @@ -418,7 +418,7 @@ static class PeriodicDirectTask public void run() { if (!disposed) { try { - getWrappedRunnable().run(); + run.run(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); worker.dispose(); @@ -444,7 +444,7 @@ public Runnable getWrappedRunnable() { } } - static final class DisposeTask implements Disposable, SchedulerRunnableWrapper { + static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableWrapper { final Runnable decoratedRun; final Worker w; @@ -459,7 +459,7 @@ static final class DisposeTask implements Disposable, SchedulerRunnableWrapper { public void run() { runner = Thread.currentThread(); try { - getWrappedRunnable().run(); + decoratedRun.run(); } finally { dispose(); runner = null; diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java index 6f9581a2a7..6a94673c82 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java @@ -18,7 +18,7 @@ * @since 2.1.7 - experimental */ @Experimental -public interface SchedulerRunnableWrapper extends Runnable { +public interface SchedulerRunnableWrapper { /** * Returns the wrapped action. diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index fcb4073ea8..327e54f067 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -630,11 +630,11 @@ public static void setSingleSchedulerHandler(@Nullable Function Date: Fri, 24 Nov 2017 09:20:06 +0100 Subject: [PATCH 13/23] 2.x: RxJavaPlugins unwrapRunnable --- src/main/java/io/reactivex/Scheduler.java | 6 +-- ...va => SchedulerRunnableIntrospection.java} | 9 ++++- .../io/reactivex/plugins/RxJavaPlugins.java | 17 --------- .../reactivex/plugins/RxJavaPluginsTest.java | 37 ------------------- .../reactivex/schedulers/SchedulerTest.java | 13 ++++--- 5 files changed, 17 insertions(+), 65 deletions(-) rename src/main/java/io/reactivex/internal/schedulers/{SchedulerRunnableWrapper.java => SchedulerRunnableIntrospection.java} (71%) diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index 8358287e39..10b63bab13 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -347,7 +347,7 @@ public long now(@NonNull TimeUnit unit) { * Holds state and logic to calculate when the next delayed invocation * of this task has to happen (accounting for clock drifts). */ - final class PeriodicTask implements Runnable, SchedulerRunnableWrapper { + final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection { @NonNull final Runnable decoratedRun; @NonNull @@ -402,7 +402,7 @@ public Runnable getWrappedRunnable() { } static class PeriodicDirectTask - implements Disposable, Runnable, SchedulerRunnableWrapper { + implements Disposable, Runnable, SchedulerRunnableIntrospection { final Runnable run; @NonNull final Worker worker; @@ -444,7 +444,7 @@ public Runnable getWrappedRunnable() { } } - static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableWrapper { + static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection { final Runnable decoratedRun; final Worker w; diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableIntrospection.java similarity index 71% rename from src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java rename to src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableIntrospection.java index 6a94673c82..5efd5adb22 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableWrapper.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableIntrospection.java @@ -10,15 +10,20 @@ package io.reactivex.internal.schedulers; +import io.reactivex.Scheduler; import io.reactivex.annotations.*; +import io.reactivex.plugins.RxJavaPlugins; /** - * Represents a wrapped action inside internal scheduler's task. + * Marker interface to indicate wrapped action inside internal scheduler's task. + * + * Inside of the {@link RxJavaPlugins#onSchedule(Runnable)}, you can unwrap runnable from internal wrappers like + * {@link Scheduler.DisposeTask}. * * @since 2.1.7 - experimental */ @Experimental -public interface SchedulerRunnableWrapper { +public interface SchedulerRunnableIntrospection { /** * Returns the wrapped action. diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 327e54f067..5ab0192342 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -620,23 +620,6 @@ public static void setSingleSchedulerHandler(@Nullable Function Date: Fri, 24 Nov 2017 10:02:59 +0100 Subject: [PATCH 14/23] 2.x: RxJavaPlugins unwrapRunnable --- src/main/java/io/reactivex/Scheduler.java | 1 + .../schedulers/SchedulerRunnableIntrospection.java | 2 +- src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java | 2 +- src/test/java/io/reactivex/schedulers/SchedulerTest.java | 1 - 4 files changed, 3 insertions(+), 3 deletions(-) rename src/main/java/io/reactivex/{internal => }/schedulers/SchedulerRunnableIntrospection.java (96%) diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index 10b63bab13..724974cbbb 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -21,6 +21,7 @@ import io.reactivex.internal.schedulers.*; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.SchedulerRunnableIntrospection; import java.util.concurrent.TimeUnit; diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableIntrospection.java b/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java similarity index 96% rename from src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableIntrospection.java rename to src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java index 5efd5adb22..4156212c9f 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerRunnableIntrospection.java +++ b/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java @@ -8,7 +8,7 @@ * the License for the specific language governing permissions and limitations under the License. */ -package io.reactivex.internal.schedulers; +package io.reactivex.schedulers; import io.reactivex.Scheduler; import io.reactivex.annotations.*; diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index cfb4c0c9cb..40a7152156 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -32,7 +32,7 @@ import io.reactivex.internal.operators.observable.ObservableRange; import io.reactivex.internal.operators.parallel.ParallelFromPublisher; import io.reactivex.internal.operators.single.SingleJust; -import io.reactivex.internal.schedulers.*; +import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.internal.subscriptions.ScalarSubscription; import io.reactivex.observables.ConnectableObservable; import io.reactivex.parallel.ParallelFlowable; diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index a86449e1e4..564cdf86b9 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -19,7 +19,6 @@ import java.util.concurrent.TimeUnit; import io.reactivex.annotations.NonNull; -import io.reactivex.internal.schedulers.SchedulerRunnableIntrospection; import org.junit.Test; import io.reactivex.*; From ebe2ca826cca502294df23c1a751c75f4baad438 Mon Sep 17 00:00:00 2001 From: lguz Date: Fri, 24 Nov 2017 10:17:19 +0100 Subject: [PATCH 15/23] 2.x: RxJavaPlugins unwrapRunnable --- .../reactivex/plugins/RxJavaPluginsTest.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index 40a7152156..a8a998285e 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -16,6 +16,17 @@ package io.reactivex.plugins; +import static org.junit.Assert.*; + +import java.io.*; +import java.lang.Thread.UncaughtExceptionHandler; +import java.lang.reflect.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import org.junit.*; +import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.Observable; @@ -37,17 +48,6 @@ import io.reactivex.observables.ConnectableObservable; import io.reactivex.parallel.ParallelFlowable; import io.reactivex.schedulers.Schedulers; -import org.junit.*; -import org.reactivestreams.*; - -import java.io.*; -import java.lang.Thread.UncaughtExceptionHandler; -import java.lang.reflect.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.junit.Assert.*; public class RxJavaPluginsTest { From b2f0c86a746f408923375fb4dd51ca02e327826b Mon Sep 17 00:00:00 2001 From: lguz Date: Tue, 28 Nov 2017 11:41:50 +0100 Subject: [PATCH 16/23] 2.x: RxJavaPlugins unwrapRunnable --- src/test/java/io/reactivex/schedulers/SchedulerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index 564cdf86b9..714482e5cb 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -319,7 +319,7 @@ public void run() { }; SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.schedulePeriodicallyDirect(runnable, 100, 100, TimeUnit.MILLISECONDS); - assertEquals(runnable, wrapper.getWrappedRunnable()); + assertSame(runnable, wrapper.getWrappedRunnable()); } @Test @@ -332,7 +332,7 @@ public void run() { } }; SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.scheduleDirect(runnable, 100, TimeUnit.MILLISECONDS); - assertEquals(runnable, wrapper.getWrappedRunnable()); + assertSame(runnable, wrapper.getWrappedRunnable()); } @Test @@ -351,7 +351,7 @@ public Worker createWorker() { public Disposable schedule(Runnable run, long delay, TimeUnit unit) { SchedulerRunnableIntrospection outerWrapper = (SchedulerRunnableIntrospection) run; SchedulerRunnableIntrospection innerWrapper = (SchedulerRunnableIntrospection) outerWrapper.getWrappedRunnable(); - assertEquals(runnable, innerWrapper.getWrappedRunnable()); + assertSame(runnable, innerWrapper.getWrappedRunnable()); return (Disposable) innerWrapper; } From bf72b36dc714c6ea0810066960c1919164dd2e54 Mon Sep 17 00:00:00 2001 From: lguz Date: Thu, 30 Nov 2017 08:53:01 +0100 Subject: [PATCH 17/23] 2.x: RxJavaPlugins unwrapRunnable --- .../SchedulerRunnableIntrospection.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java b/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java index 4156212c9f..b83d1643a5 100644 --- a/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java +++ b/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java @@ -10,15 +10,16 @@ package io.reactivex.schedulers; -import io.reactivex.Scheduler; -import io.reactivex.annotations.*; +import com.sun.istack.internal.NotNull; +import io.reactivex.annotations.Experimental; +import io.reactivex.functions.Function; import io.reactivex.plugins.RxJavaPlugins; /** - * Marker interface to indicate wrapped action inside internal scheduler's task. + * Interface to wrap an action inside internal scheduler's task. * - * Inside of the {@link RxJavaPlugins#onSchedule(Runnable)}, you can unwrap runnable from internal wrappers like - * {@link Scheduler.DisposeTask}. + * You can check if runnable implements this interface and unwrap original runnable. + * For example inside of the {@link RxJavaPlugins#setScheduleHandler(Function)} * * @since 2.1.7 - experimental */ @@ -28,8 +29,8 @@ public interface SchedulerRunnableIntrospection { /** * Returns the wrapped action. * - * @return the wrapped action, may be null. + * @return the wrapped action. Cannot be null. */ - @Nullable + @NotNull Runnable getWrappedRunnable(); } From 171ac8037a7e808c74161a600183e6ac8dd34a6e Mon Sep 17 00:00:00 2001 From: lguz Date: Thu, 30 Nov 2017 08:57:29 +0100 Subject: [PATCH 18/23] 2.x: RxJavaPlugins unwrapRunnable --- .../reactivex/schedulers/SchedulerRunnableIntrospection.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java b/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java index b83d1643a5..4e558c8343 100644 --- a/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java +++ b/src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java @@ -10,8 +10,7 @@ package io.reactivex.schedulers; -import com.sun.istack.internal.NotNull; -import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.*; import io.reactivex.functions.Function; import io.reactivex.plugins.RxJavaPlugins; @@ -31,6 +30,6 @@ public interface SchedulerRunnableIntrospection { * * @return the wrapped action. Cannot be null. */ - @NotNull + @NonNull Runnable getWrappedRunnable(); } From 2a8e6f0486d74982a336e1949c0621e59270835e Mon Sep 17 00:00:00 2001 From: lguz Date: Fri, 1 Dec 2017 09:54:09 +0100 Subject: [PATCH 19/23] 2.x: RxJavaPlugins unwrapRunnable --- .../schedulers/AbstractDirectTask.java | 8 +++- .../schedulers/ExecutorScheduler.java | 10 ++++- .../schedulers/AbstractSchedulerTests.java | 42 ++++++++++++++++++- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java b/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java index cd278fe590..2bf3c454f4 100644 --- a/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java +++ b/src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java @@ -21,6 +21,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.functions.Functions; +import io.reactivex.schedulers.SchedulerRunnableIntrospection; /** * Base functionality for direct tasks that manage a runnable and cancellation/completion. @@ -28,7 +29,7 @@ */ abstract class AbstractDirectTask extends AtomicReference> -implements Disposable { +implements Disposable, SchedulerRunnableIntrospection { private static final long serialVersionUID = 1811839108042568751L; @@ -77,4 +78,9 @@ public final void setFuture(Future future) { } } } + + @Override + public Runnable getWrappedRunnable() { + return runnable; + } } diff --git a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java index e050fed1f7..01cf01ebe4 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java @@ -23,7 +23,7 @@ import io.reactivex.internal.queue.MpscLinkedQueue; import io.reactivex.internal.schedulers.ExecutorScheduler.ExecutorWorker.BooleanRunnable; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.schedulers.*; /** * Wraps an Executor and provides the Scheduler API over it. @@ -290,7 +290,8 @@ public void run() { } } - static final class DelayedRunnable extends AtomicReference implements Runnable, Disposable { + static final class DelayedRunnable extends AtomicReference + implements Runnable, Disposable, SchedulerRunnableIntrospection { private static final long serialVersionUID = -4101336210206799084L; @@ -330,6 +331,11 @@ public void dispose() { direct.dispose(); } } + + @Override + public Runnable getWrappedRunnable() { + return get(); + } } final class DelayedDispose implements Runnable { diff --git a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java index b81036a7e0..71289b48ee 100644 --- a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java +++ b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java @@ -698,4 +698,44 @@ public void run() { disposable.get().dispose(); } } -} + + @Test(timeout = 5000) + public void unwrapDefaultPeriodicTask() throws InterruptedException { + Scheduler s = getScheduler(); + if (s instanceof TrampolineScheduler) { + // Can't properly stop a trampolined periodic task. + return; + } + + + final CountDownLatch cdl = new CountDownLatch(1); + Runnable countDownRunnable = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) s.schedulePeriodicallyDirect(countDownRunnable, 100, 100, TimeUnit.MILLISECONDS); + + assertSame(countDownRunnable, wrapper.getWrappedRunnable()); + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + } + + @Test + public void unwrapScheduleDirectTask() { + Scheduler scheduler = getScheduler(); + if (scheduler instanceof TrampolineScheduler) { + // Can't properly stop a trampolined periodic task. + return; + } + final CountDownLatch cdl = new CountDownLatch(1); + Runnable countDownRunnable = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS); + assertSame(countDownRunnable, wrapper.getWrappedRunnable()); + } +} \ No newline at end of file From f1af3f7d58c94dfe192b5828d23395e9976654a1 Mon Sep 17 00:00:00 2001 From: lguz Date: Fri, 1 Dec 2017 10:29:25 +0100 Subject: [PATCH 20/23] 2.x: RxJavaPlugins unwrapRunnable --- .../io/reactivex/schedulers/AbstractSchedulerTests.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java index 71289b48ee..deed224496 100644 --- a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java +++ b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java @@ -715,10 +715,12 @@ public void run() { cdl.countDown(); } }; - SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) s.schedulePeriodicallyDirect(countDownRunnable, 100, 100, TimeUnit.MILLISECONDS); + Disposable disposable = s.schedulePeriodicallyDirect(countDownRunnable, 100, 100, TimeUnit.MILLISECONDS); + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable; assertSame(countDownRunnable, wrapper.getWrappedRunnable()); assertTrue(cdl.await(5, TimeUnit.SECONDS)); + disposable.dispose(); } @Test @@ -735,7 +737,9 @@ public void run() { cdl.countDown(); } }; - SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS); + Disposable disposable = scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS); + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable; assertSame(countDownRunnable, wrapper.getWrappedRunnable()); + disposable.dispose(); } } \ No newline at end of file From a1c5ad14c5490f2520fb2aef0ba03977b672446e Mon Sep 17 00:00:00 2001 From: lguz Date: Fri, 1 Dec 2017 10:48:40 +0100 Subject: [PATCH 21/23] 2.x: RxJavaPlugins unwrapRunnable --- .../java/io/reactivex/schedulers/AbstractSchedulerTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java index deed224496..8d78e3756f 100644 --- a/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java +++ b/src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java @@ -703,7 +703,7 @@ public void run() { public void unwrapDefaultPeriodicTask() throws InterruptedException { Scheduler s = getScheduler(); if (s instanceof TrampolineScheduler) { - // Can't properly stop a trampolined periodic task. + // TrampolineScheduler always return EmptyDisposable return; } @@ -727,7 +727,7 @@ public void run() { public void unwrapScheduleDirectTask() { Scheduler scheduler = getScheduler(); if (scheduler instanceof TrampolineScheduler) { - // Can't properly stop a trampolined periodic task. + // TrampolineScheduler always return EmptyDisposable return; } final CountDownLatch cdl = new CountDownLatch(1); From ac5c3f5d0c6bc4a94b1859b23944b5f1922af183 Mon Sep 17 00:00:00 2001 From: lguz Date: Fri, 1 Dec 2017 12:44:57 +0100 Subject: [PATCH 22/23] 2.x: RxJavaPlugins unwrapRunnable --- .../io/reactivex/internal/schedulers/ExecutorScheduler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java index 01cf01ebe4..b18e18d16d 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java @@ -20,6 +20,7 @@ import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.queue.MpscLinkedQueue; import io.reactivex.internal.schedulers.ExecutorScheduler.ExecutorWorker.BooleanRunnable; import io.reactivex.plugins.RxJavaPlugins; @@ -334,7 +335,8 @@ public void dispose() { @Override public Runnable getWrappedRunnable() { - return get(); + Runnable r = get(); + return r != null ? r : Functions.EMPTY_RUNNABLE; } } From 3c8aefd69fe8142824a4c4ab113142babd84b8fa Mon Sep 17 00:00:00 2001 From: lguz Date: Fri, 1 Dec 2017 13:27:21 +0100 Subject: [PATCH 23/23] 2.x: RxJavaPlugins unwrapRunnable --- .../schedulers/ExecutorSchedulerTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java index c0a19efa5e..4e53fbcf42 100644 --- a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java @@ -561,4 +561,22 @@ public void runnableDisposedAsyncTimed2() throws Exception { executorScheduler.shutdownNow(); } } + + @Test + public void unwrapScheduleDirectTaskAfterDispose() { + Scheduler scheduler = getScheduler(); + final CountDownLatch cdl = new CountDownLatch(1); + Runnable countDownRunnable = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + Disposable disposable = scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS); + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable; + assertSame(countDownRunnable, wrapper.getWrappedRunnable()); + disposable.dispose(); + + assertSame(Functions.EMPTY_RUNNABLE, wrapper.getWrappedRunnable()); + } }