Skip to content

Commit

Permalink
Fixes #10217 - Review ProxyConnectionFactory buffer management. (#10225)
Browse files Browse the repository at this point in the history
Fixed buffer leak in ProxyConnection classes.
Introduced ArrayByteBufferPool.Tracking to test buffer leaks.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet authored Aug 16, 2023
1 parent 8f6a38a commit 17c3649
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
Expand All @@ -33,6 +35,7 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand All @@ -46,15 +49,18 @@

public class HttpClientProxyProtocolTest
{
private ArrayByteBufferPool.Tracking serverBufferPool;
private Server server;
private ServerConnector connector;
private ArrayByteBufferPool.Tracking clientBufferPool;
private HttpClient client;

private void startServer(Handler handler) throws Exception
{
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
serverBufferPool = new ArrayByteBufferPool.Tracking();
server = new Server(serverThreads, null, serverBufferPool);
HttpConnectionFactory http = new HttpConnectionFactory();
ProxyConnectionFactory proxy = new ProxyConnectionFactory(http.getProtocol());
connector = new ServerConnector(server, 1, 1, proxy, http);
Expand All @@ -67,18 +73,22 @@ private void startClient() throws Exception
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
clientBufferPool = new ArrayByteBufferPool.Tracking();
client = new HttpClient();
client.setExecutor(clientThreads);
client.setByteBufferPool(clientBufferPool);
client.start();
}

@AfterEach
public void dispose() throws Exception
{
if (server != null)
server.stop();
if (client != null)
client.stop();
LifeCycle.stop(client);
LifeCycle.stop(server);
Set<ArrayByteBufferPool.Tracking.Buffer> serverLeaks = serverBufferPool.getLeaks();
assertEquals(0, serverLeaks.size(), serverBufferPool.dumpLeaks());
Set<ArrayByteBufferPool.Tracking.Buffer> clientLeaks = clientBufferPool.getLeaks();
assertEquals(0, clientLeaks.size(), clientBufferPool.dumpLeaks());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.io.ArrayByteBufferPool$Tracking.LEVEL=DEBUG
#org.eclipse.jetty.io.SocketChannelEndPoint.LEVEL=DEBUG
#org.eclipse.jetty.io.ssl.LEVEL=DEBUG
#org.eclipse.jetty.http.LEVEL=DEBUG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
package org.eclipse.jetty.io;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;

import org.eclipse.jetty.io.internal.CompoundPool;
import org.eclipse.jetty.io.internal.QueuedPool;
Expand Down Expand Up @@ -564,4 +570,112 @@ public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHe
);
}
}

/**
* <p>A variant of {@link ArrayByteBufferPool} that tracks buffer
* acquires/releases, useful to identify buffer leaks.</p>
* <p>Use {@link #getLeaks()} when the system is idle to get
* the {@link Buffer}s that have been leaked, which contain
* the stack trace information of where the buffer was acquired.</p>
*/
public static class Tracking extends ArrayByteBufferPool
{
private static final Logger LOG = LoggerFactory.getLogger(Tracking.class);

private final Set<Buffer> buffers = ConcurrentHashMap.newKeySet();

public Tracking()
{
this(0, -1, Integer.MAX_VALUE);
}

public Tracking(int minCapacity, int maxCapacity, int maxBucketSize)
{
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
}

public Tracking(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, -1, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}

@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
RetainableByteBuffer buffer = super.acquire(size, direct);
Buffer wrapper = new Buffer(buffer, size);
if (LOG.isDebugEnabled())
LOG.debug("acquired {}", wrapper);
buffers.add(wrapper);
return wrapper;
}

public Set<Buffer> getLeaks()
{
return buffers;
}

public String dumpLeaks()
{
return getLeaks().stream()
.map(Buffer::dump)
.collect(Collectors.joining(System.lineSeparator()));
}

public class Buffer extends RetainableByteBuffer.Wrapper
{
private final int size;
private final Instant acquireInstant;
private final Throwable acquireStack;

private Buffer(RetainableByteBuffer wrapped, int size)
{
super(wrapped);
this.size = size;
this.acquireInstant = Instant.now();
this.acquireStack = new Throwable();
}

public int getSize()
{
return size;
}

public Instant getAcquireInstant()
{
return acquireInstant;
}

public Throwable getAcquireStack()
{
return acquireStack;
}

@Override
public boolean release()
{
boolean released = super.release();
if (released)
{
buffers.remove(this);
if (LOG.isDebugEnabled())
LOG.debug("released {}", this);
}
return released;
}

public String dump()
{
StringWriter w = new StringWriter();
getAcquireStack().printStackTrace(new PrintWriter(w));
return "%s of %d bytes on %s at %s".formatted(getClass().getSimpleName(), getSize(), getAcquireInstant(), w);
}

@Override
public String toString()
{
return "%s@%x[%s]".formatted(getClass().getSimpleName(), hashCode(), super.toString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* <p>This factory can be placed in front of any other connection factory
* to process the proxy v1 or v2 line before the normal protocol handling</p>
*
* @see <a href="http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt">http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt</a>
* @see <a href="https://www.haproxy.org/download/2.8/doc/proxy-protocol.txt">PROXY protocol</a>
*/
public class ProxyConnectionFactory extends DetectorConnectionFactory
{
Expand Down Expand Up @@ -245,6 +245,7 @@ public ByteBuffer onUpgradeFrom()
_buffer.release();
return unconsumed;
}
_buffer.release();
return null;
}

Expand Down Expand Up @@ -564,6 +565,7 @@ public ByteBuffer onUpgradeFrom()
_buffer.release();
return unconsumed;
}
_buffer.release();
return null;
}

Expand Down Expand Up @@ -591,7 +593,7 @@ private void parseBodyAndUpgrade() throws IOException
SocketAddress remote;
switch (_family)
{
case INET:
case INET ->
{
byte[] addr = new byte[4];
byteBuffer.get(addr);
Expand All @@ -602,9 +604,8 @@ private void parseBodyAndUpgrade() throws IOException
int dstPort = byteBuffer.getChar();
local = new InetSocketAddress(dstAddr, dstPort);
remote = new InetSocketAddress(srcAddr, srcPort);
break;
}
case INET6:
case INET6 ->
{
byte[] addr = new byte[16];
byteBuffer.get(addr);
Expand All @@ -615,9 +616,8 @@ private void parseBodyAndUpgrade() throws IOException
int dstPort = byteBuffer.getChar();
local = new InetSocketAddress(dstAddr, dstPort);
remote = new InetSocketAddress(srcAddr, srcPort);
break;
}
case UNIX:
case UNIX ->
{
byte[] addr = new byte[108];
byteBuffer.get(addr);
Expand All @@ -626,12 +626,8 @@ private void parseBodyAndUpgrade() throws IOException
String dst = UnixDomain.toPath(addr);
local = UnixDomain.newSocketAddress(dst);
remote = UnixDomain.newSocketAddress(src);
break;
}
default:
{
throw new IllegalStateException("Unsupported family " + _family);
}
default -> throw new IllegalStateException("Unsupported family " + _family);
}
proxyEndPoint = new ProxyEndPoint(endPoint, local, remote);

Expand Down Expand Up @@ -714,37 +710,20 @@ private void parseHeader() throws IOException
int transportAndFamily = 0xFF & byteBuffer.get();
switch (transportAndFamily >> 4)
{
case 0:
_family = Family.UNSPEC;
break;
case 1:
_family = Family.INET;
break;
case 2:
_family = Family.INET6;
break;
case 3:
_family = Family.UNIX;
break;
default:
throw new IOException("Proxy v2 bad PROXY family");
case 0 -> _family = Family.UNSPEC;
case 1 -> _family = Family.INET;
case 2 -> _family = Family.INET6;
case 3 -> _family = Family.UNIX;
default -> throw new IOException("Proxy v2 bad PROXY family");
}

Transport transport;
switch (transportAndFamily & 0xF)
Transport transport = switch (transportAndFamily & 0xF)
{
case 0:
transport = Transport.UNSPEC;
break;
case 1:
transport = Transport.STREAM;
break;
case 2:
transport = Transport.DGRAM;
break;
default:
throw new IOException("Proxy v2 bad PROXY family");
}
case 0 -> Transport.UNSPEC;
case 1 -> Transport.STREAM;
case 2 -> Transport.DGRAM;
default -> throw new IOException("Proxy v2 bad PROXY family");
};

_length = byteBuffer.getChar();

Expand All @@ -761,6 +740,8 @@ private void parseHeader() throws IOException

private void releaseAndClose()
{
if (LOG.isDebugEnabled())
LOG.debug("Proxy v2 releasing buffer and closing");
_buffer.release();
close();
}
Expand Down
Loading

0 comments on commit 17c3649

Please sign in to comment.