From ceba33f8e7f3d3246337c73a009a7e95623ea07a Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Wed, 24 Jul 2024 12:08:32 +0200 Subject: [PATCH 1/3] #12082 - fix DynamicCapacity.release() Signed-off-by: Ludovic Orban --- .../jetty/http2/internal/HTTP2Flusher.java | 14 +----- .../eclipse/jetty/http2/tests/BadURITest.java | 2 +- .../HttpClientTransportOverHTTP2Test.java | 11 +++-- .../jetty/io/RetainableByteBuffer.java | 46 +++++++++++++++++-- 4 files changed, 51 insertions(+), 22 deletions(-) diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java index 1d3849f066f5..1d35b20dfd93 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java @@ -51,7 +51,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable private final Collection processedEntries = new ArrayList<>(); private final HTTP2Session session; private final RetainableByteBuffer.Mutable accumulator; - private boolean released; private InvocationType invocationType = InvocationType.NON_BLOCKING; private Throwable terminated; private HTTP2Session.Entry stalledEntry; @@ -308,7 +307,7 @@ protected void onSuccess() private void finish() { - release(); + accumulator.clear(); processedEntries.forEach(HTTP2Session.Entry::succeeded); processedEntries.clear(); invocationType = InvocationType.NON_BLOCKING; @@ -328,15 +327,6 @@ private void finish() } } - private void release() - { - if (!released) - { - released = true; - accumulator.release(); - } - } - @Override protected void onCompleteSuccess() { @@ -346,7 +336,7 @@ protected void onCompleteSuccess() @Override protected void onCompleteFailure(Throwable x) { - release(); + accumulator.release(); Throwable closed; Set allEntries; diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/BadURITest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/BadURITest.java index b92e5398ea94..238925777197 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/BadURITest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/BadURITest.java @@ -122,7 +122,7 @@ public ByteBuffer badMessageError(int status, String reason, HttpFields.Mutable Thread.sleep(1000); // Send a second request and verify that it hits the Handler. - accumulator.release(); + accumulator.clear(); MetaData.Request metaData2 = new MetaData.Request( HttpMethod.GET.asString(), HttpScheme.HTTP.asString(), diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java index d5d84e55fafd..6715b0229bd5 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.http2.tests; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; @@ -572,7 +573,7 @@ public void onPreface() } catch (HpackException x) { - x.printStackTrace(); + throw new RuntimeException(x); } } @@ -589,7 +590,7 @@ public void onHeaders(HeadersFrame request) } catch (HpackException x) { - x.printStackTrace(); + throw new RuntimeException(x); } } @@ -599,11 +600,11 @@ private void writeFrames() { // Write the frames. accumulator.writeTo(Content.Sink.from(output), false); - accumulator.release(); + accumulator.clear(); } - catch (Throwable x) + catch (IOException x) { - x.printStackTrace(); + throw new RuntimeException(x); } } }); diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java index 243975879eb1..b9da3e9f0258 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java @@ -1445,7 +1445,7 @@ public DynamicCapacity(ByteBufferPool pool, boolean direct, long maxSize, int ag private DynamicCapacity(List buffers, ByteBufferPool.Sized pool, long maxSize, int minRetainSize) { - super(); + super(new ReferenceCounter()); // Make sure the wrapped retainable is a ReferenceCounter. _pool = pool == null ? ByteBufferPool.SIZED_NON_POOLING : pool; _maxSize = maxSize < 0 ? Long.MAX_VALUE : maxSize; _buffers = buffers == null ? new ArrayList<>() : buffers; @@ -1456,6 +1456,13 @@ private DynamicCapacity(List buffers, ByteBufferPool.Sized throw new IllegalArgumentException("must always retain if cannot aggregate"); } + private void checkNotReleased() + { + ReferenceCounter counter = (ReferenceCounter)getWrapped(); + if (counter.get() == 0) + throw new IllegalStateException("Already released"); + } + public long getMaxSize() { return _maxSize; @@ -1490,6 +1497,7 @@ public ByteBuffer getByteBuffer() throws BufferOverflowException { if (LOG.isDebugEnabled()) LOG.debug("getByteBuffer {}", this); + checkNotReleased(); return switch (_buffers.size()) { case 0 -> BufferUtil.EMPTY_BUFFER; @@ -1523,6 +1531,7 @@ public RetainableByteBuffer take(long length) { if (LOG.isDebugEnabled()) LOG.debug("take {} {}", this, length); + checkNotReleased(); if (_buffers.isEmpty() || length == 0) return RetainableByteBuffer.EMPTY; @@ -1588,6 +1597,7 @@ public RetainableByteBuffer takeFrom(long skip) { if (LOG.isDebugEnabled()) LOG.debug("take {} {}", this, skip); + checkNotReleased(); if (_buffers.isEmpty() || skip > size()) return RetainableByteBuffer.EMPTY; @@ -1653,6 +1663,7 @@ public byte[] takeByteArray() { if (LOG.isDebugEnabled()) LOG.debug("takeByteArray {}", this); + checkNotReleased(); return switch (_buffers.size()) { case 0 -> BufferUtil.EMPTY_BUFFER.array(); @@ -1698,6 +1709,7 @@ public byte get() throws BufferUnderflowException { if (LOG.isDebugEnabled()) LOG.debug("get {}", this); + checkNotReleased(); for (Iterator i = _buffers.listIterator(); i.hasNext();) { RetainableByteBuffer buffer = i.next(); @@ -1724,6 +1736,7 @@ public byte get(long index) throws IndexOutOfBoundsException { if (LOG.isDebugEnabled()) LOG.debug("get {} {}", this, index); + checkNotReleased(); for (RetainableByteBuffer buffer : _buffers) { long size = buffer.size(); @@ -1739,6 +1752,7 @@ public int get(byte[] bytes, int offset, int length) { if (LOG.isDebugEnabled()) LOG.debug("get array {} {}", this, length); + checkNotReleased(); int got = 0; for (Iterator i = _buffers.listIterator(); length > 0 && i.hasNext();) { @@ -1766,6 +1780,7 @@ public boolean isDirect() @Override public boolean hasRemaining() { + checkNotReleased(); for (RetainableByteBuffer rbb : _buffers) if (!rbb.isEmpty()) return true; @@ -1777,6 +1792,7 @@ public long skip(long length) { if (LOG.isDebugEnabled()) LOG.debug("skip {} {}", this, length); + checkNotReleased(); long skipped = 0; for (Iterator i = _buffers.listIterator(); length > 0 && i.hasNext();) { @@ -1799,6 +1815,7 @@ public void limit(long limit) { if (LOG.isDebugEnabled()) LOG.debug("limit {} {}", this, limit); + checkNotReleased(); for (Iterator i = _buffers.iterator(); i.hasNext();) { RetainableByteBuffer buffer = i.next(); @@ -1826,6 +1843,7 @@ public Mutable slice() { if (LOG.isDebugEnabled()) LOG.debug("slice {}", this); + checkNotReleased(); List buffers = new ArrayList<>(_buffers.size()); for (RetainableByteBuffer rbb : _buffers) buffers.add(rbb.slice()); @@ -1837,6 +1855,7 @@ public Mutable slice(long length) { if (LOG.isDebugEnabled()) LOG.debug("slice {} {}", this, length); + checkNotReleased(); List buffers = new ArrayList<>(_buffers.size()); for (Iterator i = _buffers.iterator(); i.hasNext();) { @@ -1879,6 +1898,7 @@ public RetainableByteBuffer copy() { if (LOG.isDebugEnabled()) LOG.debug("copy {}", this); + checkNotReleased(); List buffers = new ArrayList<>(_buffers.size()); for (RetainableByteBuffer rbb : _buffers) buffers.add(rbb.copy()); @@ -1900,6 +1920,7 @@ public int remaining() @Override public long size() { + checkNotReleased(); long length = 0; for (RetainableByteBuffer buffer : _buffers) length += buffer.remaining(); @@ -1928,6 +1949,7 @@ public boolean release() { if (LOG.isDebugEnabled()) LOG.debug("release {}", this); + checkNotReleased(); if (super.release()) { for (RetainableByteBuffer buffer : _buffers) @@ -1944,6 +1966,7 @@ public void clear() { if (LOG.isDebugEnabled()) LOG.debug("clear {}", this); + checkNotReleased(); if (_buffers.isEmpty()) return; _aggregate = null; @@ -1957,8 +1980,10 @@ public boolean append(ByteBuffer bytes) { if (LOG.isDebugEnabled()) LOG.debug("append BB {} <- {}", this, BufferUtil.toDetailString(bytes)); + checkNotReleased(); // Cannot mutate contents if retained - assert !isRetained(); + if (isRetained()) + throw new IllegalStateException("Cannot append to a retained instance"); // handle empty appends if (bytes == null) @@ -2056,9 +2081,10 @@ public boolean append(RetainableByteBuffer retainableBytes) { if (LOG.isDebugEnabled()) LOG.debug("append RBB {} {}", this, retainableBytes); - + checkNotReleased(); // Cannot mutate contents if retained - assert !isRetained(); + if (isRetained()) + throw new IllegalStateException("Cannot append to a retained instance"); // Optimize appending dynamics if (retainableBytes instanceof DynamicCapacity dynamicCapacity) @@ -2118,6 +2144,7 @@ public Mutable add(ByteBuffer bytes) throws ReadOnlyBufferException, BufferOverf { if (LOG.isDebugEnabled()) LOG.debug("add BB {} <- {}", this, BufferUtil.toDetailString(bytes)); + checkNotReleased(); add(RetainableByteBuffer.wrap(bytes)); return this; } @@ -2127,6 +2154,7 @@ public Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException, B { if (LOG.isDebugEnabled()) LOG.debug("add RBB {} <- {}", this, bytes); + checkNotReleased(); long size = size(); long space = _maxSize - size; long length = bytes.size(); @@ -2147,6 +2175,7 @@ public Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException, B @Override public Mutable put(byte b) { + checkNotReleased(); ensure(1).put(b); return this; } @@ -2154,6 +2183,7 @@ public Mutable put(byte b) @Override public Mutable put(long index, byte b) { + checkNotReleased(); for (RetainableByteBuffer buffer : _buffers) { long size = buffer.size(); @@ -2170,6 +2200,7 @@ public Mutable put(long index, byte b) @Override public Mutable putShort(short s) { + checkNotReleased(); ensure(2).putShort(s); return this; } @@ -2177,6 +2208,7 @@ public Mutable putShort(short s) @Override public Mutable putInt(int i) { + checkNotReleased(); ensure(4).putInt(i); return this; } @@ -2184,6 +2216,7 @@ public Mutable putInt(int i) @Override public Mutable putLong(long l) { + checkNotReleased(); ensure(8).putLong(l); return this; } @@ -2191,6 +2224,7 @@ public Mutable putLong(long l) @Override public Mutable put(byte[] bytes, int offset, int length) { + checkNotReleased(); // Use existing aggregate if the length is large and there is space for at least half if (length >= 16 && _aggregate != null) { @@ -2252,6 +2286,7 @@ public boolean appendTo(ByteBuffer to) { if (LOG.isDebugEnabled()) LOG.debug("appendTo BB {} -> {}", this, BufferUtil.toDetailString(to)); + checkNotReleased(); _aggregate = null; for (Iterator i = _buffers.listIterator(); i.hasNext();) { @@ -2269,6 +2304,7 @@ public boolean appendTo(RetainableByteBuffer to) { if (LOG.isDebugEnabled()) LOG.debug("appendTo RBB {} -> {}", this, to); + checkNotReleased(); _aggregate = null; for (Iterator i = _buffers.listIterator(); i.hasNext();) { @@ -2286,6 +2322,7 @@ public void putTo(ByteBuffer toInfillMode) { if (LOG.isDebugEnabled()) LOG.debug("putTo BB {} -> {}", this, toInfillMode); + checkNotReleased(); _aggregate = null; for (Iterator i = _buffers.listIterator(); i.hasNext();) { @@ -2301,6 +2338,7 @@ public void writeTo(Content.Sink sink, boolean last, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("writeTo {} -> {} {} {}", this, sink, last, callback); + checkNotReleased(); _aggregate = null; switch (_buffers.size()) { From 4b816541a3010c5139251b74bbfc445706f8de42 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Mon, 26 Aug 2024 10:19:38 +0200 Subject: [PATCH 2/3] handle review comments Signed-off-by: Ludovic Orban --- .../main/java/org/eclipse/jetty/io/RetainableByteBuffer.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java index 907f9f5e3726..f109cfe801c9 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java @@ -1470,7 +1470,6 @@ public DynamicCapacity(ByteBufferPool pool, boolean direct, long maxSize, int ag private DynamicCapacity(List buffers, ByteBufferPool.Sized pool, long maxSize, int minRetainSize) { - super(new ReferenceCounter()); // Make sure the wrapped retainable is a ReferenceCounter. _pool = pool == null ? ByteBufferPool.SIZED_NON_POOLING : pool; _maxSize = maxSize < 0 ? Long.MAX_VALUE : maxSize; _buffers = buffers == null ? new ArrayList<>() : buffers; @@ -1483,8 +1482,7 @@ private DynamicCapacity(List buffers, ByteBufferPool.Sized private void checkNotReleased() { - ReferenceCounter counter = (ReferenceCounter)getWrapped(); - if (counter.get() == 0) + if (getRetained() <= 0) throw new IllegalStateException("Already released"); } From 1666ee56d35d5a6dbd5b95047f74c2495aa70a0e Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Wed, 28 Aug 2024 15:25:26 +0200 Subject: [PATCH 3/3] fix double release Signed-off-by: Ludovic Orban --- .../java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java index a223a2c5b461..c07bdaed914c 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java @@ -377,7 +377,7 @@ protected void onCompleteFailure(Throwable x) private void onSessionFailure(Throwable x) { - accumulator.release(); + accumulator.clear(); Throwable closed = fail(x); if (closed == null) session.close(ErrorCode.COMPRESSION_ERROR.code, null, NOOP);