Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: standardized Subject state-peeking methods. #2883

Merged
merged 1 commit into from
Apr 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.subjects;

import java.lang.reflect.Array;
import java.util.*;

import rx.Observer;
Expand Down Expand Up @@ -141,6 +142,7 @@ public boolean hasObservers() {
* @return true if and only if the subject has some value but not an error
*/
@Experimental
@Override
public boolean hasValue() {
Object v = lastValue;
Object o = state.get();
Expand All @@ -151,6 +153,7 @@ public boolean hasValue() {
* @return true if the subject has received a throwable through {@code onError}.
*/
@Experimental
@Override
public boolean hasThrowable() {
Object o = state.get();
return nl.isError(o);
Expand All @@ -160,6 +163,7 @@ public boolean hasThrowable() {
* @return true if the subject completed normally via {@code onCompleted()}
*/
@Experimental
@Override
public boolean hasCompleted() {
Object o = state.get();
return o != null && !nl.isError(o);
Expand All @@ -174,6 +178,7 @@ public boolean hasCompleted() {
* has terminated with an exception or has an actual {@code null} as a value.
*/
@Experimental
@Override
public T getValue() {
Object v = lastValue;
Object o = state.get();
Expand All @@ -188,11 +193,33 @@ public T getValue() {
* subject hasn't terminated yet or it terminated normally.
*/
@Experimental
@Override
public Throwable getThrowable() {
Object o = state.get();
if (nl.isError(o)) {
return nl.getError(o);
}
return null;
}
@Override
@Experimental
@SuppressWarnings("unchecked")
public T[] getValues(T[] a) {
Object v = lastValue;
Object o = state.get();
if (!nl.isError(o) && nl.isNext(v)) {
T val = nl.getValue(v);
if (a.length == 0) {
a = (T[])Array.newInstance(a.getClass().getComponentType(), 1);
}
a[0] = val;
if (a.length > 1) {
a[1] = null;
}
} else
if (a.length > 0) {
a[0] = null;
}
return a;
}
}
25 changes: 25 additions & 0 deletions src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.subjects;


import java.lang.reflect.Array;
import java.util.*;

import rx.Observer;
Expand Down Expand Up @@ -177,6 +178,7 @@ public boolean hasObservers() {
* @return true if and only if the subject has some value and hasn't terminated yet.
*/
@Experimental
@Override
public boolean hasValue() {
Object o = state.get();
return nl.isNext(o);
Expand All @@ -186,6 +188,7 @@ public boolean hasValue() {
* @return true if the subject has received a throwable through {@code onError}.
*/
@Experimental
@Override
public boolean hasThrowable() {
Object o = state.get();
return nl.isError(o);
Expand All @@ -195,6 +198,7 @@ public boolean hasThrowable() {
* @return true if the subject completed normally via {@code onCompleted()}
*/
@Experimental
@Override
public boolean hasCompleted() {
Object o = state.get();
return nl.isCompleted(o);
Expand All @@ -209,6 +213,7 @@ public boolean hasCompleted() {
* has terminated or has an actual {@code null} as a valid value.
*/
@Experimental
@Override
public T getValue() {
Object o = state.get();
if (nl.isNext(o)) {
Expand All @@ -222,11 +227,31 @@ public T getValue() {
* subject hasn't terminated yet or it terminated normally.
*/
@Experimental
@Override
public Throwable getThrowable() {
Object o = state.get();
if (nl.isError(o)) {
return nl.getError(o);
}
return null;
}
@Override
@Experimental
@SuppressWarnings("unchecked")
public T[] getValues(T[] a) {
Object o = state.get();
if (nl.isNext(o)) {
if (a.length == 0) {
a = (T[])Array.newInstance(a.getClass().getComponentType(), 1);
}
a[0] = nl.getValue(o);
if (a.length > 1) {
a[1] = null;
}
} else
if (a.length > 0) {
a[0] = null;
}
return a;
}
}
27 changes: 27 additions & 0 deletions src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public boolean hasObservers() {
* @return true if the subject has received a throwable through {@code onError}.
*/
@Experimental
@Override
public boolean hasThrowable() {
Object o = state.get();
return nl.isError(o);
Expand All @@ -134,6 +135,7 @@ public boolean hasThrowable() {
* @return true if the subject completed normally via {@code onCompleted}
*/
@Experimental
@Override
public boolean hasCompleted() {
Object o = state.get();
return o != null && !nl.isError(o);
Expand All @@ -144,11 +146,36 @@ public boolean hasCompleted() {
* subject hasn't terminated yet or it terminated normally.
*/
@Experimental
@Override
public Throwable getThrowable() {
Object o = state.get();
if (nl.isError(o)) {
return nl.getError(o);
}
return null;
}

@Override
@Experimental
public boolean hasValue() {
return false;
}
@Override
@Experimental
public T getValue() {
return null;
}
@Override
@Experimental
public Object[] getValues() {
return new Object[0];
}
@Override
@Experimental
public T[] getValues(T[] a) {
if (a.length > 0) {
a[0] = null;
}
return a;
}
}
66 changes: 57 additions & 9 deletions src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,30 @@ public T[] toArray(T[] a) {
for (int i = 0; i < s; i++) {
a[i] = (T)list.get(i);
}
if (s < a.length - 1) {
if (a.length > s) {
a[s] = null;
}
} else
if (a.length > 0) {
a[0] = null;
}
return a;
}
@Override
public T latest() {
int idx = index;
if (idx > 0) {
Object o = list.get(idx - 1);
if (nl.isCompleted(o) || nl.isError(o)) {
if (idx > 1) {
return nl.getValue(list.get(idx - 2));
}
return null;
}
return nl.getValue(o);
}
return null;
}
}


Expand Down Expand Up @@ -715,6 +733,27 @@ public T[] toArray(T[] a) {
}
return list.toArray(a);
}
@Override
public T latest() {
Node<Object> h = head().next;
if (h == null) {
return null;
}
Node<Object> p = null;
while (h != tail()) {
p = h;
h = h.next;
}
Object value = leaveTransform.call(h.value);
if (nl.isError(value) || nl.isCompleted(value)) {
if (p != null) {
value = leaveTransform.call(p.value);
return nl.getValue(value);
}
return null;
}
return nl.getValue(value);
}
}

// **************
Expand Down Expand Up @@ -781,6 +820,12 @@ I replayObserverFromIndexTest(
* @return the array or a new array containing the current values
*/
T[] toArray(T[] a);
/**
* Returns the latest value that has been buffered or null if no such value
* present.
* @return the latest value buffered or null if none
*/
T latest();
}

/** Interface to manage eviction checking. */
Expand Down Expand Up @@ -1054,6 +1099,7 @@ public void evictFinal(NodeList<Object> list) {
* @return true if the subject has received a throwable through {@code onError}.
*/
@Experimental
@Override
public boolean hasThrowable() {
NotificationLite<T> nl = ssm.nl;
Object o = ssm.get();
Expand All @@ -1064,6 +1110,7 @@ public boolean hasThrowable() {
* @return true if the subject completed normally via {@code onCompleted}
*/
@Experimental
@Override
public boolean hasCompleted() {
NotificationLite<T> nl = ssm.nl;
Object o = ssm.get();
Expand All @@ -1075,6 +1122,7 @@ public boolean hasCompleted() {
* subject hasn't terminated yet or it terminated normally.
*/
@Experimental
@Override
public Throwable getThrowable() {
NotificationLite<T> nl = ssm.nl;
Object o = ssm.get();
Expand All @@ -1098,15 +1146,10 @@ public int size() {
public boolean hasAnyValue() {
return !state.isEmpty();
}
/** An empty array to trigger getValues() to return a new array. */
private static final Object[] EMPTY_ARRAY = new Object[0];
/**
* @return returns a snapshot of the currently buffered non-terminal events.
*/
@SuppressWarnings("unchecked")
@Experimental
public Object[] getValues() {
return state.toArray((T[])EMPTY_ARRAY);
@Override
public boolean hasValue() {
return hasAnyValue();
}
/**
* Returns a snapshot of the currently buffered non-terminal events into
Expand All @@ -1115,7 +1158,12 @@ public Object[] getValues() {
* @return the array {@code a} if it had enough capacity or a new array containing the available values
*/
@Experimental
@Override
public T[] getValues(T[] a) {
return state.toArray(a);
}
@Override
public T getValue() {
return state.latest();
}
}
36 changes: 36 additions & 0 deletions src/main/java/rx/subjects/SerializedSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.subjects;

import rx.Subscriber;
import rx.annotations.Experimental;
import rx.observers.SerializedObserver;

/**
Expand Down Expand Up @@ -68,4 +69,39 @@ public void onNext(T t) {
public boolean hasObservers() {
return actual.hasObservers();
}
@Override
@Experimental
public boolean hasCompleted() {
return actual.hasCompleted();
}
@Override
@Experimental
public boolean hasThrowable() {
return actual.hasThrowable();
}
@Override
@Experimental
public boolean hasValue() {
return actual.hasValue();
}
@Override
@Experimental
public Throwable getThrowable() {
return actual.getThrowable();
}
@Override
@Experimental
public T getValue() {
return actual.getValue();
}
@Override
@Experimental
public Object[] getValues() {
return actual.getValues();
}
@Override
@Experimental
public T[] getValues(T[] a) {
return actual.getValues(a);
}
}
Loading