From c1d4ca7ecbd2da2c093aeb512668d6653a7a7609 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 13 Feb 2015 21:46:39 +0100 Subject: [PATCH 1/2] SwitchOnNext: fixed wrong producer --- .../rx/internal/operators/OperatorMerge.java | 39 +++++++------ .../rx/internal/operators/OperatorSwitch.java | 5 +- .../operators/OperatorSwitchTest.java | 56 +++++++++++++++++-- 3 files changed, 76 insertions(+), 24 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 2da1844ca9..dfec24d061 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -194,7 +194,7 @@ private void handleNewSource(Observable t) { } MergeProducer producerIfNeeded = null; // if we have received a request then we need to respect it, otherwise we fast-path - if (mergeProducer.requested != Long.MAX_VALUE) { + if (mergeProducer.requested() != Long.MAX_VALUE) { /** *
 {@code
                  * With this optimization:
@@ -237,7 +237,7 @@ private void handleScalarSynchronousObservable(ScalarSynchronousObservable
              * 
              */
-            if (mergeProducer.requested == Long.MAX_VALUE) {
+            if (mergeProducer.requested() == Long.MAX_VALUE) {
                 handleScalarSynchronousObservableWithoutRequestLimits(t);
             } else {
                 handleScalarSynchronousObservableWithRequestLimits(t);
@@ -274,11 +274,11 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
                 boolean moreToDrain;
                 boolean isReturn = false;
                 try {
-                    long r = mergeProducer.requested;
+                    long r = mergeProducer.requested();
                     if (r > 0) {
                         emitted = true;
                         actual.onNext(t.get());
-                        MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
+                        mergeProducer.getAndAdd(-1);
                         // we handle this Observable without ever incrementing the wip or touching other machinery so just return here
                         isReturn = true;
                     }
@@ -376,7 +376,7 @@ private void drainChildrenQueues() {
         private int drainScalarValueQueue() {
             RxRingBuffer svq = scalarValueQueue;
             if (svq != null) {
-                long r = mergeProducer.requested;
+                long r = mergeProducer.requested();
                 int emittedWhileDraining = 0;
                 if (r < 0) {
                     // drain it all
@@ -398,7 +398,7 @@ private int drainScalarValueQueue() {
                         }
                     }
                     // decrement the number we emitted from outstanding requests
-                    MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
+                    mergeProducer.getAndAdd(-emittedWhileDraining);
                 }
                 return emittedWhileDraining;
             }
@@ -410,7 +410,7 @@ private int drainScalarValueQueue() {
             @Override
             public Boolean call(InnerSubscriber s) {
                 if (s.q != null) {
-                    long r = mergeProducer.requested;
+                    long r = mergeProducer.requested();
                     int emitted = s.drainQueue();
                     if (emitted > 0) {
                         s.requestMore(emitted);
@@ -533,19 +533,26 @@ public MergeProducer(MergeSubscriber ms) {
             this.ms = ms;
         }
 
-        private volatile long requested = 0;
+        private volatile long rq = 0;
         @SuppressWarnings("rawtypes")
-        static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested");
+        static final AtomicLongFieldUpdater RQ = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "rq");
 
+        public long requested() {
+            return rq;
+        }
+        public long getAndAdd(long n) {
+            return RQ.getAndAdd(this, n);
+        }
+        
         @Override
         public void request(long n) {
-            if (requested == Long.MAX_VALUE) {
+            if (rq == Long.MAX_VALUE) {
                 return;
             }
             if (n == Long.MAX_VALUE) {
-                requested = Long.MAX_VALUE;
+                rq = Long.MAX_VALUE;
             } else {
-                BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
+                BackpressureUtils.getAndAddRequest(RQ, this, n);
                 if (ms.drainQueuesIfNeeded()) {
                     boolean sendComplete = false;
                     synchronized (ms) {
@@ -668,7 +675,7 @@ private void emit(T t, boolean complete) {
                     } else {
                         // this needs to check q.count() as draining above may not have drained the full queue
                         // perf tests show this to be okay, though different queue implementations could perform poorly with this
-                        if (producer.requested > 0 && q.count() == 0) {
+                        if (producer.requested() > 0 && q.count() == 0) {
                             if (complete) {
                                 parentSubscriber.completeInner(this);
                             } else {
@@ -679,7 +686,7 @@ private void emit(T t, boolean complete) {
                                     onError(OnErrorThrowable.addValueAsLastCause(e, t));
                                 }
                                 emitted++;
-                                MergeProducer.REQUESTED.decrementAndGet(producer);
+                                producer.getAndAdd(-1);
                             }
                         } else {
                             // no requests available, so enqueue it
@@ -728,7 +735,7 @@ private void enqueue(T t, boolean complete) {
         private int drainRequested() {
             int emitted = 0;
             // drain what was requested
-            long toEmit = producer.requested;
+            long toEmit = producer.requested();
             Object o;
             for (int i = 0; i < toEmit; i++) {
                 o = q.poll();
@@ -750,7 +757,7 @@ private int drainRequested() {
             }
 
             // decrement the number we emitted from outstanding requests
-            MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
+            producer.getAndAdd(-emitted);
             return emitted;
         }
 
diff --git a/src/main/java/rx/internal/operators/OperatorSwitch.java b/src/main/java/rx/internal/operators/OperatorSwitch.java
index 7ee71084aa..eae4d3aa67 100644
--- a/src/main/java/rx/internal/operators/OperatorSwitch.java
+++ b/src/main/java/rx/internal/operators/OperatorSwitch.java
@@ -49,7 +49,9 @@ public static  OperatorSwitch instance() {
     private OperatorSwitch() { }
     @Override
     public Subscriber> call(final Subscriber child) {
-        return new SwitchSubscriber(child);
+        SwitchSubscriber sws = new SwitchSubscriber(child);
+        child.add(sws);
+        return sws;
     }
 
     private static final class SwitchSubscriber extends Subscriber> {
@@ -75,7 +77,6 @@ private static final class SwitchSubscriber extends Subscriber child) {
-            super(child);
             s = new SerializedSubscriber(child);
             ssub = new SerialSubscription();
             child.add(ssub);
diff --git a/src/test/java/rx/internal/operators/OperatorSwitchTest.java b/src/test/java/rx/internal/operators/OperatorSwitchTest.java
index a3a05f3b37..0efc388db1 100644
--- a/src/test/java/rx/internal/operators/OperatorSwitchTest.java
+++ b/src/test/java/rx/internal/operators/OperatorSwitchTest.java
@@ -18,23 +18,25 @@
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
 
-import rx.*;
+import rx.Observable;
+import rx.Observer;
+import rx.Producer;
+import rx.Scheduler;
+import rx.Subscriber;
 import rx.exceptions.TestException;
 import rx.functions.Action0;
+import rx.functions.Func1;
 import rx.observers.TestSubscriber;
 import rx.schedulers.TestScheduler;
 
@@ -530,4 +532,46 @@ public void call(final Subscriber> subscriber) {
         ).take(1).subscribe();
         assertTrue("Switch doesn't propagate 'unsubscribe'", isUnsubscribed.get());
     }
+    /** The upstream producer hijacked the switch producer stopping the requests aimed at the inner observables. */
+    @Test
+    public void testIssue2654() {
+        Observable oneItem = Observable.just("Hello").mergeWith(Observable.never());
+        
+        Observable src = oneItem.switchMap(new Func1>() {
+            @Override
+            public Observable call(final String s) {
+                return Observable.just(s)
+                        .mergeWith(Observable.interval(10, TimeUnit.MILLISECONDS)
+                        .map(new Func1() {
+                            @Override
+                            public String call(Long i) {
+                                return s + " " + i;
+                            }
+                        })).take(250);
+            }
+        })
+        .share()
+        ;
+        
+        TestSubscriber ts = new TestSubscriber() {
+            @Override
+            public void onNext(String t) {
+                super.onNext(t);
+                if (getOnNextEvents().size() == 250) {
+                    onCompleted();
+                    unsubscribe();
+                }
+            }
+        };
+        src.subscribe(ts);
+        
+        ts.awaitTerminalEvent(10, TimeUnit.SECONDS);
+        
+        System.out.println("> testIssue2654: " + ts.getOnNextEvents().size());
+        
+        ts.assertTerminalEvent();
+        ts.assertNoErrors();
+        
+        Assert.assertEquals(250, ts.getOnNextEvents().size());
+    }
 }

From 2c2051f53d571320b58787f9110b0bed687ec2b5 Mon Sep 17 00:00:00 2001
From: akarnokd 
Date: Fri, 13 Feb 2015 21:50:29 +0100
Subject: [PATCH 2/2] Restore merge changes: not related to the bug

---
 .../rx/internal/operators/OperatorMerge.java  | 39 ++++++++-----------
 1 file changed, 16 insertions(+), 23 deletions(-)

diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java
index dfec24d061..2da1844ca9 100644
--- a/src/main/java/rx/internal/operators/OperatorMerge.java
+++ b/src/main/java/rx/internal/operators/OperatorMerge.java
@@ -194,7 +194,7 @@ private void handleNewSource(Observable t) {
             }
             MergeProducer producerIfNeeded = null;
             // if we have received a request then we need to respect it, otherwise we fast-path
-            if (mergeProducer.requested() != Long.MAX_VALUE) {
+            if (mergeProducer.requested != Long.MAX_VALUE) {
                 /**
                  * 
 {@code
                  * With this optimization:
@@ -237,7 +237,7 @@ private void handleScalarSynchronousObservable(ScalarSynchronousObservable
              * 
              */
-            if (mergeProducer.requested() == Long.MAX_VALUE) {
+            if (mergeProducer.requested == Long.MAX_VALUE) {
                 handleScalarSynchronousObservableWithoutRequestLimits(t);
             } else {
                 handleScalarSynchronousObservableWithRequestLimits(t);
@@ -274,11 +274,11 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
                 boolean moreToDrain;
                 boolean isReturn = false;
                 try {
-                    long r = mergeProducer.requested();
+                    long r = mergeProducer.requested;
                     if (r > 0) {
                         emitted = true;
                         actual.onNext(t.get());
-                        mergeProducer.getAndAdd(-1);
+                        MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
                         // we handle this Observable without ever incrementing the wip or touching other machinery so just return here
                         isReturn = true;
                     }
@@ -376,7 +376,7 @@ private void drainChildrenQueues() {
         private int drainScalarValueQueue() {
             RxRingBuffer svq = scalarValueQueue;
             if (svq != null) {
-                long r = mergeProducer.requested();
+                long r = mergeProducer.requested;
                 int emittedWhileDraining = 0;
                 if (r < 0) {
                     // drain it all
@@ -398,7 +398,7 @@ private int drainScalarValueQueue() {
                         }
                     }
                     // decrement the number we emitted from outstanding requests
-                    mergeProducer.getAndAdd(-emittedWhileDraining);
+                    MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
                 }
                 return emittedWhileDraining;
             }
@@ -410,7 +410,7 @@ private int drainScalarValueQueue() {
             @Override
             public Boolean call(InnerSubscriber s) {
                 if (s.q != null) {
-                    long r = mergeProducer.requested();
+                    long r = mergeProducer.requested;
                     int emitted = s.drainQueue();
                     if (emitted > 0) {
                         s.requestMore(emitted);
@@ -533,26 +533,19 @@ public MergeProducer(MergeSubscriber ms) {
             this.ms = ms;
         }
 
-        private volatile long rq = 0;
+        private volatile long requested = 0;
         @SuppressWarnings("rawtypes")
-        static final AtomicLongFieldUpdater RQ = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "rq");
+        static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested");
 
-        public long requested() {
-            return rq;
-        }
-        public long getAndAdd(long n) {
-            return RQ.getAndAdd(this, n);
-        }
-        
         @Override
         public void request(long n) {
-            if (rq == Long.MAX_VALUE) {
+            if (requested == Long.MAX_VALUE) {
                 return;
             }
             if (n == Long.MAX_VALUE) {
-                rq = Long.MAX_VALUE;
+                requested = Long.MAX_VALUE;
             } else {
-                BackpressureUtils.getAndAddRequest(RQ, this, n);
+                BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
                 if (ms.drainQueuesIfNeeded()) {
                     boolean sendComplete = false;
                     synchronized (ms) {
@@ -675,7 +668,7 @@ private void emit(T t, boolean complete) {
                     } else {
                         // this needs to check q.count() as draining above may not have drained the full queue
                         // perf tests show this to be okay, though different queue implementations could perform poorly with this
-                        if (producer.requested() > 0 && q.count() == 0) {
+                        if (producer.requested > 0 && q.count() == 0) {
                             if (complete) {
                                 parentSubscriber.completeInner(this);
                             } else {
@@ -686,7 +679,7 @@ private void emit(T t, boolean complete) {
                                     onError(OnErrorThrowable.addValueAsLastCause(e, t));
                                 }
                                 emitted++;
-                                producer.getAndAdd(-1);
+                                MergeProducer.REQUESTED.decrementAndGet(producer);
                             }
                         } else {
                             // no requests available, so enqueue it
@@ -735,7 +728,7 @@ private void enqueue(T t, boolean complete) {
         private int drainRequested() {
             int emitted = 0;
             // drain what was requested
-            long toEmit = producer.requested();
+            long toEmit = producer.requested;
             Object o;
             for (int i = 0; i < toEmit; i++) {
                 o = q.poll();
@@ -757,7 +750,7 @@ private int drainRequested() {
             }
 
             // decrement the number we emitted from outstanding requests
-            producer.getAndAdd(-emitted);
+            MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
             return emitted;
         }