diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index 91aa77b942..55fb95fe8b 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -189,6 +189,7 @@ public final class BehaviorProcessor extends FlowableProcessor { * @return the constructed {@link BehaviorProcessor} */ @CheckReturnValue + @NonNull public static BehaviorProcessor create() { return new BehaviorProcessor(); } @@ -205,6 +206,7 @@ public static BehaviorProcessor create() { * @return the constructed {@link BehaviorProcessor} */ @CheckReturnValue + @NonNull public static BehaviorProcessor createDefault(T defaultValue) { ObjectHelper.requireNonNull(defaultValue, "defaultValue is null"); return new BehaviorProcessor(defaultValue); diff --git a/src/main/java/io/reactivex/processors/PublishProcessor.java b/src/main/java/io/reactivex/processors/PublishProcessor.java index d299bffea2..217d9c3a1e 100644 --- a/src/main/java/io/reactivex/processors/PublishProcessor.java +++ b/src/main/java/io/reactivex/processors/PublishProcessor.java @@ -76,6 +76,7 @@ public final class PublishProcessor extends FlowableProcessor { * @return the new PublishProcessor */ @CheckReturnValue + @NonNull public static PublishProcessor create() { return new PublishProcessor(); } diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index 839170fa27..dd47b9be24 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -117,6 +117,7 @@ public final class ReplayProcessor extends FlowableProcessor { * @return the created ReplayProcessor */ @CheckReturnValue + @NonNull public static ReplayProcessor create() { return new ReplayProcessor(new UnboundedReplayBuffer(16)); } @@ -137,6 +138,7 @@ public static ReplayProcessor create() { * @return the created subject */ @CheckReturnValue + @NonNull public static ReplayProcessor create(int capacityHint) { return new ReplayProcessor(new UnboundedReplayBuffer(capacityHint)); } @@ -162,6 +164,7 @@ public static ReplayProcessor create(int capacityHint) { * @return the created subject */ @CheckReturnValue + @NonNull public static ReplayProcessor createWithSize(int maxSize) { return new ReplayProcessor(new SizeBoundReplayBuffer(maxSize)); } @@ -216,6 +219,7 @@ public static ReplayProcessor createWithSize(int maxSize) { * @return the created subject */ @CheckReturnValue + @NonNull public static ReplayProcessor createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) { return new ReplayProcessor(new SizeAndTimeBoundReplayBuffer(Integer.MAX_VALUE, maxAge, unit, scheduler)); } @@ -255,6 +259,7 @@ public static ReplayProcessor createWithTime(long maxAge, TimeUnit unit, * @return the created subject */ @CheckReturnValue + @NonNull public static ReplayProcessor createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) { return new ReplayProcessor(new SizeAndTimeBoundReplayBuffer(maxSize, maxAge, unit, scheduler)); } diff --git a/src/main/java/io/reactivex/processors/UnicastProcessor.java b/src/main/java/io/reactivex/processors/UnicastProcessor.java index 847060feb8..e712f1f5c8 100644 --- a/src/main/java/io/reactivex/processors/UnicastProcessor.java +++ b/src/main/java/io/reactivex/processors/UnicastProcessor.java @@ -18,6 +18,7 @@ import io.reactivex.annotations.Experimental; import io.reactivex.annotations.Nullable; +import io.reactivex.annotations.NonNull; import org.reactivestreams.*; import io.reactivex.internal.functions.ObjectHelper; @@ -74,6 +75,7 @@ public final class UnicastProcessor extends FlowableProcessor { * @return an UnicastSubject instance */ @CheckReturnValue + @NonNull public static UnicastProcessor create() { return new UnicastProcessor(bufferSize()); } @@ -85,6 +87,7 @@ public static UnicastProcessor create() { * @return an UnicastProcessor instance */ @CheckReturnValue + @NonNull public static UnicastProcessor create(int capacityHint) { return new UnicastProcessor(capacityHint); } @@ -98,6 +101,7 @@ public static UnicastProcessor create(int capacityHint) { */ @CheckReturnValue @Experimental + @NonNull public static UnicastProcessor create(boolean delayError) { return new UnicastProcessor(bufferSize(), null, delayError); } @@ -115,6 +119,7 @@ public static UnicastProcessor create(boolean delayError) { * @return an UnicastProcessor instance */ @CheckReturnValue + @NonNull public static UnicastProcessor create(int capacityHint, Runnable onCancelled) { ObjectHelper.requireNonNull(onCancelled, "onTerminate"); return new UnicastProcessor(capacityHint, onCancelled); @@ -136,6 +141,7 @@ public static UnicastProcessor create(int capacityHint, Runnable onCancel */ @CheckReturnValue @Experimental + @NonNull public static UnicastProcessor create(int capacityHint, Runnable onCancelled, boolean delayError) { ObjectHelper.requireNonNull(onCancelled, "onTerminate"); return new UnicastProcessor(capacityHint, onCancelled, delayError); diff --git a/src/main/java/io/reactivex/subjects/AsyncSubject.java b/src/main/java/io/reactivex/subjects/AsyncSubject.java index 519811a803..af5b6e5fdd 100644 --- a/src/main/java/io/reactivex/subjects/AsyncSubject.java +++ b/src/main/java/io/reactivex/subjects/AsyncSubject.java @@ -14,6 +14,7 @@ package io.reactivex.subjects; import io.reactivex.annotations.Nullable; +import io.reactivex.annotations.NonNull; import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; @@ -129,6 +130,7 @@ public final class AsyncSubject extends Subject { * @return the new AsyncProcessor instance */ @CheckReturnValue + @NonNull public static AsyncSubject create() { return new AsyncSubject(); } diff --git a/src/main/java/io/reactivex/subjects/BehaviorSubject.java b/src/main/java/io/reactivex/subjects/BehaviorSubject.java index 3c006beb53..7199ba2d0c 100644 --- a/src/main/java/io/reactivex/subjects/BehaviorSubject.java +++ b/src/main/java/io/reactivex/subjects/BehaviorSubject.java @@ -15,6 +15,7 @@ import io.reactivex.annotations.CheckReturnValue; import io.reactivex.annotations.Nullable; +import io.reactivex.annotations.NonNull; import java.lang.reflect.Array; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.*; @@ -180,6 +181,7 @@ public final class BehaviorSubject extends Subject { * @return the constructed {@link BehaviorSubject} */ @CheckReturnValue + @NonNull public static BehaviorSubject create() { return new BehaviorSubject(); } @@ -196,6 +198,7 @@ public static BehaviorSubject create() { * @return the constructed {@link BehaviorSubject} */ @CheckReturnValue + @NonNull public static BehaviorSubject createDefault(T defaultValue) { return new BehaviorSubject(defaultValue); } diff --git a/src/main/java/io/reactivex/subjects/CompletableSubject.java b/src/main/java/io/reactivex/subjects/CompletableSubject.java index 93c626baa0..2e1b72ec9c 100644 --- a/src/main/java/io/reactivex/subjects/CompletableSubject.java +++ b/src/main/java/io/reactivex/subjects/CompletableSubject.java @@ -18,6 +18,7 @@ import io.reactivex.*; import io.reactivex.annotations.CheckReturnValue; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.plugins.RxJavaPlugins; @@ -101,6 +102,7 @@ public final class CompletableSubject extends Completable implements Completable * @return the new CompletableSubject instance */ @CheckReturnValue + @NonNull public static CompletableSubject create() { return new CompletableSubject(); } diff --git a/src/main/java/io/reactivex/subjects/MaybeSubject.java b/src/main/java/io/reactivex/subjects/MaybeSubject.java index 1fec546d7c..61b80445af 100644 --- a/src/main/java/io/reactivex/subjects/MaybeSubject.java +++ b/src/main/java/io/reactivex/subjects/MaybeSubject.java @@ -129,6 +129,7 @@ public final class MaybeSubject extends Maybe implements MaybeObserver * @return the new MaybeSubject instance */ @CheckReturnValue + @NonNull public static MaybeSubject create() { return new MaybeSubject(); } diff --git a/src/main/java/io/reactivex/subjects/PublishSubject.java b/src/main/java/io/reactivex/subjects/PublishSubject.java index 7d22a81d2d..0bcfe6452c 100644 --- a/src/main/java/io/reactivex/subjects/PublishSubject.java +++ b/src/main/java/io/reactivex/subjects/PublishSubject.java @@ -15,6 +15,7 @@ import io.reactivex.annotations.CheckReturnValue; import io.reactivex.annotations.Nullable; +import io.reactivex.annotations.NonNull; import java.util.concurrent.atomic.*; import io.reactivex.Observer; @@ -114,6 +115,7 @@ public final class PublishSubject extends Subject { * @return the new PublishSubject */ @CheckReturnValue + @NonNull public static PublishSubject create() { return new PublishSubject(); } diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 271a1f7b56..2a553f51d5 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -158,6 +158,7 @@ public final class ReplaySubject extends Subject { * @return the created subject */ @CheckReturnValue + @NonNull public static ReplaySubject create() { return new ReplaySubject(new UnboundedReplayBuffer(16)); } @@ -178,6 +179,7 @@ public static ReplaySubject create() { * @return the created subject */ @CheckReturnValue + @NonNull public static ReplaySubject create(int capacityHint) { return new ReplaySubject(new UnboundedReplayBuffer(capacityHint)); } @@ -203,6 +205,7 @@ public static ReplaySubject create(int capacityHint) { * @return the created subject */ @CheckReturnValue + @NonNull public static ReplaySubject createWithSize(int maxSize) { return new ReplaySubject(new SizeBoundReplayBuffer(maxSize)); } @@ -257,6 +260,7 @@ public static ReplaySubject createWithSize(int maxSize) { * @return the created subject */ @CheckReturnValue + @NonNull public static ReplaySubject createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) { return new ReplaySubject(new SizeAndTimeBoundReplayBuffer(Integer.MAX_VALUE, maxAge, unit, scheduler)); } @@ -296,6 +300,7 @@ public static ReplaySubject createWithTime(long maxAge, TimeUnit unit, Sc * @return the created subject */ @CheckReturnValue + @NonNull public static ReplaySubject createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) { return new ReplaySubject(new SizeAndTimeBoundReplayBuffer(maxSize, maxAge, unit, scheduler)); } diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index 330e2de85a..61700dec55 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -15,6 +15,7 @@ import io.reactivex.annotations.Experimental; import io.reactivex.annotations.Nullable; +import io.reactivex.annotations.NonNull; import io.reactivex.plugins.RxJavaPlugins; import java.util.concurrent.atomic.*; @@ -180,6 +181,7 @@ public final class UnicastSubject extends Subject { * @return an UnicastSubject instance */ @CheckReturnValue + @NonNull public static UnicastSubject create() { return new UnicastSubject(bufferSize(), true); } @@ -191,6 +193,7 @@ public static UnicastSubject create() { * @return an UnicastSubject instance */ @CheckReturnValue + @NonNull public static UnicastSubject create(int capacityHint) { return new UnicastSubject(capacityHint, true); } @@ -208,6 +211,7 @@ public static UnicastSubject create(int capacityHint) { * @return an UnicastSubject instance */ @CheckReturnValue + @NonNull public static UnicastSubject create(int capacityHint, Runnable onTerminate) { return new UnicastSubject(capacityHint, onTerminate, true); } @@ -228,6 +232,7 @@ public static UnicastSubject create(int capacityHint, Runnable onTerminat */ @CheckReturnValue @Experimental + @NonNull public static UnicastSubject create(int capacityHint, Runnable onTerminate, boolean delayError) { return new UnicastSubject(capacityHint, onTerminate, delayError); } @@ -245,6 +250,7 @@ public static UnicastSubject create(int capacityHint, Runnable onTerminat */ @CheckReturnValue @Experimental + @NonNull public static UnicastSubject create(boolean delayError) { return new UnicastSubject(bufferSize(), delayError); }