diff --git a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java new file mode 100644 index 0000000000..a6007e938e --- /dev/null +++ b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java @@ -0,0 +1,86 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.observers; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.CompletableObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.ListCompositeDisposable; +import io.reactivex.internal.functions.ObjectHelper; + +/** + * An abstract {@link CompletableObserver} that allows asynchronous cancellation of its subscription and associated resources. + * + *

All pre-implemented final methods are thread-safe. + */ +public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable { + /** The active subscription. */ + private final AtomicReference s = new AtomicReference(); + + /** The resource composite, can never be null. */ + private final ListCompositeDisposable resources = new ListCompositeDisposable(); + + /** + * Adds a resource to this ResourceObserver. + * + * @param resource the resource to add + * + * @throws NullPointerException if resource is null + */ + public final void add(Disposable resource) { + ObjectHelper.requireNonNull(resource, "resource is null"); + resources.add(resource); + } + + @Override + public final void onSubscribe(Disposable s) { + if (DisposableHelper.setOnce(this.s, s)) { + onStart(); + } + } + + /** + * Called once the upstream sets a Subscription on this ResourceObserver. + * + *

You can perform initialization at this moment. The default + * implementation does nothing. + */ + protected void onStart() { + } + + /** + * Cancels the main disposable (if any) and disposes the resources associated with + * this ResourceObserver (if any). + * + *

This method can be called before the upstream calls onSubscribe at which + * case the main Disposable will be immediately disposed. + */ + @Override + public final void dispose() { + if (DisposableHelper.dispose(s)) { + resources.dispose(); + } + } + + /** + * Returns true if this ResourceObserver has been disposed/cancelled. + * @return true if this ResourceObserver has been disposed/cancelled + */ + @Override + public final boolean isDisposed() { + return DisposableHelper.isDisposed(s.get()); + } +} diff --git a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java new file mode 100644 index 0000000000..07a76ac111 --- /dev/null +++ b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java @@ -0,0 +1,88 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.observers; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.MaybeObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.ListCompositeDisposable; +import io.reactivex.internal.functions.ObjectHelper; + +/** + * An abstract {@link MaybeObserver} that allows asynchronous cancellation of its subscription and associated resources. + * + *

All pre-implemented final methods are thread-safe. + * + * @param the value type + */ +public abstract class ResourceMaybeObserver implements MaybeObserver, Disposable { + /** The active subscription. */ + private final AtomicReference s = new AtomicReference(); + + /** The resource composite, can never be null. */ + private final ListCompositeDisposable resources = new ListCompositeDisposable(); + + /** + * Adds a resource to this ResourceObserver. + * + * @param resource the resource to add + * + * @throws NullPointerException if resource is null + */ + public final void add(Disposable resource) { + ObjectHelper.requireNonNull(resource, "resource is null"); + resources.add(resource); + } + + @Override + public final void onSubscribe(Disposable s) { + if (DisposableHelper.setOnce(this.s, s)) { + onStart(); + } + } + + /** + * Called once the upstream sets a Subscription on this ResourceObserver. + * + *

You can perform initialization at this moment. The default + * implementation does nothing. + */ + protected void onStart() { + } + + /** + * Cancels the main disposable (if any) and disposes the resources associated with + * this ResourceObserver (if any). + * + *

This method can be called before the upstream calls onSubscribe at which + * case the main Disposable will be immediately disposed. + */ + @Override + public final void dispose() { + if (DisposableHelper.dispose(s)) { + resources.dispose(); + } + } + + /** + * Returns true if this ResourceObserver has been disposed/cancelled. + * @return true if this ResourceObserver has been disposed/cancelled + */ + @Override + public final boolean isDisposed() { + return DisposableHelper.isDisposed(s.get()); + } +} diff --git a/src/main/java/io/reactivex/observers/ResourceObserver.java b/src/main/java/io/reactivex/observers/ResourceObserver.java index 8e081c56df..5baa8c8cb5 100644 --- a/src/main/java/io/reactivex/observers/ResourceObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceObserver.java @@ -21,7 +21,7 @@ import io.reactivex.internal.functions.ObjectHelper; /** - * An abstract Observer that allows asynchronous cancellation of its subscription and associated resources. + * An abstract {@link Observer} that allows asynchronous cancellation of its subscription and associated resources. * *

All pre-implemented final methods are thread-safe. * @@ -31,7 +31,7 @@ public abstract class ResourceObserver implements Observer, Disposable { /** The active subscription. */ private final AtomicReference s = new AtomicReference(); - /** The resource composite, can be null. */ + /** The resource composite, can never be null. */ private final ListCompositeDisposable resources = new ListCompositeDisposable(); /** @@ -69,17 +69,13 @@ protected void onStart() { *

This method can be called before the upstream calls onSubscribe at which * case the main Disposable will be immediately disposed. */ - protected final void cancel() { + @Override + public final void dispose() { if (DisposableHelper.dispose(s)) { resources.dispose(); } } - @Override - public final void dispose() { - cancel(); - } - /** * Returns true if this ResourceObserver has been disposed/cancelled. * @return true if this ResourceObserver has been disposed/cancelled diff --git a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java new file mode 100644 index 0000000000..37c213c56e --- /dev/null +++ b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java @@ -0,0 +1,88 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.observers; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.SingleObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.ListCompositeDisposable; +import io.reactivex.internal.functions.ObjectHelper; + +/** + * An abstract {@link SingleObserver} that allows asynchronous cancellation of its subscription and associated resources. + * + *

All pre-implemented final methods are thread-safe. + * + * @param the value type + */ +public abstract class ResourceSingleObserver implements SingleObserver, Disposable { + /** The active subscription. */ + private final AtomicReference s = new AtomicReference(); + + /** The resource composite, can never be null. */ + private final ListCompositeDisposable resources = new ListCompositeDisposable(); + + /** + * Adds a resource to this ResourceObserver. + * + * @param resource the resource to add + * + * @throws NullPointerException if resource is null + */ + public final void add(Disposable resource) { + ObjectHelper.requireNonNull(resource, "resource is null"); + resources.add(resource); + } + + @Override + public final void onSubscribe(Disposable s) { + if (DisposableHelper.setOnce(this.s, s)) { + onStart(); + } + } + + /** + * Called once the upstream sets a Subscription on this ResourceObserver. + * + *

You can perform initialization at this moment. The default + * implementation does nothing. + */ + protected void onStart() { + } + + /** + * Cancels the main disposable (if any) and disposes the resources associated with + * this ResourceObserver (if any). + * + *

This method can be called before the upstream calls onSubscribe at which + * case the main Disposable will be immediately disposed. + */ + @Override + public final void dispose() { + if (DisposableHelper.dispose(s)) { + resources.dispose(); + } + } + + /** + * Returns true if this ResourceObserver has been disposed/cancelled. + * @return true if this ResourceObserver has been disposed/cancelled + */ + @Override + public final boolean isDisposed() { + return DisposableHelper.isDisposed(s.get()); + } +} diff --git a/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java b/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java index d996127900..06ec84439f 100644 --- a/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java @@ -13,17 +13,19 @@ package io.reactivex.subscribers; -import java.util.concurrent.atomic.*; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; -import org.reactivestreams.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.ListCompositeDisposable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; /** - * An abstract Subscriber that allows asynchronous cancellation of its - * subscription. + * An abstract Subscriber that allows asynchronous cancellation of its subscription and associated resources. * *

This implementation let's you chose if the AsyncObserver manages resources or not, * thus saving memory on cases where there is no need for that. @@ -36,8 +38,8 @@ public abstract class ResourceSubscriber implements Subscriber, Disposable /** The active subscription. */ private final AtomicReference s = new AtomicReference(); - /** The resource composite, can be null. */ - private final CompositeDisposable resources = new CompositeDisposable(); + /** The resource composite, can never be null. */ + private final ListCompositeDisposable resources = new ListCompositeDisposable(); /** Remembers the request(n) counts until a subscription arrives. */ private final AtomicLong missedRequested = new AtomicLong(); @@ -90,17 +92,13 @@ protected final void request(long n) { *

This method can be called before the upstream calls onSubscribe at which * case the Subscription will be immediately cancelled. */ - protected final void cancel() { + @Override + public final void dispose() { if (SubscriptionHelper.cancel(s)) { resources.dispose(); } } - @Override - public final void dispose() { - cancel(); - } - /** * Returns true if this AsyncObserver has been disposed/cancelled. * @return true if this AsyncObserver has been disposed/cancelled diff --git a/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java b/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java index fdd17ef09b..05052be5f7 100644 --- a/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java @@ -408,7 +408,7 @@ public void onNext(Integer t) { int total = totalReceived.incrementAndGet(); received.incrementAndGet(); if (total >= 2000) { - cancel(); + dispose(); } if (received.get() == 100) { batches.incrementAndGet(); @@ -457,7 +457,7 @@ public void onNext(Integer t) { boolean done = false; if (total >= 2000) { done = true; - cancel(); + dispose(); } if (received.get() == 100) { batches.incrementAndGet(); diff --git a/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java b/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java new file mode 100644 index 0000000000..aeea523b72 --- /dev/null +++ b/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java @@ -0,0 +1,202 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.observers; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import io.reactivex.Completable; +import io.reactivex.TestHelper; +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ResourceCompletableObserverTest { + static final class TestResourceCompletableObserver extends ResourceCompletableObserver { + final List errors = new ArrayList(); + + int complete; + + int start; + + @Override + protected void onStart() { + super.onStart(); + + start++; + } + + @Override + public void onComplete() { + complete++; + + dispose(); + } + + @Override + public void onError(Throwable e) { + errors.add(e); + + dispose(); + } + } + + @Test(expected = NullPointerException.class) + public void nullResource() { + TestResourceCompletableObserver rco = new TestResourceCompletableObserver(); + rco.add(null); + } + + @Test + public void addResources() { + TestResourceCompletableObserver rco = new TestResourceCompletableObserver(); + + assertFalse(rco.isDisposed()); + + Disposable d = Disposables.empty(); + + rco.add(d); + + assertFalse(d.isDisposed()); + + rco.dispose(); + + assertTrue(rco.isDisposed()); + + assertTrue(d.isDisposed()); + + rco.dispose(); + + assertTrue(rco.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void onCompleteCleansUp() { + TestResourceCompletableObserver rco = new TestResourceCompletableObserver(); + + assertFalse(rco.isDisposed()); + + Disposable d = Disposables.empty(); + + rco.add(d); + + assertFalse(d.isDisposed()); + + rco.onComplete(); + + assertTrue(rco.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void onErrorCleansUp() { + TestResourceCompletableObserver rco = new TestResourceCompletableObserver(); + + assertFalse(rco.isDisposed()); + + Disposable d = Disposables.empty(); + + rco.add(d); + + assertFalse(d.isDisposed()); + + rco.onError(new TestException()); + + assertTrue(rco.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void normal() { + TestResourceCompletableObserver rco = new TestResourceCompletableObserver(); + + assertFalse(rco.isDisposed()); + assertEquals(0, rco.start); + assertTrue(rco.errors.isEmpty()); + + Completable.complete().subscribe(rco); + + assertTrue(rco.isDisposed()); + assertEquals(1, rco.start); + assertEquals(1, rco.complete); + assertTrue(rco.errors.isEmpty()); + } + + @Test + public void error() { + TestResourceCompletableObserver rco = new TestResourceCompletableObserver(); + + assertFalse(rco.isDisposed()); + assertEquals(0, rco.start); + assertTrue(rco.errors.isEmpty()); + + final RuntimeException error = new RuntimeException("error"); + Completable.error(error).subscribe(rco); + + assertTrue(rco.isDisposed()); + assertEquals(1, rco.start); + assertEquals(0, rco.complete); + assertEquals(1, rco.errors.size()); + assertTrue(rco.errors.contains(error)); + } + + @Test + public void startOnce() { + + List error = TestHelper.trackPluginErrors(); + + try { + TestResourceCompletableObserver rco = new TestResourceCompletableObserver(); + + rco.onSubscribe(Disposables.empty()); + + Disposable d = Disposables.empty(); + + rco.onSubscribe(d); + + assertTrue(d.isDisposed()); + + assertEquals(1, rco.start); + + TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose() { + TestResourceCompletableObserver rco = new TestResourceCompletableObserver(); + rco.dispose(); + + Disposable d = Disposables.empty(); + + rco.onSubscribe(d); + + assertTrue(d.isDisposed()); + + assertEquals(0, rco.start); + } +} diff --git a/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java b/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java new file mode 100644 index 0000000000..a57946ec98 --- /dev/null +++ b/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java @@ -0,0 +1,253 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.observers; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import io.reactivex.Maybe; +import io.reactivex.TestHelper; +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class ResourceMaybeObserverTest { + static final class TestResourceMaybeObserver extends ResourceMaybeObserver { + T value; + + final List errors = new ArrayList(); + + int complete; + + int start; + + @Override + protected void onStart() { + super.onStart(); + + start++; + } + + @Override + public void onSuccess(final T value) { + this.value = value; + + dispose(); + } + + @Override + public void onComplete() { + complete++; + + dispose(); + } + + @Override + public void onError(Throwable e) { + errors.add(e); + + dispose(); + } + } + + @Test(expected = NullPointerException.class) + public void nullResource() { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + rmo.add(null); + } + + @Test + public void addResources() { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + + assertFalse(rmo.isDisposed()); + + Disposable d = Disposables.empty(); + + rmo.add(d); + + assertFalse(d.isDisposed()); + + rmo.dispose(); + + assertTrue(rmo.isDisposed()); + + assertTrue(d.isDisposed()); + + rmo.dispose(); + + assertTrue(rmo.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void onCompleteCleansUp() { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + + assertFalse(rmo.isDisposed()); + + Disposable d = Disposables.empty(); + + rmo.add(d); + + assertFalse(d.isDisposed()); + + rmo.onComplete(); + + assertTrue(rmo.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void onSuccessCleansUp() { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + + assertFalse(rmo.isDisposed()); + + Disposable d = Disposables.empty(); + + rmo.add(d); + + assertFalse(d.isDisposed()); + + rmo.onSuccess(1); + + assertTrue(rmo.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void onErrorCleansUp() { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + + assertFalse(rmo.isDisposed()); + + Disposable d = Disposables.empty(); + + rmo.add(d); + + assertFalse(d.isDisposed()); + + rmo.onError(new TestException()); + + assertTrue(rmo.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void normal() { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + + assertFalse(rmo.isDisposed()); + assertEquals(0, rmo.start); + assertNull(rmo.value); + assertTrue(rmo.errors.isEmpty()); + + Maybe.just(1).subscribe(rmo); + + assertTrue(rmo.isDisposed()); + assertEquals(1, rmo.start); + assertEquals(Integer.valueOf(1), rmo.value); + assertEquals(0, rmo.complete); + assertTrue(rmo.errors.isEmpty()); + } + + @Test + public void empty() { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + + assertFalse(rmo.isDisposed()); + assertEquals(0, rmo.start); + assertNull(rmo.value); + assertTrue(rmo.errors.isEmpty()); + + Maybe.empty().subscribe(rmo); + + assertTrue(rmo.isDisposed()); + assertEquals(1, rmo.start); + assertNull(rmo.value); + assertEquals(1, rmo.complete); + assertTrue(rmo.errors.isEmpty()); + } + + @Test + public void error() { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + + assertFalse(rmo.isDisposed()); + assertEquals(0, rmo.start); + assertNull(rmo.value); + assertTrue(rmo.errors.isEmpty()); + + final RuntimeException error = new RuntimeException("error"); + Maybe.error(error).subscribe(rmo); + + assertTrue(rmo.isDisposed()); + assertEquals(1, rmo.start); + assertNull(rmo.value); + assertEquals(0, rmo.complete); + assertEquals(1, rmo.errors.size()); + assertTrue(rmo.errors.contains(error)); + } + + @Test + public void startOnce() { + + List error = TestHelper.trackPluginErrors(); + + try { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + + rmo.onSubscribe(Disposables.empty()); + + Disposable d = Disposables.empty(); + + rmo.onSubscribe(d); + + assertTrue(d.isDisposed()); + + assertEquals(1, rmo.start); + + TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose() { + TestResourceMaybeObserver rmo = new TestResourceMaybeObserver(); + rmo.dispose(); + + Disposable d = Disposables.empty(); + + rmo.onSubscribe(d); + + assertTrue(d.isDisposed()); + + assertEquals(0, rmo.start); + } +} diff --git a/src/test/java/io/reactivex/observers/ResourceObserverTest.java b/src/test/java/io/reactivex/observers/ResourceObserverTest.java index ff1b34e536..40cf6a0d1f 100644 --- a/src/test/java/io/reactivex/observers/ResourceObserverTest.java +++ b/src/test/java/io/reactivex/observers/ResourceObserverTest.java @@ -13,19 +13,22 @@ package io.reactivex.observers; -import static org.junit.Assert.*; - -import java.util.*; - import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + import io.reactivex.Observable; import io.reactivex.TestHelper; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; -import io.reactivex.observers.ResourceObserver; import io.reactivex.plugins.RxJavaPlugins; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class ResourceObserverTest { static final class TestResourceObserver extends ResourceObserver { @@ -62,10 +65,6 @@ public void onComplete() { dispose(); } - - void cancelIt() { - cancel(); - } } @Test(expected = NullPointerException.class) @@ -86,7 +85,7 @@ public void addResources() { assertFalse(d.isDisposed()); - ro.cancelIt(); + ro.dispose(); assertTrue(ro.isDisposed()); @@ -154,6 +153,25 @@ public void normal() { assertTrue(tc.errors.isEmpty()); } + @Test + public void error() { + TestResourceObserver tc = new TestResourceObserver(); + + assertFalse(tc.isDisposed()); + assertEquals(0, tc.start); + assertTrue(tc.values.isEmpty()); + assertTrue(tc.errors.isEmpty()); + + final RuntimeException error = new RuntimeException("error"); + Observable.error(error).subscribe(tc); + + assertTrue(tc.isDisposed()); + assertEquals(1, tc.start); + assertTrue(tc.values.isEmpty()); + assertEquals(1, tc.errors.size()); + assertTrue(tc.errors.contains(error)); + } + @Test public void startOnce() { diff --git a/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java b/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java new file mode 100644 index 0000000000..7e47901662 --- /dev/null +++ b/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java @@ -0,0 +1,205 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.observers; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import io.reactivex.Single; +import io.reactivex.TestHelper; +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class ResourceSingleObserverTest { + static final class TestResourceSingleObserver extends ResourceSingleObserver { + T value; + + final List errors = new ArrayList(); + + int start; + + @Override + protected void onStart() { + super.onStart(); + + start++; + } + + @Override + public void onSuccess(final T value) { + this.value = value; + + dispose(); + } + + @Override + public void onError(Throwable e) { + errors.add(e); + + dispose(); + } + } + + @Test(expected = NullPointerException.class) + public void nullResource() { + TestResourceSingleObserver rso = new TestResourceSingleObserver(); + rso.add(null); + } + + @Test + public void addResources() { + TestResourceSingleObserver rso = new TestResourceSingleObserver(); + + assertFalse(rso.isDisposed()); + + Disposable d = Disposables.empty(); + + rso.add(d); + + assertFalse(d.isDisposed()); + + rso.dispose(); + + assertTrue(rso.isDisposed()); + + assertTrue(d.isDisposed()); + + rso.dispose(); + + assertTrue(rso.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void onSuccessCleansUp() { + TestResourceSingleObserver rso = new TestResourceSingleObserver(); + + assertFalse(rso.isDisposed()); + + Disposable d = Disposables.empty(); + + rso.add(d); + + assertFalse(d.isDisposed()); + + rso.onSuccess(1); + + assertTrue(rso.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void onErrorCleansUp() { + TestResourceSingleObserver rso = new TestResourceSingleObserver(); + + assertFalse(rso.isDisposed()); + + Disposable d = Disposables.empty(); + + rso.add(d); + + assertFalse(d.isDisposed()); + + rso.onError(new TestException()); + + assertTrue(rso.isDisposed()); + + assertTrue(d.isDisposed()); + } + + @Test + public void normal() { + TestResourceSingleObserver rso = new TestResourceSingleObserver(); + + assertFalse(rso.isDisposed()); + assertEquals(0, rso.start); + assertNull(rso.value); + assertTrue(rso.errors.isEmpty()); + + Single.just(1).subscribe(rso); + + assertTrue(rso.isDisposed()); + assertEquals(1, rso.start); + assertEquals(Integer.valueOf(1), rso.value); + assertTrue(rso.errors.isEmpty()); + } + + @Test + public void error() { + TestResourceSingleObserver rso = new TestResourceSingleObserver(); + + assertFalse(rso.isDisposed()); + assertEquals(0, rso.start); + assertNull(rso.value); + assertTrue(rso.errors.isEmpty()); + + final RuntimeException error = new RuntimeException("error"); + Single.error(error).subscribe(rso); + + assertTrue(rso.isDisposed()); + assertEquals(1, rso.start); + assertNull(rso.value); + assertEquals(1, rso.errors.size()); + assertTrue(rso.errors.contains(error)); + } + + @Test + public void startOnce() { + + List error = TestHelper.trackPluginErrors(); + + try { + TestResourceSingleObserver rso = new TestResourceSingleObserver(); + + rso.onSubscribe(Disposables.empty()); + + Disposable d = Disposables.empty(); + + rso.onSubscribe(d); + + assertTrue(d.isDisposed()); + + assertEquals(1, rso.start); + + TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose() { + TestResourceSingleObserver rso = new TestResourceSingleObserver(); + rso.dispose(); + + Disposable d = Disposables.empty(); + + rso.onSubscribe(d); + + assertTrue(d.isDisposed()); + + assertEquals(0, rso.start); + } +} diff --git a/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java b/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java index 5bf4923700..5b7de925f5 100644 --- a/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java +++ b/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java @@ -62,10 +62,6 @@ public void onComplete() { dispose(); } - void cancelIt() { - cancel(); - } - void requestMore(long n) { request(n); } @@ -89,7 +85,7 @@ public void addResources() { assertFalse(d.isDisposed()); - ro.cancelIt(); + ro.dispose(); assertTrue(ro.isDisposed());