diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 9287e105de..8e94b3fc0d 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -8330,7 +8330,7 @@ public final Observable subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler); } - return nest().lift(new OperatorSubscribeOn(scheduler)); + return create(new OperatorSubscribeOn(this, scheduler)); } /** diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 64f8e8b584..fb1fed35d7 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1736,8 +1736,43 @@ public void onNext(T t) { * @see RxJava Threading Examples * @see #observeOn */ - public final Single subscribeOn(Scheduler scheduler) { - return nest().lift(new OperatorSubscribeOn(scheduler)); + public final Single subscribeOn(final Scheduler scheduler) { + return create(new OnSubscribe() { + @Override + public void call(final SingleSubscriber t) { + final Scheduler.Worker w = scheduler.createWorker(); + t.add(w); + + w.schedule(new Action0() { + @Override + public void call() { + SingleSubscriber ssub = new SingleSubscriber() { + @Override + public void onSuccess(T value) { + try { + t.onSuccess(value); + } finally { + w.unsubscribe(); + } + } + + @Override + public void onError(Throwable error) { + try { + t.onError(error); + } finally { + w.unsubscribe(); + } + } + }; + + t.add(ssub); + + Single.this.subscribe(ssub); + } + }); + } + }); } /** diff --git a/src/main/java/rx/internal/operators/OperatorSubscribeOn.java b/src/main/java/rx/internal/operators/OperatorSubscribeOn.java index 152bc504e4..70bc2fa592 100644 --- a/src/main/java/rx/internal/operators/OperatorSubscribeOn.java +++ b/src/main/java/rx/internal/operators/OperatorSubscribeOn.java @@ -15,96 +15,84 @@ */ package rx.internal.operators; -import rx.Observable; -import rx.Observable.Operator; -import rx.Producer; -import rx.Scheduler; +import rx.*; +import rx.Observable.OnSubscribe; import rx.Scheduler.Worker; -import rx.Subscriber; import rx.functions.Action0; /** * Subscribes Observers on the specified {@code Scheduler}. *

* + * + * @param the value type of the actual source */ -public class OperatorSubscribeOn implements Operator> { +public final class OperatorSubscribeOn implements OnSubscribe { - private final Scheduler scheduler; + final Scheduler scheduler; + final Observable source; - public OperatorSubscribeOn(Scheduler scheduler) { + public OperatorSubscribeOn(Observable source, Scheduler scheduler) { this.scheduler = scheduler; + this.source = source; } @Override - public Subscriber> call(final Subscriber subscriber) { + public void call(final Subscriber subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); - return new Subscriber>(subscriber) { - - @Override - public void onCompleted() { - // ignore because this is a nested Observable and we expect only 1 Observable emitted to onNext - } - - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - + + inner.schedule(new Action0() { @Override - public void onNext(final Observable o) { - inner.schedule(new Action0() { - + public void call() { + final Thread t = Thread.currentThread(); + + Subscriber s = new Subscriber(subscriber) { @Override - public void call() { - final Thread t = Thread.currentThread(); - o.unsafeSubscribe(new Subscriber(subscriber) { - - @Override - public void onCompleted() { - subscriber.onCompleted(); - } - - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - - @Override - public void onNext(T t) { - subscriber.onNext(t); - } - + public void onNext(T t) { + subscriber.onNext(t); + } + + @Override + public void onError(Throwable e) { + try { + subscriber.onError(e); + } finally { + inner.unsubscribe(); + } + } + + @Override + public void onCompleted() { + try { + subscriber.onCompleted(); + } finally { + inner.unsubscribe(); + } + } + + @Override + public void setProducer(final Producer p) { + subscriber.setProducer(new Producer() { @Override - public void setProducer(final Producer producer) { - subscriber.setProducer(new Producer() { - - @Override - public void request(final long n) { - if (Thread.currentThread() == t) { - // don't schedule if we're already on the thread (primarily for first setProducer call) - // see unit test 'testSetProducerSynchronousRequest' for more context on this - producer.request(n); - } else { - inner.schedule(new Action0() { - - @Override - public void call() { - producer.request(n); - } - }); + public void request(final long n) { + if (t == Thread.currentThread()) { + p.request(n); + } else { + inner.schedule(new Action0() { + @Override + public void call() { + p.request(n); } - } - - }); + }); + } } - }); } - }); + }; + + source.unsafeSubscribe(s); } - - }; + }); } -} +} \ No newline at end of file diff --git a/src/main/java/rx/internal/operators/OperatorTimeoutBase.java b/src/main/java/rx/internal/operators/OperatorTimeoutBase.java index d4700bcb9b..823831bc3a 100644 --- a/src/main/java/rx/internal/operators/OperatorTimeoutBase.java +++ b/src/main/java/rx/internal/operators/OperatorTimeoutBase.java @@ -16,16 +16,11 @@ package rx.internal.operators; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import rx.Observable; +import rx.*; import rx.Observable.Operator; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Func3; -import rx.functions.Func4; +import rx.functions.*; +import rx.internal.producers.ProducerArbiter; import rx.observers.SerializedSubscriber; import rx.subscriptions.SerialSubscription; @@ -49,10 +44,10 @@ class OperatorTimeoutBase implements Operator { Func4, Long, T, Scheduler.Worker, Subscription> { } - private final FirstTimeoutStub firstTimeoutStub; - private final TimeoutStub timeoutStub; - private final Observable other; - private final Scheduler scheduler; + final FirstTimeoutStub firstTimeoutStub; + final TimeoutStub timeoutStub; + final Observable other; + final Scheduler scheduler; /* package-private */OperatorTimeoutBase(FirstTimeoutStub firstTimeoutStub, TimeoutStub timeoutStub, Observable other, Scheduler scheduler) { this.firstTimeoutStub = firstTimeoutStub; @@ -65,67 +60,86 @@ class OperatorTimeoutBase implements Operator { public Subscriber call(Subscriber subscriber) { Scheduler.Worker inner = scheduler.createWorker(); subscriber.add(inner); - final SerialSubscription serial = new SerialSubscription(); - subscriber.add(serial); // Use SynchronizedSubscriber for safe memory access // as the subscriber will be accessed in the current thread or the // scheduler or other Observables. final SerializedSubscriber synchronizedSubscriber = new SerializedSubscriber(subscriber); + final SerialSubscription serial = new SerialSubscription(); + synchronizedSubscriber.add(serial); + TimeoutSubscriber timeoutSubscriber = new TimeoutSubscriber(synchronizedSubscriber, timeoutStub, serial, other, inner); + + synchronizedSubscriber.add(timeoutSubscriber); + synchronizedSubscriber.setProducer(timeoutSubscriber.arbiter); + serial.set(firstTimeoutStub.call(timeoutSubscriber, 0L, inner)); + return timeoutSubscriber; } /* package-private */static final class TimeoutSubscriber extends Subscriber { - private final SerialSubscription serial; - private final Object gate = new Object(); + final SerialSubscription serial; - private final SerializedSubscriber serializedSubscriber; + final SerializedSubscriber serializedSubscriber; - private final TimeoutStub timeoutStub; + final TimeoutStub timeoutStub; - private final Observable other; - private final Scheduler.Worker inner; + final Observable other; + + final Scheduler.Worker inner; + + final ProducerArbiter arbiter; + + /** Guarded by this. */ + boolean terminated; + /** Guarded by this. */ + long actual; - final AtomicInteger terminated = new AtomicInteger(); - final AtomicLong actual = new AtomicLong(); - TimeoutSubscriber( SerializedSubscriber serializedSubscriber, TimeoutStub timeoutStub, SerialSubscription serial, Observable other, Scheduler.Worker inner) { - super(serializedSubscriber); this.serializedSubscriber = serializedSubscriber; this.timeoutStub = timeoutStub; this.serial = serial; this.other = other; this.inner = inner; + this.arbiter = new ProducerArbiter(); } + @Override + public void setProducer(Producer p) { + arbiter.setProducer(p); + } + @Override public void onNext(T value) { boolean onNextWins = false; - synchronized (gate) { - if (terminated.get() == 0) { - actual.incrementAndGet(); + long a; + synchronized (this) { + if (!terminated) { + a = ++actual; onNextWins = true; + } else { + a = actual; } } if (onNextWins) { serializedSubscriber.onNext(value); - serial.set(timeoutStub.call(this, actual.get(), value, inner)); + serial.set(timeoutStub.call(this, a, value, inner)); } } @Override public void onError(Throwable error) { boolean onErrorWins = false; - synchronized (gate) { - if (terminated.getAndSet(1) == 0) { + synchronized (this) { + if (!terminated) { + terminated = true; onErrorWins = true; } } @@ -138,8 +152,9 @@ public void onError(Throwable error) { @Override public void onCompleted() { boolean onCompletedWins = false; - synchronized (gate) { - if (terminated.getAndSet(1) == 0) { + synchronized (this) { + if (!terminated) { + terminated = true; onCompletedWins = true; } } @@ -152,8 +167,9 @@ public void onCompleted() { public void onTimeout(long seqId) { long expected = seqId; boolean timeoutWins = false; - synchronized (gate) { - if (expected == actual.get() && terminated.getAndSet(1) == 0) { + synchronized (this) { + if (expected == actual && !terminated) { + terminated = true; timeoutWins = true; } } @@ -161,10 +177,31 @@ public void onTimeout(long seqId) { if (other == null) { serializedSubscriber.onError(new TimeoutException()); } else { - other.unsafeSubscribe(serializedSubscriber); - serial.set(serializedSubscriber); + Subscriber second = new Subscriber() { + @Override + public void onNext(T t) { + serializedSubscriber.onNext(t); + } + + @Override + public void onError(Throwable e) { + serializedSubscriber.onError(e); + } + + @Override + public void onCompleted() { + serializedSubscriber.onCompleted(); + } + + @Override + public void setProducer(Producer p) { + arbiter.setProducer(p); + } + }; + other.unsafeSubscribe(second); + serial.set(second); } } } } -} +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java index 9a1dce56d7..797a4e4406 100644 --- a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java +++ b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java @@ -15,20 +15,74 @@ */ package rx.internal.util; -import rx.Observable; -import rx.Scheduler; -import rx.Scheduler.Worker; -import rx.Subscriber; -import rx.functions.Action0; -import rx.functions.Func1; +import java.util.concurrent.atomic.AtomicBoolean; + +import rx.*; +import rx.exceptions.Exceptions; +import rx.functions.*; +import rx.internal.producers.SingleProducer; import rx.internal.schedulers.EventLoopsScheduler; +import rx.observers.Subscribers; +import rx.schedulers.Schedulers; +/** + * An Observable that emits a single constant scalar value to Subscribers. + *

+ * This is a direct implementation of the Observable class to allow identifying it + * in flatMap and bypass the subscription to it altogether. + * + * @param the value type + */ public final class ScalarSynchronousObservable extends Observable { + /** + * We expect the Schedulers.computation() to return an EventLoopsScheduler all the time. + */ + static final Func1 COMPUTATION_ONSCHEDULE = new Func1() { + final EventLoopsScheduler els = (EventLoopsScheduler)Schedulers.computation(); + + @Override + public Subscription call(Action0 t) { + return els.scheduleDirect(t); + } + }; + + /** + * Indicates that the Producer used by this Observable should be fully + * threadsafe. It is possible, but unlikely that multiple concurrent + * requests will arrive to just(). + */ + static final boolean STRONG_MODE; + static { + String wp = System.getProperty("rx.just.strong-mode", "false"); + STRONG_MODE = Boolean.valueOf(wp); + } + + /** + * Creates a scalar producer depending on the state of STRONG_MODE. + * @param the type of the scalar value + * @param s the target subscriber + * @param v the value to emit + * @return the created Producer + */ + static Producer createProducer(Subscriber s, T v) { + if (STRONG_MODE) { + return new SingleProducer(s, v); + } + return new WeakSingleProducer(s, v); + } + + /** + * Constructs a ScalarSynchronousObservable with the given constant value. + * @param the value type + * @param t the value to emit when requested + * @return the new Observable + */ public static final ScalarSynchronousObservable create(T t) { return new ScalarSynchronousObservable(t); } + /** The constant scalar value to emit on request. */ final T t; protected ScalarSynchronousObservable(final T t) { @@ -36,116 +90,198 @@ protected ScalarSynchronousObservable(final T t) { @Override public void call(Subscriber s) { - /* - * We don't check isUnsubscribed as it is a significant performance impact in the fast-path use cases. - * See PerfBaseline tests and https://github.com/ReactiveX/RxJava/issues/1383 for more information. - * The assumption here is that when asking for a single item we should emit it and not concern ourselves with - * being unsubscribed already. If the Subscriber unsubscribes at 0, they shouldn't have subscribed, or it will - * filter it out (such as take(0)). This prevents us from paying the price on every subscription. - */ - s.onNext(t); - s.onCompleted(); + s.setProducer(createProducer(s, t)); } }); this.t = t; } + /** + * Returns the scalar constant value directly. + * @return the scalar constant value directly + */ public T get() { return t; } + + /** * Customized observeOn/subscribeOn implementation which emits the scalar * value directly or with less overhead on the specified scheduler. * @param scheduler the target scheduler * @return the new observable */ - public Observable scalarScheduleOn(Scheduler scheduler) { + public Observable scalarScheduleOn(final Scheduler scheduler) { + final Func1 onSchedule; if (scheduler instanceof EventLoopsScheduler) { - EventLoopsScheduler es = (EventLoopsScheduler) scheduler; - return create(new DirectScheduledEmission(es, t)); + onSchedule = COMPUTATION_ONSCHEDULE; + } else { + onSchedule = new Func1() { + @Override + public Subscription call(final Action0 a) { + final Scheduler.Worker w = scheduler.createWorker(); + w.schedule(new Action0() { + @Override + public void call() { + try { + a.call(); + } finally { + w.unsubscribe(); + } + } + }); + return w; + } + }; } - return create(new NormalScheduledEmission(scheduler, t)); + + return create(new ScalarAsyncOnSubscribe(t, onSchedule)); } - /** Optimized observeOn for scalar value observed on the EventLoopsScheduler. */ - static final class DirectScheduledEmission implements OnSubscribe { - private final EventLoopsScheduler es; - private final T value; - DirectScheduledEmission(EventLoopsScheduler es, T value) { - this.es = es; - this.value = value; - } - @Override - public void call(final Subscriber child) { - child.add(es.scheduleDirect(new ScalarSynchronousAction(child, value))); - } - } - /** Emits a scalar value on a general scheduler. */ - static final class NormalScheduledEmission implements OnSubscribe { - private final Scheduler scheduler; - private final T value; + /** + * The OnSubscribe implementation that creates the ScalarAsyncProducer for each + * incoming subscriber. + * + * @param the value type + */ + static final class ScalarAsyncOnSubscribe implements OnSubscribe { + final T value; + final Func1 onSchedule; - NormalScheduledEmission(Scheduler scheduler, T value) { - this.scheduler = scheduler; + ScalarAsyncOnSubscribe(T value, Func1 onSchedule) { this.value = value; + this.onSchedule = onSchedule; } - + @Override - public void call(final Subscriber subscriber) { - Worker worker = scheduler.createWorker(); - subscriber.add(worker); - worker.schedule(new ScalarSynchronousAction(subscriber, value)); + public void call(Subscriber s) { + s.setProducer(new ScalarAsyncProducer(s, value, onSchedule)); } } - /** Action that emits a single value when called. */ - static final class ScalarSynchronousAction implements Action0 { - private final Subscriber subscriber; - private final T value; - ScalarSynchronousAction(Subscriber subscriber, - T value) { - this.subscriber = subscriber; + /** + * Represents a producer which schedules the emission of a scalar value on + * the first positive request via the given scheduler callback. + * + * @param the value type + */ + static final class ScalarAsyncProducer extends AtomicBoolean implements Producer, Action0 { + /** */ + private static final long serialVersionUID = -2466317989629281651L; + final Subscriber actual; + final T value; + final Func1 onSchedule; + + public ScalarAsyncProducer(Subscriber actual, T value, Func1 onSchedule) { + this.actual = actual; this.value = value; + this.onSchedule = onSchedule; } + @Override + public void request(long n) { + if (n < 0L) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } + if (n != 0 && compareAndSet(false, true)) { + actual.add(onSchedule.call(this)); + } + } + @Override public void call() { + Subscriber a = actual; + if (a.isUnsubscribed()) { + return; + } + T v = value; try { - subscriber.onNext(value); - } catch (Throwable t) { - subscriber.onError(t); + a.onNext(v); + } catch (Throwable e) { + Exceptions.throwOrReport(e, a, v); + return; + } + if (a.isUnsubscribed()) { return; } - subscriber.onCompleted(); + a.onCompleted(); + } + + @Override + public String toString() { + return "ScalarAsyncProducer[" + value + ", " + get() + "]"; } } + /** + * Given this scalar source as input to a flatMap, avoid one step of subscription + * and subscribes to the single Observable returned by the function. + *

+ * If the functions returns another scalar, no subscription happens and this inner + * scalar value will be emitted once requested. + * @param the result type + * @param func the mapper function that returns an Observable for the scalar value of this + * @return the new observable + */ public Observable scalarFlatMap(final Func1> func) { return create(new OnSubscribe() { @Override public void call(final Subscriber child) { Observable o = func.call(t); - if (o.getClass() == ScalarSynchronousObservable.class) { - child.onNext(((ScalarSynchronousObservable)o).t); - child.onCompleted(); + if (o instanceof ScalarSynchronousObservable) { + child.setProducer(createProducer(child, ((ScalarSynchronousObservable)o).t)); } else { - o.unsafeSubscribe(new Subscriber(child) { - @Override - public void onNext(R v) { - child.onNext(v); - } - @Override - public void onError(Throwable e) { - child.onError(e); - } - @Override - public void onCompleted() { - child.onCompleted(); - } - }); + o.unsafeSubscribe(Subscribers.wrap(child)); } } }); } -} + + /** + * This is the weak version of SingleProducer that uses plain fields + * to avoid reentrancy and as such is not threadsafe for concurrent + * request() calls. + * + * @param the value type + */ + static final class WeakSingleProducer implements Producer { + final Subscriber actual; + final T value; + boolean once; + + public WeakSingleProducer(Subscriber actual, T value) { + this.actual = actual; + this.value = value; + } + + @Override + public void request(long n) { + if (once) { + return; + } + if (n < 0L) { + throw new IllegalStateException("n >= required but it was " + n); + } + if (n != 0L) { + once = true; + Subscriber a = actual; + if (a.isUnsubscribed()) { + return; + } + T v = value; + try { + a.onNext(v); + } catch (Throwable e) { + Exceptions.throwOrReport(e, a, v); + return; + } + + if (a.isUnsubscribed()) { + return; + } + a.onCompleted(); + } + } + } +} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index 3da35b83b8..b05a6f3a72 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -623,7 +623,8 @@ public void testIssue2191_SchedulerUnsubscribe() throws Exception { verifyObserverMock(mockObserverAfterConnect, 2, 6); verify(spiedWorker, times(1)).isUnsubscribed(); - verify(spiedWorker, times(1)).unsubscribe(); + // subscribeOn didn't unsubscribe the worker before but it should have + verify(spiedWorker, times(2)).unsubscribe(); verify(sourceUnsubscribed, times(1)).call(); verifyNoMoreInteractions(sourceNext); @@ -684,7 +685,8 @@ public void testIssue2191_SchedulerUnsubscribeOnError() throws Exception { verifyObserver(mockObserverAfterConnect, 2, 2, illegalArgumentException); verify(spiedWorker, times(1)).isUnsubscribed(); - verify(spiedWorker, times(1)).unsubscribe(); + // subscribeOn didn't unsubscribe the worker before but it should have + verify(spiedWorker, times(2)).unsubscribe(); verify(sourceUnsubscribed, times(1)).call(); verifyNoMoreInteractions(sourceNext); diff --git a/src/test/java/rx/internal/operators/OperatorUnsubscribeOnTest.java b/src/test/java/rx/internal/operators/OperatorUnsubscribeOnTest.java index 08bce82609..4be8b96298 100644 --- a/src/test/java/rx/internal/operators/OperatorUnsubscribeOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorUnsubscribeOnTest.java @@ -20,7 +20,7 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; @@ -31,13 +31,14 @@ import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; +import rx.internal.util.RxThreadFactory; import rx.observers.TestObserver; import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; public class OperatorUnsubscribeOnTest { - @Test + @Test(timeout = 1000) public void testUnsubscribeWhenSubscribeOnAndUnsubscribeOnAreOnSameThread() throws InterruptedException { UIEventLoopScheduler UI_EVENT_LOOP = new UIEventLoopScheduler(); try { @@ -56,7 +57,11 @@ public void call(Subscriber t1) { }); TestObserver observer = new TestObserver(); - w.subscribeOn(UI_EVENT_LOOP).observeOn(Schedulers.computation()).unsubscribeOn(UI_EVENT_LOOP).subscribe(observer); + w + .subscribeOn(UI_EVENT_LOOP) + .observeOn(Schedulers.computation()) + .unsubscribeOn(UI_EVENT_LOOP) + .subscribe(observer); Thread unsubscribeThread = subscription.getThread(); @@ -78,7 +83,7 @@ public void call(Subscriber t1) { } } - @Test + @Test(timeout = 1000) public void testUnsubscribeWhenSubscribeOnAndUnsubscribeOnAreOnDifferentThreads() throws InterruptedException { UIEventLoopScheduler UI_EVENT_LOOP = new UIEventLoopScheduler(); try { @@ -97,7 +102,11 @@ public void call(Subscriber t1) { }); TestObserver observer = new TestObserver(); - w.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.computation()).unsubscribeOn(UI_EVENT_LOOP).subscribe(observer); + w + .subscribeOn(Schedulers.newThread()) + .observeOn(Schedulers.computation()) + .unsubscribeOn(UI_EVENT_LOOP) + .subscribe(observer); Thread unsubscribeThread = subscription.getThread(); @@ -110,7 +119,10 @@ public void call(Subscriber t1) { System.out.println("unsubscribeThread: " + unsubscribeThread); System.out.println("subscribeThread.get(): " + subscribeThread.get()); - assertTrue(unsubscribeThread == UI_EVENT_LOOP.getThread()); + Thread uiThread = UI_EVENT_LOOP.getThread(); + System.out.println("UI_EVENT_LOOP: " + uiThread); + + assertTrue(unsubscribeThread == uiThread); observer.assertReceivedOnNext(Arrays.asList(1, 2)); observer.assertTerminalEvent(); @@ -153,23 +165,24 @@ public Thread getThread() throws InterruptedException { public static class UIEventLoopScheduler extends Scheduler { - private final Scheduler.Worker eventLoop; - private final Subscription s; + private final ExecutorService eventLoop; + final Scheduler single; private volatile Thread t; public UIEventLoopScheduler() { - eventLoop = Schedulers.newThread().createWorker(); - s = eventLoop; + eventLoop = Executors.newSingleThreadExecutor(new RxThreadFactory("Test-EventLoop")); + single = Schedulers.from(eventLoop); + /* * DON'T DO THIS IN PRODUCTION CODE */ final CountDownLatch latch = new CountDownLatch(1); - eventLoop.schedule(new Action0() { + eventLoop.submit(new Runnable() { @Override - public void call() { + public void run() { t = Thread.currentThread(); latch.countDown(); } @@ -184,11 +197,11 @@ public void call() { @Override public Worker createWorker() { - return eventLoop; + return single.createWorker(); } public void shutdown() { - s.unsubscribe(); + eventLoop.shutdownNow(); } public Thread getThread() { @@ -196,4 +209,4 @@ public Thread getThread() { } } -} +} \ No newline at end of file diff --git a/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java b/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java new file mode 100644 index 0000000000..fee7b6f8e1 --- /dev/null +++ b/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java @@ -0,0 +1,233 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.util; + +import org.junit.Test; + +import rx.Observable; +import rx.exceptions.TestException; +import rx.functions.Func1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; + +public class ScalarSynchronousObservableTest { + @Test + public void testBackpressure() { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.just(1).subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + + ts.requestMore(1); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test(timeout = 1000) + public void testBackpressureSubscribeOn() throws Exception { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.just(1).subscribeOn(Schedulers.computation()).subscribe(ts); + + Thread.sleep(200); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + + ts.awaitTerminalEvent(); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test(timeout = 1000) + public void testBackpressureObserveOn() throws Exception { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.just(1).observeOn(Schedulers.computation()).subscribe(ts); + + Thread.sleep(200); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + + ts.awaitTerminalEvent(); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test + public void testBackpressureFlatMapJust() { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.just(1).flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return Observable.just(v); + } + }).subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + + ts.requestMore(1); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test + public void testBackpressureFlatMapRange() { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.just(1).flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return Observable.range(v, 2); + } + }).subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + ts.assertValues(1, 2); + ts.assertCompleted(); + ts.assertNoErrors(); + + ts.requestMore(1); + + ts.assertValues(1, 2); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test + public void emptiesAndJust() { + TestSubscriber ts = TestSubscriber.create(); + + Observable.just(1) + .flatMap(new Func1>() { + @Override + public Observable call(Integer n) { + return Observable.just(null, null) + .filter(new Func1() { + @Override + public Boolean call(Object o) { + return o != null; + } + }) + .switchIfEmpty(Observable.empty().switchIfEmpty(Observable.just("Hello"))); + } + }).subscribe(ts); + + ts.assertValue("Hello"); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void syncObserverNextThrows() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + throw new TestException(); + } + }; + + Observable.just(1).unsafeSubscribe(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void syncFlatMapJustObserverNextThrows() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + throw new TestException(); + } + }; + + Observable.just(1) + .flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return Observable.just(v); + } + }) + .unsafeSubscribe(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test(timeout = 1000) + public void asyncObserverNextThrows() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + throw new TestException(); + } + }; + + Observable.just(1).subscribeOn(Schedulers.computation()).unsafeSubscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } +} \ No newline at end of file