Skip to content

Commit

Permalink
Issue #6470 - remove ByteBufferPool usage from MessageInputStream
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Jun 30, 2021
1 parent f045b5a commit 5c20c42
Showing 1 changed file with 4 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,11 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.common.WebSocketSession;

/**
* Support class for reading a (single) WebSocket BINARY message via a InputStream.
Expand All @@ -45,7 +42,6 @@ public class MessageInputStream extends InputStream implements MessageAppender
private static final ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer();

private final Session session;
private final ByteBufferPool bufferPool;
private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
private final long timeoutMs;
private ByteBuffer activeBuffer = null;
Expand Down Expand Up @@ -84,7 +80,6 @@ public MessageInputStream(Session session, int timeoutMs)
{
this.timeoutMs = timeoutMs;
this.session = session;
this.bufferPool = (session instanceof WebSocketSession) ? ((WebSocketSession)session).getBufferPool() : new NullByteBufferPool();
}

@Override
Expand Down Expand Up @@ -156,16 +151,6 @@ public void close()
if (remainingContent)
LOG.warn("MessageInputStream closed without fully consuming content {}", session);


// Release any buffers taken from the pool.
if (activeBuffer != null && activeBuffer != EOF)
bufferPool.release(activeBuffer);

for (ByteBuffer buffer : buffers)
{
bufferPool.release(buffer);
}

activeBuffer = null;
buffers.clear();
state = State.CLOSED;
Expand Down Expand Up @@ -258,8 +243,6 @@ public int read(byte[] b, int off, int len) throws IOException
SuspendToken resume = null;
synchronized (this)
{
// Release buffer back to pool.
bufferPool.release(activeBuffer);
activeBuffer = null;

switch (state)
Expand Down Expand Up @@ -325,6 +308,9 @@ public boolean markSupported()

private ByteBuffer acquire(int capacity, boolean direct)
{
return bufferPool.acquire(capacity, direct);
if (direct)
return BufferUtil.allocateDirect(capacity);
else
return BufferUtil.allocate(capacity);
}
}

0 comments on commit 5c20c42

Please sign in to comment.