Skip to content

Commit

Permalink
1.x: fix Spsc queues reporting not empty but then poll() returns null (
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jun 15, 2016
1 parent 67b7be0 commit 3261a49
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ else if (null != lvElement(buffer, offset)){
return false;
}
}
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
soElement(buffer, offset, e); // StoreStore
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
return true;
}

Expand All @@ -79,8 +79,8 @@ public E poll() {
if (null == e) {
return null;
}
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
soElement(lElementBuffer, offset, null);// StoreStore
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
return e;
}

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/rx/internal/util/atomic/SpscLinkedArrayQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public final boolean offer(final T e) {
}

private boolean writeToQueue(final AtomicReferenceArray<Object> buffer, final T e, final long index, final int offset) {
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
soElement(buffer, offset, e);// StoreStore
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
return true;
}

Expand All @@ -101,11 +101,11 @@ private void resize(final AtomicReferenceArray<Object> oldBuffer, final long cur
final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
producerBuffer = newBuffer;
producerLookAhead = currIndex + mask - 1;
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
soElement(newBuffer, offset, e);// StoreStore
soNext(oldBuffer, newBuffer);
soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is
// inserted
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
}

private void soNext(AtomicReferenceArray<Object> curr, AtomicReferenceArray<Object> next) {
Expand All @@ -131,8 +131,8 @@ public final T poll() {
final Object e = lvElement(buffer, offset);// LoadLoad
boolean isNextBuffer = e == HAS_NEXT;
if (null != e && !isNextBuffer) {
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
soElement(buffer, offset, null);// StoreStore
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
return (T) e;
} else if (isNextBuffer) {
return newBufferPoll(lvNext(buffer), index, mask);
Expand All @@ -149,8 +149,8 @@ private T newBufferPoll(AtomicReferenceArray<Object> nextBuffer, final long inde
if (null == n) {
return null;
} else {
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
soElement(nextBuffer, offsetInNew, null);// StoreStore
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
return n;
}
}
Expand Down Expand Up @@ -330,8 +330,8 @@ public boolean offer(T first, T second) {
if (null == lvElement(buffer, pi)) {
pi = calcWrappedOffset(p, m);
soElement(buffer, pi + 1, second);
soProducerIndex(p + 2);
soElement(buffer, pi, first);
soProducerIndex(p + 2);
} else {
final int capacity = buffer.length();
final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
Expand All @@ -342,9 +342,9 @@ public boolean offer(T first, T second) {
soElement(newBuffer, pi, first);
soNext(buffer, newBuffer);

soProducerIndex(p + 2);// this ensures correctness on 32bit platforms

soElement(buffer, pi, HAS_NEXT); // new buffer is visible after element is

soProducerIndex(p + 2);// this ensures correctness on 32bit platforms
}

return true;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public boolean offer(final E e) {
if (null != lvElement(lElementBuffer, offset)){
return false;
}
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
soElement(lElementBuffer, offset, e); // StoreStore
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
return true;
}

Expand All @@ -130,8 +130,8 @@ public E poll() {
if (null == e) {
return null;
}
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
soElement(lElementBuffer, offset, null);// StoreStore
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
return e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public final boolean offer(final E e) {
}

private boolean writeToQueue(final E[] buffer, final E e, final long index, final long offset) {
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
soElement(buffer, offset, e);// StoreStore
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
return true;
}

Expand All @@ -144,11 +144,11 @@ private void resize(final E[] oldBuffer, final long currIndex, final long offset
final E[] newBuffer = (E[]) new Object[capacity];
producerBuffer = newBuffer;
producerLookAhead = currIndex + mask - 1;
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
soElement(newBuffer, offset, e);// StoreStore
soNext(oldBuffer, newBuffer);
soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is
// inserted
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
}

private void soNext(E[] curr, E[] next) {
Expand All @@ -174,8 +174,8 @@ public final E poll() {
final Object e = lvElement(buffer, offset);// LoadLoad
boolean isNextBuffer = e == HAS_NEXT;
if (null != e && !isNextBuffer) {
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
soElement(buffer, offset, null);// StoreStore
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
return (E) e;
} else if (isNextBuffer) {
return newBufferPoll(lvNext(buffer), index, mask);
Expand All @@ -192,8 +192,8 @@ private E newBufferPoll(E[] nextBuffer, final long index, final long mask) {
if (null == n) {
return null;
} else {
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
soElement(nextBuffer, offsetInNew, null);// StoreStore
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
return n;
}
}
Expand Down
29 changes: 28 additions & 1 deletion src/test/java/rx/internal/operators/OperatorSwitchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.*;

import org.junit.*;
import org.mockito.InOrder;
Expand All @@ -32,6 +32,7 @@
import rx.Observer;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.util.UtilityFunctions;
import rx.observers.TestSubscriber;
import rx.schedulers.*;
import rx.subjects.PublishSubject;
Expand Down Expand Up @@ -880,4 +881,30 @@ public void call(Throwable e) {
}
}
}

@Test
public void asyncInner() throws Throwable {
for (int i = 0; i < 100; i++) {

final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

Observable.just(Observable.range(1, 1000 * 1000).subscribeOn(Schedulers.computation()))
.switchMap(UtilityFunctions.<Observable<Integer>>identity())
.observeOn(Schedulers.computation())
.ignoreElements()
.timeout(5, TimeUnit.SECONDS)
.toBlocking()
.subscribe(Actions.empty(), new Action1<Throwable>() {
@Override
public void call(Throwable e) {
error.set(e);
}
});

Throwable ex = error.get();
if (ex != null) {
throw ex;
}
}
}
}

0 comments on commit 3261a49

Please sign in to comment.