Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change RxRingBuffer Queue Usage #1496

Merged
merged 2 commits into from
Jul 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 26 additions & 42 deletions rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import rx.Subscription;
import rx.exceptions.MissingBackpressureException;
import rx.internal.operators.NotificationLite;
import rx.internal.util.unsafe.SpmcArrayQueue;
import rx.internal.util.unsafe.SpscArrayQueue;
import rx.internal.util.unsafe.UnsafeAccess;

/**
Expand All @@ -33,15 +31,28 @@ public class RxRingBuffer implements Subscription {

public static RxRingBuffer getSpscInstance() {
if (UnsafeAccess.isUnsafeAvailable()) {
return new RxRingBuffer(SPSC_POOL, SIZE);
// using SynchronizedQueue until issues are solved with SpscArrayQueue offer rejection
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
// BackpressureException when using SpscArrayQueue
// return new RxRingBuffer(SPSC_POOL, SIZE); // this is the one we were trying to use
// return new RxRingBuffer(new SpscArrayQueue<Object>(SIZE), SIZE);
// the performance of this is sufficient (actually faster in some cases)
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
} else {
return new RxRingBuffer();
}
}

public static RxRingBuffer getSpmcInstance() {
if (UnsafeAccess.isUnsafeAvailable()) {
return new RxRingBuffer(SPMC_POOL, SIZE);
// using SynchronizedQueue until issues are solved with SpmcArrayQueue offer rejection
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
// BackpressureException when using SpmcArrayQueue/MpmcArrayQueue
// return new RxRingBuffer(SPMC_POOL, SIZE); // this is the one we were trying to use
// return new RxRingBuffer(new SpmcArrayQueue<Object>(SIZE), SIZE);
// return new RxRingBuffer(new MpmcArrayQueue<Object>(SIZE), SIZE);
// the performance of this is sufficient (actually faster in some cases)
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
} else {
return new RxRingBuffer();
}
Expand Down Expand Up @@ -75,7 +86,7 @@ public static RxRingBuffer getSpmcInstance() {
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 23951121.098 1982380.330 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 1142.351 33.592 ops/s
*
* With SynchronizedQueue (synchronized LinkedList)
* With SynchronizedQueue (synchronized LinkedList ... no object pooling)
*
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 33231667.136 685757.510 ops/s
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 74623.614 5493.766 ops/s
Expand Down Expand Up @@ -119,11 +130,11 @@ public static RxRingBuffer getSpmcInstance() {
* With SpmcArrayQueue
* - requires access to Unsafe
*
* Benchmark Mode Samples Score Score error Units
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 1835494.523 63874.461 ops/s
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 45545.599 1882.146 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 38126258.816 474874.236 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 42507.743 240.530 ops/s
* Benchmark Mode Samples Score Score error Units
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1 thrpt 5 27630345.474 769219.142 ops/s
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1000 thrpt 5 80052.046 4059.541 ops/s
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1 thrpt 5 44449524.222 563068.793 ops/s
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1000 thrpt 5 65231.253 1805.732 ops/s
*
* With SpmcArrayQueue and ObjectPool (object pool improves createUseAndDestroy1 by 10x)
*
Expand All @@ -135,17 +146,7 @@ public static RxRingBuffer getSpmcInstance() {
*
* --------------
*
* When UnsafeAccess.isUnsafeAvailable() == true we can use the Spmc/SpscArrayQueue implementations and get these numbers:
*
* Benchmark Mode Samples Score Score error Units
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1 thrpt 5 17813072.116 672207.872 ops/s
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1000 thrpt 5 46794.691 1146.195 ops/s
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1 thrpt 5 32117630.315 749011.552 ops/s
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1000 thrpt 5 47257.476 1081.623 ops/s
* r.i.RxRingBufferPerf.spscCreateUseAndDestroy1 thrpt 5 24729994.601 353101.940 ops/s
* r.i.RxRingBufferPerf.spscCreateUseAndDestroy1000 thrpt 5 73101.460 2406.377 ops/s
* r.i.RxRingBufferPerf.spscRingBufferAddRemove1 thrpt 5 83548821.062 752738.756 ops/s
* r.i.RxRingBufferPerf.spscRingBufferAddRemove1000 thrpt 5 70549.816 1377.227 ops/s
* When UnsafeAccess.isUnsafeAvailable() == true we can use the Spmc/SpscArrayQueue implementations.
*
* } </pre>
*/
Expand All @@ -169,30 +170,13 @@ public static RxRingBuffer getSpmcInstance() {

public static final int SIZE = 1024;

private static ObjectPool<Queue<Object>> SPSC_POOL = new ObjectPool<Queue<Object>>() {

@Override
protected SpscArrayQueue<Object> createObject() {
return new SpscArrayQueue<Object>(SIZE);
}

};

private static ObjectPool<Queue<Object>> SPMC_POOL = new ObjectPool<Queue<Object>>() {

@Override
protected SpmcArrayQueue<Object> createObject() {
return new SpmcArrayQueue<Object>(SIZE);
}

};

private RxRingBuffer(Queue<Object> queue, int size) {
this.queue = queue;
this.pool = null;
this.size = size;
}

@SuppressWarnings("unused")
private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
this.pool = pool;
this.queue = pool.borrowObject();
Expand All @@ -213,7 +197,7 @@ public void unsubscribe() {
release();
}

/* for unit tests */RxRingBuffer() {
/* package accessible for unit tests */RxRingBuffer() {
this(new SynchronizedQueue<Object>(SIZE), SIZE);
}

Expand Down Expand Up @@ -260,7 +244,7 @@ public int count() {
}
return queue.size();
}

public boolean isEmpty() {
if (queue == null) {
return true;
Expand Down Expand Up @@ -294,7 +278,7 @@ public Object poll() {
}
return o;
}

public Object peek() {
if (queue == null) {
// we are unsubscribed and have released the undelrying queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,27 @@
import java.util.AbstractQueue;
import java.util.Iterator;

abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E>{
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> implements MessagePassingQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}

/**
* A concurrent access enabling class used by circular array based queues this class exposes an offset computation
* method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and
* the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post
* padding.
* <p>
* Offset calculation is separate from access to enable the reuse of a give compute offset.
* <p>
* Load/Store methods using a <i>buffer</i> parameter are provided to allow the prevention of final field reload after a
* LoadLoad barrier.
* <p>
*
* @author nitsanw
*
* @param <E>
*/
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
protected static final int BUFFER_PAD = 32;
Expand All @@ -51,87 +67,108 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular

@SuppressWarnings("unchecked")
public ConcurrentCircularArrayQueue(int capacity) {
this.capacity = Pow2.findNextPositivePowerOfTwo(capacity);
this.capacity = Pow2.roundToPowerOfTwo(capacity);
mask = this.capacity - 1;
// pad data on either end with some empty slots.
buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
}

public ConcurrentCircularArrayQueue(ConcurrentCircularArrayQueue<E> c) {
this.capacity = c.capacity;
this.mask = c.mask;
// pad data on either end with some empty slots.
this.buffer = c.buffer;
}

protected final long calcOffset(long index) {
/**
* @param index desirable element index
* @return the offset in bytes within the array for a given index.
*/
protected final long calcElementOffset(long index) {
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}

/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e a kitty
*/
protected final void spElement(long offset, E e) {
UNSAFE.putObject(buffer, offset, e);
}

protected final void soElement(long offset, E e) {
UNSAFE.putOrderedObject(buffer, offset, e);
}

protected final void svElement(long offset, E e) {
UNSAFE.putObjectVolatile(buffer, offset, e);
}

@SuppressWarnings("unchecked")
protected final E lpElement(long offset) {
return (E) UNSAFE.getObject(buffer, offset);
}

@SuppressWarnings("unchecked")
protected final E lvElement(long offset) {
return (E) UNSAFE.getObjectVolatile(buffer, offset);
spElement(buffer, offset, e);
}

/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void spElement(E[] buffer, long offset, E e) {
UNSAFE.putObject(buffer, offset, e);
}

/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void soElement(long offset, E e) {
soElement(buffer, offset, e);
}

/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void soElement(E[] buffer, long offset, E e) {
UNSAFE.putOrderedObject(buffer, offset, e);
}

protected final void svElement(E[] buffer, long offset, E e) {
UNSAFE.putObjectVolatile(buffer, offset, e);
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lpElement(long offset) {
return lpElement(buffer, offset);
}

/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lpElement(E[] buffer, long offset) {
return (E) UNSAFE.getObject(buffer, offset);
}

/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lvElement(long offset) {
return lvElement(buffer, offset);
}

/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected final E lvElement(E[] buffer, long offset) {
return (E) UNSAFE.getObjectVolatile(buffer, offset);
}

@Override
public boolean offer(E e) {
throw new UnsupportedOperationException();
}

@Override
public E poll() {
throw new UnsupportedOperationException();
}
@Override
public E peek() {
throw new UnsupportedOperationException();
}
@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}

@Override
public int size() {
throw new UnsupportedOperationException();
}
}
Loading