From 5e47f78712b7a3b0f018554a4661fc6535c907c7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 16 Apr 2015 20:42:46 +0200 Subject: [PATCH] Proposal: standardized Subject state-peeking methods. --- src/main/java/rx/subjects/AsyncSubject.java | 27 ++ .../java/rx/subjects/BehaviorSubject.java | 25 ++ src/main/java/rx/subjects/PublishSubject.java | 27 ++ src/main/java/rx/subjects/ReplaySubject.java | 66 ++- .../java/rx/subjects/SerializedSubject.java | 36 ++ src/main/java/rx/subjects/Subject.java | 89 ++++ .../rx/subjects/SerializedSubjectTest.java | 384 ++++++++++++++++++ 7 files changed, 645 insertions(+), 9 deletions(-) diff --git a/src/main/java/rx/subjects/AsyncSubject.java b/src/main/java/rx/subjects/AsyncSubject.java index 5d668f1be6..c186b1f78c 100644 --- a/src/main/java/rx/subjects/AsyncSubject.java +++ b/src/main/java/rx/subjects/AsyncSubject.java @@ -15,6 +15,7 @@ */ package rx.subjects; +import java.lang.reflect.Array; import java.util.*; import rx.Observer; @@ -141,6 +142,7 @@ public boolean hasObservers() { * @return true if and only if the subject has some value but not an error */ @Experimental + @Override public boolean hasValue() { Object v = lastValue; Object o = state.get(); @@ -151,6 +153,7 @@ public boolean hasValue() { * @return true if the subject has received a throwable through {@code onError}. */ @Experimental + @Override public boolean hasThrowable() { Object o = state.get(); return nl.isError(o); @@ -160,6 +163,7 @@ public boolean hasThrowable() { * @return true if the subject completed normally via {@code onCompleted()} */ @Experimental + @Override public boolean hasCompleted() { Object o = state.get(); return o != null && !nl.isError(o); @@ -174,6 +178,7 @@ public boolean hasCompleted() { * has terminated with an exception or has an actual {@code null} as a value. */ @Experimental + @Override public T getValue() { Object v = lastValue; Object o = state.get(); @@ -188,6 +193,7 @@ public T getValue() { * subject hasn't terminated yet or it terminated normally. */ @Experimental + @Override public Throwable getThrowable() { Object o = state.get(); if (nl.isError(o)) { @@ -195,4 +201,25 @@ public Throwable getThrowable() { } return null; } + @Override + @Experimental + @SuppressWarnings("unchecked") + public T[] getValues(T[] a) { + Object v = lastValue; + Object o = state.get(); + if (!nl.isError(o) && nl.isNext(v)) { + T val = nl.getValue(v); + if (a.length == 0) { + a = (T[])Array.newInstance(a.getClass().getComponentType(), 1); + } + a[0] = val; + if (a.length > 1) { + a[1] = null; + } + } else + if (a.length > 0) { + a[0] = null; + } + return a; + } } diff --git a/src/main/java/rx/subjects/BehaviorSubject.java b/src/main/java/rx/subjects/BehaviorSubject.java index f2a47a8d17..218eef5eba 100644 --- a/src/main/java/rx/subjects/BehaviorSubject.java +++ b/src/main/java/rx/subjects/BehaviorSubject.java @@ -16,6 +16,7 @@ package rx.subjects; +import java.lang.reflect.Array; import java.util.*; import rx.Observer; @@ -177,6 +178,7 @@ public boolean hasObservers() { * @return true if and only if the subject has some value and hasn't terminated yet. */ @Experimental + @Override public boolean hasValue() { Object o = state.get(); return nl.isNext(o); @@ -186,6 +188,7 @@ public boolean hasValue() { * @return true if the subject has received a throwable through {@code onError}. */ @Experimental + @Override public boolean hasThrowable() { Object o = state.get(); return nl.isError(o); @@ -195,6 +198,7 @@ public boolean hasThrowable() { * @return true if the subject completed normally via {@code onCompleted()} */ @Experimental + @Override public boolean hasCompleted() { Object o = state.get(); return nl.isCompleted(o); @@ -209,6 +213,7 @@ public boolean hasCompleted() { * has terminated or has an actual {@code null} as a valid value. */ @Experimental + @Override public T getValue() { Object o = state.get(); if (nl.isNext(o)) { @@ -222,6 +227,7 @@ public T getValue() { * subject hasn't terminated yet or it terminated normally. */ @Experimental + @Override public Throwable getThrowable() { Object o = state.get(); if (nl.isError(o)) { @@ -229,4 +235,23 @@ public Throwable getThrowable() { } return null; } + @Override + @Experimental + @SuppressWarnings("unchecked") + public T[] getValues(T[] a) { + Object o = state.get(); + if (nl.isNext(o)) { + if (a.length == 0) { + a = (T[])Array.newInstance(a.getClass().getComponentType(), 1); + } + a[0] = nl.getValue(o); + if (a.length > 1) { + a[1] = null; + } + } else + if (a.length > 0) { + a[0] = null; + } + return a; + } } diff --git a/src/main/java/rx/subjects/PublishSubject.java b/src/main/java/rx/subjects/PublishSubject.java index 0a3292ae50..1197048c3f 100644 --- a/src/main/java/rx/subjects/PublishSubject.java +++ b/src/main/java/rx/subjects/PublishSubject.java @@ -125,6 +125,7 @@ public boolean hasObservers() { * @return true if the subject has received a throwable through {@code onError}. */ @Experimental + @Override public boolean hasThrowable() { Object o = state.get(); return nl.isError(o); @@ -134,6 +135,7 @@ public boolean hasThrowable() { * @return true if the subject completed normally via {@code onCompleted} */ @Experimental + @Override public boolean hasCompleted() { Object o = state.get(); return o != null && !nl.isError(o); @@ -144,6 +146,7 @@ public boolean hasCompleted() { * subject hasn't terminated yet or it terminated normally. */ @Experimental + @Override public Throwable getThrowable() { Object o = state.get(); if (nl.isError(o)) { @@ -151,4 +154,28 @@ public Throwable getThrowable() { } return null; } + + @Override + @Experimental + public boolean hasValue() { + return false; + } + @Override + @Experimental + public T getValue() { + return null; + } + @Override + @Experimental + public Object[] getValues() { + return new Object[0]; + } + @Override + @Experimental + public T[] getValues(T[] a) { + if (a.length > 0) { + a[0] = null; + } + return a; + } } diff --git a/src/main/java/rx/subjects/ReplaySubject.java b/src/main/java/rx/subjects/ReplaySubject.java index 5d6292be19..418a7d7f4e 100644 --- a/src/main/java/rx/subjects/ReplaySubject.java +++ b/src/main/java/rx/subjects/ReplaySubject.java @@ -550,12 +550,30 @@ public T[] toArray(T[] a) { for (int i = 0; i < s; i++) { a[i] = (T)list.get(i); } - if (s < a.length - 1) { + if (a.length > s) { a[s] = null; } + } else + if (a.length > 0) { + a[0] = null; } return a; } + @Override + public T latest() { + int idx = index; + if (idx > 0) { + Object o = list.get(idx - 1); + if (nl.isCompleted(o) || nl.isError(o)) { + if (idx > 1) { + return nl.getValue(list.get(idx - 2)); + } + return null; + } + return nl.getValue(o); + } + return null; + } } @@ -715,6 +733,27 @@ public T[] toArray(T[] a) { } return list.toArray(a); } + @Override + public T latest() { + Node h = head().next; + if (h == null) { + return null; + } + Node p = null; + while (h != tail()) { + p = h; + h = h.next; + } + Object value = leaveTransform.call(h.value); + if (nl.isError(value) || nl.isCompleted(value)) { + if (p != null) { + value = leaveTransform.call(p.value); + return nl.getValue(value); + } + return null; + } + return nl.getValue(value); + } } // ************** @@ -781,6 +820,12 @@ I replayObserverFromIndexTest( * @return the array or a new array containing the current values */ T[] toArray(T[] a); + /** + * Returns the latest value that has been buffered or null if no such value + * present. + * @return the latest value buffered or null if none + */ + T latest(); } /** Interface to manage eviction checking. */ @@ -1054,6 +1099,7 @@ public void evictFinal(NodeList list) { * @return true if the subject has received a throwable through {@code onError}. */ @Experimental + @Override public boolean hasThrowable() { NotificationLite nl = ssm.nl; Object o = ssm.get(); @@ -1064,6 +1110,7 @@ public boolean hasThrowable() { * @return true if the subject completed normally via {@code onCompleted} */ @Experimental + @Override public boolean hasCompleted() { NotificationLite nl = ssm.nl; Object o = ssm.get(); @@ -1075,6 +1122,7 @@ public boolean hasCompleted() { * subject hasn't terminated yet or it terminated normally. */ @Experimental + @Override public Throwable getThrowable() { NotificationLite nl = ssm.nl; Object o = ssm.get(); @@ -1098,15 +1146,10 @@ public int size() { public boolean hasAnyValue() { return !state.isEmpty(); } - /** An empty array to trigger getValues() to return a new array. */ - private static final Object[] EMPTY_ARRAY = new Object[0]; - /** - * @return returns a snapshot of the currently buffered non-terminal events. - */ - @SuppressWarnings("unchecked") @Experimental - public Object[] getValues() { - return state.toArray((T[])EMPTY_ARRAY); + @Override + public boolean hasValue() { + return hasAnyValue(); } /** * Returns a snapshot of the currently buffered non-terminal events into @@ -1115,7 +1158,12 @@ public Object[] getValues() { * @return the array {@code a} if it had enough capacity or a new array containing the available values */ @Experimental + @Override public T[] getValues(T[] a) { return state.toArray(a); } + @Override + public T getValue() { + return state.latest(); + } } diff --git a/src/main/java/rx/subjects/SerializedSubject.java b/src/main/java/rx/subjects/SerializedSubject.java index 5a86a35137..baaf50b8d4 100644 --- a/src/main/java/rx/subjects/SerializedSubject.java +++ b/src/main/java/rx/subjects/SerializedSubject.java @@ -16,6 +16,7 @@ package rx.subjects; import rx.Subscriber; +import rx.annotations.Experimental; import rx.observers.SerializedObserver; /** @@ -68,4 +69,39 @@ public void onNext(T t) { public boolean hasObservers() { return actual.hasObservers(); } + @Override + @Experimental + public boolean hasCompleted() { + return actual.hasCompleted(); + } + @Override + @Experimental + public boolean hasThrowable() { + return actual.hasThrowable(); + } + @Override + @Experimental + public boolean hasValue() { + return actual.hasValue(); + } + @Override + @Experimental + public Throwable getThrowable() { + return actual.getThrowable(); + } + @Override + @Experimental + public T getValue() { + return actual.getValue(); + } + @Override + @Experimental + public Object[] getValues() { + return actual.getValues(); + } + @Override + @Experimental + public T[] getValues(T[] a) { + return actual.getValues(a); + } } diff --git a/src/main/java/rx/subjects/Subject.java b/src/main/java/rx/subjects/Subject.java index 525893a82f..5aa8ba4b85 100644 --- a/src/main/java/rx/subjects/Subject.java +++ b/src/main/java/rx/subjects/Subject.java @@ -18,6 +18,7 @@ import rx.Observable; import rx.Observer; import rx.Subscriber; +import rx.annotations.Experimental; /** * Represents an object that is both an Observable and an Observer. @@ -49,6 +50,94 @@ protected Subject(OnSubscribe onSubscribe) { * @return SerializedSubject wrapping the current Subject */ public final SerializedSubject toSerialized() { + if (getClass() == SerializedSubject.class) { + return (SerializedSubject)this; + } return new SerializedSubject(this); } + /** + * Check if the Subject has terminated with an exception. + *

The operation is threadsafe. + * @return true if the subject has received a throwable through {@code onError}. + */ + @Experimental + public boolean hasThrowable() { + throw new UnsupportedOperationException(); + } + /** + * Check if the Subject has terminated normally. + *

The operation is threadsafe. + * @return true if the subject completed normally via {@code onCompleted} + */ + @Experimental + public boolean hasCompleted() { + throw new UnsupportedOperationException(); + } + /** + * Returns the Throwable that terminated the Subject. + *

The operation is threadsafe. + * @return the Throwable that terminated the Subject or {@code null} if the + * subject hasn't terminated yet or it terminated normally. + */ + @Experimental + public Throwable getThrowable() { + throw new UnsupportedOperationException(); + } + /** + * Check if the Subject has any value. + *

Use the {@link #getValue()} method to retrieve such a value. + *

Note that unless {@link #hasCompleted()} or {@link #hasThrowable()} returns true, the value + * retrieved by {@code getValue()} may get outdated. + *

The operation is threadsafe. + * @return true if and only if the subject has some value but not an error + */ + @Experimental + public boolean hasValue() { + throw new UnsupportedOperationException(); + } + /** + * Returns the current or latest value of the Subject if there is such a value and + * the subject hasn't terminated with an exception. + *

The method can return {@code null} for various reasons. Use {@link #hasValue()}, {@link #hasThrowable()} + * and {@link #hasCompleted()} to determine if such {@code null} is a valid value, there was an + * exception or the Subject terminated without receiving any value. + *

The operation is threadsafe. + * @return the current value or {@code null} if the Subject doesn't have a value, + * has terminated with an exception or has an actual {@code null} as a value. + */ + @Experimental + public T getValue() { + throw new UnsupportedOperationException(); + } + /** An empty array to trigger getValues() to return a new array. */ + private static final Object[] EMPTY_ARRAY = new Object[0]; + /** + * Returns a snapshot of the currently buffered non-terminal events. + *

The operation is threadsafe. + * @return a snapshot of the currently buffered non-terminal events. + */ + @SuppressWarnings("unchecked") + @Experimental + public Object[] getValues() { + T[] r = getValues((T[])EMPTY_ARRAY); + if (r == EMPTY_ARRAY) { + return new Object[0]; // don't leak the default empty array. + } + return r; + } + /** + * Returns a snapshot of the currently buffered non-terminal events into + * the provided {@code a} array or creates a new array if it has not enough capacity. + *

If the subject's values fit in the specified array with room to spare + * (i.e., the array has more elements than the list), the element in + * the array immediately following the end of the subject's values is set to + * null. + *

The operation is threadsafe. + * @param a the array to fill in + * @return the array {@code a} if it had enough capacity or a new array containing the available values + */ + @Experimental + public T[] getValues(T[] a) { + throw new UnsupportedOperationException(); + } } diff --git a/src/test/java/rx/subjects/SerializedSubjectTest.java b/src/test/java/rx/subjects/SerializedSubjectTest.java index bba2ce1e5d..097fcd311e 100644 --- a/src/test/java/rx/subjects/SerializedSubjectTest.java +++ b/src/test/java/rx/subjects/SerializedSubjectTest.java @@ -15,10 +15,13 @@ */ package rx.subjects; +import static org.junit.Assert.*; + import java.util.Arrays; import org.junit.Test; +import rx.exceptions.TestException; import rx.observers.TestSubscriber; public class SerializedSubjectTest { @@ -33,4 +36,385 @@ public void testBasic() { ts.awaitTerminalEvent(); ts.assertReceivedOnNext(Arrays.asList("hello")); } + + @Test + public void testAsyncSubjectValueRelay() { + AsyncSubject async = AsyncSubject.create(); + async.onNext(1); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertEquals((Integer)1, serial.getValue()); + assertTrue(serial.hasValue()); + assertArrayEquals(new Object[] { 1 }, serial.getValues()); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { 1, null }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testAsyncSubjectValueEmpty() { + AsyncSubject async = AsyncSubject.create(); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testAsyncSubjectValueError() { + AsyncSubject async = AsyncSubject.create(); + TestException te = new TestException(); + async.onError(te); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertTrue(serial.hasThrowable()); + assertSame(te, serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testPublishSubjectValueRelay() { + PublishSubject async = PublishSubject.create(); + async.onNext(1); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + + assertArrayEquals(new Object[0], serial.getValues()); + assertArrayEquals(new Integer[0], serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + + @Test + public void testPublishSubjectValueEmpty() { + PublishSubject async = PublishSubject.create(); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testPublishSubjectValueError() { + PublishSubject async = PublishSubject.create(); + TestException te = new TestException(); + async.onError(te); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertTrue(serial.hasThrowable()); + assertSame(te, serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + + @Test + public void testBehaviorSubjectValueRelay() { + BehaviorSubject async = BehaviorSubject.create(); + async.onNext(1); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testBehaviorSubjectValueRelayIncomplete() { + BehaviorSubject async = BehaviorSubject.create(); + async.onNext(1); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertEquals((Integer)1, serial.getValue()); + assertTrue(serial.hasValue()); + assertArrayEquals(new Object[] { 1 }, serial.getValues()); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { 1, null }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testBehaviorSubjectIncompleteEmpty() { + BehaviorSubject async = BehaviorSubject.create(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testBehaviorSubjectEmpty() { + BehaviorSubject async = BehaviorSubject.create(); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testBehaviorSubjectError() { + BehaviorSubject async = BehaviorSubject.create(); + TestException te = new TestException(); + async.onError(te); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertTrue(serial.hasThrowable()); + assertSame(te, serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + + @Test + public void testReplaySubjectValueRelay() { + ReplaySubject async = ReplaySubject.create(); + async.onNext(1); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertEquals((Integer)1, serial.getValue()); + assertTrue(serial.hasValue()); + assertArrayEquals(new Object[] { 1 }, serial.getValues()); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { 1, null }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testReplaySubjectValueRelayIncomplete() { + ReplaySubject async = ReplaySubject.create(); + async.onNext(1); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertEquals((Integer)1, serial.getValue()); + assertTrue(serial.hasValue()); + assertArrayEquals(new Object[] { 1 }, serial.getValues()); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { 1, null }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testReplaySubjectValueRelayBounded() { + ReplaySubject async = ReplaySubject.createWithSize(1); + async.onNext(0); + async.onNext(1); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertEquals((Integer)1, serial.getValue()); + assertTrue(serial.hasValue()); + assertArrayEquals(new Object[] { 1 }, serial.getValues()); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { 1, null }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testReplaySubjectValueRelayBoundedIncomplete() { + ReplaySubject async = ReplaySubject.createWithSize(1); + async.onNext(0); + async.onNext(1); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertEquals((Integer)1, serial.getValue()); + assertTrue(serial.hasValue()); + assertArrayEquals(new Object[] { 1 }, serial.getValues()); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { 1 }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { 1, null }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testReplaySubjectValueRelayBoundedEmptyIncomplete() { + ReplaySubject async = ReplaySubject.createWithSize(1); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testReplaySubjectValueRelayEmptyIncomplete() { + ReplaySubject async = ReplaySubject.create(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + + @Test + public void testReplaySubjectEmpty() { + ReplaySubject async = ReplaySubject.create(); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testReplaySubjectError() { + ReplaySubject async = ReplaySubject.create(); + TestException te = new TestException(); + async.onError(te); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertTrue(serial.hasThrowable()); + assertSame(te, serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + + @Test + public void testReplaySubjectBoundedEmpty() { + ReplaySubject async = ReplaySubject.createWithSize(1); + async.onCompleted(); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertTrue(serial.hasCompleted()); + assertFalse(serial.hasThrowable()); + assertNull(serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + @Test + public void testReplaySubjectBoundedError() { + ReplaySubject async = ReplaySubject.createWithSize(1); + TestException te = new TestException(); + async.onError(te); + Subject serial = async.toSerialized(); + + assertFalse(serial.hasObservers()); + assertFalse(serial.hasCompleted()); + assertTrue(serial.hasThrowable()); + assertSame(te, serial.getThrowable()); + assertNull(serial.getValue()); + assertFalse(serial.hasValue()); + assertArrayEquals(new Object[] { }, serial.getValues()); + assertArrayEquals(new Integer[] { }, serial.getValues(new Integer[0])); + assertArrayEquals(new Integer[] { null }, serial.getValues(new Integer[] { 0 })); + assertArrayEquals(new Integer[] { null, 0 }, serial.getValues(new Integer[] { 0, 0 })); + } + + @Test + public void testDontWrapSerializedSubjectAgain() { + PublishSubject s = PublishSubject.create(); + Subject s1 = s.toSerialized(); + Subject s2 = s1.toSerialized(); + assertSame(s1, s2); + } }