Skip to content

Commit

Permalink
finish lock usage
Browse files Browse the repository at this point in the history
  • Loading branch information
pull-vert committed Feb 6, 2025
1 parent a1f435e commit 6fa00c1
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 106 deletions.
4 changes: 2 additions & 2 deletions benchmarks/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ jmh {
jmhVersion = catalogVersion("jmh")

// includes.set(listOf("""jayo\.benchmarks\.BufferLatin1Benchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.BufferUtf8Benchmark.*"""))
includes.set(listOf("""jayo\.benchmarks\.BufferUtf8Benchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.JsonSerializationBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.SlowReaderBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.SlowWriterBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.SocketReaderBenchmark.*"""))
includes.set(listOf("""jayo\.benchmarks\.TcpAndJsonSerializationBenchmark.*"""))
// includes.set(listOf("""jayo\.benchmarks\.TcpAndJsonSerializationBenchmark.*"""))
}

tasks {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/jayo/internal/DeflaterRawWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void write(final @NonNull Buffer reader, final long byteCount) {
}

var remaining = byteCount;
var head = _reader.segmentQueue.head;
var head = _reader.segmentQueue.head();
assert head != null;
while (remaining > 0L) {
var headLimit = head.limit;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/jayo/internal/GzipRawReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private void updateCrc(final @NonNull SegmentQueue segmentQueue,
var _offset = offset;
var _byteCount = byteCount;
// Skip segments that we aren't checksumming.
var segment = segmentQueue.head;
var segment = segmentQueue.head();
assert segment != null;
while (_offset >= segment.limit - segment.pos) {
_offset -= (segment.limit - segment.pos);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/jayo/internal/HashingUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private HashingUtils() {
}
try (rawReader; final var segmentQueue = new ReaderSegmentQueue(rawReader)) {
var remaining = segmentQueue.expectSize(Long.MAX_VALUE);
var head = segmentQueue.head;
var head = segmentQueue.head();
while (remaining > 0L) {
assert head != null;
final var currentLimit = head.limit;
Expand Down Expand Up @@ -75,7 +75,7 @@ private HashingUtils() {
}
try (rawReader; final var segmentQueue = new ReaderSegmentQueue(rawReader)) {
var remaining = segmentQueue.expectSize(Long.MAX_VALUE);
var head = segmentQueue.head;
var head = segmentQueue.head();
while (remaining > 0L) {
assert head != null;
final var currentLimit = head.limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void write(final @NonNull Buffer reader, final long byteCount) {
}

var remaining = byteCount;
var head = _reader.segmentQueue.head;
var head = _reader.segmentQueue.head();
assert head != null;
var headLimit = head.limit;
while (remaining > 0L) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/jayo/internal/PeekRawReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public PeekRawReader(final @NonNull Reader upstream) {
this.upstream = Objects.requireNonNull(upstream);
buffer = (RealBuffer) getBufferFromReader(upstream);
buffer.segmentQueue.expectSize(1L);
final var bufferHead = buffer.segmentQueue.head;
final var bufferHead = buffer.segmentQueue.head();
if (bufferHead != null) {
this.expectedSegment = bufferHead;
this.expectedPos = bufferHead.pos;
Expand All @@ -72,7 +72,7 @@ public long readAtMostTo(final @NonNull Buffer writer, final long byteCount) {
throw new IllegalStateException("this peek reader is closed");
}

final var bufferHead = buffer.segmentQueue.head;
final var bufferHead = buffer.segmentQueue.head();
// Reader becomes invalid if there is an expected Segment and it and the expected position does not match the
// current head and head position of the upstream buffer
if (expectedSegment != null &&
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/jayo/internal/ReaderSegmentQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ final static class Async extends ReaderSegmentQueue {

// non-volatile because always used inside the lock
private long expectedSize = 0;
private final Lock lock = new ReentrantLock();
private final Condition expectingSize = lock.newCondition();
private final @NonNull Lock lock = new ReentrantLock();
private final @NonNull Condition expectingSize = lock.newCondition();

private volatile @Nullable RuntimeException exception = null;
private boolean readerConsumerRunning = false;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/jayo/internal/RealAsyncTimeout.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private void writeCancellable(RealBuffer reader, long byteCount, RealCancelToken
while (remaining > 0L) {
// Count how many bytes to write. This loop guarantees we split on a segment boundary.
var _toWrite = 0L;
var segment = reader.segmentQueue.head;
var segment = reader.segmentQueue.head();
while (_toWrite < TIMEOUT_WRITE_SIZE) {
assert segment != null;
final var segmentSize = segment.limit - segment.pos;
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/jayo/internal/RealBasicLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ public final class RealBasicLock implements BasicLock {

@Override
public void lock() {
final var lockingThread = (Thread) LOCKING_THREAD.getAndSetRelease(this, Thread.currentThread());
if (lockingThread != null) {
final var currentThread = Thread.currentThread();
final var lockingThread = (Thread) LOCKING_THREAD.getAndSetRelease(this, currentThread);
if (lockingThread != null && lockingThread != currentThread) {
LockSupport.park();
}
}
Expand Down
49 changes: 25 additions & 24 deletions core/src/main/java/jayo/internal/RealBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import jayo.*;
import jayo.crypto.Digest;
import jayo.crypto.Hmac;
import jayo.tools.BasicLock;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -60,7 +61,7 @@ public final class RealBuffer implements Buffer {
final @NonNull SegmentQueue segmentQueue;

public RealBuffer() {
this(new SegmentQueue());
this(new SegmentQueue(BasicLock.create()));
}

RealBuffer(final @NonNull SegmentQueue segmentQueue) {
Expand Down Expand Up @@ -111,7 +112,7 @@ public long bytesAvailable() {
var _byteCount = byteCount;

// Skip segments that we aren't copying from.
var segment = segmentQueue.head;
var segment = segmentQueue.head();
assert segment != null;
var segmentSize = segment.limit - segment.pos;
while (_offset >= segmentSize) {
Expand Down Expand Up @@ -165,7 +166,7 @@ public long bytesAvailable() {
var _byteCount = byteCount;

// Skip segment nodes that we aren't copying from.
var segment = segmentQueue.head;
var segment = segmentQueue.head();
assert segment != null;
var segmentSize = segment.limit - segment.pos;
while (_offset >= segmentSize) {
Expand Down Expand Up @@ -208,7 +209,7 @@ public long bytesAvailable() {
}

var remaining = byteCount;
var head = segmentQueue.head;
var head = segmentQueue.head();
assert head != null;
var headLimit = head.limit;
while (remaining > 0L) {
Expand Down Expand Up @@ -334,7 +335,7 @@ public byte readByte() {
throw new JayoEOFException();
}

final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
return readByte(head);
}
Expand All @@ -355,7 +356,7 @@ public short readShort() {
throw new JayoEOFException();
}

var head = segmentQueue.head;
var head = segmentQueue.head();
assert head != null;
final var currentLimit = head.limit;
// If the short is split across multiple segments, delegate to readByte().
Expand All @@ -378,7 +379,7 @@ public int readInt() {
throw new JayoEOFException();
}

final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
return readInt(head);
}
Expand Down Expand Up @@ -411,7 +412,7 @@ public long readLong() {
throw new JayoEOFException();
}

final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
final var currentLimit = head.limit;
// If the long is split across multiple segments, delegate to readInt().
Expand Down Expand Up @@ -453,7 +454,7 @@ public long readDecimalLong() {
if (seen >= currentSize && segmentQueue.expectSize(20L) == 0L) {
break;
}
final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
final var data = head.data;
var pos = head.pos;
Expand Down Expand Up @@ -526,7 +527,7 @@ public long readHexadecimalUnsignedLong() {
if (seen >= currentSize && segmentQueue.expectSize(17L) == 0L) {
break;
}
final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
final var data = head.data;
var pos = head.pos;
Expand Down Expand Up @@ -618,7 +619,7 @@ public long readHexadecimalUnsignedLong() {

private @NonNull ByteStringBuilder prepareByteString(final long byteCount) {
final var byteStringBuilder = new ByteStringBuilder();
var head = segmentQueue.head;
var head = segmentQueue.head();
var offset = 0;
var finished = false;
while (!finished) {
Expand Down Expand Up @@ -795,7 +796,7 @@ public long transferTo(final @NonNull RawWriter writer) {
return "";
}

final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
final var currentLimit = head.limit;
if (head.pos + byteCount > currentLimit) {
Expand Down Expand Up @@ -942,7 +943,7 @@ public int readUtf8CodePoint() {
return new byte[0];
}

final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
return readByteArray(head, (int) byteCount);
}
Expand All @@ -965,7 +966,7 @@ public void readTo(final byte @NonNull [] writer,
Objects.requireNonNull(writer);
checkOffsetAndCount(writer.length, offset, byteCount);

final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
readTo(head, writer, offset, byteCount);
}
Expand Down Expand Up @@ -1011,7 +1012,7 @@ public int readAtMostTo(final byte @NonNull [] writer,
return -1;
}

final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
final var currentLimit = head.limit;
final var toCopy = Math.min(byteCount, currentLimit - head.pos);
Expand Down Expand Up @@ -1044,7 +1045,7 @@ public int readAtMostTo(final @NonNull ByteBuffer writer) {
return -1;
}

final var head = segmentQueue.head;
final var head = segmentQueue.head();
assert head != null;
final var currentLimit = head.limit;
final var toCopy = Math.min(writer.remaining(), currentLimit - head.pos);
Expand Down Expand Up @@ -1097,7 +1098,7 @@ void skipInternal(final long byteCount) {
segmentQueue.hashCode(), byteCount, segmentQueue, System.lineSeparator());
}
var remaining = byteCount;
var head = segmentQueue.head;
var head = segmentQueue.head();
assert head != null;
var headLimit = head.limit;
while (remaining > 0L) {
Expand Down Expand Up @@ -1728,7 +1729,7 @@ public void write(final @NonNull Buffer reader, final long byteCount) {
try {
var tail = segmentQueue.nonRemovedTailOrNull();
try {
var readerHead = _reader.segmentQueue.head;
var readerHead = _reader.segmentQueue.head();
Segment nextReaderHead = null;
var remaining = byteCount;
while (remaining > 0) {
Expand Down Expand Up @@ -2172,7 +2173,7 @@ public void close() {

final var builder = new StringBuilder(len * 2 + ((currentSize > maxPrintableBytes) ? 1 : 0));

var segment = segmentQueue.head;
var segment = segmentQueue.head();
assert segment != null;
var written = 0;
var pos = segment.pos;
Expand Down Expand Up @@ -2249,7 +2250,7 @@ private int checkAndCountSegments(final int byteCount) {

var offset = 0;
var segmentCount = 0;
var segment = segmentQueue.head;
var segment = segmentQueue.head();
while (offset < byteCount) {
assert segment != null;
final var currentPos = segment.pos;
Expand All @@ -2268,7 +2269,7 @@ private int checkAndCountSegments(final int byteCount) {
private void fillSegmentsAndDirectory(Segment[] segments, int[] directory, int byteCount) {
var offset = 0;
var segmentCount = 0;
var segment = segmentQueue.head;
var segment = segmentQueue.head();
while (offset < byteCount) {
assert segment != null;
final var copy = segment.sharedCopy();
Expand Down Expand Up @@ -2320,7 +2321,7 @@ private void fillSegmentsAndDirectory(Segment[] segments, int[] directory, int b
* depending on what's closer to `startIndex`.
*/
private <T> T seek(final long startIndex, BiFunction<Segment, Long, T> lambda) {
var segment = segmentQueue.head;
var segment = segmentQueue.head();
if (segment == null) {
return lambda.apply(null, -1L);
}
Expand Down Expand Up @@ -2532,7 +2533,7 @@ public int seek(final long offset) {

// Navigate to the segment that contains `offset`. Start from our current segment if possible.
var nextOffset = 0L;
final var currentHead = _buffer.segmentQueue.head;
final var currentHead = _buffer.segmentQueue.head();
assert currentHead != null;
Segment next = currentHead;
if (this.segment != null) {
Expand Down Expand Up @@ -2616,7 +2617,7 @@ public long resizeBuffer(final long newSize) {
throw new IllegalArgumentException("newSize < 0: " + newSize);
}
// Shrink the buffer by either shrinking segments or removing them.
var s = _buffer.segmentQueue.head;
var s = _buffer.segmentQueue.head();
var remaining = newSize;
var removeAll = false;
while (s != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public boolean refill() {
}

// Assign buffer bytes to the inflater.
currentHead = segmentQueue.head;
currentHead = segmentQueue.head();
assert currentHead != null;
bytesHeldByInflater = currentHead.limit - currentHead.pos;
inflater.setInput(currentHead.data, currentHead.pos, bytesHeldByInflater);
Expand Down
Loading

0 comments on commit 6fa00c1

Please sign in to comment.