diff --git a/rxjava-core/src/main/java/rx/internal/util/MpscPaddedQueue.java b/rxjava-core/src/main/java/rx/internal/util/MpscPaddedQueue.java new file mode 100644 index 0000000000..a711bf9e76 --- /dev/null +++ b/rxjava-core/src/main/java/rx/internal/util/MpscPaddedQueue.java @@ -0,0 +1,129 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.util; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * A multiple-producer single consumer queue implementation with padded reference + * to tail to avoid cache-line thrashing. + * Based on Netty's MpscQueue implementation but using AtomicReferenceFieldUpdater + * instead of Unsafe. + * @param the element type + */ +public final class MpscPaddedQueue extends AtomicReference> { + @SuppressWarnings(value = "rawtypes") + static final AtomicReferenceFieldUpdater TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNode.class, Node.class, "tail"); + /** */ + private static final long serialVersionUID = 1L; + /** The padded tail reference. */ + final PaddedNode tail; + /** + * Initializes the empty queue. + */ + public MpscPaddedQueue() { + Node first = new Node(null); + tail = new PaddedNode(); + tail.tail = first; + set(first); + } + /** + * Offer a new value. + * @param v the value to offer + */ + public void offer(E v) { + Node n = new Node(v); + getAndSet(n).set(n); + } + + /** + * @return Poll a value from the head of the queue or return null if the queue is empty. + */ + public E poll() { + Node n = peekNode(); + if (n == null) { + return null; + } + E v = n.value; + n.value = null; // do not retain this value as the node still stays in the queue + TAIL_UPDATER.lazySet(tail, n); + return v; + } + /** + * Check if there is a node available without changing anything. + */ + private Node peekNode() { + for (;;) { + @SuppressWarnings(value = "unchecked") + Node t = TAIL_UPDATER.get(tail); + Node n = t.get(); + if (n != null || get() == t) { + return n; + } + } + } + /** + * Clears the queue. + */ + public void clear() { + for (;;) { + if (poll() == null) { + break; + } + } + } + /** Class that contains a Node reference padded around to fit a typical cache line. */ + static final class PaddedNode { + /** Padding, public to prevent optimizing it away. */ + public int p1; + volatile Node tail; + /** Padding, public to prevent optimizing it away. */ + public long p2; + /** Padding, public to prevent optimizing it away. */ + public long p3; + /** Padding, public to prevent optimizing it away. */ + public long p4; + /** Padding, public to prevent optimizing it away. */ + public long p5; + /** Padding, public to prevent optimizing it away. */ + public long p6; + } + /** + * Regular node with value and reference to the next node. + */ + static final class Node { + + E value; + @SuppressWarnings(value = "rawtypes") + static final AtomicReferenceFieldUpdater TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail"); + volatile Node tail; + + public Node(E value) { + this.value = value; + } + + public void set(Node newTail) { + TAIL_UPDATER.lazySet(this, newTail); + } + + @SuppressWarnings(value = "unchecked") + public Node get() { + return TAIL_UPDATER.get(this); + } + } + +} diff --git a/rxjava-core/src/main/java/rx/internal/util/PaddedAtomicInteger.java b/rxjava-core/src/main/java/rx/internal/util/PaddedAtomicInteger.java new file mode 100644 index 0000000000..c15869ce9f --- /dev/null +++ b/rxjava-core/src/main/java/rx/internal/util/PaddedAtomicInteger.java @@ -0,0 +1,56 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.util; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An AtomicInteger with extra fields to pad it out to fit a typical cache line. + */ +public final class PaddedAtomicInteger extends AtomicInteger { + private static final long serialVersionUID = 1L; + /** Padding, public to prevent optimizing it away. */ + public int p1; + /** Padding, public to prevent optimizing it away. */ + public int p2; + /** Padding, public to prevent optimizing it away. */ + public int p3; + /** Padding, public to prevent optimizing it away. */ + public int p4; + /** Padding, public to prevent optimizing it away. */ + public int p5; + /** Padding, public to prevent optimizing it away. */ + public int p6; + /** Padding, public to prevent optimizing it away. */ + public int p7; + /** Padding, public to prevent optimizing it away. */ + public int p8; + /** Padding, public to prevent optimizing it away. */ + public int p9; + /** Padding, public to prevent optimizing it away. */ + public int p10; + /** Padding, public to prevent optimizing it away. */ + public int p11; + /** Padding, public to prevent optimizing it away. */ + public int p12; + /** Padding, public to prevent optimizing it away. */ + public int p13; + /** @return prevents optimizing away the fields, most likely. */ + public int noopt() { + return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13; + } + +} diff --git a/rxjava-core/src/main/java/rx/internal/util/README.md b/rxjava-core/src/main/java/rx/internal/util/README.md new file mode 100644 index 0000000000..c86e57b4ea --- /dev/null +++ b/rxjava-core/src/main/java/rx/internal/util/README.md @@ -0,0 +1,3 @@ +This `rx.internal.*` package is for internal use only. Any code here can change at any time and is not considered part of the public API, even if the classes are `public` so as to be used from other packages within `rx.*`. + +If you depend on these classes, your code may break in any future RxJava release, even if it's just a patch release (major.minor.patch).