diff --git a/src/test/java/io/reactivex/exceptions/ExceptionsTest.java b/src/test/java/io/reactivex/exceptions/ExceptionsTest.java index efa24e3b02..88a08e8c70 100644 --- a/src/test/java/io/reactivex/exceptions/ExceptionsTest.java +++ b/src/test/java/io/reactivex/exceptions/ExceptionsTest.java @@ -491,4 +491,26 @@ public void manualPropagate() { } } + @Test + public void errorNotImplementedNull1() { + OnErrorNotImplementedException ex = new OnErrorNotImplementedException(null); + + assertTrue("" + ex.getCause(), ex.getCause() instanceof NullPointerException); + } + + @Test + public void errorNotImplementedNull2() { + OnErrorNotImplementedException ex = new OnErrorNotImplementedException("Message", null); + + assertTrue("" + ex.getCause(), ex.getCause() instanceof NullPointerException); + } + + @Test + public void errorNotImplementedWithCause() { + OnErrorNotImplementedException ex = new OnErrorNotImplementedException("Message", new TestException("Forced failure")); + + assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); + + assertEquals("" + ex.getCause(), "Forced failure", ex.getCause().getMessage()); + } } diff --git a/src/test/java/io/reactivex/internal/functions/FunctionsTest.java b/src/test/java/io/reactivex/internal/functions/FunctionsTest.java index 872d09df15..ca90162afa 100644 --- a/src/test/java/io/reactivex/internal/functions/FunctionsTest.java +++ b/src/test/java/io/reactivex/internal/functions/FunctionsTest.java @@ -16,13 +16,16 @@ import static org.junit.Assert.*; import java.lang.reflect.Method; +import java.util.List; import org.junit.Test; import io.reactivex.TestHelper; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions.*; import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.plugins.RxJavaPlugins; public class FunctionsTest { @Test @@ -245,4 +248,16 @@ public void emptyConsumerToString() { assertEquals("EmptyConsumer", Functions.EMPTY_CONSUMER.toString()); } + @Test + public void errorConsumerEmpty() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + Functions.ERROR_CONSUMER.accept(new TestException()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + assertEquals(errors.toString(), 1, errors.size()); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletablePeekTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletablePeekTest.java index 0094a932a5..3db576fbba 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletablePeekTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletablePeekTest.java @@ -20,7 +20,9 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Action; +import io.reactivex.internal.functions.Functions; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.CompletableSubject; public class CompletablePeekTest { @@ -43,4 +45,9 @@ public void run() throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void disposed() { + TestHelper.checkDisposed(CompletableSubject.create().doOnComplete(Functions.EMPTY_ACTION)); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleInternalHelperTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleInternalHelperTest.java index c2fde721cf..5d4459259b 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleInternalHelperTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleInternalHelperTest.java @@ -15,9 +15,12 @@ import static org.junit.Assert.*; +import java.util.*; + import org.junit.Test; -import io.reactivex.TestHelper; +import io.reactivex.*; + public class SingleInternalHelperTest { @@ -43,4 +46,21 @@ public void toObservableEnum() { assertEquals(1, SingleInternalHelper.ToObservable.values().length); assertNotNull(SingleInternalHelper.ToObservable.valueOf("INSTANCE")); } + + @Test + public void singleIterableToFlowableIterable() { + Iterable> it = SingleInternalHelper.iterableToFlowable( + Collections.singletonList(Single.just(1))); + + Iterator> iter = it.iterator(); + + if (iter.hasNext()) { + iter.next().test().assertResult(1); + if (iter.hasNext()) { + fail("Iterator reports an additional element"); + } + } else { + fail("Iterator was empty"); + } + } } diff --git a/src/test/java/io/reactivex/internal/subscribers/StrictSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/StrictSubscriberTest.java new file mode 100644 index 0000000000..0850149811 --- /dev/null +++ b/src/test/java/io/reactivex/internal/subscribers/StrictSubscriberTest.java @@ -0,0 +1,334 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.Flowable; +import io.reactivex.exceptions.TestException; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.subscribers.TestSubscriber; + +public class StrictSubscriberTest { + + @Test + public void strictMode() { + final List list = new ArrayList(); + Subscriber sub = new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(10); + } + + @Override + public void onNext(Object t) { + list.add(t); + } + + @Override + public void onError(Throwable t) { + list.add(t); + } + + @Override + public void onComplete() { + list.add("Done"); + } + }; + + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(s); + } + }.subscribe(sub); + + assertTrue(list.toString(), list.get(0) instanceof StrictSubscriber); + } + + static final class SubscriberWrapper implements Subscriber { + final TestSubscriber tester; + + SubscriberWrapper(TestSubscriber tester) { + this.tester = tester; + } + + @Override + public void onSubscribe(Subscription s) { + tester.onSubscribe(s); + } + + @Override + public void onNext(T t) { + tester.onNext(t); + } + + @Override + public void onError(Throwable t) { + tester.onError(t); + } + + @Override + public void onComplete() { + tester.onComplete(); + } + } + + @Test + public void normalOnNext() { + TestSubscriber ts = new TestSubscriber(); + SubscriberWrapper wrapper = new SubscriberWrapper(ts); + + Flowable.range(1, 5).subscribe(wrapper); + + ts.assertResult(1, 2, 3, 4, 5); + } + + @Test + public void normalOnNextBackpressured() { + TestSubscriber ts = new TestSubscriber(0); + SubscriberWrapper wrapper = new SubscriberWrapper(ts); + + Flowable.range(1, 5).subscribe(wrapper); + + ts.assertEmpty() + .requestMore(1) + .assertValue(1) + .requestMore(2) + .assertValues(1, 2, 3) + .requestMore(2) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void normalOnError() { + TestSubscriber ts = new TestSubscriber(); + SubscriberWrapper wrapper = new SubscriberWrapper(ts); + + Flowable.range(1, 5).concatWith(Flowable.error(new TestException())) + .subscribe(wrapper); + + ts.assertFailure(TestException.class, 1, 2, 3, 4, 5); + } + + @Test + public void deferredRequest() { + final List list = new ArrayList(); + Subscriber sub = new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(5); + list.add(0); + } + + @Override + public void onNext(Object t) { + list.add(t); + } + + @Override + public void onError(Throwable t) { + list.add(t); + } + + @Override + public void onComplete() { + list.add("Done"); + } + }; + + Flowable.range(1, 5).subscribe(sub); + + assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, "Done"), list); + } + + @Test + public void requestZero() { + final List list = new ArrayList(); + Subscriber sub = new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(0); + } + + @Override + public void onNext(Object t) { + list.add(t); + } + + @Override + public void onError(Throwable t) { + list.add(t); + } + + @Override + public void onComplete() { + list.add("Done"); + } + }; + + Flowable.range(1, 5).subscribe(sub); + + assertTrue(list.toString(), list.get(0) instanceof IllegalArgumentException); + assertTrue(list.toString(), list.get(0).toString().contains("3.9")); + } + + @Test + public void requestNegative() { + final List list = new ArrayList(); + Subscriber sub = new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(-99); + } + + @Override + public void onNext(Object t) { + list.add(t); + } + + @Override + public void onError(Throwable t) { + list.add(t); + } + + @Override + public void onComplete() { + list.add("Done"); + } + }; + + Flowable.range(1, 5).subscribe(sub); + + assertTrue(list.toString(), list.get(0) instanceof IllegalArgumentException); + assertTrue(list.toString(), list.get(0).toString().contains("3.9")); + } + + @Test + public void cancelAfterOnComplete() { + final List list = new ArrayList(); + Subscriber sub = new Subscriber() { + + Subscription s; + @Override + public void onSubscribe(Subscription s) { + this.s = s; + } + + @Override + public void onNext(Object t) { + list.add(t); + } + + @Override + public void onError(Throwable t) { + s.cancel(); + list.add(t); + } + + @Override + public void onComplete() { + s.cancel(); + list.add("Done"); + } + }; + + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + BooleanSubscription b = new BooleanSubscription(); + s.onSubscribe(b); + s.onComplete(); + list.add(b.isCancelled()); + } + }.subscribe(sub); + + assertEquals(Arrays.asList("Done", false), list); + } + + @Test + public void cancelAfterOnError() { + final List list = new ArrayList(); + Subscriber sub = new Subscriber() { + + Subscription s; + @Override + public void onSubscribe(Subscription s) { + this.s = s; + } + + @Override + public void onNext(Object t) { + list.add(t); + } + + @Override + public void onError(Throwable t) { + s.cancel(); + list.add(t.getMessage()); + } + + @Override + public void onComplete() { + s.cancel(); + list.add("Done"); + } + }; + + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + BooleanSubscription b = new BooleanSubscription(); + s.onSubscribe(b); + s.onError(new TestException("Forced failure")); + list.add(b.isCancelled()); + } + }.subscribe(sub); + + assertEquals(Arrays.asList("Forced failure", false), list); + } + + @Test + public void doubleOnSubscribe() { + TestSubscriber ts = new TestSubscriber(); + SubscriberWrapper wrapper = new SubscriberWrapper(ts); + + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + BooleanSubscription b1 = new BooleanSubscription(); + s.onSubscribe(b1); + + BooleanSubscription b2 = new BooleanSubscription(); + s.onSubscribe(b2); + + assertTrue(b1.isCancelled()); + assertTrue(b2.isCancelled()); + } + }.subscribe(wrapper); + + ts.assertFailure(IllegalStateException.class); + assertTrue(ts.errors().toString(), ts.errors().get(0).getMessage().contains("2.12")); + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelMapTryTest.java b/src/test/java/io/reactivex/parallel/ParallelMapTryTest.java index 261eea5e9e..bed3f5eae8 100644 --- a/src/test/java/io/reactivex/parallel/ParallelMapTryTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelMapTryTest.java @@ -348,4 +348,9 @@ public void mapInvalidSourceConditional() { RxJavaPlugins.reset(); } } + + @Test + public void failureHandlingEnum() { + TestHelper.checkEnum(ParallelFailureHandling.class); + } } diff --git a/src/test/java/io/reactivex/parallel/ParallelRunOnTest.java b/src/test/java/io/reactivex/parallel/ParallelRunOnTest.java index 863c80e104..bf0bb0b33b 100644 --- a/src/test/java/io/reactivex/parallel/ParallelRunOnTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelRunOnTest.java @@ -278,4 +278,47 @@ public void run() { TestHelper.race(r1, r2); } } + + @SuppressWarnings("unchecked") + @Test + public void normalCancelAfterRequest1() { + + TestSubscriber ts = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.range(1, 5) + .parallel(1) + .runOn(ImmediateThinScheduler.INSTANCE) + .subscribe(new Subscriber[] { ts }); + + ts.assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void conditionalCancelAfterRequest1() { + + TestSubscriber ts = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.range(1, 5) + .parallel(1) + .runOn(ImmediateThinScheduler.INSTANCE) + .filter(Functions.alwaysTrue()) + .subscribe(new Subscriber[] { ts }); + + ts.assertResult(1); + } } diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index 8c2b534991..2845cf3a91 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -285,4 +285,26 @@ public void holders() { assertNotNull(new Schedulers.SingleHolder()); } + + static final class CustomScheduler extends Scheduler { + + @Override + public Worker createWorker() { + return Schedulers.single().createWorker(); + } + + } + + @Test + public void customScheduleDirectDisposed() { + CustomScheduler scheduler = new CustomScheduler(); + + Disposable d = scheduler.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MINUTES); + + assertFalse(d.isDisposed()); + + d.dispose(); + + assertTrue(d.isDisposed()); + } } diff --git a/src/test/java/io/reactivex/single/SingleTest.java b/src/test/java/io/reactivex/single/SingleTest.java index 41971d50c6..3fc650d276 100644 --- a/src/test/java/io/reactivex/single/SingleTest.java +++ b/src/test/java/io/reactivex/single/SingleTest.java @@ -577,5 +577,15 @@ public void fromObservableError() { .assertFailure(RuntimeException.class) .assertErrorMessage("some error"); } + + @Test(expected = NullPointerException.class) + public void implementationThrows() { + new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + throw new NullPointerException(); + } + }.test(); + } }