Skip to content

Commit

Permalink
fix: async reader race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
pull-vert committed Oct 11, 2024
1 parent 581a549 commit 6059dbb
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 102 deletions.
4 changes: 2 additions & 2 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ JsonSerializationBenchmark.kotlinxToStream thrpt 5 914737.943 ± 86
## SlowReaderBenchmark

Benchmark (type) Mode Cnt Score Error Units
SlowReaderBenchmark.readerJayo jayo thrpt 5 1.386 ± 0.071 ops/s
SlowReaderBenchmark.sourceOkio okio thrpt 5 0.732 ± 0.035 ops/s
SlowReaderBenchmark.readerJayo jayo thrpt 5 1.401 ± 0.092 ops/s
SlowReaderBenchmark.sourceOkio okio thrpt 5 0.717 ± 0.080 ops/s

## SlowWriterBenchmark

Expand Down
4 changes: 2 additions & 2 deletions benchmarks/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ jmh {
// includes.set(listOf("""jayo\.benchmarks\.BufferLatin1Benchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.BufferUtf8Benchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.JsonSerializationBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.SlowReaderBenchmark.*"""))
includes.set(listOf("""jayo\.benchmarks\.SlowReaderBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.SlowWriterBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.SocketReaderBenchmark.*"""))
includes.set(listOf("""jayo\.benchmarks\.TcpAndJsonSerializationBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.TcpAndJsonSerializationBenchmark.*"""))
}

dependencies {
Expand Down
46 changes: 24 additions & 22 deletions core/src/main/java/jayo/internal/ReaderSegmentQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ final static class Async extends ReaderSegmentQueue {

private volatile @Nullable RuntimeException exception = null;

// non-volatile because always used inside the lock
private @NonNegative long expectedSize = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition expectingSize = lock.newCondition();
Expand Down Expand Up @@ -157,7 +158,17 @@ long expectSize(final long expectedSize) {
// resume reader consumer thread if needed, then await on expected size
readerConsumerPaused.signal();
expectingSize.await();
currentSize = size();
// consumer thread may have not read all data, continue here in current thread
final var stillExpectingSize = this.expectedSize;
if (stillExpectingSize > 0) {
this.expectedSize = 0L;
currentSize = super.expectSize(stillExpectingSize);
// resume reader consumer thread to continue reading asynchronously
readerConsumerPaused.signal();
} else {
currentSize = size();
}

if (LOGGER.isLoggable(DEBUG)) {
LOGGER.log(DEBUG, "AsyncReaderSegmentQueue#{0}: expectSize({1}) resumed expecting more " +
"bytes, current size = {2}{3}",
Expand Down Expand Up @@ -212,20 +223,11 @@ public void run() {
var currentExpectedSize = 0L;
while (!Thread.interrupted()) {
try {
var readSuccess = true;
final boolean readSuccess;
if (currentExpectedSize != 0L) {
final var currentSize = size();
if (currentSize < currentExpectedSize) {
var remaining = currentExpectedSize - currentSize;
while (remaining > 0L) {
final var toRead = Math.max(remaining, Segment.SIZE);
final var read = reader.readAtMostTo(buffer, toRead);
if (read == -1) {
break;
}
remaining -= read;
}
}
// if size is already reached -> success, else must continue to call the reader in the
// calling thread to ensure no race occurs
readSuccess = size() >= currentExpectedSize;
} else {
readSuccess = reader.readAtMostTo(buffer, Segment.SIZE) > 0L;
}
Expand All @@ -238,9 +240,10 @@ public void run() {
if (!readSuccess) {
LOGGER.log(DEBUG,
"AsyncReaderSegmentQueue#{0}:ReaderConsumer Runnable task:" +
" reader is exhausted, expected size = {1}, pausing" +
" consumer thread{2}",
hashCode(), currentExpectedSize, System.lineSeparator());
" last read did not return any result, expected size = " +
"{1}, current size = {2} pausing consumer thread{3}",
hashCode(), currentExpectedSize, currentSize,
System.lineSeparator());
} else {
LOGGER.log(DEBUG,
"AsyncReaderSegmentQueue#{0}:ReaderConsumer Runnable task:" +
Expand All @@ -249,18 +252,18 @@ public void run() {
hashCode(), currentSize, MAX_BYTE_SIZE, System.lineSeparator());
}
}
// if read did not return any result or buffer reached max capacity,
// resume expecting size, then pause consumer thread
currentExpectedSize = 0L;
// if read did not return any result or buffer reached max capacity, resume
// expecting size, then pause consumer thread
resumeExpectingSize();
readerConsumerPaused.await();
currentExpectedSize = 0L;
}

if (currentExpectedSize == 0L) {
currentExpectedSize = expectedSize;
expectedSize = 0L;
} else {
currentExpectedSize = 0L;
expectedSize = 0L;
resumeExpectingSize();
}
} finally {
Expand Down Expand Up @@ -294,7 +297,6 @@ public void run() {

private void resumeExpectingSize() {
assert lock.isHeldByCurrentThread();
expectedSize = 0L;
if (LOGGER.isLoggable(DEBUG)) {
LOGGER.log(DEBUG, "AsyncReaderSegmentQueue#{0}: resumeExpectingSize()", hashCode());
}
Expand Down
94 changes: 53 additions & 41 deletions core/src/main/java/jayo/internal/RealBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.function.BiFunction;

import static java.lang.System.Logger.Level.TRACE;
import static java.lang.System.Logger.Level.WARNING;
import static jayo.external.JayoUtils.checkOffsetAndCount;
import static jayo.internal.Segment.TRANSFERRING;
import static jayo.internal.Segment.WRITING;
Expand Down Expand Up @@ -464,13 +463,11 @@ public long readDecimalLong() {

var overflowDigit = OVERFLOW_DIGIT_START;
while (!done) {
if (seen == currentSize) {
segmentQueue.expectSize(20L);
}
final var head = segmentQueue.head();
if (head == null) {
if (seen >= currentSize && segmentQueue.expectSize(20L) == 0L) {
break;
}
final var head = segmentQueue.head();
assert head != null;
final var data = head.data;
var pos = head.pos;
final var currentLimit = head.limitVolatile();
Expand Down Expand Up @@ -540,13 +537,11 @@ public long readHexadecimalUnsignedLong() {
var done = false;

while (!done) {
if (seen == currentSize) {
segmentQueue.expectSize(17L);
}
final var head = segmentQueue.head();
if (head == null) {
if (seen >= currentSize && segmentQueue.expectSize(17L) == 0L) {
break;
}
final var head = segmentQueue.head();
assert head != null;
final var data = head.data;
var pos = head.pos;
final var currentLimit = head.limitVolatile();
Expand Down Expand Up @@ -1683,47 +1678,41 @@ public void write(final @NonNull Buffer reader, final @NonNegative long byteCoun
var tail = segmentQueue.nonRemovedTailOrNull();
var readerHead = _reader.segmentQueue.head();
Segment nextReaderHead = null;
var readerHeadIsWriting = false;
try {
while (remaining > 0) {
if (nextReaderHead != null) {
readerHead = nextReaderHead;
}
if (readerHead == null) {
LOGGER.log(WARNING, "readerHead == null, should not !\n" + _reader.segmentQueue);
throw new IllegalStateException("readerHead == null, should not !");
}
assert readerHead != null;

// Is a prefix of the reader's head segment all that we need to move?
assert readerHead != null;
var currentLimit = readerHead.limitVolatile();
var bytesInReader = currentLimit - readerHead.pos;
var split = readerHeadIsWriting;
var split = false;
if (remaining < bytesInReader) {
if (tail != null && tail.owner &&
remaining + tail.limit() - ((tail.isShared()) ? 0 : tail.pos) <= Segment.SIZE
) {
try {
// Our existing segments are sufficient. Transfer bytes from reader's head to our tail.
readerHead.writeTo(tail, (int) remaining);
if (LOGGER.isLoggable(TRACE)) {
LOGGER.log(TRACE, "Buffer(SegmentQueue#{0}) : transferred {1} bytes from reader " +
"Segment#{2} to target Segment#{3}{4}",
segmentQueue.hashCode(), remaining, readerHead.hashCode(), tail.hashCode(),
System.lineSeparator());
}
_reader.segmentQueue.decrementSize(remaining);
segmentQueue.incrementSize(remaining);
return;
} finally {
readerHead.finishTransfer(readerHeadIsWriting);
}
if (tryTransferBytes(tail, readerHead, _reader, remaining)) {
return;
}
split = true;
}

if (!split) {
split = readerHead.startTransfer();
if (!split) {
// check after switching to Transferring state
final var newLimit = readerHead.limitVolatile();
if (newLimit != currentLimit) {
bytesInReader = newLimit - readerHead.pos;
if (remaining < bytesInReader) {
if (tryTransferBytes(tail, readerHead, _reader, remaining)) {
return;
}
split = true;
} else {
currentLimit = newLimit;
}
}
}
if (LOGGER.isLoggable(TRACE)) {
LOGGER.log(TRACE,
"reader SegmentQueue#{0} : head Segment#{1} is writing = {2}{3}",
Expand All @@ -1733,12 +1722,12 @@ public void write(final @NonNull Buffer reader, final @NonNegative long byteCoun
}

if (split) {
// We're going to need another segment. Split the reader's head segment in two, then we will
// move the first of those two to this buffer.
// We're going to need another segment. Split the reader's head segment in two, then we will move
// the first of those two to this buffer.
nextReaderHead = readerHead;

bytesInReader = (int) Math.min(bytesInReader, remaining);
readerHead = readerHead.splitHead(bytesInReader, readerHeadIsWriting);
readerHead = readerHead.splitHead(bytesInReader);
if (LOGGER.isLoggable(TRACE)) {
LOGGER.log(TRACE,
"reader SegmentQueue#{0} : splitHead. prefix Segment#{1}, suffix Segment#{2}{3}",
Expand Down Expand Up @@ -1779,15 +1768,16 @@ public void write(final @NonNull Buffer reader, final @NonNegative long byteCoun
segmentQueue.hashCode(), newTail.hashCode(), movedByteCount, _reader.segmentQueue.hashCode(), System.lineSeparator());
}

segmentQueue.incrementSize(movedByteCount);
if (tail != null) {
tail.finishWrite();
}
tail = newTail;
} else {
segmentQueue.incrementSize(movedByteCount);
readerHead = null;
}

segmentQueue.incrementSize(movedByteCount);
if (LOGGER.isLoggable(TRACE)) {
LOGGER.log(TRACE,
"Buffer(SegmentQueue#{0}) : incremented {1} bytes of this segment queue{2}{3}",
Expand All @@ -1811,6 +1801,28 @@ public void write(final @NonNull Buffer reader, final @NonNegative long byteCoun
}
}

private boolean tryTransferBytes(Segment tail, Segment readerHead, RealBuffer reader, long byteCount) {
if (tail != null && tail.owner &&
byteCount + tail.limit() - ((tail.isShared()) ? 0 : tail.pos) <= Segment.SIZE) {
try {
// Our existing segments are sufficient. Transfer bytes from reader's head to our tail.
readerHead.writeTo(tail, (int) byteCount);
if (LOGGER.isLoggable(TRACE)) {
LOGGER.log(TRACE, "Buffer(SegmentQueue#{0}) : transferred {1} bytes from reader " +
"Segment#{2} to target Segment#{3}{4}",
segmentQueue.hashCode(), byteCount, readerHead.hashCode(), tail.hashCode(),
System.lineSeparator());
}
reader.segmentQueue.decrementSize(byteCount);
segmentQueue.incrementSize(byteCount);
return true;
} finally {
readerHead.finishTransfer();
}
}
return false;
}

/**
* Call this when the tail and its predecessor may both be less than half full. In this case, we will copy data so
* that a segment can be recycled.
Expand Down Expand Up @@ -2304,7 +2316,7 @@ private <T> T seek(final long startIndex, BiFunction<Segment, Long, T> lambda) {
// node = segmentQueue.tail();
// while (true) {
// assert node != null;
// offset -= (segment.limit - segment.pos);
// offset -= (segment.limitVolatile() - segment.pos);
// if (offset <= startIndex || node.prev() == null) {
// break;
// }
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/java/jayo/internal/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,16 @@ public boolean startTransfer() {
return switch (previousState) {
case AVAILABLE -> false;
case WRITING -> true;
default -> throw new IllegalStateException("Unexpected state " + previousState + ". The head queue " +
"node should be in 'AVAILABLE' or 'WRITING' state before transferring.");
default -> throw new IllegalStateException("Unexpected state " + previousState + ". The head queue node " +
"should be in 'AVAILABLE' or 'WRITING' state before transferring.");
};
}

void finishTransfer(final boolean wasWriting) {
if (!wasWriting) {
STATUS.compareAndSet(this, TRANSFERRING, AVAILABLE);
void finishTransfer() {
final var previousState = (byte) STATUS.compareAndExchange(this, TRANSFERRING, AVAILABLE);
if (previousState != TRANSFERRING && previousState != AVAILABLE && previousState != WRITING) {
throw new IllegalStateException("Unexpected state " + previousState + ". The head queue node should be in" +
" 'AVAILABLE', 'TRANSFERRING' or 'WRITING' state before ending the transfer.");
}
}

Expand Down Expand Up @@ -299,7 +301,7 @@ Segment unsharedCopy() {
* @return the new head of the queue.
*/
@NonNull
Segment splitHead(final @NonNegative int byteCount, final boolean wasWriting) {
Segment splitHead(final @NonNegative int byteCount) {
final Segment prefix;

// We have two competing performance goals:
Expand All @@ -321,8 +323,9 @@ Segment splitHead(final @NonNegative int byteCount, final boolean wasWriting) {
if (!NEXT.compareAndSet(prefix, null, this)) {
throw new IllegalStateException("Could set next segment of prefix");
}

// stop transferring the current segment = the suffix
finishTransfer(wasWriting);
finishTransfer();

return prefix;
}
Expand Down
15 changes: 6 additions & 9 deletions core/src/main/java/jayo/internal/SegmentQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ final <T> T withWritableTail(final int minimumCapacity,
written = currentTail.limit() - previousLimit;
return result;
} finally {
currentTail.finishWrite();
incrementSize(written);
currentTail.finishWrite();
}
}

Expand All @@ -270,21 +270,18 @@ private <T> T withNewWritableSegment(final @NonNull Function<@NonNull Segment, T
if (written > 0) {
try {
if (currentTail != null) {
try {
if (!Segment.NEXT.compareAndSet(currentTail, null, newTail)) {
throw new IllegalStateException("Could not add new Segment after current tail, " +
"next node should be null");
}
} finally {
currentTail.finishWrite();
if (!Segment.NEXT.compareAndSet(currentTail, null, newTail)) {
throw new IllegalStateException("Could not add new Segment after current tail, " +
"next node should be null");
}
currentTail.finishWrite();
} else {
HEAD.setVolatile(this, newTail);
}
TAIL.setVolatile(this, newTail);
} finally {
newTail.finishWrite();
incrementSize(written);
newTail.finishWrite();
}
} else {
if (currentTail != null) {
Expand Down
Loading

0 comments on commit 6059dbb

Please sign in to comment.