diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java
index 0d63eec808..a6f852ee8b 100644
--- a/src/main/java/io/reactivex/Scheduler.java
+++ b/src/main/java/io/reactivex/Scheduler.java
@@ -15,9 +15,12 @@
import java.util.concurrent.TimeUnit;
+import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.*;
+import io.reactivex.internal.schedulers.SchedulerWhen;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.plugins.RxJavaPlugins;
@@ -171,6 +174,85 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
return periodicTask;
}
+ /**
+ * Allows the use of operators for controlling the timing around when
+ * actions scheduled on workers are actually done. This makes it possible to
+ * layer additional behavior on this {@link Scheduler}. The only parameter
+ * is a function that flattens an {@link Flowable} of {@link Flowable}
+ * of {@link Completable}s into just one {@link Completable}. There must be
+ * a chain of operators connecting the returned value to the source
+ * {@link Flowable} otherwise any work scheduled on the returned
+ * {@link Scheduler} will not be executed.
+ *
+ * When {@link Scheduler#createWorker()} is invoked a {@link Flowable} of
+ * {@link Completable}s is onNext'd to the combinator to be flattened. If
+ * the inner {@link Flowable} is not immediately subscribed to an calls to
+ * {@link Worker#schedule} are buffered. Once the {@link Flowable} is
+ * subscribed to actions are then onNext'd as {@link Completable}s.
+ *
+ * Finally the actions scheduled on the parent {@link Scheduler} when the
+ * inner most {@link Completable}s are subscribed to.
+ *
+ * When the {@link Worker} is unsubscribed the {@link Completable} emits an
+ * onComplete and triggers any behavior in the flattening operator. The
+ * {@link Flowable} and all {@link Completable}s give to the flattening
+ * function never onError.
+ *
+ * Limit the amount concurrency two at a time without creating a new fix
+ * size thread pool:
+ *
+ *
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ * // use merge max concurrent to limit the number of concurrent
+ * // callbacks two at a time
+ * return Completable.merge(Flowable.merge(workers), 2);
+ * });
+ *
+ *
+ * This is a slightly different way to limit the concurrency but it has some
+ * interesting benefits and drawbacks to the method above. It works by
+ * limited the number of concurrent {@link Worker}s rather than individual
+ * actions. Generally each {@link Flowable} uses its own {@link Worker}.
+ * This means that this will essentially limit the number of concurrent
+ * subscribes. The danger comes from using operators like
+ * {@link Flowable#zip(Flowable, Flowable, rx.functions.Func2)} where
+ * subscribing to the first {@link Flowable} could deadlock the
+ * subscription to the second.
+ *
+ *
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ * // use merge max concurrent to limit the number of concurrent
+ * // Flowables two at a time
+ * return Completable.merge(Flowable.merge(workers, 2));
+ * });
+ *
+ *
+ * Slowing down the rate to no more than than 1 a second. This suffers from
+ * the same problem as the one above I could find an {@link Flowable}
+ * operator that limits the rate without dropping the values (aka leaky
+ * bucket algorithm).
+ *
+ *
+ * Scheduler slowSched = Schedulers.computation().when(workers -> {
+ * // use concatenate to make each worker happen one at a time.
+ * return Completable.concat(workers.map(actions -> {
+ * // delay the starting of the next worker by 1 second.
+ * return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
+ * }));
+ * });
+ *
+ *
+ * @param a Scheduler and a Subscription
+ * @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns
+ * the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
+ * @return the Scheduler with the customized execution behavior
+ */
+ @SuppressWarnings("unchecked")
+ @Experimental
+ public S when(Function>, Completable> combine) {
+ return (S) new SchedulerWhen(combine, this);
+ }
+
/**
* Sequential Scheduler for executing actions on a single thread or event loop.
*
diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java
new file mode 100644
index 0000000000..3f2e724fa0
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java
@@ -0,0 +1,319 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.reactivex.internal.schedulers;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.reactivex.Completable;
+import io.reactivex.CompletableObserver;
+import io.reactivex.Flowable;
+import io.reactivex.Observable;
+import io.reactivex.Scheduler;
+import io.reactivex.annotations.Experimental;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.disposables.Disposables;
+import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Function;
+import io.reactivex.processors.FlowableProcessor;
+import io.reactivex.processors.UnicastProcessor;
+
+/**
+ * Allows the use of operators for controlling the timing around when actions
+ * scheduled on workers are actually done. This makes it possible to layer
+ * additional behavior on this {@link Scheduler}. The only parameter is a
+ * function that flattens an {@link Observable} of {@link Observable} of
+ * {@link Completable}s into just one {@link Completable}. There must be a chain
+ * of operators connecting the returned value to the source {@link Observable}
+ * otherwise any work scheduled on the returned {@link Scheduler} will not be
+ * executed.
+ *
+ * When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
+ * {@link Completable}s is onNext'd to the combinator to be flattened. If the
+ * inner {@link Observable} is not immediately subscribed to an calls to
+ * {@link Worker#schedule} are buffered. Once the {@link Observable} is
+ * subscribed to actions are then onNext'd as {@link Completable}s.
+ *
+ * Finally the actions scheduled on the parent {@link Scheduler} when the inner
+ * most {@link Completable}s are subscribed to.
+ *
+ * When the {@link Worker} is unsubscribed the {@link Completable} emits an
+ * onComplete and triggers any behavior in the flattening operator. The
+ * {@link Observable} and all {@link Completable}s give to the flattening
+ * function never onError.
+ *
+ * Limit the amount concurrency two at a time without creating a new fix size
+ * thread pool:
+ *
+ *
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ * // use merge max concurrent to limit the number of concurrent
+ * // callbacks two at a time
+ * return Completable.merge(Observable.merge(workers), 2);
+ * });
+ *
+ *
+ * This is a slightly different way to limit the concurrency but it has some
+ * interesting benefits and drawbacks to the method above. It works by limited
+ * the number of concurrent {@link Worker}s rather than individual actions.
+ * Generally each {@link Observable} uses its own {@link Worker}. This means
+ * that this will essentially limit the number of concurrent subscribes. The
+ * danger comes from using operators like
+ * {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
+ * subscribing to the first {@link Observable} could deadlock the subscription
+ * to the second.
+ *
+ *
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ * // use merge max concurrent to limit the number of concurrent
+ * // Observables two at a time
+ * return Completable.merge(Observable.merge(workers, 2));
+ * });
+ *
+ *
+ * Slowing down the rate to no more than than 1 a second. This suffers from the
+ * same problem as the one above I could find an {@link Observable} operator
+ * that limits the rate without dropping the values (aka leaky bucket
+ * algorithm).
+ *
+ *
+ * Scheduler slowSched = Schedulers.computation().when(workers -> {
+ * // use concatenate to make each worker happen one at a time.
+ * return Completable.concat(workers.map(actions -> {
+ * // delay the starting of the next worker by 1 second.
+ * return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
+ * }));
+ * });
+ *
+ *
+ * @param combine
+ * @return
+ */
+@Experimental
+public class SchedulerWhen extends Scheduler implements Disposable {
+ private final Scheduler actualScheduler;
+ private final FlowableProcessor> workerProcessor;
+ private Disposable disposable;
+
+ public SchedulerWhen(Function>, Completable> combine, Scheduler actualScheduler) {
+ this.actualScheduler = actualScheduler;
+ // workers are converted into completables and put in this queue.
+ this.workerProcessor = UnicastProcessor.>create().toSerialized();
+ // send it to a custom combinator to pick the order and rate at which
+ // workers are processed.
+ try {
+ disposable = combine.apply(workerProcessor).subscribe();
+ } catch (Throwable e) {
+ Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public void dispose() {
+ disposable.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return disposable.isDisposed();
+ }
+
+ @Override
+ public Worker createWorker() {
+ final Worker actualWorker = actualScheduler.createWorker();
+ // a queue for the actions submitted while worker is waiting to get to
+ // the subscribe to off the workerQueue.
+ final FlowableProcessor actionProcessor = UnicastProcessor.create().toSerialized();
+ // convert the work of scheduling all the actions into a completable
+ Flowable actions = actionProcessor.map(new Function() {
+ @Override
+ public Completable apply(final ScheduledAction action) {
+ return new Completable() {
+ @Override
+ protected void subscribeActual(CompletableObserver actionCompletable) {
+ actionCompletable.onSubscribe(action);
+ action.call(actualWorker, actionCompletable);
+ }
+ };
+ }
+ });
+
+ // a worker that queues the action to the actionQueue subject.
+ Worker worker = new Worker() {
+ private final AtomicBoolean unsubscribed = new AtomicBoolean();
+
+ @Override
+ public void dispose() {
+ // complete the actionQueue when worker is unsubscribed to make
+ // room for the next worker in the workerQueue.
+ if (unsubscribed.compareAndSet(false, true)) {
+ actualWorker.dispose();
+ actionProcessor.onComplete();
+ }
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return unsubscribed.get();
+ }
+
+ @Override
+ public Disposable schedule(final Runnable action, final long delayTime, final TimeUnit unit) {
+ // send a scheduled action to the actionQueue
+ DelayedAction delayedAction = new DelayedAction(action, delayTime, unit);
+ actionProcessor.onNext(delayedAction);
+ return delayedAction;
+ }
+
+ @Override
+ public Disposable schedule(final Runnable action) {
+ // send a scheduled action to the actionQueue
+ ImmediateAction immediateAction = new ImmediateAction(action);
+ actionProcessor.onNext(immediateAction);
+ return immediateAction;
+ }
+ };
+
+ // enqueue the completable that process actions put in reply subject
+ workerProcessor.onNext(actions);
+
+ // return the worker that adds actions to the reply subject
+ return worker;
+ }
+
+ static final Disposable SUBSCRIBED = new Disposable() {
+ @Override
+ public void dispose() {
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return false;
+ }
+ };
+
+ static final Disposable DISPOSED = Disposables.disposed();
+
+ @SuppressWarnings("serial")
+ static abstract class ScheduledAction extends AtomicReferenceimplements Disposable {
+ public ScheduledAction() {
+ super(SUBSCRIBED);
+ }
+
+ private final void call(Worker actualWorker, CompletableObserver actionCompletable) {
+ Disposable oldState = get();
+ // either SUBSCRIBED or UNSUBSCRIBED
+ if (oldState == DISPOSED) {
+ // no need to schedule return
+ return;
+ }
+ if (oldState != SUBSCRIBED) {
+ // has already been scheduled return
+ // should not be able to get here but handle it anyway by not
+ // rescheduling.
+ return;
+ }
+
+ Disposable newState = callActual(actualWorker, actionCompletable);
+
+ if (!compareAndSet(SUBSCRIBED, newState)) {
+ // set would only fail if the new current state is some other
+ // subscription from a concurrent call to this method.
+ // Unsubscribe from the action just scheduled because it lost
+ // the race.
+ newState.dispose();
+ }
+ }
+
+ protected abstract Disposable callActual(Worker actualWorker, CompletableObserver actionCompletable);
+
+ @Override
+ public boolean isDisposed() {
+ return get().isDisposed();
+ }
+
+ @Override
+ public void dispose() {
+ Disposable oldState;
+ // no matter what the current state is the new state is going to be
+ Disposable newState = DISPOSED;
+ do {
+ oldState = get();
+ if (oldState == DISPOSED) {
+ // the action has already been unsubscribed
+ return;
+ }
+ } while (!compareAndSet(oldState, newState));
+
+ if (oldState != SUBSCRIBED) {
+ // the action was scheduled. stop it.
+ oldState.dispose();
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ static class ImmediateAction extends ScheduledAction {
+ private final Runnable action;
+
+ public ImmediateAction(Runnable action) {
+ this.action = action;
+ }
+
+ @Override
+ protected Disposable callActual(Worker actualWorker, CompletableObserver actionCompletable) {
+ return actualWorker.schedule(new OnCompletedAction(action, actionCompletable));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ static class DelayedAction extends ScheduledAction {
+ private final Runnable action;
+ private final long delayTime;
+ private final TimeUnit unit;
+
+ public DelayedAction(Runnable action, long delayTime, TimeUnit unit) {
+ this.action = action;
+ this.delayTime = delayTime;
+ this.unit = unit;
+ }
+
+ @Override
+ protected Disposable callActual(Worker actualWorker, CompletableObserver actionCompletable) {
+ return actualWorker.schedule(new OnCompletedAction(action, actionCompletable), delayTime, unit);
+ }
+ }
+
+ static class OnCompletedAction implements Runnable {
+ private CompletableObserver actionCompletable;
+ private Runnable action;
+
+ public OnCompletedAction(Runnable action, CompletableObserver actionCompletable) {
+ this.action = action;
+ this.actionCompletable = actionCompletable;
+ }
+
+ @Override
+ public void run() {
+ try {
+ action.run();
+ } finally {
+ actionCompletable.onComplete();
+ }
+ }
+ }
+}
diff --git a/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java b/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java
new file mode 100644
index 0000000000..129ee61502
--- /dev/null
+++ b/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java
@@ -0,0 +1,209 @@
+package io.reactivex.internal.schedulers;
+
+import static io.reactivex.Flowable.just;
+import static io.reactivex.Flowable.merge;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Test;
+
+import io.reactivex.Completable;
+import io.reactivex.Flowable;
+import io.reactivex.Scheduler;
+import io.reactivex.functions.Function;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.schedulers.TestScheduler;
+import io.reactivex.subscribers.TestSubscriber;
+
+public class SchedulerWhenTest {
+ @Test
+ public void testAsyncMaxConcurrent() {
+ TestScheduler tSched = new TestScheduler();
+ SchedulerWhen sched = maxConcurrentScheduler(tSched);
+ TestSubscriber tSub = TestSubscriber.create();
+
+ asyncWork(sched).subscribe(tSub);
+
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(0, SECONDS);
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(2);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(4);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(5);
+ tSub.assertComplete();
+
+ sched.dispose();
+ }
+
+ @Test
+ public void testAsyncDelaySubscription() {
+ final TestScheduler tSched = new TestScheduler();
+ SchedulerWhen sched = throttleScheduler(tSched);
+ TestSubscriber tSub = TestSubscriber.create();
+
+ asyncWork(sched).subscribe(tSub);
+
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(0, SECONDS);
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(1);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(1);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(2);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(2);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(3);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(3);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(4);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(4);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(5);
+ tSub.assertComplete();
+
+ sched.dispose();
+ }
+
+ @Test
+ public void testSyncMaxConcurrent() {
+ TestScheduler tSched = new TestScheduler();
+ SchedulerWhen sched = maxConcurrentScheduler(tSched);
+ TestSubscriber tSub = TestSubscriber.create();
+
+ syncWork(sched).subscribe(tSub);
+
+ tSub.assertValueCount(0);
+ tSched.advanceTimeBy(0, SECONDS);
+
+ // since all the work is synchronous nothing is blocked and its all done
+ tSub.assertValueCount(5);
+ tSub.assertComplete();
+
+ sched.dispose();
+ }
+
+ @Test
+ public void testSyncDelaySubscription() {
+ final TestScheduler tSched = new TestScheduler();
+ SchedulerWhen sched = throttleScheduler(tSched);
+ TestSubscriber tSub = TestSubscriber.create();
+
+ syncWork(sched).subscribe(tSub);
+
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(0, SECONDS);
+ tSub.assertValueCount(1);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(2);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(3);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(4);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(5);
+ tSub.assertComplete();
+
+ sched.dispose();
+ }
+
+ private Flowable asyncWork(final Scheduler sched) {
+ return Flowable.range(1, 5).flatMap(new Function>() {
+ @Override
+ public Flowable apply(Integer t) {
+ return Flowable.timer(1, SECONDS, sched);
+ }
+ });
+ }
+
+ private Flowable syncWork(final Scheduler sched) {
+ return Flowable.range(1, 5).flatMap(new Function>() {
+ @Override
+ public Flowable apply(Integer t) {
+ return Flowable.defer(new Callable>() {
+ @Override
+ public Flowable call() {
+ return Flowable.just(0l);
+ }
+ }).subscribeOn(sched);
+ }
+ });
+ }
+
+ private SchedulerWhen maxConcurrentScheduler(TestScheduler tSched) {
+ SchedulerWhen sched = new SchedulerWhen(new Function>, Completable>() {
+ @Override
+ public Completable apply(Flowable> workerActions) {
+ Flowable workers = workerActions.map(new Function, Completable>() {
+ @Override
+ public Completable apply(Flowable actions) {
+ return Completable.concat(actions);
+ }
+ });
+ return Completable.merge(workers, 2);
+ }
+ }, tSched);
+ return sched;
+ }
+
+ private SchedulerWhen throttleScheduler(final TestScheduler tSched) {
+ SchedulerWhen sched = new SchedulerWhen(new Function>, Completable>() {
+ @Override
+ public Completable apply(Flowable> workerActions) {
+ Flowable workers = workerActions.map(new Function, Completable>() {
+ @Override
+ public Completable apply(Flowable actions) {
+ return Completable.concat(actions);
+ }
+ });
+ return Completable.concat(workers.map(new Function() {
+ @Override
+ public Completable apply(Completable worker) {
+ return worker.delay(1, SECONDS, tSched);
+ }
+ }));
+ }
+ }, tSched);
+ return sched;
+ }
+
+ @Test(timeout = 1000)
+ public void testRaceConditions() {
+ Scheduler comp = Schedulers.computation();
+ Scheduler limited = comp.when(new Function>, Completable>() {
+ @Override
+ public Completable apply(Flowable> t) {
+ return Completable.merge(Flowable.merge(t, 10));
+ }
+ });
+
+ merge(just(just(1).subscribeOn(limited).observeOn(comp)).repeat(1000)).blockingSubscribe();
+ }
+}