Skip to content

Commit

Permalink
#12214 restore ee9 and ee10 thread starvation tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Oct 16, 2024
1 parent a8a52be commit b8887fb
Show file tree
Hide file tree
Showing 2 changed files with 376 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,44 @@

package org.eclipse.jetty.ee10.servlets;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.servlet.ServletException;
import org.eclipse.jetty.ee10.servlet.DefaultServlet;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -59,7 +68,6 @@ public void dispose() throws Exception
}

@Test
@Tag("flaky")
public void testDefaultServletSuccess() throws Exception
{
int maxThreads = 6;
Expand All @@ -68,10 +76,10 @@ public void testDefaultServletSuccess() throws Exception
_server = new Server(threadPool);

// Prepare a big file to download.
File directory = MavenTestingUtils.getTargetTestingDir();
Files.createDirectories(directory.toPath());
Path directory = MavenTestingUtils.getTargetTestingPath();
Files.createDirectories(directory);
String resourceName = "resource.bin";
Path resourcePath = Paths.get(directory.getPath(), resourceName);
Path resourcePath = directory.resolve(resourceName);
try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
{
byte[] chunk = new byte[256 * 1024];
Expand Down Expand Up @@ -105,8 +113,8 @@ protected void onIncompleteFlush()
_server.addConnector(connector);

ServletContextHandler context = new ServletContextHandler("/");
context.setBaseResourceAsPath(directory.toPath());
context.setBaseResourceAsPath(directory);

//TODO: Uses DefaultServlet, currently all commented out
context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false);
_server.setHandler(context);
Expand Down Expand Up @@ -223,172 +231,168 @@ public void run()
}
}

//TODO needs visibility of server.internal.HttpChannelState
/* @Test
@Test
public void testFailureStarvation() throws Exception
{
try (StacklessLogging stackless = new StacklessLogging(HttpChannelState.class))
int acceptors = 0;
int selectors = 1;
int maxThreads = 10;
final int barried = maxThreads - acceptors - selectors * 2;
final CyclicBarrier barrier = new CyclicBarrier(barried);

QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);

ServerConnector connector = new ServerConnector(_server, acceptors, selectors)
{
int acceptors = 0;
int selectors = 1;
int maxThreads = 10;
final int barried = maxThreads - acceptors - selectors * 2;
final CyclicBarrier barrier = new CyclicBarrier(barried);
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
ServerConnector connector = new ServerConnector(_server, acceptors, selectors)
@Override
protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key)
{
@Override
protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key)
return new SocketChannelEndPoint(channel, selectSet, key, getScheduler())
{
return new SocketChannelEndPoint(channel, selectSet, key, getScheduler())
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
super.flush(buffers[0]);
throw new IOException("TEST FAILURE");
}
};
}
};
connector.setIdleTimeout(Long.MAX_VALUE);
_server.addConnector(connector);
final AtomicInteger count = new AtomicInteger(0);
class TheHandler extends Handler.Abstract
super.flush(buffers[0]);
throw new IOException("TEST FAILURE");
}
};
}
};
connector.setIdleTimeout(Long.MAX_VALUE);
_server.addConnector(connector);

final AtomicInteger count = new AtomicInteger(0);
class TheHandler extends Handler.Abstract
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
@Override
public boolean handle(request request, Response response, Callback callback) throws Exception
int c = count.getAndIncrement();
try
{
int c = count.getAndIncrement();
try
{
if (c < barried)
{
barrier.await(10, TimeUnit.SECONDS);
}
}
catch (InterruptedException | BrokenBarrierException | TimeoutException e)
if (c < barried)
{
throw new ServletException(e);
barrier.await(10, TimeUnit.SECONDS);
}
response.setStatus(200);
response.setContentLength(13);
response.write(true, callback, "Hello World!\n");
return true;
}
catch (InterruptedException | BrokenBarrierException | TimeoutException e)
{
throw new ServletException(e);
}

response.setStatus(200);
response.getHeaders().put(HttpHeader.CONTENT_LENGTH, 13L);
response.write(true, BufferUtil.toBuffer("Hello World!\n"), callback);
return true;
}
_server.setHandler(new TheHandler());
_server.start();
List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads * 2; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request =
"GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
// "Connection: close\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
}
byte[] buffer = new byte[48 * 1024];
List<Exchanger<Integer>> totals = new ArrayList<>();
for (Socket socket : sockets)
}

_server.setHandler(new TheHandler());

_server.start();

List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads * 2; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request =
"GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
// "Connection: close\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
}

byte[] buffer = new byte[48 * 1024];
List<Exchanger<Integer>> totals = new ArrayList<>();
for (Socket socket : sockets)
{
final Exchanger<Integer> x = new Exchanger<>();
totals.add(x);
final InputStream input = socket.getInputStream();

new Thread()
{
final Exchanger<Integer> x = new Exchanger<>();
totals.add(x);
final InputStream input = socket.getInputStream();
new Thread()
@Override
public void run()
{
@Override
public void run()
int read = 0;
try
{
int read = 0;
try
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state = 0;
while (state < 4 && header.length() < 2048)
{
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state = 0;
while (state < 4 && header.length() < 2048)
int ch = input.read();
if (ch < 0)
break;
header.append((char)ch);
switch (state)
{
int ch = input.read();
if (ch < 0)
case 0:
if (ch == '\r')
state = 1;
break;
case 1:
if (ch == '\n')
state = 2;
else
state = 0;
break;
case 2:
if (ch == '\r')
state = 3;
else
state = 0;
break;
case 3:
if (ch == '\n')
state = 4;
else
state = 0;
break;
header.append((char)ch);
switch (state)
{
case 0:
if (ch == '\r')
state = 1;
break;
case 1:
if (ch == '\n')
state = 2;
else
state = 0;
break;
case 2:
if (ch == '\r')
state = 3;
else
state = 0;
break;
case 3:
if (ch == '\n')
state = 4;
else
state = 0;
break;
}
}
read = input.read(buffer);
}
catch (IOException e)

read = input.read(buffer);
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
try
{
// e.printStackTrace();
x.exchange(read);
}
finally
catch (InterruptedException e)
{
try
{
x.exchange(read);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
e.printStackTrace();
}
}
}.start();
}
for (Exchanger<Integer> x : totals)
{
Integer read = x.exchange(-1, 10, TimeUnit.SECONDS);
assertEquals(-1, read.intValue());
}
// We could read everything, good.
for (Socket socket : sockets)
{
socket.close();
}
_server.stop();
}
}.start();
}

for (Exchanger<Integer> x : totals)
{
Integer read = x.exchange(-1, 10, TimeUnit.SECONDS);
assertEquals(-1, read.intValue());
}
}*/

// We could read everything, good.
for (Socket socket : sockets)
{
socket.close();
}

_server.stop();
}
}
Loading

0 comments on commit b8887fb

Please sign in to comment.