Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DynamicCapacity.release() #12083

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
private final Collection<HTTP2Session.Entry> processedEntries = new ArrayList<>();
private final HTTP2Session session;
private final RetainableByteBuffer.Mutable accumulator;
private boolean released;
private InvocationType invocationType = InvocationType.NON_BLOCKING;
private Throwable terminated;
private HTTP2Session.Entry stalledEntry;
Expand Down Expand Up @@ -308,7 +307,7 @@ protected void onSuccess()

private void finish()
{
release();
accumulator.clear();
processedEntries.forEach(HTTP2Session.Entry::succeeded);
processedEntries.clear();
invocationType = InvocationType.NON_BLOCKING;
Expand All @@ -328,15 +327,6 @@ private void finish()
}
}

private void release()
{
if (!released)
{
released = true;
accumulator.release();
}
}

@Override
protected void onCompleteSuccess()
{
Expand Down Expand Up @@ -377,7 +367,7 @@ protected void onFailure(Throwable x)
@Override
protected void onCompleteFailure(Throwable x)
{
release();
accumulator.release();
}

public void terminate(Throwable cause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ByteBuffer badMessageError(int status, String reason, HttpFields.Mutable
Thread.sleep(1000);

// Send a second request and verify that it hits the Handler.
accumulator.release();
accumulator.clear();
MetaData.Request metaData2 = new MetaData.Request(
HttpMethod.GET.asString(),
HttpScheme.HTTP.asString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.http2.tests;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
Expand Down Expand Up @@ -574,7 +575,7 @@ public void onPreface()
}
catch (HpackException x)
{
x.printStackTrace();
throw new RuntimeException(x);
}
}

Expand All @@ -591,7 +592,7 @@ public void onHeaders(HeadersFrame request)
}
catch (HpackException x)
{
x.printStackTrace();
throw new RuntimeException(x);
}
}

Expand All @@ -601,11 +602,11 @@ private void writeFrames()
{
// Write the frames.
accumulator.writeTo(Content.Sink.from(output), false);
accumulator.release();
accumulator.clear();
}
catch (Throwable x)
catch (IOException x)
{
x.printStackTrace();
throw new RuntimeException(x);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,6 @@ public DynamicCapacity(ByteBufferPool pool, boolean direct, long maxSize, int ag

private DynamicCapacity(List<RetainableByteBuffer> buffers, ByteBufferPool.Sized pool, long maxSize, int minRetainSize)
{
super();
_pool = pool == null ? ByteBufferPool.SIZED_NON_POOLING : pool;
_maxSize = maxSize < 0 ? Long.MAX_VALUE : maxSize;
_buffers = buffers == null ? new ArrayList<>() : buffers;
Expand All @@ -1481,6 +1480,12 @@ private DynamicCapacity(List<RetainableByteBuffer> buffers, ByteBufferPool.Sized
throw new IllegalArgumentException("must always retain if cannot aggregate");
}

private void checkNotReleased()
lorban marked this conversation as resolved.
Show resolved Hide resolved
{
if (getRetained() <= 0)
throw new IllegalStateException("Already released");
lorban marked this conversation as resolved.
Show resolved Hide resolved
}

public long getMaxSize()
{
return _maxSize;
Expand Down Expand Up @@ -1515,6 +1520,7 @@ public ByteBuffer getByteBuffer() throws BufferOverflowException
{
if (LOG.isDebugEnabled())
LOG.debug("getByteBuffer {}", this);
checkNotReleased();
return switch (_buffers.size())
{
case 0 -> BufferUtil.EMPTY_BUFFER;
Expand Down Expand Up @@ -1548,6 +1554,7 @@ public RetainableByteBuffer take(long length)
{
if (LOG.isDebugEnabled())
LOG.debug("take {} {}", this, length);
checkNotReleased();

if (_buffers.isEmpty() || length == 0)
return RetainableByteBuffer.EMPTY;
Expand Down Expand Up @@ -1613,6 +1620,7 @@ public RetainableByteBuffer takeFrom(long skip)
{
if (LOG.isDebugEnabled())
LOG.debug("take {} {}", this, skip);
checkNotReleased();

if (_buffers.isEmpty() || skip > size())
return RetainableByteBuffer.EMPTY;
Expand Down Expand Up @@ -1678,6 +1686,7 @@ public byte[] takeByteArray()
{
if (LOG.isDebugEnabled())
LOG.debug("takeByteArray {}", this);
checkNotReleased();
return switch (_buffers.size())
{
case 0 -> BufferUtil.EMPTY_BUFFER.array();
Expand Down Expand Up @@ -1723,6 +1732,7 @@ public byte get() throws BufferUnderflowException
{
if (LOG.isDebugEnabled())
LOG.debug("get {}", this);
checkNotReleased();
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{
RetainableByteBuffer buffer = i.next();
Expand All @@ -1749,6 +1759,7 @@ public byte get(long index) throws IndexOutOfBoundsException
{
if (LOG.isDebugEnabled())
LOG.debug("get {} {}", this, index);
checkNotReleased();
for (RetainableByteBuffer buffer : _buffers)
{
long size = buffer.size();
Expand All @@ -1764,6 +1775,7 @@ public int get(byte[] bytes, int offset, int length)
{
if (LOG.isDebugEnabled())
LOG.debug("get array {} {}", this, length);
checkNotReleased();
int got = 0;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); length > 0 && i.hasNext();)
{
Expand Down Expand Up @@ -1791,6 +1803,7 @@ public boolean isDirect()
@Override
public boolean hasRemaining()
{
checkNotReleased();
for (RetainableByteBuffer rbb : _buffers)
if (!rbb.isEmpty())
return true;
Expand All @@ -1802,6 +1815,7 @@ public long skip(long length)
{
if (LOG.isDebugEnabled())
LOG.debug("skip {} {}", this, length);
checkNotReleased();
long skipped = 0;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); length > 0 && i.hasNext();)
{
Expand All @@ -1824,6 +1838,7 @@ public void limit(long limit)
{
if (LOG.isDebugEnabled())
LOG.debug("limit {} {}", this, limit);
checkNotReleased();
for (Iterator<RetainableByteBuffer> i = _buffers.iterator(); i.hasNext();)
{
RetainableByteBuffer buffer = i.next();
Expand Down Expand Up @@ -1851,6 +1866,7 @@ public Mutable slice()
{
if (LOG.isDebugEnabled())
LOG.debug("slice {}", this);
checkNotReleased();
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
for (RetainableByteBuffer rbb : _buffers)
buffers.add(rbb.slice());
Expand All @@ -1862,6 +1878,7 @@ public Mutable slice(long length)
{
if (LOG.isDebugEnabled())
LOG.debug("slice {} {}", this, length);
checkNotReleased();
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
for (Iterator<RetainableByteBuffer> i = _buffers.iterator(); i.hasNext();)
{
Expand Down Expand Up @@ -1904,6 +1921,7 @@ public RetainableByteBuffer copy()
{
if (LOG.isDebugEnabled())
LOG.debug("copy {}", this);
checkNotReleased();
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
for (RetainableByteBuffer rbb : _buffers)
buffers.add(rbb.copy());
Expand All @@ -1925,6 +1943,7 @@ public int remaining()
@Override
public long size()
{
checkNotReleased();
long length = 0;
for (RetainableByteBuffer buffer : _buffers)
length += buffer.remaining();
Expand Down Expand Up @@ -1953,6 +1972,7 @@ public boolean release()
{
if (LOG.isDebugEnabled())
LOG.debug("release {}", this);
checkNotReleased();
if (super.release())
{
for (RetainableByteBuffer buffer : _buffers)
Expand Down Expand Up @@ -1985,6 +2005,7 @@ public void clear()
{
if (LOG.isDebugEnabled())
LOG.debug("clear {}", this);
checkNotReleased();
if (_buffers.isEmpty())
return;
_aggregate = null;
Expand All @@ -1998,8 +2019,10 @@ public boolean append(ByteBuffer bytes)
{
if (LOG.isDebugEnabled())
LOG.debug("append BB {} <- {}", this, BufferUtil.toDetailString(bytes));
checkNotReleased();
// Cannot mutate contents if retained
assert !isRetained();
if (isRetained())
throw new IllegalStateException("Cannot append to a retained instance");

// handle empty appends
if (bytes == null)
Expand Down Expand Up @@ -2097,9 +2120,10 @@ public boolean append(RetainableByteBuffer retainableBytes)
{
if (LOG.isDebugEnabled())
LOG.debug("append RBB {} {}", this, retainableBytes);

checkNotReleased();
// Cannot mutate contents if retained
assert !isRetained();
if (isRetained())
throw new IllegalStateException("Cannot append to a retained instance");

// Optimize appending dynamics
if (retainableBytes instanceof DynamicCapacity dynamicCapacity)
Expand Down Expand Up @@ -2159,6 +2183,7 @@ public Mutable add(ByteBuffer bytes) throws ReadOnlyBufferException, BufferOverf
{
if (LOG.isDebugEnabled())
LOG.debug("add BB {} <- {}", this, BufferUtil.toDetailString(bytes));
checkNotReleased();
add(RetainableByteBuffer.wrap(bytes));
return this;
}
Expand All @@ -2168,6 +2193,7 @@ public Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException, B
{
if (LOG.isDebugEnabled())
LOG.debug("add RBB {} <- {}", this, bytes);
checkNotReleased();
long size = size();
long space = _maxSize - size;
long length = bytes.size();
Expand All @@ -2188,13 +2214,15 @@ public Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException, B
@Override
public Mutable put(byte b)
{
checkNotReleased();
ensure(1).put(b);
return this;
}

@Override
public Mutable put(long index, byte b)
{
checkNotReleased();
for (RetainableByteBuffer buffer : _buffers)
{
long size = buffer.size();
Expand All @@ -2211,27 +2239,31 @@ public Mutable put(long index, byte b)
@Override
public Mutable putShort(short s)
{
checkNotReleased();
ensure(2).putShort(s);
return this;
}

@Override
public Mutable putInt(int i)
{
checkNotReleased();
ensure(4).putInt(i);
return this;
}

@Override
public Mutable putLong(long l)
{
checkNotReleased();
ensure(8).putLong(l);
return this;
}

@Override
public Mutable put(byte[] bytes, int offset, int length)
{
checkNotReleased();
// Use existing aggregate if the length is large and there is space for at least half
if (length >= 16 && _aggregate != null)
{
Expand Down Expand Up @@ -2293,6 +2325,7 @@ public boolean appendTo(ByteBuffer to)
{
if (LOG.isDebugEnabled())
LOG.debug("appendTo BB {} -> {}", this, BufferUtil.toDetailString(to));
checkNotReleased();
_aggregate = null;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{
Expand All @@ -2310,6 +2343,7 @@ public boolean appendTo(RetainableByteBuffer to)
{
if (LOG.isDebugEnabled())
LOG.debug("appendTo RBB {} -> {}", this, to);
checkNotReleased();
_aggregate = null;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{
Expand All @@ -2327,6 +2361,7 @@ public void putTo(ByteBuffer toInfillMode)
{
if (LOG.isDebugEnabled())
LOG.debug("putTo BB {} -> {}", this, toInfillMode);
checkNotReleased();
_aggregate = null;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{
Expand All @@ -2342,6 +2377,7 @@ public void writeTo(Content.Sink sink, boolean last, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("writeTo {} -> {} {} {}", this, sink, last, callback);
checkNotReleased();
_aggregate = null;
switch (_buffers.size())
{
Expand Down
Loading