Skip to content

Commit

Permalink
More nullability annotations (#5251)
Browse files Browse the repository at this point in the history
* More nullability annotations

* Refactored imports

* Changes based on akarnokd's review

* A few more annotations

* Changes based on akarnokd's 2nd review
  • Loading branch information
mibac138 authored and akarnokd committed Apr 1, 2017
1 parent 5ec4f76 commit fa58d36
Show file tree
Hide file tree
Showing 35 changed files with 187 additions and 87 deletions.
10 changes: 8 additions & 2 deletions src/main/java/io/reactivex/Notification.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex;

import io.reactivex.annotations.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.NotificationLite;

Expand Down Expand Up @@ -66,6 +67,7 @@ public boolean isOnNext() {
* @see #isOnNext()
*/
@SuppressWarnings("unchecked")
@Nullable
public T getValue() {
Object o = value;
if (o != null && !NotificationLite.isError(o)) {
Expand All @@ -80,6 +82,7 @@ public T getValue() {
* @return the Throwable error contained or null
* @see #isOnError()
*/
@Nullable
public Throwable getError() {
Object o = value;
if (NotificationLite.isError(o)) {
Expand Down Expand Up @@ -122,7 +125,8 @@ public String toString() {
* @return the new Notification instance
* @throws NullPointerException if value is null
*/
public static <T> Notification<T> createOnNext(T value) {
@NonNull
public static <T> Notification<T> createOnNext(@NonNull T value) {
ObjectHelper.requireNonNull(value, "value is null");
return new Notification<T>(value);
}
Expand All @@ -134,7 +138,8 @@ public static <T> Notification<T> createOnNext(T value) {
* @return the new Notification instance
* @throws NullPointerException if error is null
*/
public static <T> Notification<T> createOnError(Throwable error) {
@NonNull
public static <T> Notification<T> createOnError(@NonNull Throwable error) {
ObjectHelper.requireNonNull(error, "error is null");
return new Notification<T>(NotificationLite.error(error));
}
Expand All @@ -146,6 +151,7 @@ public static <T> Notification<T> createOnError(Throwable error) {
* @return the shared Notification instance representing an onComplete signal
*/
@SuppressWarnings("unchecked")
@NonNull
public static <T> Notification<T> createOnComplete() {
return (Notification<T>)COMPLETE;
}
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/io/reactivex/Observer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex;

import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;

/**
Expand Down Expand Up @@ -40,7 +41,7 @@ public interface Observer<T> {
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(Disposable d);
void onSubscribe(@NonNull Disposable d);

/**
* Provides the Observer with a new item to observe.
Expand All @@ -53,7 +54,7 @@ public interface Observer<T> {
* @param t
* the item emitted by the Observable
*/
void onNext(T t);
void onNext(@NonNull T t);

/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
Expand All @@ -64,7 +65,7 @@ public interface Observer<T> {
* @param e
* the exception encountered by the Observable
*/
void onError(Throwable e);
void onError(@NonNull Throwable e);

/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import java.util.NoSuchElementException;
import java.util.concurrent.*;

import org.reactivestreams.Publisher;

import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
Expand All @@ -34,6 +32,7 @@
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Publisher;

/**
* The Single class implements the Reactive Pattern for a single value response.
Expand Down Expand Up @@ -2714,7 +2713,7 @@ public final void subscribe(SingleObserver<? super T> subscriber) {
* Override this method in subclasses to handle the incoming SingleObservers.
* @param observer the SingleObserver to handle, not null
*/
protected abstract void subscribeActual(SingleObserver<? super T> observer);
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);

/**
* Subscribes a given SingleObserver (subclass) to this Single and returns the given
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/reactivex/disposables/ActionDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package io.reactivex.disposables;

import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Action;
import io.reactivex.internal.util.ExceptionHelper;

Expand All @@ -24,7 +25,7 @@ final class ActionDisposable extends ReferenceDisposable<Action> {
}

@Override
protected void onDisposed(Action value) {
protected void onDisposed(@NonNull Action value) {
try {
value.run();
} catch (Throwable ex) {
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/reactivex/disposables/CompositeDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.*;

import io.reactivex.annotations.NonNull;
import io.reactivex.exceptions.*;
import io.reactivex.internal.disposables.DisposableContainer;
import io.reactivex.internal.functions.ObjectHelper;
Expand All @@ -39,7 +40,7 @@ public CompositeDisposable() {
* Creates a CompositeDisposables with the given array of initial elements.
* @param resources the array of Disposables to start with
*/
public CompositeDisposable(Disposable... resources) {
public CompositeDisposable(@NonNull Disposable... resources) {
ObjectHelper.requireNonNull(resources, "resources is null");
this.resources = new OpenHashSet<Disposable>(resources.length + 1);
for (Disposable d : resources) {
Expand All @@ -52,7 +53,7 @@ public CompositeDisposable(Disposable... resources) {
* Creates a CompositeDisposables with the given Iterable sequence of initial elements.
* @param resources the Iterable sequence of Disposables to start with
*/
public CompositeDisposable(Iterable<? extends Disposable> resources) {
public CompositeDisposable(@NonNull Iterable<? extends Disposable> resources) {
ObjectHelper.requireNonNull(resources, "resources is null");
this.resources = new OpenHashSet<Disposable>();
for (Disposable d : resources) {
Expand Down Expand Up @@ -85,7 +86,7 @@ public boolean isDisposed() {
}

@Override
public boolean add(Disposable d) {
public boolean add(@NonNull Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
if (!disposed) {
synchronized (this) {
Expand All @@ -110,7 +111,7 @@ public boolean add(Disposable d) {
* @param ds the array of Disposables
* @return true if the operation was successful, false if the container has been disposed
*/
public boolean addAll(Disposable... ds) {
public boolean addAll(@NonNull Disposable... ds) {
ObjectHelper.requireNonNull(ds, "ds is null");
if (!disposed) {
synchronized (this) {
Expand All @@ -135,7 +136,7 @@ public boolean addAll(Disposable... ds) {
}

@Override
public boolean remove(Disposable d) {
public boolean remove(@NonNull Disposable d) {
if (delete(d)) {
d.dispose();
return true;
Expand All @@ -144,7 +145,7 @@ public boolean remove(Disposable d) {
}

@Override
public boolean delete(Disposable d) {
public boolean delete(@NonNull Disposable d) {
ObjectHelper.requireNonNull(d, "Disposable item is null");
if (disposed) {
return false;
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/io/reactivex/disposables/Disposables.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

import java.util.concurrent.Future;

import org.reactivestreams.Subscription;

import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Action;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.*;
import org.reactivestreams.Subscription;

/**
* Utility class to help create disposables by wrapping
Expand All @@ -38,7 +38,8 @@ private Disposables() {
* @param run the Runnable to wrap
* @return the new Disposable instance
*/
public static Disposable fromRunnable(Runnable run) {
@NonNull
public static Disposable fromRunnable(@NonNull Runnable run) {
ObjectHelper.requireNonNull(run, "run is null");
return new RunnableDisposable(run);
}
Expand All @@ -49,7 +50,8 @@ public static Disposable fromRunnable(Runnable run) {
* @param run the Action to wrap
* @return the new Disposable instance
*/
public static Disposable fromAction(Action run) {
@NonNull
public static Disposable fromAction(@NonNull Action run) {
ObjectHelper.requireNonNull(run, "run is null");
return new ActionDisposable(run);
}
Expand All @@ -60,7 +62,8 @@ public static Disposable fromAction(Action run) {
* @param future the Future to wrap
* @return the new Disposable instance
*/
public static Disposable fromFuture(Future<?> future) {
@NonNull
public static Disposable fromFuture(@NonNull Future<?> future) {
ObjectHelper.requireNonNull(future, "future is null");
return fromFuture(future, true);
}
Expand All @@ -72,7 +75,8 @@ public static Disposable fromFuture(Future<?> future) {
* @param allowInterrupt if true, the future cancel happens via Future.cancel(true)
* @return the new Disposable instance
*/
public static Disposable fromFuture(Future<?> future, boolean allowInterrupt) {
@NonNull
public static Disposable fromFuture(@NonNull Future<?> future, boolean allowInterrupt) {
ObjectHelper.requireNonNull(future, "future is null");
return new FutureDisposable(future, allowInterrupt);
}
Expand All @@ -83,7 +87,8 @@ public static Disposable fromFuture(Future<?> future, boolean allowInterrupt) {
* @param subscription the Runnable to wrap
* @return the new Disposable instance
*/
public static Disposable fromSubscription(Subscription subscription) {
@NonNull
public static Disposable fromSubscription(@NonNull Subscription subscription) {
ObjectHelper.requireNonNull(subscription, "subscription is null");
return new SubscriptionDisposable(subscription);
}
Expand All @@ -92,6 +97,7 @@ public static Disposable fromSubscription(Subscription subscription) {
* Returns a new, non-disposed Disposable instance.
* @return a new, non-disposed Disposable instance
*/
@NonNull
public static Disposable empty() {
return fromRunnable(Functions.EMPTY_RUNNABLE);
}
Expand All @@ -100,6 +106,7 @@ public static Disposable empty() {
* Returns a disposed Disposable instance.
* @return a disposed Disposable instance
*/
@NonNull
public static Disposable disposed() {
return EmptyDisposable.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.annotations.NonNull;
import io.reactivex.internal.functions.ObjectHelper;

/**
Expand All @@ -31,7 +32,7 @@ abstract class ReferenceDisposable<T> extends AtomicReference<T> implements Disp
super(ObjectHelper.requireNonNull(value, "value is null"));
}

protected abstract void onDisposed(T value);
protected abstract void onDisposed(@NonNull T value);

@Override
public final void dispose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package io.reactivex.disposables;

import io.reactivex.annotations.NonNull;

/**
* A disposable container that manages a Runnable instance.
*/
Expand All @@ -24,7 +26,7 @@ final class RunnableDisposable extends ReferenceDisposable<Runnable> {
}

@Override
protected void onDisposed(Runnable value) {
protected void onDisposed(@NonNull Runnable value) {
value.run();
}

Expand Down
10 changes: 6 additions & 4 deletions src/main/java/io/reactivex/disposables/SerialDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.internal.disposables.*;
import io.reactivex.annotations.Nullable;
import io.reactivex.internal.disposables.DisposableHelper;

/**
* A Disposable container that allows atomically updating/replacing the contained
Expand All @@ -36,7 +37,7 @@ public SerialDisposable() {
* Constructs a SerialDisposable with the given initial Disposable instance.
* @param initialDisposable the initial Disposable instance to use, null allowed
*/
public SerialDisposable(Disposable initialDisposable) {
public SerialDisposable(@Nullable Disposable initialDisposable) {
this.resource = new AtomicReference<Disposable>(initialDisposable);
}

Expand All @@ -47,7 +48,7 @@ public SerialDisposable(Disposable initialDisposable) {
* @return true if the operation succeeded, false if the container has been disposed
* @see #replace(Disposable)
*/
public boolean set(Disposable next) {
public boolean set(@Nullable Disposable next) {
return DisposableHelper.set(resource, next);
}

Expand All @@ -58,14 +59,15 @@ public boolean set(Disposable next) {
* @return true if the operation succeeded, false if the container has been disposed
* @see #set(Disposable)
*/
public boolean replace(Disposable next) {
public boolean replace(@Nullable Disposable next) {
return DisposableHelper.replace(resource, next);
}

/**
* Returns the currently contained Disposable or null if this container is empty.
* @return the current Disposable, may be null
*/
@Nullable
public Disposable get() {
Disposable d = resource.get();
if (d == DisposableHelper.DISPOSED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package io.reactivex.disposables;

import io.reactivex.annotations.NonNull;
import org.reactivestreams.Subscription;

/**
Expand All @@ -26,7 +27,7 @@ final class SubscriptionDisposable extends ReferenceDisposable<Subscription> {
}

@Override
protected void onDisposed(Subscription value) {
protected void onDisposed(@NonNull Subscription value) {
value.cancel();
}
}
Loading

0 comments on commit fa58d36

Please sign in to comment.