Skip to content

Commit

Permalink
#12214 add read starvation test to ee* envs
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Oct 16, 2024
1 parent 21fbc35 commit 4b557e4
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,24 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.ee10.servlet.DefaultServlet;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
Expand All @@ -49,10 +57,13 @@
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -67,6 +78,85 @@ public void dispose() throws Exception
_server.stop();
}

@Test
public void testReadStarvation() throws Exception
{
int maxThreads = 5;
int clients = maxThreads + 2;
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);

ServerConnector connector = new ServerConnector(_server, 1, 1);
_server.addConnector(connector);

ServletContextHandler context = new ServletContextHandler("/");
context.addServlet(new ServletHolder(new HttpServlet() {
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException
{
IO.copy(req.getInputStream(), resp.getOutputStream());
}
}), "/*");
_server.setHandler(context);

_server.start();

ExecutorService clientExecutors = Executors.newFixedThreadPool(clients);

List<Callable<String>> clientTasks = new ArrayList<>();

for (int i = 0; i < clients; i++)
{
clientTasks.add(() ->
{
try (Socket client = new Socket("localhost", connector.getLocalPort());
OutputStream out = client.getOutputStream();
InputStream in = client.getInputStream())
{
client.setSoTimeout(10000);

String request =
"PUT / HTTP/1.0\r\n" +
"host: localhost\r\n" +
"content-length: 10\r\n" +
"\r\n" +
"1";

// Write partial request
out.write(request.getBytes(StandardCharsets.UTF_8));
out.flush();

// Finish Request
Thread.sleep(1500);
out.write(("234567890\r\n").getBytes(StandardCharsets.UTF_8));
out.flush();

// Read Response
String response = IO.toString(in);
assertEquals(-1, in.read());
return response;
}
});
}

try
{
List<Future<String>> responses = clientExecutors.invokeAll(clientTasks, 60, TimeUnit.SECONDS);

for (Future<String> responseFut : responses)
{
String response = responseFut.get();
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
}
}
finally
{
clientExecutors.shutdownNow();
}
}

@Test
public void testDefaultServletSuccess() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,24 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.ee9.servlet.DefaultServlet;
import org.eclipse.jetty.ee9.servlet.ServletContextHandler;
import org.eclipse.jetty.ee9.servlet.ServletHolder;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
Expand All @@ -49,10 +57,13 @@
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -67,6 +78,85 @@ public void dispose() throws Exception
_server.stop();
}

@Test
public void testReadStarvation() throws Exception
{
int maxThreads = 5;
int clients = maxThreads + 2;
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);

ServerConnector connector = new ServerConnector(_server, 1, 1);
_server.addConnector(connector);

ServletContextHandler context = new ServletContextHandler(_server, "/");
context.addServlet(new ServletHolder(new HttpServlet() {
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException
{
IO.copy(req.getInputStream(), resp.getOutputStream());
}
}), "/*");
_server.setHandler(context);

_server.start();

ExecutorService clientExecutors = Executors.newFixedThreadPool(clients);

List<Callable<String>> clientTasks = new ArrayList<>();

for (int i = 0; i < clients; i++)
{
clientTasks.add(() ->
{
try (Socket client = new Socket("localhost", connector.getLocalPort());
OutputStream out = client.getOutputStream();
InputStream in = client.getInputStream())
{
client.setSoTimeout(10000);

String request =
"PUT / HTTP/1.0\r\n" +
"host: localhost\r\n" +
"content-length: 10\r\n" +
"\r\n" +
"1";

// Write partial request
out.write(request.getBytes(StandardCharsets.UTF_8));
out.flush();

// Finish Request
Thread.sleep(1500);
out.write(("234567890\r\n").getBytes(StandardCharsets.UTF_8));
out.flush();

// Read Response
String response = IO.toString(in);
assertEquals(-1, in.read());
return response;
}
});
}

try
{
List<Future<String>> responses = clientExecutors.invokeAll(clientTasks, 60, TimeUnit.SECONDS);

for (Future<String> responseFut : responses)
{
String response = responseFut.get();
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
}
}
finally
{
clientExecutors.shutdownNow();
}
}

@Test
public void testDefaultServletSuccess() throws Exception
{
Expand Down

0 comments on commit 4b557e4

Please sign in to comment.