Skip to content

Commit

Permalink
2.x: RxJavaPlugins unwrapRunnable (#5734)
Browse files Browse the repository at this point in the history
* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable-javadoc

* 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage

* 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage

* 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage

* 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage

* 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable

* 2.x: RxJavaPlugins unwrapRunnable
  • Loading branch information
lukaszguz authored and akarnokd committed Dec 4, 2017
1 parent 30905fc commit 4d2e821
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 14 deletions.
36 changes: 26 additions & 10 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

package io.reactivex;

import java.util.concurrent.TimeUnit;

import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
Expand All @@ -23,6 +21,9 @@
import io.reactivex.internal.schedulers.*;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.SchedulerRunnableIntrospection;

import java.util.concurrent.TimeUnit;

/**
* A {@code Scheduler} is an object that specifies an API for scheduling
Expand Down Expand Up @@ -197,7 +198,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* <p>
* Limit the amount concurrency two at a time without creating a new fix
* size thread pool:
*
*
* <pre>
* Scheduler limitScheduler = Schedulers.computation().when(workers -&gt; {
* // use merge max concurrent to limit the number of concurrent
Expand All @@ -215,20 +216,20 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* {@link Flowable#zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.functions.BiFunction)} where
* subscribing to the first {@link Flowable} could deadlock the
* subscription to the second.
*
*
* <pre>
* Scheduler limitScheduler = Schedulers.computation().when(workers -&gt; {
* // use merge max concurrent to limit the number of concurrent
* // Flowables two at a time
* return Completable.merge(Flowable.merge(workers, 2));
* });
* </pre>
*
*
* Slowing down the rate to no more than than 1 a second. This suffers from
* the same problem as the one above I could find an {@link Flowable}
* operator that limits the rate without dropping the values (aka leaky
* bucket algorithm).
*
*
* <pre>
* Scheduler slowScheduler = Schedulers.computation().when(workers -&gt; {
* // use concatenate to make each worker happen one at a time.
Expand All @@ -238,7 +239,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* }));
* });
* </pre>
*
*
* <p>History: 2.0.1 - experimental
* @param <S> a Scheduler and a Subscription
* @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns
Expand Down Expand Up @@ -347,7 +348,7 @@ public long now(@NonNull TimeUnit unit) {
* Holds state and logic to calculate when the next delayed invocation
* of this task has to happen (accounting for clock drifts).
*/
final class PeriodicTask implements Runnable {
final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
Expand Down Expand Up @@ -393,11 +394,16 @@ public void run() {
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}

@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}

static class PeriodicDirectTask
implements Runnable, Disposable {
implements Disposable, Runnable, SchedulerRunnableIntrospection {
final Runnable run;
@NonNull
final Worker worker;
Expand Down Expand Up @@ -432,9 +438,14 @@ public void dispose() {
public boolean isDisposed() {
return disposed;
}

@Override
public Runnable getWrappedRunnable() {
return run;
}
}

static final class DisposeTask implements Runnable, Disposable {
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
final Runnable decoratedRun;
final Worker w;

Expand Down Expand Up @@ -469,5 +480,10 @@ public void dispose() {
public boolean isDisposed() {
return w.isDisposed();
}

@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.Functions;
import io.reactivex.schedulers.SchedulerRunnableIntrospection;

/**
* Base functionality for direct tasks that manage a runnable and cancellation/completion.
* @since 2.0.8
*/
abstract class AbstractDirectTask
extends AtomicReference<Future<?>>
implements Disposable {
implements Disposable, SchedulerRunnableIntrospection {

private static final long serialVersionUID = 1811839108042568751L;

Expand Down Expand Up @@ -77,4 +78,9 @@ public final void setFuture(Future<?> future) {
}
}
}

@Override
public Runnable getWrappedRunnable() {
return runnable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.queue.MpscLinkedQueue;
import io.reactivex.internal.schedulers.ExecutorScheduler.ExecutorWorker.BooleanRunnable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.*;

/**
* Wraps an Executor and provides the Scheduler API over it.
Expand Down Expand Up @@ -290,7 +291,8 @@ public void run() {
}
}

static final class DelayedRunnable extends AtomicReference<Runnable> implements Runnable, Disposable {
static final class DelayedRunnable extends AtomicReference<Runnable>
implements Runnable, Disposable, SchedulerRunnableIntrospection {

private static final long serialVersionUID = -4101336210206799084L;

Expand Down Expand Up @@ -330,6 +332,12 @@ public void dispose() {
direct.dispose();
}
}

@Override
public Runnable getWrappedRunnable() {
Runnable r = get();
return r != null ? r : Functions.EMPTY_RUNNABLE;
}
}

final class DelayedDispose implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.schedulers;

import io.reactivex.annotations.*;
import io.reactivex.functions.Function;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Interface to wrap an action inside internal scheduler's task.
*
* You can check if runnable implements this interface and unwrap original runnable.
* For example inside of the {@link RxJavaPlugins#setScheduleHandler(Function)}
*
* @since 2.1.7 - experimental
*/
@Experimental
public interface SchedulerRunnableIntrospection {

/**
* Returns the wrapped action.
*
* @return the wrapped action. Cannot be null.
*/
@NonNull
Runnable getWrappedRunnable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -698,4 +698,48 @@ public void run() {
disposable.get().dispose();
}
}
}

@Test(timeout = 5000)
public void unwrapDefaultPeriodicTask() throws InterruptedException {
Scheduler s = getScheduler();
if (s instanceof TrampolineScheduler) {
// TrampolineScheduler always return EmptyDisposable
return;
}


final CountDownLatch cdl = new CountDownLatch(1);
Runnable countDownRunnable = new Runnable() {
@Override
public void run() {
cdl.countDown();
}
};
Disposable disposable = s.schedulePeriodicallyDirect(countDownRunnable, 100, 100, TimeUnit.MILLISECONDS);
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable;

assertSame(countDownRunnable, wrapper.getWrappedRunnable());
assertTrue(cdl.await(5, TimeUnit.SECONDS));
disposable.dispose();
}

@Test
public void unwrapScheduleDirectTask() {
Scheduler scheduler = getScheduler();
if (scheduler instanceof TrampolineScheduler) {
// TrampolineScheduler always return EmptyDisposable
return;
}
final CountDownLatch cdl = new CountDownLatch(1);
Runnable countDownRunnable = new Runnable() {
@Override
public void run() {
cdl.countDown();
}
};
Disposable disposable = scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS);
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable;
assertSame(countDownRunnable, wrapper.getWrappedRunnable());
disposable.dispose();
}
}
18 changes: 18 additions & 0 deletions src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -561,4 +561,22 @@ public void runnableDisposedAsyncTimed2() throws Exception {
executorScheduler.shutdownNow();
}
}

@Test
public void unwrapScheduleDirectTaskAfterDispose() {
Scheduler scheduler = getScheduler();
final CountDownLatch cdl = new CountDownLatch(1);
Runnable countDownRunnable = new Runnable() {
@Override
public void run() {
cdl.countDown();
}
};
Disposable disposable = scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS);
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable;
assertSame(countDownRunnable, wrapper.getWrappedRunnable());
disposable.dispose();

assertSame(Functions.EMPTY_RUNNABLE, wrapper.getWrappedRunnable());
}
}
62 changes: 62 additions & 0 deletions src/test/java/io/reactivex/schedulers/SchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,66 @@ public void customScheduleDirectDisposed() {

assertTrue(d.isDisposed());
}

@Test
public void unwrapDefaultPeriodicTask() {
TestScheduler scheduler = new TestScheduler();

Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.schedulePeriodicallyDirect(runnable, 100, 100, TimeUnit.MILLISECONDS);

assertSame(runnable, wrapper.getWrappedRunnable());
}

@Test
public void unwrapScheduleDirectTask() {
TestScheduler scheduler = new TestScheduler();

Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.scheduleDirect(runnable, 100, TimeUnit.MILLISECONDS);
assertSame(runnable, wrapper.getWrappedRunnable());
}

@Test
public void unwrapWorkerPeriodicTask() {
final Runnable runnable = new Runnable() {
@Override
public void run() {
}
};

Scheduler scheduler = new Scheduler() {
@Override
public Worker createWorker() {
return new Worker() {
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
SchedulerRunnableIntrospection outerWrapper = (SchedulerRunnableIntrospection) run;
SchedulerRunnableIntrospection innerWrapper = (SchedulerRunnableIntrospection) outerWrapper.getWrappedRunnable();
assertSame(runnable, innerWrapper.getWrappedRunnable());
return (Disposable) innerWrapper;
}

@Override
public void dispose() {
}

@Override
public boolean isDisposed() {
return false;
}
};
}
};

scheduler.schedulePeriodicallyDirect(runnable, 100, 100, TimeUnit.MILLISECONDS);
}
}

0 comments on commit 4d2e821

Please sign in to comment.