diff --git a/jetty-ee10/jetty-ee10-servlets/src/test/java/org/eclipse/jetty/ee10/servlets/ThreadStarvationTest.java b/jetty-ee10/jetty-ee10-servlets/src/test/java/org/eclipse/jetty/ee10/servlets/ThreadStarvationTest.java index 0971d5a274a..3c5ddc6bcfc 100644 --- a/jetty-ee10/jetty-ee10-servlets/src/test/java/org/eclipse/jetty/ee10/servlets/ThreadStarvationTest.java +++ b/jetty-ee10/jetty-ee10-servlets/src/test/java/org/eclipse/jetty/ee10/servlets/ThreadStarvationTest.java @@ -28,16 +28,24 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; import org.eclipse.jetty.ee10.servlet.DefaultServlet; import org.eclipse.jetty.ee10.servlet.ServletContextHandler; +import org.eclipse.jetty.ee10.servlet.ServletHolder; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; @@ -49,10 +57,13 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -67,6 +78,85 @@ public void dispose() throws Exception _server.stop(); } + @Test + public void testReadStarvation() throws Exception + { + int maxThreads = 5; + int clients = maxThreads + 2; + QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads); + threadPool.setDetailedDump(true); + _server = new Server(threadPool); + + ServerConnector connector = new ServerConnector(_server, 1, 1); + _server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler("/"); + context.addServlet(new ServletHolder(new HttpServlet() { + @Override + protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + IO.copy(req.getInputStream(), resp.getOutputStream()); + } + }), "/*"); + _server.setHandler(context); + + _server.start(); + + ExecutorService clientExecutors = Executors.newFixedThreadPool(clients); + + List> clientTasks = new ArrayList<>(); + + for (int i = 0; i < clients; i++) + { + clientTasks.add(() -> + { + try (Socket client = new Socket("localhost", connector.getLocalPort()); + OutputStream out = client.getOutputStream(); + InputStream in = client.getInputStream()) + { + client.setSoTimeout(10000); + + String request = + "PUT / HTTP/1.0\r\n" + + "host: localhost\r\n" + + "content-length: 10\r\n" + + "\r\n" + + "1"; + + // Write partial request + out.write(request.getBytes(StandardCharsets.UTF_8)); + out.flush(); + + // Finish Request + Thread.sleep(1500); + out.write(("234567890\r\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + // Read Response + String response = IO.toString(in); + assertEquals(-1, in.read()); + return response; + } + }); + } + + try + { + List> responses = clientExecutors.invokeAll(clientTasks, 60, TimeUnit.SECONDS); + + for (Future responseFut : responses) + { + String response = responseFut.get(); + assertThat(response, containsString("200 OK")); + assertThat(response, containsString("Read Input 10")); + } + } + finally + { + clientExecutors.shutdownNow(); + } + } + @Test public void testDefaultServletSuccess() throws Exception { diff --git a/jetty-ee9/jetty-ee9-servlets/src/test/java/org/eclipse/jetty/ee9/servlets/ThreadStarvationTest.java b/jetty-ee9/jetty-ee9-servlets/src/test/java/org/eclipse/jetty/ee9/servlets/ThreadStarvationTest.java index 2b64a9ce719..da30254d2e9 100644 --- a/jetty-ee9/jetty-ee9-servlets/src/test/java/org/eclipse/jetty/ee9/servlets/ThreadStarvationTest.java +++ b/jetty-ee9/jetty-ee9-servlets/src/test/java/org/eclipse/jetty/ee9/servlets/ThreadStarvationTest.java @@ -28,16 +28,24 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; import org.eclipse.jetty.ee9.servlet.DefaultServlet; import org.eclipse.jetty.ee9.servlet.ServletContextHandler; +import org.eclipse.jetty.ee9.servlet.ServletHolder; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; @@ -49,10 +57,13 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -67,6 +78,85 @@ public void dispose() throws Exception _server.stop(); } + @Test + public void testReadStarvation() throws Exception + { + int maxThreads = 5; + int clients = maxThreads + 2; + QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads); + threadPool.setDetailedDump(true); + _server = new Server(threadPool); + + ServerConnector connector = new ServerConnector(_server, 1, 1); + _server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(_server, "/"); + context.addServlet(new ServletHolder(new HttpServlet() { + @Override + protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException + { + IO.copy(req.getInputStream(), resp.getOutputStream()); + } + }), "/*"); + _server.setHandler(context); + + _server.start(); + + ExecutorService clientExecutors = Executors.newFixedThreadPool(clients); + + List> clientTasks = new ArrayList<>(); + + for (int i = 0; i < clients; i++) + { + clientTasks.add(() -> + { + try (Socket client = new Socket("localhost", connector.getLocalPort()); + OutputStream out = client.getOutputStream(); + InputStream in = client.getInputStream()) + { + client.setSoTimeout(10000); + + String request = + "PUT / HTTP/1.0\r\n" + + "host: localhost\r\n" + + "content-length: 10\r\n" + + "\r\n" + + "1"; + + // Write partial request + out.write(request.getBytes(StandardCharsets.UTF_8)); + out.flush(); + + // Finish Request + Thread.sleep(1500); + out.write(("234567890\r\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + // Read Response + String response = IO.toString(in); + assertEquals(-1, in.read()); + return response; + } + }); + } + + try + { + List> responses = clientExecutors.invokeAll(clientTasks, 60, TimeUnit.SECONDS); + + for (Future responseFut : responses) + { + String response = responseFut.get(); + assertThat(response, containsString("200 OK")); + assertThat(response, containsString("Read Input 10")); + } + } + finally + { + clientExecutors.shutdownNow(); + } + } + @Test public void testDefaultServletSuccess() throws Exception {