From ad5ab3a72bd04591d05b55f7b084dcca0621f067 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Wed, 9 Nov 2016 14:25:52 -0800 Subject: [PATCH] Porting the Scheduler.when operator from 1.x to 2.x --- src/main/java/io/reactivex/Scheduler.java | 82 +++++ .../internal/schedulers/SchedulerWhen.java | 319 ++++++++++++++++++ .../schedulers/SchedulerWhenTest.java | 209 ++++++++++++ 3 files changed, 610 insertions(+) create mode 100644 src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java create mode 100644 src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index 0d63eec808..a6f852ee8b 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -15,9 +15,12 @@ import java.util.concurrent.TimeUnit; +import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.schedulers.SchedulerWhen; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.plugins.RxJavaPlugins; @@ -171,6 +174,85 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo return periodicTask; } + /** + * Allows the use of operators for controlling the timing around when + * actions scheduled on workers are actually done. This makes it possible to + * layer additional behavior on this {@link Scheduler}. The only parameter + * is a function that flattens an {@link Flowable} of {@link Flowable} + * of {@link Completable}s into just one {@link Completable}. There must be + * a chain of operators connecting the returned value to the source + * {@link Flowable} otherwise any work scheduled on the returned + * {@link Scheduler} will not be executed. + *

+ * When {@link Scheduler#createWorker()} is invoked a {@link Flowable} of + * {@link Completable}s is onNext'd to the combinator to be flattened. If + * the inner {@link Flowable} is not immediately subscribed to an calls to + * {@link Worker#schedule} are buffered. Once the {@link Flowable} is + * subscribed to actions are then onNext'd as {@link Completable}s. + *

+ * Finally the actions scheduled on the parent {@link Scheduler} when the + * inner most {@link Completable}s are subscribed to. + *

+ * When the {@link Worker} is unsubscribed the {@link Completable} emits an + * onComplete and triggers any behavior in the flattening operator. The + * {@link Flowable} and all {@link Completable}s give to the flattening + * function never onError. + *

+ * Limit the amount concurrency two at a time without creating a new fix + * size thread pool: + * + *

+     * Scheduler limitSched = Schedulers.computation().when(workers -> {
+     *  // use merge max concurrent to limit the number of concurrent
+     *  // callbacks two at a time
+     *  return Completable.merge(Flowable.merge(workers), 2);
+     * });
+     * 
+ *

+ * This is a slightly different way to limit the concurrency but it has some + * interesting benefits and drawbacks to the method above. It works by + * limited the number of concurrent {@link Worker}s rather than individual + * actions. Generally each {@link Flowable} uses its own {@link Worker}. + * This means that this will essentially limit the number of concurrent + * subscribes. The danger comes from using operators like + * {@link Flowable#zip(Flowable, Flowable, rx.functions.Func2)} where + * subscribing to the first {@link Flowable} could deadlock the + * subscription to the second. + * + *

+     * Scheduler limitSched = Schedulers.computation().when(workers -> {
+     *  // use merge max concurrent to limit the number of concurrent
+     *  // Flowables two at a time
+     *  return Completable.merge(Flowable.merge(workers, 2));
+     * });
+     * 
+ * + * Slowing down the rate to no more than than 1 a second. This suffers from + * the same problem as the one above I could find an {@link Flowable} + * operator that limits the rate without dropping the values (aka leaky + * bucket algorithm). + * + *
+     * Scheduler slowSched = Schedulers.computation().when(workers -> {
+     *  // use concatenate to make each worker happen one at a time.
+     *  return Completable.concat(workers.map(actions -> {
+     *      // delay the starting of the next worker by 1 second.
+     *      return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
+     *  }));
+     * });
+     * 
+ * + * @param a Scheduler and a Subscription + * @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns + * the Completable that will be subscribed to and should trigger the execution of the scheduled Actions. + * @return the Scheduler with the customized execution behavior + */ + @SuppressWarnings("unchecked") + @Experimental + public S when(Function>, Completable> combine) { + return (S) new SchedulerWhen(combine, this); + } + /** * Sequential Scheduler for executing actions on a single thread or event loop. *

diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java new file mode 100644 index 0000000000..3f2e724fa0 --- /dev/null +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java @@ -0,0 +1,319 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.internal.schedulers; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.Completable; +import io.reactivex.CompletableObserver; +import io.reactivex.Flowable; +import io.reactivex.Observable; +import io.reactivex.Scheduler; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.processors.FlowableProcessor; +import io.reactivex.processors.UnicastProcessor; + +/** + * Allows the use of operators for controlling the timing around when actions + * scheduled on workers are actually done. This makes it possible to layer + * additional behavior on this {@link Scheduler}. The only parameter is a + * function that flattens an {@link Observable} of {@link Observable} of + * {@link Completable}s into just one {@link Completable}. There must be a chain + * of operators connecting the returned value to the source {@link Observable} + * otherwise any work scheduled on the returned {@link Scheduler} will not be + * executed. + *

+ * When {@link Scheduler#createWorker()} is invoked a {@link Observable} of + * {@link Completable}s is onNext'd to the combinator to be flattened. If the + * inner {@link Observable} is not immediately subscribed to an calls to + * {@link Worker#schedule} are buffered. Once the {@link Observable} is + * subscribed to actions are then onNext'd as {@link Completable}s. + *

+ * Finally the actions scheduled on the parent {@link Scheduler} when the inner + * most {@link Completable}s are subscribed to. + *

+ * When the {@link Worker} is unsubscribed the {@link Completable} emits an + * onComplete and triggers any behavior in the flattening operator. The + * {@link Observable} and all {@link Completable}s give to the flattening + * function never onError. + *

+ * Limit the amount concurrency two at a time without creating a new fix size + * thread pool: + * + *

+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ *  // use merge max concurrent to limit the number of concurrent
+ *  // callbacks two at a time
+ *  return Completable.merge(Observable.merge(workers), 2);
+ * });
+ * 
+ *

+ * This is a slightly different way to limit the concurrency but it has some + * interesting benefits and drawbacks to the method above. It works by limited + * the number of concurrent {@link Worker}s rather than individual actions. + * Generally each {@link Observable} uses its own {@link Worker}. This means + * that this will essentially limit the number of concurrent subscribes. The + * danger comes from using operators like + * {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where + * subscribing to the first {@link Observable} could deadlock the subscription + * to the second. + * + *

+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ *  // use merge max concurrent to limit the number of concurrent
+ *  // Observables two at a time
+ *  return Completable.merge(Observable.merge(workers, 2));
+ * });
+ * 
+ * + * Slowing down the rate to no more than than 1 a second. This suffers from the + * same problem as the one above I could find an {@link Observable} operator + * that limits the rate without dropping the values (aka leaky bucket + * algorithm). + * + *
+ * Scheduler slowSched = Schedulers.computation().when(workers -> {
+ *  // use concatenate to make each worker happen one at a time.
+ *  return Completable.concat(workers.map(actions -> {
+ *      // delay the starting of the next worker by 1 second.
+ *      return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
+ *  }));
+ * });
+ * 
+ * + * @param combine + * @return + */ +@Experimental +public class SchedulerWhen extends Scheduler implements Disposable { + private final Scheduler actualScheduler; + private final FlowableProcessor> workerProcessor; + private Disposable disposable; + + public SchedulerWhen(Function>, Completable> combine, Scheduler actualScheduler) { + this.actualScheduler = actualScheduler; + // workers are converted into completables and put in this queue. + this.workerProcessor = UnicastProcessor.>create().toSerialized(); + // send it to a custom combinator to pick the order and rate at which + // workers are processed. + try { + disposable = combine.apply(workerProcessor).subscribe(); + } catch (Throwable e) { + Exceptions.propagate(e); + } + } + + @Override + public void dispose() { + disposable.dispose(); + } + + @Override + public boolean isDisposed() { + return disposable.isDisposed(); + } + + @Override + public Worker createWorker() { + final Worker actualWorker = actualScheduler.createWorker(); + // a queue for the actions submitted while worker is waiting to get to + // the subscribe to off the workerQueue. + final FlowableProcessor actionProcessor = UnicastProcessor.create().toSerialized(); + // convert the work of scheduling all the actions into a completable + Flowable actions = actionProcessor.map(new Function() { + @Override + public Completable apply(final ScheduledAction action) { + return new Completable() { + @Override + protected void subscribeActual(CompletableObserver actionCompletable) { + actionCompletable.onSubscribe(action); + action.call(actualWorker, actionCompletable); + } + }; + } + }); + + // a worker that queues the action to the actionQueue subject. + Worker worker = new Worker() { + private final AtomicBoolean unsubscribed = new AtomicBoolean(); + + @Override + public void dispose() { + // complete the actionQueue when worker is unsubscribed to make + // room for the next worker in the workerQueue. + if (unsubscribed.compareAndSet(false, true)) { + actualWorker.dispose(); + actionProcessor.onComplete(); + } + } + + @Override + public boolean isDisposed() { + return unsubscribed.get(); + } + + @Override + public Disposable schedule(final Runnable action, final long delayTime, final TimeUnit unit) { + // send a scheduled action to the actionQueue + DelayedAction delayedAction = new DelayedAction(action, delayTime, unit); + actionProcessor.onNext(delayedAction); + return delayedAction; + } + + @Override + public Disposable schedule(final Runnable action) { + // send a scheduled action to the actionQueue + ImmediateAction immediateAction = new ImmediateAction(action); + actionProcessor.onNext(immediateAction); + return immediateAction; + } + }; + + // enqueue the completable that process actions put in reply subject + workerProcessor.onNext(actions); + + // return the worker that adds actions to the reply subject + return worker; + } + + static final Disposable SUBSCRIBED = new Disposable() { + @Override + public void dispose() { + } + + @Override + public boolean isDisposed() { + return false; + } + }; + + static final Disposable DISPOSED = Disposables.disposed(); + + @SuppressWarnings("serial") + static abstract class ScheduledAction extends AtomicReferenceimplements Disposable { + public ScheduledAction() { + super(SUBSCRIBED); + } + + private final void call(Worker actualWorker, CompletableObserver actionCompletable) { + Disposable oldState = get(); + // either SUBSCRIBED or UNSUBSCRIBED + if (oldState == DISPOSED) { + // no need to schedule return + return; + } + if (oldState != SUBSCRIBED) { + // has already been scheduled return + // should not be able to get here but handle it anyway by not + // rescheduling. + return; + } + + Disposable newState = callActual(actualWorker, actionCompletable); + + if (!compareAndSet(SUBSCRIBED, newState)) { + // set would only fail if the new current state is some other + // subscription from a concurrent call to this method. + // Unsubscribe from the action just scheduled because it lost + // the race. + newState.dispose(); + } + } + + protected abstract Disposable callActual(Worker actualWorker, CompletableObserver actionCompletable); + + @Override + public boolean isDisposed() { + return get().isDisposed(); + } + + @Override + public void dispose() { + Disposable oldState; + // no matter what the current state is the new state is going to be + Disposable newState = DISPOSED; + do { + oldState = get(); + if (oldState == DISPOSED) { + // the action has already been unsubscribed + return; + } + } while (!compareAndSet(oldState, newState)); + + if (oldState != SUBSCRIBED) { + // the action was scheduled. stop it. + oldState.dispose(); + } + } + } + + @SuppressWarnings("serial") + static class ImmediateAction extends ScheduledAction { + private final Runnable action; + + public ImmediateAction(Runnable action) { + this.action = action; + } + + @Override + protected Disposable callActual(Worker actualWorker, CompletableObserver actionCompletable) { + return actualWorker.schedule(new OnCompletedAction(action, actionCompletable)); + } + } + + @SuppressWarnings("serial") + static class DelayedAction extends ScheduledAction { + private final Runnable action; + private final long delayTime; + private final TimeUnit unit; + + public DelayedAction(Runnable action, long delayTime, TimeUnit unit) { + this.action = action; + this.delayTime = delayTime; + this.unit = unit; + } + + @Override + protected Disposable callActual(Worker actualWorker, CompletableObserver actionCompletable) { + return actualWorker.schedule(new OnCompletedAction(action, actionCompletable), delayTime, unit); + } + } + + static class OnCompletedAction implements Runnable { + private CompletableObserver actionCompletable; + private Runnable action; + + public OnCompletedAction(Runnable action, CompletableObserver actionCompletable) { + this.action = action; + this.actionCompletable = actionCompletable; + } + + @Override + public void run() { + try { + action.run(); + } finally { + actionCompletable.onComplete(); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java b/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java new file mode 100644 index 0000000000..129ee61502 --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java @@ -0,0 +1,209 @@ +package io.reactivex.internal.schedulers; + +import static io.reactivex.Flowable.just; +import static io.reactivex.Flowable.merge; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.concurrent.Callable; + +import org.junit.Test; + +import io.reactivex.Completable; +import io.reactivex.Flowable; +import io.reactivex.Scheduler; +import io.reactivex.functions.Function; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.schedulers.TestScheduler; +import io.reactivex.subscribers.TestSubscriber; + +public class SchedulerWhenTest { + @Test + public void testAsyncMaxConcurrent() { + TestScheduler tSched = new TestScheduler(); + SchedulerWhen sched = maxConcurrentScheduler(tSched); + TestSubscriber tSub = TestSubscriber.create(); + + asyncWork(sched).subscribe(tSub); + + tSub.assertValueCount(0); + + tSched.advanceTimeBy(0, SECONDS); + tSub.assertValueCount(0); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(2); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(4); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(5); + tSub.assertComplete(); + + sched.dispose(); + } + + @Test + public void testAsyncDelaySubscription() { + final TestScheduler tSched = new TestScheduler(); + SchedulerWhen sched = throttleScheduler(tSched); + TestSubscriber tSub = TestSubscriber.create(); + + asyncWork(sched).subscribe(tSub); + + tSub.assertValueCount(0); + + tSched.advanceTimeBy(0, SECONDS); + tSub.assertValueCount(0); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(1); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(1); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(2); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(2); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(3); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(3); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(4); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(4); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(5); + tSub.assertComplete(); + + sched.dispose(); + } + + @Test + public void testSyncMaxConcurrent() { + TestScheduler tSched = new TestScheduler(); + SchedulerWhen sched = maxConcurrentScheduler(tSched); + TestSubscriber tSub = TestSubscriber.create(); + + syncWork(sched).subscribe(tSub); + + tSub.assertValueCount(0); + tSched.advanceTimeBy(0, SECONDS); + + // since all the work is synchronous nothing is blocked and its all done + tSub.assertValueCount(5); + tSub.assertComplete(); + + sched.dispose(); + } + + @Test + public void testSyncDelaySubscription() { + final TestScheduler tSched = new TestScheduler(); + SchedulerWhen sched = throttleScheduler(tSched); + TestSubscriber tSub = TestSubscriber.create(); + + syncWork(sched).subscribe(tSub); + + tSub.assertValueCount(0); + + tSched.advanceTimeBy(0, SECONDS); + tSub.assertValueCount(1); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(2); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(3); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(4); + + tSched.advanceTimeBy(1, SECONDS); + tSub.assertValueCount(5); + tSub.assertComplete(); + + sched.dispose(); + } + + private Flowable asyncWork(final Scheduler sched) { + return Flowable.range(1, 5).flatMap(new Function>() { + @Override + public Flowable apply(Integer t) { + return Flowable.timer(1, SECONDS, sched); + } + }); + } + + private Flowable syncWork(final Scheduler sched) { + return Flowable.range(1, 5).flatMap(new Function>() { + @Override + public Flowable apply(Integer t) { + return Flowable.defer(new Callable>() { + @Override + public Flowable call() { + return Flowable.just(0l); + } + }).subscribeOn(sched); + } + }); + } + + private SchedulerWhen maxConcurrentScheduler(TestScheduler tSched) { + SchedulerWhen sched = new SchedulerWhen(new Function>, Completable>() { + @Override + public Completable apply(Flowable> workerActions) { + Flowable workers = workerActions.map(new Function, Completable>() { + @Override + public Completable apply(Flowable actions) { + return Completable.concat(actions); + } + }); + return Completable.merge(workers, 2); + } + }, tSched); + return sched; + } + + private SchedulerWhen throttleScheduler(final TestScheduler tSched) { + SchedulerWhen sched = new SchedulerWhen(new Function>, Completable>() { + @Override + public Completable apply(Flowable> workerActions) { + Flowable workers = workerActions.map(new Function, Completable>() { + @Override + public Completable apply(Flowable actions) { + return Completable.concat(actions); + } + }); + return Completable.concat(workers.map(new Function() { + @Override + public Completable apply(Completable worker) { + return worker.delay(1, SECONDS, tSched); + } + })); + } + }, tSched); + return sched; + } + + @Test(timeout = 1000) + public void testRaceConditions() { + Scheduler comp = Schedulers.computation(); + Scheduler limited = comp.when(new Function>, Completable>() { + @Override + public Completable apply(Flowable> t) { + return Completable.merge(Flowable.merge(t, 10)); + } + }); + + merge(just(just(1).subscribeOn(limited).observeOn(comp)).repeat(1000)).blockingSubscribe(); + } +}