Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add missing Resource Observer for Maybe, Completable & Single and adjust some Javadoc #4518

Merged
merged 3 commits into from
Sep 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.observers;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.CompletableObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.disposables.ListCompositeDisposable;
import io.reactivex.internal.functions.ObjectHelper;

/**
* An abstract {@link CompletableObserver} that allows asynchronous cancellation of its subscription and associated resources.
*
* <p>All pre-implemented final methods are thread-safe.
*/
public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable {
/** The active subscription. */
private final AtomicReference<Disposable> s = new AtomicReference<Disposable>();

/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();

/**
* Adds a resource to this ResourceObserver.
*
* @param resource the resource to add
*
* @throws NullPointerException if resource is null
*/
public final void add(Disposable resource) {
ObjectHelper.requireNonNull(resource, "resource is null");
resources.add(resource);
}

@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
onStart();
}
}

/**
* Called once the upstream sets a Subscription on this ResourceObserver.
*
* <p>You can perform initialization at this moment. The default
* implementation does nothing.
*/
protected void onStart() {
}

/**
* Cancels the main disposable (if any) and disposes the resources associated with
* this ResourceObserver (if any).
*
* <p>This method can be called before the upstream calls onSubscribe at which
* case the main Disposable will be immediately disposed.
*/
@Override
public final void dispose() {
if (DisposableHelper.dispose(s)) {
resources.dispose();
}
}

/**
* Returns true if this ResourceObserver has been disposed/cancelled.
* @return true if this ResourceObserver has been disposed/cancelled
*/
@Override
public final boolean isDisposed() {
return DisposableHelper.isDisposed(s.get());
}
}
88 changes: 88 additions & 0 deletions src/main/java/io/reactivex/observers/ResourceMaybeObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.observers;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.MaybeObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.disposables.ListCompositeDisposable;
import io.reactivex.internal.functions.ObjectHelper;

/**
* An abstract {@link MaybeObserver} that allows asynchronous cancellation of its subscription and associated resources.
*
* <p>All pre-implemented final methods are thread-safe.
*
* @param <T> the value type
*/
public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable {
/** The active subscription. */
private final AtomicReference<Disposable> s = new AtomicReference<Disposable>();

/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();

/**
* Adds a resource to this ResourceObserver.
*
* @param resource the resource to add
*
* @throws NullPointerException if resource is null
*/
public final void add(Disposable resource) {
ObjectHelper.requireNonNull(resource, "resource is null");
resources.add(resource);
}

@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
onStart();
}
}

/**
* Called once the upstream sets a Subscription on this ResourceObserver.
*
* <p>You can perform initialization at this moment. The default
* implementation does nothing.
*/
protected void onStart() {
}

/**
* Cancels the main disposable (if any) and disposes the resources associated with
* this ResourceObserver (if any).
*
* <p>This method can be called before the upstream calls onSubscribe at which
* case the main Disposable will be immediately disposed.
*/
@Override
public final void dispose() {
if (DisposableHelper.dispose(s)) {
resources.dispose();
}
}

/**
* Returns true if this ResourceObserver has been disposed/cancelled.
* @return true if this ResourceObserver has been disposed/cancelled
*/
@Override
public final boolean isDisposed() {
return DisposableHelper.isDisposed(s.get());
}
}
12 changes: 4 additions & 8 deletions src/main/java/io/reactivex/observers/ResourceObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.reactivex.internal.functions.ObjectHelper;

/**
* An abstract Observer that allows asynchronous cancellation of its subscription and associated resources.
* An abstract {@link Observer} that allows asynchronous cancellation of its subscription and associated resources.
*
* <p>All pre-implemented final methods are thread-safe.
*
Expand All @@ -31,7 +31,7 @@ public abstract class ResourceObserver<T> implements Observer<T>, Disposable {
/** The active subscription. */
private final AtomicReference<Disposable> s = new AtomicReference<Disposable>();

/** The resource composite, can be null. */
/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();

/**
Expand Down Expand Up @@ -69,17 +69,13 @@ protected void onStart() {
* <p>This method can be called before the upstream calls onSubscribe at which
* case the main Disposable will be immediately disposed.
*/
protected final void cancel() {
@Override
public final void dispose() {
if (DisposableHelper.dispose(s)) {
resources.dispose();
}
}

@Override
public final void dispose() {
cancel();
}

/**
* Returns true if this ResourceObserver has been disposed/cancelled.
* @return true if this ResourceObserver has been disposed/cancelled
Expand Down
88 changes: 88 additions & 0 deletions src/main/java/io/reactivex/observers/ResourceSingleObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.observers;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.SingleObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.disposables.ListCompositeDisposable;
import io.reactivex.internal.functions.ObjectHelper;

/**
* An abstract {@link SingleObserver} that allows asynchronous cancellation of its subscription and associated resources.
*
* <p>All pre-implemented final methods are thread-safe.
*
* @param <T> the value type
*/
public abstract class ResourceSingleObserver<T> implements SingleObserver<T>, Disposable {
/** The active subscription. */
private final AtomicReference<Disposable> s = new AtomicReference<Disposable>();

/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();

/**
* Adds a resource to this ResourceObserver.
*
* @param resource the resource to add
*
* @throws NullPointerException if resource is null
*/
public final void add(Disposable resource) {
ObjectHelper.requireNonNull(resource, "resource is null");
resources.add(resource);
}

@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
onStart();
}
}

/**
* Called once the upstream sets a Subscription on this ResourceObserver.
*
* <p>You can perform initialization at this moment. The default
* implementation does nothing.
*/
protected void onStart() {
}

/**
* Cancels the main disposable (if any) and disposes the resources associated with
* this ResourceObserver (if any).
*
* <p>This method can be called before the upstream calls onSubscribe at which
* case the main Disposable will be immediately disposed.
*/
@Override
public final void dispose() {
if (DisposableHelper.dispose(s)) {
resources.dispose();
}
}

/**
* Returns true if this ResourceObserver has been disposed/cancelled.
* @return true if this ResourceObserver has been disposed/cancelled
*/
@Override
public final boolean isDisposed() {
return DisposableHelper.isDisposed(s.get());
}
}
24 changes: 11 additions & 13 deletions src/main/java/io/reactivex/subscribers/ResourceSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@

package io.reactivex.subscribers;

import java.util.concurrent.atomic.*;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import org.reactivestreams.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.disposables.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.ListCompositeDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;

/**
* An abstract Subscriber that allows asynchronous cancellation of its
* subscription.
* An abstract Subscriber that allows asynchronous cancellation of its subscription and associated resources.
*
* <p>This implementation let's you chose if the AsyncObserver manages resources or not,
* thus saving memory on cases where there is no need for that.
Expand All @@ -36,8 +38,8 @@ public abstract class ResourceSubscriber<T> implements Subscriber<T>, Disposable
/** The active subscription. */
private final AtomicReference<Subscription> s = new AtomicReference<Subscription>();

/** The resource composite, can be null. */
private final CompositeDisposable resources = new CompositeDisposable();
/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();

/** Remembers the request(n) counts until a subscription arrives. */
private final AtomicLong missedRequested = new AtomicLong();
Expand Down Expand Up @@ -90,17 +92,13 @@ protected final void request(long n) {
* <p>This method can be called before the upstream calls onSubscribe at which
* case the Subscription will be immediately cancelled.
*/
protected final void cancel() {
@Override
public final void dispose() {
if (SubscriptionHelper.cancel(s)) {
resources.dispose();
}
}

@Override
public final void dispose() {
cancel();
}

/**
* Returns true if this AsyncObserver has been disposed/cancelled.
* @return true if this AsyncObserver has been disposed/cancelled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public void onNext(Integer t) {
int total = totalReceived.incrementAndGet();
received.incrementAndGet();
if (total >= 2000) {
cancel();
dispose();
}
if (received.get() == 100) {
batches.incrementAndGet();
Expand Down Expand Up @@ -457,7 +457,7 @@ public void onNext(Integer t) {
boolean done = false;
if (total >= 2000) {
done = true;
cancel();
dispose();
}
if (received.get() == 100) {
batches.incrementAndGet();
Expand Down
Loading