From 1c00aa6cc578bfc5e792aa5409ff1f336529ac6d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 30 May 2014 11:05:14 +0200 Subject: [PATCH 1/2] Removal of ConcurrentHashMap from ReplaySubject and some NotificationLite cleanup. --- .../main/java/rx/subjects/AsyncSubject.java | 4 +- .../java/rx/subjects/BehaviorSubject.java | 8 +-- .../main/java/rx/subjects/PublishSubject.java | 10 +-- .../main/java/rx/subjects/ReplaySubject.java | 69 ++++--------------- .../subjects/SubjectSubscriptionManager.java | 45 ++++++++---- .../main/java/rx/subjects/TestSubject.java | 2 +- 6 files changed, 58 insertions(+), 80 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index 7ec8075675..2810fdbe83 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -54,7 +54,7 @@ public final class AsyncSubject extends Subject { /** * Creates and returns a new {@code AsyncSubject}. - * + * @param the result value type * @return the new {@code AsyncSubject} */ public static AsyncSubject create() { @@ -63,7 +63,7 @@ public static AsyncSubject create() { @Override public void call(SubjectObserver o) { Object v = state.get(); - o.accept(v); + o.accept(v, state.nl); NotificationLite nl = NotificationLite.instance(); if (v == null || (!nl.isCompleted(v) && !nl.isError(v))) { o.onCompleted(); diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 726c930680..70d9430358 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -99,7 +99,7 @@ private static BehaviorSubject create(T defaultValue, boolean hasDefault) @Override public void call(SubjectObserver o) { - o.emitFirst(state.get()); + o.emitFirst(state.get(), state.nl); } }; @@ -121,7 +121,7 @@ public void onCompleted() { if (last == null || state.active) { Object n = nl.completed(); for (SubjectObserver bo : state.terminate(n)) { - bo.emitNext(n); + bo.emitNext(n, state.nl); } } } @@ -132,7 +132,7 @@ public void onError(Throwable e) { if (last == null || state.active) { Object n = nl.error(e); for (SubjectObserver bo : state.terminate(n)) { - bo.emitNext(n); + bo.emitNext(n, state.nl); } } } @@ -143,7 +143,7 @@ public void onNext(T v) { if (last == null || state.active) { Object n = nl.next(v); for (SubjectObserver bo : state.next(n)) { - bo.emitNext(n); + bo.emitNext(n, state.nl); } } } diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index b547a0da27..93763e7eed 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -50,19 +50,19 @@ public final class PublishSubject extends Subject { /** * Creates and returns a new {@code PublishSubject}. * + * @param the value type * @return the new {@code PublishSubject} */ public static PublishSubject create() { final SubjectSubscriptionManager state = new SubjectSubscriptionManager(); - state.onAdded = new Action1>() { + state.onTerminated = new Action1>() { @Override public void call(SubjectObserver o) { - o.emitFirst(state.get()); + o.emitFirst(state.get(), state.nl); } }; - state.onTerminated = state.onAdded; return new PublishSubject(state, state); } @@ -79,7 +79,7 @@ public void onCompleted() { if (state.active) { Object n = nl.completed(); for (SubjectObserver bo : state.terminate(n)) { - bo.emitNext(n); + bo.emitNext(n, state.nl); } } @@ -90,7 +90,7 @@ public void onError(final Throwable e) { if (state.active) { Object n = nl.error(e); for (SubjectObserver bo : state.terminate(n)) { - bo.emitNext(n); + bo.emitNext(n, state.nl); } } } diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 991634b8ea..c1647f9bef 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -16,10 +16,8 @@ package rx.subjects; import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import rx.Observer; import rx.Scheduler; @@ -98,13 +96,13 @@ public void call(SubjectObserver o) { int lastIndex = state.replayObserverFromIndex(0, o); // now that it is caught up add to observers - state.replayState.put(o, lastIndex); + o.index(lastIndex); } }; ssm.onTerminated = new Action1>() { @Override public void call(SubjectObserver o) { - Integer idx = state.replayState.remove(o); + Integer idx = o.index(); if (idx == null) { idx = 0; } @@ -112,12 +110,6 @@ public void call(SubjectObserver o) { state.replayObserverFromIndex(idx, o); } }; - ssm.onUnsubscribed = new Action1>() { - @Override - public void call(SubjectObserver o) { - state.replayState.remove(o); - } - }; return new ReplaySubject(ssm, ssm, state); } @@ -273,20 +265,13 @@ static final ReplaySubject createWithState(final BoundedState state, @Override public void call(SubjectObserver t1) { - NodeList.Node l = state.removeState(t1); + NodeList.Node l = t1.index(); if (l == null) { l = state.head(); } state.replayObserverFromIndex(l, t1); } - }; - ssm.onUnsubscribed = new Action1>() { - @Override - public void call(SubjectObserver t1) { - state.removeState(t1); - } - }; return new ReplaySubject(ssm, ssm, state); @@ -341,7 +326,7 @@ public void onCompleted() { * @return Returns the number of subscribers. */ /* Support test. */int subscriberCount() { - return state.replayStateSize(); + return ssm.state.observers.length; } private boolean caughtUp(SubjectObserver o) { @@ -364,8 +349,6 @@ private boolean caughtUp(SubjectObserver o) { * @param the input and output type */ static final class UnboundedReplayState implements ReplayState { - /** Each Observer is tracked here for what events they have received. */ - final ConcurrentHashMap, Integer> replayState; private final NotificationLite nl = NotificationLite.instance(); /** The buffer. */ private final ArrayList list; @@ -378,7 +361,6 @@ static final class UnboundedReplayState implements ReplayState { = AtomicIntegerFieldUpdater.newUpdater(UnboundedReplayState.class, "index"); public UnboundedReplayState(int initialCapacity) { list = new ArrayList(initialCapacity); - replayState = new ConcurrentHashMap, Integer>(); } @Override @@ -417,10 +399,10 @@ public boolean terminated() { @Override public void replayObserver(SubjectObserver observer) { - Integer lastEmittedLink = replayState.get(observer); + Integer lastEmittedLink = observer.index(); if (lastEmittedLink != null) { int l = replayObserverFromIndex(lastEmittedLink, observer); - replayState.put(observer, l); + observer.index(l); } else { throw new IllegalStateException("failed to find lastEmittedLink for: " + observer); } @@ -441,12 +423,6 @@ public Integer replayObserverFromIndex(Integer idx, SubjectObserver o public Integer replayObserverFromIndexTest(Integer idx, SubjectObserver observer, long now) { return replayObserverFromIndex(idx, observer); } - - @Override - public int replayStateSize() { - return replayState.size(); - } - } @@ -456,7 +432,6 @@ public int replayStateSize() { */ static final class BoundedState implements ReplayState> { final NodeList list; - final ConcurrentHashMap, NodeList.Node> replayState; final EvictionPolicy evictionPolicy; final Func1 enterTransform; final Func1 leaveTransform; @@ -468,7 +443,6 @@ public BoundedState(EvictionPolicy evictionPolicy, Func1 enterTr Func1 leaveTransform) { this.list = new NodeList(); this.tail = list.tail; - this.replayState = new ConcurrentHashMap, NodeList.Node>(); this.evictionPolicy = evictionPolicy; this.enterTransform = enterTransform; this.leaveTransform = leaveTransform; @@ -525,21 +499,11 @@ public Node head() { public Node tail() { return tail; } - public Node removeState(SubjectObserver o) { - return replayState.remove(o); - } - public void addState(SubjectObserver o, Node state) { - if (state == null) { - throw new IllegalStateException("Null state!"); - } else { - replayState.put(o, state); - } - } @Override public void replayObserver(SubjectObserver observer) { - NodeList.Node lastEmittedLink = replayState.get(observer); + NodeList.Node lastEmittedLink = observer.index(); NodeList.Node l = replayObserverFromIndex(lastEmittedLink, observer); - addState(observer, l); + observer.index(l); } @Override @@ -565,11 +529,6 @@ public NodeList.Node replayObserverFromIndexTest( public boolean terminated() { return terminated; } - - @Override - public int replayStateSize() { - return replayState.size(); - } } // ************** @@ -584,6 +543,10 @@ public int replayStateSize() { interface ReplayState { /** @return true if the subject has reached a terminal state. */ boolean terminated(); + /** + * Replay contents to the given observer. + * @param observer the receiver of events + */ void replayObserver(SubjectObserver observer); /** * Replay the buffered values from an index position and return a new index @@ -601,10 +564,6 @@ I replayObserverFromIndex( */ I replayObserverFromIndexTest( I idx, SubjectObserver observer, long now); - /** - * @return the size of the replay state map for testing purposes. - */ - int replayStateSize(); /** * Add an OnNext value to the buffer * @param value the value to add @@ -756,7 +715,7 @@ public DefaultOnAdd(BoundedState state) { @Override public void call(SubjectObserver t1) { NodeList.Node l = state.replayObserverFromIndex(state.head(), t1); - state.addState(t1, l); + t1.index(l); } } @@ -783,7 +742,7 @@ public void call(SubjectObserver t1) { // accept all if terminated l = state.replayObserverFromIndex(state.head(), t1); } - state.addState(t1, l); + t1.index(l); } } diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java index 6c6d598e85..77a6ae4e43 100644 --- a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -50,8 +50,8 @@ Action1> onAdded = Actions.empty(); /** Action called when the subscriber wants to subscribe to a terminal state. */ Action1> onTerminated = Actions.empty(); - /** Called when the subscruber explicitly unsubscribes. */ - Action1> onUnsubscribed = Actions.empty(); + /** The notification lite. */ + public final NotificationLite nl = NotificationLite.instance(); @Override public void call(final Subscriber child) { SubjectObserver bo = new SubjectObserver(child); @@ -59,7 +59,6 @@ public void call(final Subscriber child) { onStart.call(bo); if (add(bo) && child.isUnsubscribed()) { remove(bo); - onUnsubscribed.call(bo); } } /** Registers the unsubscribe action for the given subscriber. */ @@ -68,7 +67,6 @@ void addUnsubscriber(Subscriber child, final SubjectObserver bo) { @Override public void call() { remove(bo); - onUnsubscribed.call(bo); } })); } @@ -205,8 +203,6 @@ public State remove(SubjectObserver o) { protected static final class SubjectObserver implements Observer { /** The actual Observer. */ final Observer actual; - /** The NotificationLite to avoid allocating objects for each OnNext value. */ - final NotificationLite nl = NotificationLite.instance(); /** Was the emitFirst run? Guarded by this. */ boolean first = true; /** Guarded by this. */ @@ -214,7 +210,10 @@ protected static final class SubjectObserver implements Observer { /** Guarded by this. */ List queue; /* volatile */boolean fastPath; + /** Indicate that the observer has caught up. */ protected volatile boolean caughtUp; + /** Indicate where the observer is at replaying. */ + private volatile Object index; public SubjectObserver(Observer actual) { this.actual = actual; } @@ -234,8 +233,9 @@ public void onCompleted() { * Emits the given NotificationLite value and * prevents the emitFirst to run if not already run. * @param n the NotificationLite value + * @param nl the type-appropriate notification lite object */ - protected void emitNext(Object n) { + protected void emitNext(Object n, final NotificationLite nl) { if (!fastPath) { synchronized (this) { first = false; @@ -255,8 +255,9 @@ protected void emitNext(Object n) { * Tries to emit a NotificationLite value as the first * value and drains the queue as long as possible. * @param n the NotificationLite value + * @param nl the type-appropriate notification lite object */ - protected void emitFirst(Object n) { + protected void emitFirst(Object n, final NotificationLite nl) { synchronized (this) { if (!first || emitting) { return; @@ -265,27 +266,28 @@ protected void emitFirst(Object n) { emitting = n != null; } if (n != null) { - emitLoop(null, n); + emitLoop(null, n, nl); } } /** * Emits the contents of the queue as long as there are values. * @param localQueue the initial queue contents * @param current the current content to emit + * @param nl the type-appropriate notification lite object */ - protected void emitLoop(List localQueue, Object current) { + protected void emitLoop(List localQueue, Object current, final NotificationLite nl) { boolean once = true; boolean skipFinal = false; try { do { if (localQueue != null) { for (Object n : localQueue) { - accept(n); + accept(n, nl); } } if (once) { once = false; - accept(current); + accept(current, nl); } synchronized (this) { localQueue = queue; @@ -308,8 +310,9 @@ protected void emitLoop(List localQueue, Object current) { /** * Dispatches a NotificationLite value to the actual Observer. * @param n the value to dispatch + * @param nl the type-appropriate notification lite object */ - protected void accept(Object n) { + protected void accept(Object n, final NotificationLite nl) { if (n != null) { nl.accept(actual, n); } @@ -319,5 +322,21 @@ protected void accept(Object n) { protected Observer getActual() { return actual; } + /** + * Returns the stored index. + * @param the index type + * @return the index value + */ + @SuppressWarnings("unchecked") + public I index() { + return (I)index; + } + /** + * Sets a new index value. + * @param newIndex the new index value + */ + public void index(Object newIndex) { + this.index = newIndex; + } } } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subjects/TestSubject.java b/rxjava-core/src/main/java/rx/subjects/TestSubject.java index ec0ed8c28d..334e6c9963 100644 --- a/rxjava-core/src/main/java/rx/subjects/TestSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/TestSubject.java @@ -63,7 +63,7 @@ public static TestSubject create(TestScheduler scheduler) { @Override public void call(SubjectObserver o) { - o.emitFirst(state.get()); + o.emitFirst(state.get(), state.nl); } }; From b195ff8aba9239c716f89aed3f7b8f840db3385f Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 30 May 2014 11:08:05 +0200 Subject: [PATCH 2/2] Remove duplicate NotificationLite. --- rxjava-core/src/main/java/rx/subjects/AsyncSubject.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index 2810fdbe83..e146423d00 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -63,8 +63,8 @@ public static AsyncSubject create() { @Override public void call(SubjectObserver o) { Object v = state.get(); - o.accept(v, state.nl); - NotificationLite nl = NotificationLite.instance(); + NotificationLite nl = state.nl; + o.accept(v, nl); if (v == null || (!nl.isCompleted(v) && !nl.isError(v))) { o.onCompleted(); }