diff --git a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java
new file mode 100644
index 0000000000..a6007e938e
--- /dev/null
+++ b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.observers;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.reactivex.CompletableObserver;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.internal.disposables.DisposableHelper;
+import io.reactivex.internal.disposables.ListCompositeDisposable;
+import io.reactivex.internal.functions.ObjectHelper;
+
+/**
+ * An abstract {@link CompletableObserver} that allows asynchronous cancellation of its subscription and associated resources.
+ *
+ *
All pre-implemented final methods are thread-safe.
+ */
+public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable {
+ /** The active subscription. */
+ private final AtomicReference s = new AtomicReference();
+
+ /** The resource composite, can never be null. */
+ private final ListCompositeDisposable resources = new ListCompositeDisposable();
+
+ /**
+ * Adds a resource to this ResourceObserver.
+ *
+ * @param resource the resource to add
+ *
+ * @throws NullPointerException if resource is null
+ */
+ public final void add(Disposable resource) {
+ ObjectHelper.requireNonNull(resource, "resource is null");
+ resources.add(resource);
+ }
+
+ @Override
+ public final void onSubscribe(Disposable s) {
+ if (DisposableHelper.setOnce(this.s, s)) {
+ onStart();
+ }
+ }
+
+ /**
+ * Called once the upstream sets a Subscription on this ResourceObserver.
+ *
+ * You can perform initialization at this moment. The default
+ * implementation does nothing.
+ */
+ protected void onStart() {
+ }
+
+ /**
+ * Cancels the main disposable (if any) and disposes the resources associated with
+ * this ResourceObserver (if any).
+ *
+ *
This method can be called before the upstream calls onSubscribe at which
+ * case the main Disposable will be immediately disposed.
+ */
+ @Override
+ public final void dispose() {
+ if (DisposableHelper.dispose(s)) {
+ resources.dispose();
+ }
+ }
+
+ /**
+ * Returns true if this ResourceObserver has been disposed/cancelled.
+ * @return true if this ResourceObserver has been disposed/cancelled
+ */
+ @Override
+ public final boolean isDisposed() {
+ return DisposableHelper.isDisposed(s.get());
+ }
+}
diff --git a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java
new file mode 100644
index 0000000000..07a76ac111
--- /dev/null
+++ b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.observers;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.reactivex.MaybeObserver;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.internal.disposables.DisposableHelper;
+import io.reactivex.internal.disposables.ListCompositeDisposable;
+import io.reactivex.internal.functions.ObjectHelper;
+
+/**
+ * An abstract {@link MaybeObserver} that allows asynchronous cancellation of its subscription and associated resources.
+ *
+ *
All pre-implemented final methods are thread-safe.
+ *
+ * @param the value type
+ */
+public abstract class ResourceMaybeObserver implements MaybeObserver, Disposable {
+ /** The active subscription. */
+ private final AtomicReference s = new AtomicReference();
+
+ /** The resource composite, can never be null. */
+ private final ListCompositeDisposable resources = new ListCompositeDisposable();
+
+ /**
+ * Adds a resource to this ResourceObserver.
+ *
+ * @param resource the resource to add
+ *
+ * @throws NullPointerException if resource is null
+ */
+ public final void add(Disposable resource) {
+ ObjectHelper.requireNonNull(resource, "resource is null");
+ resources.add(resource);
+ }
+
+ @Override
+ public final void onSubscribe(Disposable s) {
+ if (DisposableHelper.setOnce(this.s, s)) {
+ onStart();
+ }
+ }
+
+ /**
+ * Called once the upstream sets a Subscription on this ResourceObserver.
+ *
+ * You can perform initialization at this moment. The default
+ * implementation does nothing.
+ */
+ protected void onStart() {
+ }
+
+ /**
+ * Cancels the main disposable (if any) and disposes the resources associated with
+ * this ResourceObserver (if any).
+ *
+ *
This method can be called before the upstream calls onSubscribe at which
+ * case the main Disposable will be immediately disposed.
+ */
+ @Override
+ public final void dispose() {
+ if (DisposableHelper.dispose(s)) {
+ resources.dispose();
+ }
+ }
+
+ /**
+ * Returns true if this ResourceObserver has been disposed/cancelled.
+ * @return true if this ResourceObserver has been disposed/cancelled
+ */
+ @Override
+ public final boolean isDisposed() {
+ return DisposableHelper.isDisposed(s.get());
+ }
+}
diff --git a/src/main/java/io/reactivex/observers/ResourceObserver.java b/src/main/java/io/reactivex/observers/ResourceObserver.java
index 8e081c56df..5baa8c8cb5 100644
--- a/src/main/java/io/reactivex/observers/ResourceObserver.java
+++ b/src/main/java/io/reactivex/observers/ResourceObserver.java
@@ -21,7 +21,7 @@
import io.reactivex.internal.functions.ObjectHelper;
/**
- * An abstract Observer that allows asynchronous cancellation of its subscription and associated resources.
+ * An abstract {@link Observer} that allows asynchronous cancellation of its subscription and associated resources.
*
*
All pre-implemented final methods are thread-safe.
*
@@ -31,7 +31,7 @@ public abstract class ResourceObserver implements Observer, Disposable {
/** The active subscription. */
private final AtomicReference s = new AtomicReference();
- /** The resource composite, can be null. */
+ /** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();
/**
@@ -69,17 +69,13 @@ protected void onStart() {
* This method can be called before the upstream calls onSubscribe at which
* case the main Disposable will be immediately disposed.
*/
- protected final void cancel() {
+ @Override
+ public final void dispose() {
if (DisposableHelper.dispose(s)) {
resources.dispose();
}
}
- @Override
- public final void dispose() {
- cancel();
- }
-
/**
* Returns true if this ResourceObserver has been disposed/cancelled.
* @return true if this ResourceObserver has been disposed/cancelled
diff --git a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java
new file mode 100644
index 0000000000..37c213c56e
--- /dev/null
+++ b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.observers;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.reactivex.SingleObserver;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.internal.disposables.DisposableHelper;
+import io.reactivex.internal.disposables.ListCompositeDisposable;
+import io.reactivex.internal.functions.ObjectHelper;
+
+/**
+ * An abstract {@link SingleObserver} that allows asynchronous cancellation of its subscription and associated resources.
+ *
+ *
All pre-implemented final methods are thread-safe.
+ *
+ * @param the value type
+ */
+public abstract class ResourceSingleObserver implements SingleObserver, Disposable {
+ /** The active subscription. */
+ private final AtomicReference s = new AtomicReference();
+
+ /** The resource composite, can never be null. */
+ private final ListCompositeDisposable resources = new ListCompositeDisposable();
+
+ /**
+ * Adds a resource to this ResourceObserver.
+ *
+ * @param resource the resource to add
+ *
+ * @throws NullPointerException if resource is null
+ */
+ public final void add(Disposable resource) {
+ ObjectHelper.requireNonNull(resource, "resource is null");
+ resources.add(resource);
+ }
+
+ @Override
+ public final void onSubscribe(Disposable s) {
+ if (DisposableHelper.setOnce(this.s, s)) {
+ onStart();
+ }
+ }
+
+ /**
+ * Called once the upstream sets a Subscription on this ResourceObserver.
+ *
+ * You can perform initialization at this moment. The default
+ * implementation does nothing.
+ */
+ protected void onStart() {
+ }
+
+ /**
+ * Cancels the main disposable (if any) and disposes the resources associated with
+ * this ResourceObserver (if any).
+ *
+ *
This method can be called before the upstream calls onSubscribe at which
+ * case the main Disposable will be immediately disposed.
+ */
+ @Override
+ public final void dispose() {
+ if (DisposableHelper.dispose(s)) {
+ resources.dispose();
+ }
+ }
+
+ /**
+ * Returns true if this ResourceObserver has been disposed/cancelled.
+ * @return true if this ResourceObserver has been disposed/cancelled
+ */
+ @Override
+ public final boolean isDisposed() {
+ return DisposableHelper.isDisposed(s.get());
+ }
+}
diff --git a/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java b/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java
index d996127900..06ec84439f 100644
--- a/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java
+++ b/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java
@@ -13,17 +13,19 @@
package io.reactivex.subscribers;
-import java.util.concurrent.atomic.*;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
-import org.reactivestreams.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
-import io.reactivex.disposables.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.internal.disposables.ListCompositeDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
/**
- * An abstract Subscriber that allows asynchronous cancellation of its
- * subscription.
+ * An abstract Subscriber that allows asynchronous cancellation of its subscription and associated resources.
*
*
This implementation let's you chose if the AsyncObserver manages resources or not,
* thus saving memory on cases where there is no need for that.
@@ -36,8 +38,8 @@ public abstract class ResourceSubscriber implements Subscriber, Disposable
/** The active subscription. */
private final AtomicReference s = new AtomicReference();
- /** The resource composite, can be null. */
- private final CompositeDisposable resources = new CompositeDisposable();
+ /** The resource composite, can never be null. */
+ private final ListCompositeDisposable resources = new ListCompositeDisposable();
/** Remembers the request(n) counts until a subscription arrives. */
private final AtomicLong missedRequested = new AtomicLong();
@@ -90,17 +92,13 @@ protected final void request(long n) {
* This method can be called before the upstream calls onSubscribe at which
* case the Subscription will be immediately cancelled.
*/
- protected final void cancel() {
+ @Override
+ public final void dispose() {
if (SubscriptionHelper.cancel(s)) {
resources.dispose();
}
}
- @Override
- public final void dispose() {
- cancel();
- }
-
/**
* Returns true if this AsyncObserver has been disposed/cancelled.
* @return true if this AsyncObserver has been disposed/cancelled
diff --git a/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java b/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java
index fdd17ef09b..05052be5f7 100644
--- a/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java
+++ b/src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java
@@ -408,7 +408,7 @@ public void onNext(Integer t) {
int total = totalReceived.incrementAndGet();
received.incrementAndGet();
if (total >= 2000) {
- cancel();
+ dispose();
}
if (received.get() == 100) {
batches.incrementAndGet();
@@ -457,7 +457,7 @@ public void onNext(Integer t) {
boolean done = false;
if (total >= 2000) {
done = true;
- cancel();
+ dispose();
}
if (received.get() == 100) {
batches.incrementAndGet();
diff --git a/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java b/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java
new file mode 100644
index 0000000000..aeea523b72
--- /dev/null
+++ b/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java
@@ -0,0 +1,202 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.observers;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.reactivex.Completable;
+import io.reactivex.TestHelper;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.disposables.Disposables;
+import io.reactivex.exceptions.TestException;
+import io.reactivex.plugins.RxJavaPlugins;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ResourceCompletableObserverTest {
+ static final class TestResourceCompletableObserver extends ResourceCompletableObserver {
+ final List errors = new ArrayList();
+
+ int complete;
+
+ int start;
+
+ @Override
+ protected void onStart() {
+ super.onStart();
+
+ start++;
+ }
+
+ @Override
+ public void onComplete() {
+ complete++;
+
+ dispose();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ errors.add(e);
+
+ dispose();
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void nullResource() {
+ TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
+ rco.add(null);
+ }
+
+ @Test
+ public void addResources() {
+ TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
+
+ assertFalse(rco.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rco.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rco.dispose();
+
+ assertTrue(rco.isDisposed());
+
+ assertTrue(d.isDisposed());
+
+ rco.dispose();
+
+ assertTrue(rco.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void onCompleteCleansUp() {
+ TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
+
+ assertFalse(rco.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rco.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rco.onComplete();
+
+ assertTrue(rco.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void onErrorCleansUp() {
+ TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
+
+ assertFalse(rco.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rco.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rco.onError(new TestException());
+
+ assertTrue(rco.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void normal() {
+ TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
+
+ assertFalse(rco.isDisposed());
+ assertEquals(0, rco.start);
+ assertTrue(rco.errors.isEmpty());
+
+ Completable.complete().subscribe(rco);
+
+ assertTrue(rco.isDisposed());
+ assertEquals(1, rco.start);
+ assertEquals(1, rco.complete);
+ assertTrue(rco.errors.isEmpty());
+ }
+
+ @Test
+ public void error() {
+ TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
+
+ assertFalse(rco.isDisposed());
+ assertEquals(0, rco.start);
+ assertTrue(rco.errors.isEmpty());
+
+ final RuntimeException error = new RuntimeException("error");
+ Completable.error(error).subscribe(rco);
+
+ assertTrue(rco.isDisposed());
+ assertEquals(1, rco.start);
+ assertEquals(0, rco.complete);
+ assertEquals(1, rco.errors.size());
+ assertTrue(rco.errors.contains(error));
+ }
+
+ @Test
+ public void startOnce() {
+
+ List error = TestHelper.trackPluginErrors();
+
+ try {
+ TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
+
+ rco.onSubscribe(Disposables.empty());
+
+ Disposable d = Disposables.empty();
+
+ rco.onSubscribe(d);
+
+ assertTrue(d.isDisposed());
+
+ assertEquals(1, rco.start);
+
+ TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!");
+ } finally {
+ RxJavaPlugins.reset();
+ }
+ }
+
+ @Test
+ public void dispose() {
+ TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
+ rco.dispose();
+
+ Disposable d = Disposables.empty();
+
+ rco.onSubscribe(d);
+
+ assertTrue(d.isDisposed());
+
+ assertEquals(0, rco.start);
+ }
+}
diff --git a/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java b/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java
new file mode 100644
index 0000000000..a57946ec98
--- /dev/null
+++ b/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java
@@ -0,0 +1,253 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.observers;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.reactivex.Maybe;
+import io.reactivex.TestHelper;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.disposables.Disposables;
+import io.reactivex.exceptions.TestException;
+import io.reactivex.plugins.RxJavaPlugins;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ResourceMaybeObserverTest {
+ static final class TestResourceMaybeObserver extends ResourceMaybeObserver {
+ T value;
+
+ final List errors = new ArrayList();
+
+ int complete;
+
+ int start;
+
+ @Override
+ protected void onStart() {
+ super.onStart();
+
+ start++;
+ }
+
+ @Override
+ public void onSuccess(final T value) {
+ this.value = value;
+
+ dispose();
+ }
+
+ @Override
+ public void onComplete() {
+ complete++;
+
+ dispose();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ errors.add(e);
+
+ dispose();
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void nullResource() {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+ rmo.add(null);
+ }
+
+ @Test
+ public void addResources() {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+
+ assertFalse(rmo.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rmo.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rmo.dispose();
+
+ assertTrue(rmo.isDisposed());
+
+ assertTrue(d.isDisposed());
+
+ rmo.dispose();
+
+ assertTrue(rmo.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void onCompleteCleansUp() {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+
+ assertFalse(rmo.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rmo.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rmo.onComplete();
+
+ assertTrue(rmo.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void onSuccessCleansUp() {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+
+ assertFalse(rmo.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rmo.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rmo.onSuccess(1);
+
+ assertTrue(rmo.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void onErrorCleansUp() {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+
+ assertFalse(rmo.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rmo.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rmo.onError(new TestException());
+
+ assertTrue(rmo.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void normal() {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+
+ assertFalse(rmo.isDisposed());
+ assertEquals(0, rmo.start);
+ assertNull(rmo.value);
+ assertTrue(rmo.errors.isEmpty());
+
+ Maybe.just(1).subscribe(rmo);
+
+ assertTrue(rmo.isDisposed());
+ assertEquals(1, rmo.start);
+ assertEquals(Integer.valueOf(1), rmo.value);
+ assertEquals(0, rmo.complete);
+ assertTrue(rmo.errors.isEmpty());
+ }
+
+ @Test
+ public void empty() {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+
+ assertFalse(rmo.isDisposed());
+ assertEquals(0, rmo.start);
+ assertNull(rmo.value);
+ assertTrue(rmo.errors.isEmpty());
+
+ Maybe.empty().subscribe(rmo);
+
+ assertTrue(rmo.isDisposed());
+ assertEquals(1, rmo.start);
+ assertNull(rmo.value);
+ assertEquals(1, rmo.complete);
+ assertTrue(rmo.errors.isEmpty());
+ }
+
+ @Test
+ public void error() {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+
+ assertFalse(rmo.isDisposed());
+ assertEquals(0, rmo.start);
+ assertNull(rmo.value);
+ assertTrue(rmo.errors.isEmpty());
+
+ final RuntimeException error = new RuntimeException("error");
+ Maybe.error(error).subscribe(rmo);
+
+ assertTrue(rmo.isDisposed());
+ assertEquals(1, rmo.start);
+ assertNull(rmo.value);
+ assertEquals(0, rmo.complete);
+ assertEquals(1, rmo.errors.size());
+ assertTrue(rmo.errors.contains(error));
+ }
+
+ @Test
+ public void startOnce() {
+
+ List error = TestHelper.trackPluginErrors();
+
+ try {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+
+ rmo.onSubscribe(Disposables.empty());
+
+ Disposable d = Disposables.empty();
+
+ rmo.onSubscribe(d);
+
+ assertTrue(d.isDisposed());
+
+ assertEquals(1, rmo.start);
+
+ TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!");
+ } finally {
+ RxJavaPlugins.reset();
+ }
+ }
+
+ @Test
+ public void dispose() {
+ TestResourceMaybeObserver rmo = new TestResourceMaybeObserver();
+ rmo.dispose();
+
+ Disposable d = Disposables.empty();
+
+ rmo.onSubscribe(d);
+
+ assertTrue(d.isDisposed());
+
+ assertEquals(0, rmo.start);
+ }
+}
diff --git a/src/test/java/io/reactivex/observers/ResourceObserverTest.java b/src/test/java/io/reactivex/observers/ResourceObserverTest.java
index ff1b34e536..40cf6a0d1f 100644
--- a/src/test/java/io/reactivex/observers/ResourceObserverTest.java
+++ b/src/test/java/io/reactivex/observers/ResourceObserverTest.java
@@ -13,19 +13,22 @@
package io.reactivex.observers;
-import static org.junit.Assert.*;
-
-import java.util.*;
-
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
import io.reactivex.Observable;
import io.reactivex.TestHelper;
-import io.reactivex.disposables.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.disposables.Disposables;
import io.reactivex.exceptions.TestException;
-import io.reactivex.observers.ResourceObserver;
import io.reactivex.plugins.RxJavaPlugins;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
public class ResourceObserverTest {
static final class TestResourceObserver extends ResourceObserver {
@@ -62,10 +65,6 @@ public void onComplete() {
dispose();
}
-
- void cancelIt() {
- cancel();
- }
}
@Test(expected = NullPointerException.class)
@@ -86,7 +85,7 @@ public void addResources() {
assertFalse(d.isDisposed());
- ro.cancelIt();
+ ro.dispose();
assertTrue(ro.isDisposed());
@@ -154,6 +153,25 @@ public void normal() {
assertTrue(tc.errors.isEmpty());
}
+ @Test
+ public void error() {
+ TestResourceObserver tc = new TestResourceObserver();
+
+ assertFalse(tc.isDisposed());
+ assertEquals(0, tc.start);
+ assertTrue(tc.values.isEmpty());
+ assertTrue(tc.errors.isEmpty());
+
+ final RuntimeException error = new RuntimeException("error");
+ Observable.error(error).subscribe(tc);
+
+ assertTrue(tc.isDisposed());
+ assertEquals(1, tc.start);
+ assertTrue(tc.values.isEmpty());
+ assertEquals(1, tc.errors.size());
+ assertTrue(tc.errors.contains(error));
+ }
+
@Test
public void startOnce() {
diff --git a/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java b/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java
new file mode 100644
index 0000000000..7e47901662
--- /dev/null
+++ b/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java
@@ -0,0 +1,205 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.observers;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.reactivex.Single;
+import io.reactivex.TestHelper;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.disposables.Disposables;
+import io.reactivex.exceptions.TestException;
+import io.reactivex.plugins.RxJavaPlugins;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ResourceSingleObserverTest {
+ static final class TestResourceSingleObserver extends ResourceSingleObserver {
+ T value;
+
+ final List errors = new ArrayList();
+
+ int start;
+
+ @Override
+ protected void onStart() {
+ super.onStart();
+
+ start++;
+ }
+
+ @Override
+ public void onSuccess(final T value) {
+ this.value = value;
+
+ dispose();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ errors.add(e);
+
+ dispose();
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void nullResource() {
+ TestResourceSingleObserver rso = new TestResourceSingleObserver();
+ rso.add(null);
+ }
+
+ @Test
+ public void addResources() {
+ TestResourceSingleObserver rso = new TestResourceSingleObserver();
+
+ assertFalse(rso.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rso.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rso.dispose();
+
+ assertTrue(rso.isDisposed());
+
+ assertTrue(d.isDisposed());
+
+ rso.dispose();
+
+ assertTrue(rso.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void onSuccessCleansUp() {
+ TestResourceSingleObserver rso = new TestResourceSingleObserver();
+
+ assertFalse(rso.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rso.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rso.onSuccess(1);
+
+ assertTrue(rso.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void onErrorCleansUp() {
+ TestResourceSingleObserver rso = new TestResourceSingleObserver();
+
+ assertFalse(rso.isDisposed());
+
+ Disposable d = Disposables.empty();
+
+ rso.add(d);
+
+ assertFalse(d.isDisposed());
+
+ rso.onError(new TestException());
+
+ assertTrue(rso.isDisposed());
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void normal() {
+ TestResourceSingleObserver rso = new TestResourceSingleObserver();
+
+ assertFalse(rso.isDisposed());
+ assertEquals(0, rso.start);
+ assertNull(rso.value);
+ assertTrue(rso.errors.isEmpty());
+
+ Single.just(1).subscribe(rso);
+
+ assertTrue(rso.isDisposed());
+ assertEquals(1, rso.start);
+ assertEquals(Integer.valueOf(1), rso.value);
+ assertTrue(rso.errors.isEmpty());
+ }
+
+ @Test
+ public void error() {
+ TestResourceSingleObserver rso = new TestResourceSingleObserver();
+
+ assertFalse(rso.isDisposed());
+ assertEquals(0, rso.start);
+ assertNull(rso.value);
+ assertTrue(rso.errors.isEmpty());
+
+ final RuntimeException error = new RuntimeException("error");
+ Single.error(error).subscribe(rso);
+
+ assertTrue(rso.isDisposed());
+ assertEquals(1, rso.start);
+ assertNull(rso.value);
+ assertEquals(1, rso.errors.size());
+ assertTrue(rso.errors.contains(error));
+ }
+
+ @Test
+ public void startOnce() {
+
+ List error = TestHelper.trackPluginErrors();
+
+ try {
+ TestResourceSingleObserver rso = new TestResourceSingleObserver();
+
+ rso.onSubscribe(Disposables.empty());
+
+ Disposable d = Disposables.empty();
+
+ rso.onSubscribe(d);
+
+ assertTrue(d.isDisposed());
+
+ assertEquals(1, rso.start);
+
+ TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!");
+ } finally {
+ RxJavaPlugins.reset();
+ }
+ }
+
+ @Test
+ public void dispose() {
+ TestResourceSingleObserver rso = new TestResourceSingleObserver();
+ rso.dispose();
+
+ Disposable d = Disposables.empty();
+
+ rso.onSubscribe(d);
+
+ assertTrue(d.isDisposed());
+
+ assertEquals(0, rso.start);
+ }
+}
diff --git a/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java b/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java
index 5bf4923700..5b7de925f5 100644
--- a/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java
+++ b/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java
@@ -62,10 +62,6 @@ public void onComplete() {
dispose();
}
- void cancelIt() {
- cancel();
- }
-
void requestMore(long n) {
request(n);
}
@@ -89,7 +85,7 @@ public void addResources() {
assertFalse(d.isDisposed());
- ro.cancelIt();
+ ro.dispose();
assertTrue(ro.isDisposed());