Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: RxJavaPlugins unwrapRunnable #5734

Merged
merged 24 commits into from
Dec 4, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@

package io.reactivex;

import java.util.concurrent.TimeUnit;

import io.reactivex.annotations.*;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.schedulers.*;
import io.reactivex.internal.disposables.EmptyDisposable;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure your IDE doesn't unroll star imports to reduce the diff size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.schedulers.NewThreadWorker;
import io.reactivex.internal.schedulers.SchedulerRunnableWrapper;
import io.reactivex.internal.schedulers.SchedulerWhen;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.TimeUnit;

/**
* A {@code Scheduler} is an object that specifies an API for scheduling
* units of work with or without delays or periodically.
Expand Down Expand Up @@ -197,7 +200,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* <p>
* Limit the amount concurrency two at a time without creating a new fix
* size thread pool:
*
*
* <pre>
* Scheduler limitScheduler = Schedulers.computation().when(workers -&gt; {
* // use merge max concurrent to limit the number of concurrent
Expand All @@ -215,20 +218,20 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* {@link Flowable#zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.functions.BiFunction)} where
* subscribing to the first {@link Flowable} could deadlock the
* subscription to the second.
*
*
* <pre>
* Scheduler limitScheduler = Schedulers.computation().when(workers -&gt; {
* // use merge max concurrent to limit the number of concurrent
* // Flowables two at a time
* return Completable.merge(Flowable.merge(workers, 2));
* });
* </pre>
*
*
* Slowing down the rate to no more than than 1 a second. This suffers from
* the same problem as the one above I could find an {@link Flowable}
* operator that limits the rate without dropping the values (aka leaky
* bucket algorithm).
*
*
* <pre>
* Scheduler slowScheduler = Schedulers.computation().when(workers -&gt; {
* // use concatenate to make each worker happen one at a time.
Expand All @@ -238,7 +241,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* }));
* });
* </pre>
*
*
* <p>History: 2.0.1 - experimental
* @param <S> a Scheduler and a Subscription
* @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns
Expand Down Expand Up @@ -347,7 +350,7 @@ public long now(@NonNull TimeUnit unit) {
* Holds state and logic to calculate when the next delayed invocation
* of this task has to happen (accounting for clock drifts).
*/
final class PeriodicTask implements Runnable {
final class PeriodicTask implements SchedulerRunnableWrapper {
@NonNull
final Runnable decoratedRun;
@NonNull
Expand Down Expand Up @@ -393,11 +396,16 @@ public void run() {
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}

@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}

static class PeriodicDirectTask
implements Runnable, Disposable {
implements Disposable, SchedulerRunnableWrapper {
final Runnable run;
@NonNull
final Worker worker;
Expand Down Expand Up @@ -432,9 +440,14 @@ public void dispose() {
public boolean isDisposed() {
return disposed;
}

@Override
public Runnable getWrappedRunnable() {
return run;
}
}

static final class DisposeTask implements Runnable, Disposable {
static final class DisposeTask implements Disposable, SchedulerRunnableWrapper {
final Runnable decoratedRun;
final Worker w;

Expand Down Expand Up @@ -469,5 +482,10 @@ public void dispose() {
public boolean isDisposed() {
return w.isDisposed();
}

@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.schedulers;

public interface SchedulerRunnableWrapper extends Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document this interface a bit and include a @since 2.1.7 - experimental tag and the @Experimental class annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain this should extend Runnable as it may make developers think they should invoke run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chaged it.


Runnable getWrappedRunnable();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a short description of this method and the @Nullable annotation (some terminated wrappers do null out their wrapped Runnable instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
12 changes: 12 additions & 0 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,18 @@ public static void setSingleSchedulerHandler(@Nullable Function<? super Schedule
onSingleHandler = handler;
}

/**
* Unwraps internal scheduler's task.
* @param task the internal task
* @return the unwrapped runnable or task
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @since 2.1.7 - experimental and @Experimental annotation to the method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alos please add @Nullable and @CheckReturnValue to the return type, @NonNull to the parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

public static Runnable unwrapRunnable(Runnable task) {
if (task instanceof SchedulerRunnableWrapper) {
return ((SchedulerRunnableWrapper) task).getWrappedRunnable();
}
return task;
}

/**
* Revokes the lockdown, only for testing purposes.
*/
Expand Down
75 changes: 59 additions & 16 deletions src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,16 @@

package io.reactivex.plugins;

import static org.junit.Assert.*;

import java.io.*;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import org.junit.*;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.Scheduler.Worker;
import io.reactivex.disposables.*;
import io.reactivex.disposables.Disposable;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't unroll star imports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

import io.reactivex.disposables.Disposables;
import io.reactivex.exceptions.*;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.*;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.BooleanSupplier;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.operators.completable.CompletableError;
import io.reactivex.internal.operators.flowable.FlowableRange;
Expand All @@ -44,10 +34,30 @@
import io.reactivex.internal.operators.parallel.ParallelFromPublisher;
import io.reactivex.internal.operators.single.SingleJust;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.schedulers.SchedulerRunnableWrapper;
import io.reactivex.internal.subscriptions.ScalarSubscription;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.schedulers.Schedulers;
import org.junit.Ignore;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.*;

public class RxJavaPluginsTest {

Expand Down Expand Up @@ -2255,4 +2265,37 @@ public void isBug() {
assertTrue(RxJavaPlugins.isBug(new CompositeException(new TestException())));
assertTrue(RxJavaPlugins.isBug(new OnErrorNotImplementedException(new TestException())));
}

@Test
public void unwrappedTask() {
class TestSchedulerRunnableWrapper implements SchedulerRunnableWrapper {
private final Runnable decoratedRun;

private TestSchedulerRunnableWrapper(Runnable decoratedRun) {
this.decoratedRun = decoratedRun;
}

@Override
public Runnable getWrappedRunnable() {
return decoratedRun;
}

@Override
public void run() {
throw new NullPointerException();
}
}

final Runnable runnable = new Runnable() {
@Override
public void run() {

}
};
SchedulerRunnableWrapper wrapper = new TestSchedulerRunnableWrapper(runnable);
Runnable unwrappedRunnable = RxJavaPlugins.unwrapRunnable(wrapper);

assertEquals(runnable, unwrappedRunnable);
unwrappedRunnable.run();
}
}