Skip to content

Commit

Permalink
simplify segment properties, no more volatile properties
Browse files Browse the repository at this point in the history
  • Loading branch information
pull-vert committed Feb 4, 2025
1 parent 476ce6c commit 0d6417b
Show file tree
Hide file tree
Showing 24 changed files with 760 additions and 845 deletions.
224 changes: 112 additions & 112 deletions benchmarks/README.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions benchmarks/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ jmh {
jmhVersion = catalogVersion("jmh")

// includes.set(listOf("""jayo\.benchmarks\.BufferLatin1Benchmark.*"""))
includes.set(listOf("""jayo\.benchmarks\.BufferUtf8Benchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.BufferUtf8Benchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.JsonSerializationBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.SlowReaderBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.SlowWriterBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.SocketReaderBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.TcpAndJsonSerializationBenchmark.*"""))
includes.set(listOf("""jayo\.benchmarks\.TcpAndJsonSerializationBenchmark.*"""))
}

tasks {
Expand Down
22 changes: 8 additions & 14 deletions core/src/main/java/jayo/internal/DeflaterRawWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,14 @@ public void write(final @NonNull Buffer reader, final long byteCount) {
}

var remaining = byteCount;
var head = _reader.segmentQueue.head();
var head = _reader.segmentQueue.head;
assert head != null;
while (remaining > 0L) {
var headLimit = head.limitVolatile();
var headLimit = head.limit;
if (head.pos == headLimit) {
if (!head.tryRemove()) {
throw new IllegalStateException("Non tail segment must be removable");
}
final var oldHead = head;
head = _reader.segmentQueue.removeHead(head);
head = _reader.segmentQueue.removeHead(head, true);
assert head != null;
headLimit = head.limitVolatile();
SegmentPool.recycle(oldHead);
headLimit = head.limit;
}

// Share bytes from the head segment of 'reader' with the deflater.
Expand All @@ -81,9 +76,8 @@ public void write(final @NonNull Buffer reader, final long byteCount) {
_reader.segmentQueue.decrementSize(toDeflate);
remaining -= toDeflate;
}
if (head.pos == head.limitVolatile() && head.tryRemove() && head.validateRemove()) {
_reader.segmentQueue.removeHead(head);
SegmentPool.recycle(head);
if (head.pos == head.limit) {
_reader.segmentQueue.removeHead(head, false);
}
}

Expand Down Expand Up @@ -152,14 +146,14 @@ private void deflate(final boolean syncFlush) {
continueLoop = segmentQueue.withWritableTail(1, tail -> {
final int deflated;
try {
deflated = deflater.deflate(tail.data, tail.limit(), Segment.SIZE - tail.limit(),
deflated = deflater.deflate(tail.data, tail.limit, Segment.SIZE - tail.limit,
syncFlush ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH);
} catch (NullPointerException npe) {
throw new JayoException("Deflater already closed", new IOException(npe));
}

if (deflated > 0) {
tail.incrementLimitVolatile(deflated);
tail.limit += deflated;
this.segmentQueue.emitCompleteSegments();
return true;
} else {
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/java/jayo/internal/GzipRawReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,23 +210,23 @@ private void updateCrc(final @NonNull SegmentQueue segmentQueue,
var _offset = offset;
var _byteCount = byteCount;
// Skip segments that we aren't checksumming.
var segment = segmentQueue.head();
var segment = segmentQueue.head;
assert segment != null;
while (_offset >= segment.limit() - segment.pos) {
_offset -= (segment.limit() - segment.pos);
segment = segment.nextVolatile();
while (_offset >= segment.limit - segment.pos) {
_offset -= (segment.limit - segment.pos);
segment = segment.next;
assert segment != null;
}

// Checksum one segment at a time.
while (_byteCount > 0) {
assert segment != null;
final var pos = (int) (segment.pos + _offset);
final var toUpdate = (int) Math.min(segment.limit() - pos, _byteCount);
final var toUpdate = (int) Math.min(segment.limit - pos, _byteCount);
crc.update(segment.data, pos, toUpdate);
_byteCount -= toUpdate;
_offset = 0;
segment = segment.nextVolatile();
segment = segment.next;
}
}

Expand Down
22 changes: 6 additions & 16 deletions core/src/main/java/jayo/internal/HashingUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,17 @@ private HashingUtils() {
}
try (rawReader; final var segmentQueue = new ReaderSegmentQueue(rawReader)) {
var remaining = segmentQueue.expectSize(Long.MAX_VALUE);
var head = segmentQueue.head();
var head = segmentQueue.head;
while (remaining > 0L) {
assert head != null;
final var currentLimit = head.limitVolatile();
final var currentLimit = head.limit;
final var toRead = (int) Math.min(remaining, currentLimit - head.pos);
messageDigest.update(head.data, head.pos, toRead);
head.pos += toRead;
segmentQueue.decrementSize(toRead);
remaining -= toRead;
if (head.pos == currentLimit) {
if (!head.tryRemove()) {
throw new IllegalStateException("Segment must be removable");
}
final var oldHead = head;
head = segmentQueue.removeHead(head);
SegmentPool.recycle(oldHead);
head = segmentQueue.removeHead(head, true);
}
}
}
Expand Down Expand Up @@ -80,22 +75,17 @@ private HashingUtils() {
}
try (rawReader; final var segmentQueue = new ReaderSegmentQueue(rawReader)) {
var remaining = segmentQueue.expectSize(Long.MAX_VALUE);
var head = segmentQueue.head();
var head = segmentQueue.head;
while (remaining > 0L) {
assert head != null;
final var currentLimit = head.limitVolatile();
final var currentLimit = head.limit;
final var toRead = (int) Math.min(remaining, currentLimit - head.pos);
javaMac.update(head.data, head.pos, toRead);
head.pos += toRead;
segmentQueue.decrementSize(toRead);
remaining -= toRead;
if (head.pos == currentLimit) {
if (!head.tryRemove()) {
throw new IllegalStateException("Segment must be removable");
}
final var oldHead = head;
head = segmentQueue.removeHead(head);
SegmentPool.recycle(oldHead);
head = segmentQueue.removeHead(head, true);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/jayo/internal/InputStreamRawReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ public long readAtMostTo(final @NonNull Buffer writer, final long byteCount) {
}

final var bytesRead = _writer.segmentQueue.withWritableTail(1, tail -> {
final var toRead = (int) Math.min(byteCount, Segment.SIZE - tail.limit());
final var toRead = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
final int read;
try {
read = in.read(tail.data, tail.limit(), toRead);
read = in.read(tail.data, tail.limit, toRead);
} catch (IOException e) {
throw JayoException.buildJayoException(e);
}
if (read > 0) {
tail.incrementLimitVolatile(read);
tail.limit += read;
}
return read;
});
Expand Down
18 changes: 6 additions & 12 deletions core/src/main/java/jayo/internal/OutputStreamRawWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,14 @@ public void write(final @NonNull Buffer reader, final long byteCount) {
}

var remaining = byteCount;
var head = _reader.segmentQueue.head();
var head = _reader.segmentQueue.head;
assert head != null;
var headLimit = head.limit;
while (remaining > 0L) {
var headLimit = head.limitVolatile();
if (head.pos == headLimit) {
final var oldHead = head;
if (!head.tryRemove()) {
throw new IllegalStateException("Non tail segment must be removable");
}
head = _reader.segmentQueue.removeHead(head);
head = _reader.segmentQueue.removeHead(head, true);
assert head != null;
headLimit = head.limitVolatile();
SegmentPool.recycle(oldHead);
headLimit = head.limit;
}

CancelToken.throwIfReached(cancelToken);
Expand All @@ -94,9 +89,8 @@ public void write(final @NonNull Buffer reader, final long byteCount) {
_reader.segmentQueue.decrementSize(toWrite);
remaining -= toWrite;
}
if (head.pos == head.limitVolatile() && head.tryRemove() && head.validateRemove()) {
_reader.segmentQueue.removeHead(head);
SegmentPool.recycle(head);
if (head.pos == head.limit) {
_reader.segmentQueue.removeHead(head, false);
}

if (LOGGER.isLoggable(TRACE)) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/jayo/internal/PeekRawReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public PeekRawReader(final @NonNull Reader upstream) {
this.upstream = Objects.requireNonNull(upstream);
buffer = (RealBuffer) getBufferFromReader(upstream);
buffer.segmentQueue.expectSize(1L);
final var bufferHead = buffer.segmentQueue.head();
final var bufferHead = buffer.segmentQueue.head;
if (bufferHead != null) {
this.expectedSegment = bufferHead;
this.expectedPos = bufferHead.pos;
Expand All @@ -72,7 +72,7 @@ public long readAtMostTo(final @NonNull Buffer writer, final long byteCount) {
throw new IllegalStateException("this peek reader is closed");
}

final var bufferHead = buffer.segmentQueue.head();
final var bufferHead = buffer.segmentQueue.head;
// Reader becomes invalid if there is an expected Segment and it and the expected position does not match the
// current head and head position of the upstream buffer
if (expectedSegment != null &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public long readAtMostTo(final @NonNull Buffer writer, final long byteCount) {
}

final var bytesRead = _writer.segmentQueue.withWritableTail(1, tail -> {
final var toRead = (int) Math.min(byteCount, Segment.SIZE - tail.limit());
final var toRead = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
final int read;
try {
read = in.read(tail.asWriteByteBuffer(toRead));
} catch (IOException e) {
throw JayoException.buildJayoException(e);
}
if (read > 0) {
tail.incrementLimitVolatile(read);
tail.limit += read;
}
return read;
});
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/jayo/internal/RealAsyncTimeout.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,16 @@ private void writeCancellable(RealBuffer reader, long byteCount, RealCancelToken
while (remaining > 0L) {
// Count how many bytes to write. This loop guarantees we split on a segment boundary.
var _toWrite = 0L;
var segment = reader.segmentQueue.head();
var segment = reader.segmentQueue.head;
while (_toWrite < TIMEOUT_WRITE_SIZE) {
assert segment != null;
final var segmentSize = segment.limitVolatile() - segment.pos;
final var segmentSize = segment.limit - segment.pos;
_toWrite += segmentSize;
if (_toWrite >= remaining) {
_toWrite = remaining;
break;
}
segment = segment.nextVolatile();
segment = segment.next;
}

final var toWrite = _toWrite;
Expand Down
Loading

0 comments on commit 0d6417b

Please sign in to comment.