Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @NonNull annotations to create methods of Subjects and Processors #5930

Merged
merged 1 commit into from
Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/processors/BehaviorProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public final class BehaviorProcessor<T> extends FlowableProcessor<T> {
* @return the constructed {@link BehaviorProcessor}
*/
@CheckReturnValue
@NonNull
public static <T> BehaviorProcessor<T> create() {
return new BehaviorProcessor<T>();
}
Expand All @@ -205,6 +206,7 @@ public static <T> BehaviorProcessor<T> create() {
* @return the constructed {@link BehaviorProcessor}
*/
@CheckReturnValue
@NonNull
public static <T> BehaviorProcessor<T> createDefault(T defaultValue) {
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
return new BehaviorProcessor<T>(defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public final class PublishProcessor<T> extends FlowableProcessor<T> {
* @return the new PublishProcessor
*/
@CheckReturnValue
@NonNull
public static <T> PublishProcessor<T> create() {
return new PublishProcessor<T>();
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/reactivex/processors/ReplayProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public final class ReplayProcessor<T> extends FlowableProcessor<T> {
* @return the created ReplayProcessor
*/
@CheckReturnValue
@NonNull
public static <T> ReplayProcessor<T> create() {
return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(16));
}
Expand All @@ -137,6 +138,7 @@ public static <T> ReplayProcessor<T> create() {
* @return the created subject
*/
@CheckReturnValue
@NonNull
public static <T> ReplayProcessor<T> create(int capacityHint) {
return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(capacityHint));
}
Expand All @@ -162,6 +164,7 @@ public static <T> ReplayProcessor<T> create(int capacityHint) {
* @return the created subject
*/
@CheckReturnValue
@NonNull
public static <T> ReplayProcessor<T> createWithSize(int maxSize) {
return new ReplayProcessor<T>(new SizeBoundReplayBuffer<T>(maxSize));
}
Expand Down Expand Up @@ -216,6 +219,7 @@ public static <T> ReplayProcessor<T> createWithSize(int maxSize) {
* @return the created subject
*/
@CheckReturnValue
@NonNull
public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) {
return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(Integer.MAX_VALUE, maxAge, unit, scheduler));
}
Expand Down Expand Up @@ -255,6 +259,7 @@ public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit,
* @return the created subject
*/
@CheckReturnValue
@NonNull
public static <T> ReplayProcessor<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) {
return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(maxSize, maxAge, unit, scheduler));
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/reactivex/processors/UnicastProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.Nullable;
import io.reactivex.annotations.NonNull;
import org.reactivestreams.*;

import io.reactivex.internal.functions.ObjectHelper;
Expand Down Expand Up @@ -74,6 +75,7 @@ public final class UnicastProcessor<T> extends FlowableProcessor<T> {
* @return an UnicastSubject instance
*/
@CheckReturnValue
@NonNull
public static <T> UnicastProcessor<T> create() {
return new UnicastProcessor<T>(bufferSize());
}
Expand All @@ -85,6 +87,7 @@ public static <T> UnicastProcessor<T> create() {
* @return an UnicastProcessor instance
*/
@CheckReturnValue
@NonNull
public static <T> UnicastProcessor<T> create(int capacityHint) {
return new UnicastProcessor<T>(capacityHint);
}
Expand All @@ -98,6 +101,7 @@ public static <T> UnicastProcessor<T> create(int capacityHint) {
*/
@CheckReturnValue
@Experimental
@NonNull
public static <T> UnicastProcessor<T> create(boolean delayError) {
return new UnicastProcessor<T>(bufferSize(), null, delayError);
}
Expand All @@ -115,6 +119,7 @@ public static <T> UnicastProcessor<T> create(boolean delayError) {
* @return an UnicastProcessor instance
*/
@CheckReturnValue
@NonNull
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled) {
ObjectHelper.requireNonNull(onCancelled, "onTerminate");
return new UnicastProcessor<T>(capacityHint, onCancelled);
Expand All @@ -136,6 +141,7 @@ public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancel
*/
@CheckReturnValue
@Experimental
@NonNull
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled, boolean delayError) {
ObjectHelper.requireNonNull(onCancelled, "onTerminate");
return new UnicastProcessor<T>(capacityHint, onCancelled, delayError);
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.subjects;

import io.reactivex.annotations.Nullable;
import io.reactivex.annotations.NonNull;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -129,6 +130,7 @@ public final class AsyncSubject<T> extends Subject<T> {
* @return the new AsyncProcessor instance
*/
@CheckReturnValue
@NonNull
public static <T> AsyncSubject<T> create() {
return new AsyncSubject<T>();
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/reactivex/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.Nullable;
import io.reactivex.annotations.NonNull;
import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.*;
Expand Down Expand Up @@ -180,6 +181,7 @@ public final class BehaviorSubject<T> extends Subject<T> {
* @return the constructed {@link BehaviorSubject}
*/
@CheckReturnValue
@NonNull
public static <T> BehaviorSubject<T> create() {
return new BehaviorSubject<T>();
}
Expand All @@ -196,6 +198,7 @@ public static <T> BehaviorSubject<T> create() {
* @return the constructed {@link BehaviorSubject}
*/
@CheckReturnValue
@NonNull
public static <T> BehaviorSubject<T> createDefault(T defaultValue) {
return new BehaviorSubject<T>(defaultValue);
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/CompletableSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.reactivex.*;
import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -101,6 +102,7 @@ public final class CompletableSubject extends Completable implements Completable
* @return the new CompletableSubject instance
*/
@CheckReturnValue
@NonNull
public static CompletableSubject create() {
return new CompletableSubject();
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/subjects/MaybeSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public final class MaybeSubject<T> extends Maybe<T> implements MaybeObserver<T>
* @return the new MaybeSubject instance
*/
@CheckReturnValue
@NonNull
public static <T> MaybeSubject<T> create() {
return new MaybeSubject<T>();
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.Nullable;
import io.reactivex.annotations.NonNull;
import java.util.concurrent.atomic.*;

import io.reactivex.Observer;
Expand Down Expand Up @@ -114,6 +115,7 @@ public final class PublishSubject<T> extends Subject<T> {
* @return the new PublishSubject
*/
@CheckReturnValue
@NonNull
public static <T> PublishSubject<T> create() {
return new PublishSubject<T>();
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/reactivex/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public final class ReplaySubject<T> extends Subject<T> {
* @return the created subject
*/
@CheckReturnValue
@NonNull
public static <T> ReplaySubject<T> create() {
return new ReplaySubject<T>(new UnboundedReplayBuffer<T>(16));
}
Expand All @@ -178,6 +179,7 @@ public static <T> ReplaySubject<T> create() {
* @return the created subject
*/
@CheckReturnValue
@NonNull
public static <T> ReplaySubject<T> create(int capacityHint) {
return new ReplaySubject<T>(new UnboundedReplayBuffer<T>(capacityHint));
}
Expand All @@ -203,6 +205,7 @@ public static <T> ReplaySubject<T> create(int capacityHint) {
* @return the created subject
*/
@CheckReturnValue
@NonNull
public static <T> ReplaySubject<T> createWithSize(int maxSize) {
return new ReplaySubject<T>(new SizeBoundReplayBuffer<T>(maxSize));
}
Expand Down Expand Up @@ -257,6 +260,7 @@ public static <T> ReplaySubject<T> createWithSize(int maxSize) {
* @return the created subject
*/
@CheckReturnValue
@NonNull
public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) {
return new ReplaySubject<T>(new SizeAndTimeBoundReplayBuffer<T>(Integer.MAX_VALUE, maxAge, unit, scheduler));
}
Expand Down Expand Up @@ -296,6 +300,7 @@ public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Sc
* @return the created subject
*/
@CheckReturnValue
@NonNull
public static <T> ReplaySubject<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) {
return new ReplaySubject<T>(new SizeAndTimeBoundReplayBuffer<T>(maxSize, maxAge, unit, scheduler));
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/reactivex/subjects/UnicastSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.Nullable;
import io.reactivex.annotations.NonNull;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.atomic.*;
Expand Down Expand Up @@ -180,6 +181,7 @@ public final class UnicastSubject<T> extends Subject<T> {
* @return an UnicastSubject instance
*/
@CheckReturnValue
@NonNull
public static <T> UnicastSubject<T> create() {
return new UnicastSubject<T>(bufferSize(), true);
}
Expand All @@ -191,6 +193,7 @@ public static <T> UnicastSubject<T> create() {
* @return an UnicastSubject instance
*/
@CheckReturnValue
@NonNull
public static <T> UnicastSubject<T> create(int capacityHint) {
return new UnicastSubject<T>(capacityHint, true);
}
Expand All @@ -208,6 +211,7 @@ public static <T> UnicastSubject<T> create(int capacityHint) {
* @return an UnicastSubject instance
*/
@CheckReturnValue
@NonNull
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate) {
return new UnicastSubject<T>(capacityHint, onTerminate, true);
}
Expand All @@ -228,6 +232,7 @@ public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminat
*/
@CheckReturnValue
@Experimental
@NonNull
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError) {
return new UnicastSubject<T>(capacityHint, onTerminate, delayError);
}
Expand All @@ -245,6 +250,7 @@ public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminat
*/
@CheckReturnValue
@Experimental
@NonNull
public static <T> UnicastSubject<T> create(boolean delayError) {
return new UnicastSubject<T>(bufferSize(), delayError);
}
Expand Down