From b8887fb3fc82d9a86b360e3382c460ff10a65f23 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Tue, 8 Oct 2024 16:22:35 +0200 Subject: [PATCH] #12214 restore ee9 and ee10 thread starvation tests Signed-off-by: Ludovic Orban --- .../ee10/servlets/ThreadStarvationTest.java | 306 +++++++-------- .../ee9/servlets/ThreadStarvationTest.java | 350 +++++++++++------- 2 files changed, 376 insertions(+), 280 deletions(-) diff --git a/jetty-ee10/jetty-ee10-servlets/src/test/java/org/eclipse/jetty/ee10/servlets/ThreadStarvationTest.java b/jetty-ee10/jetty-ee10-servlets/src/test/java/org/eclipse/jetty/ee10/servlets/ThreadStarvationTest.java index 4715a470fe20..0971d5a274a0 100644 --- a/jetty-ee10/jetty-ee10-servlets/src/test/java/org/eclipse/jetty/ee10/servlets/ThreadStarvationTest.java +++ b/jetty-ee10/jetty-ee10-servlets/src/test/java/org/eclipse/jetty/ee10/servlets/ThreadStarvationTest.java @@ -13,35 +13,44 @@ package org.eclipse.jetty.ee10.servlets; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import jakarta.servlet.ServletException; import org.eclipse.jetty.ee10.servlet.DefaultServlet; import org.eclipse.jetty.ee10.servlet.ServletContextHandler; +import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -59,7 +68,6 @@ public void dispose() throws Exception } @Test - @Tag("flaky") public void testDefaultServletSuccess() throws Exception { int maxThreads = 6; @@ -68,10 +76,10 @@ public void testDefaultServletSuccess() throws Exception _server = new Server(threadPool); // Prepare a big file to download. - File directory = MavenTestingUtils.getTargetTestingDir(); - Files.createDirectories(directory.toPath()); + Path directory = MavenTestingUtils.getTargetTestingPath(); + Files.createDirectories(directory); String resourceName = "resource.bin"; - Path resourcePath = Paths.get(directory.getPath(), resourceName); + Path resourcePath = directory.resolve(resourceName); try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { byte[] chunk = new byte[256 * 1024]; @@ -105,8 +113,8 @@ protected void onIncompleteFlush() _server.addConnector(connector); ServletContextHandler context = new ServletContextHandler("/"); - context.setBaseResourceAsPath(directory.toPath()); - + context.setBaseResourceAsPath(directory); + //TODO: Uses DefaultServlet, currently all commented out context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false); _server.setHandler(context); @@ -223,172 +231,168 @@ public void run() } } - //TODO needs visibility of server.internal.HttpChannelState - /* @Test + @Test public void testFailureStarvation() throws Exception { - try (StacklessLogging stackless = new StacklessLogging(HttpChannelState.class)) + int acceptors = 0; + int selectors = 1; + int maxThreads = 10; + final int barried = maxThreads - acceptors - selectors * 2; + final CyclicBarrier barrier = new CyclicBarrier(barried); + + QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads); + threadPool.setDetailedDump(true); + _server = new Server(threadPool); + + ServerConnector connector = new ServerConnector(_server, acceptors, selectors) { - int acceptors = 0; - int selectors = 1; - int maxThreads = 10; - final int barried = maxThreads - acceptors - selectors * 2; - final CyclicBarrier barrier = new CyclicBarrier(barried); - - QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads); - threadPool.setDetailedDump(true); - _server = new Server(threadPool); - - ServerConnector connector = new ServerConnector(_server, acceptors, selectors) + @Override + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) { - @Override - protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) + return new SocketChannelEndPoint(channel, selectSet, key, getScheduler()) { - return new SocketChannelEndPoint(channel, selectSet, key, getScheduler()) + @Override + public boolean flush(ByteBuffer... buffers) throws IOException { - @Override - public boolean flush(ByteBuffer... buffers) throws IOException - { - super.flush(buffers[0]); - throw new IOException("TEST FAILURE"); - } - }; - } - }; - connector.setIdleTimeout(Long.MAX_VALUE); - _server.addConnector(connector); - - final AtomicInteger count = new AtomicInteger(0); - class TheHandler extends Handler.Abstract + super.flush(buffers[0]); + throw new IOException("TEST FAILURE"); + } + }; + } + }; + connector.setIdleTimeout(Long.MAX_VALUE); + _server.addConnector(connector); + + final AtomicInteger count = new AtomicInteger(0); + class TheHandler extends Handler.Abstract + { + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception { - @Override - public boolean handle(request request, Response response, Callback callback) throws Exception + int c = count.getAndIncrement(); + try { - int c = count.getAndIncrement(); - try - { - if (c < barried) - { - barrier.await(10, TimeUnit.SECONDS); - } - } - catch (InterruptedException | BrokenBarrierException | TimeoutException e) + if (c < barried) { - throw new ServletException(e); + barrier.await(10, TimeUnit.SECONDS); } - - response.setStatus(200); - response.setContentLength(13); - response.write(true, callback, "Hello World!\n"); - return true; } + catch (InterruptedException | BrokenBarrierException | TimeoutException e) + { + throw new ServletException(e); + } + + response.setStatus(200); + response.getHeaders().put(HttpHeader.CONTENT_LENGTH, 13L); + response.write(true, BufferUtil.toBuffer("Hello World!\n"), callback); + return true; } - - _server.setHandler(new TheHandler()); - - _server.start(); - - List sockets = new ArrayList<>(); - for (int i = 0; i < maxThreads * 2; ++i) - { - Socket socket = new Socket("localhost", connector.getLocalPort()); - sockets.add(socket); - OutputStream output = socket.getOutputStream(); - String request = - "GET / HTTP/1.1\r\n" + - "Host: localhost\r\n" + - // "Connection: close\r\n" + - "\r\n"; - output.write(request.getBytes(StandardCharsets.UTF_8)); - output.flush(); - } - - byte[] buffer = new byte[48 * 1024]; - List> totals = new ArrayList<>(); - for (Socket socket : sockets) + } + + _server.setHandler(new TheHandler()); + + _server.start(); + + List sockets = new ArrayList<>(); + for (int i = 0; i < maxThreads * 2; ++i) + { + Socket socket = new Socket("localhost", connector.getLocalPort()); + sockets.add(socket); + OutputStream output = socket.getOutputStream(); + String request = + "GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + // "Connection: close\r\n" + + "\r\n"; + output.write(request.getBytes(StandardCharsets.UTF_8)); + output.flush(); + } + + byte[] buffer = new byte[48 * 1024]; + List> totals = new ArrayList<>(); + for (Socket socket : sockets) + { + final Exchanger x = new Exchanger<>(); + totals.add(x); + final InputStream input = socket.getInputStream(); + + new Thread() { - final Exchanger x = new Exchanger<>(); - totals.add(x); - final InputStream input = socket.getInputStream(); - - new Thread() + @Override + public void run() { - @Override - public void run() + int read = 0; + try { - int read = 0; - try + // look for CRLFCRLF + StringBuilder header = new StringBuilder(); + int state = 0; + while (state < 4 && header.length() < 2048) { - // look for CRLFCRLF - StringBuilder header = new StringBuilder(); - int state = 0; - while (state < 4 && header.length() < 2048) + int ch = input.read(); + if (ch < 0) + break; + header.append((char)ch); + switch (state) { - int ch = input.read(); - if (ch < 0) + case 0: + if (ch == '\r') + state = 1; + break; + case 1: + if (ch == '\n') + state = 2; + else + state = 0; + break; + case 2: + if (ch == '\r') + state = 3; + else + state = 0; + break; + case 3: + if (ch == '\n') + state = 4; + else + state = 0; break; - header.append((char)ch); - switch (state) - { - case 0: - if (ch == '\r') - state = 1; - break; - case 1: - if (ch == '\n') - state = 2; - else - state = 0; - break; - case 2: - if (ch == '\r') - state = 3; - else - state = 0; - break; - case 3: - if (ch == '\n') - state = 4; - else - state = 0; - break; - } } - - read = input.read(buffer); } - catch (IOException e) + + read = input.read(buffer); + } + catch (IOException e) + { + e.printStackTrace(); + } + finally + { + try { - // e.printStackTrace(); + x.exchange(read); } - finally + catch (InterruptedException e) { - try - { - x.exchange(read); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } + e.printStackTrace(); } } - }.start(); - } - - for (Exchanger x : totals) - { - Integer read = x.exchange(-1, 10, TimeUnit.SECONDS); - assertEquals(-1, read.intValue()); - } - - // We could read everything, good. - for (Socket socket : sockets) - { - socket.close(); - } - - _server.stop(); + } + }.start(); + } + + for (Exchanger x : totals) + { + Integer read = x.exchange(-1, 10, TimeUnit.SECONDS); + assertEquals(-1, read.intValue()); } - }*/ + + // We could read everything, good. + for (Socket socket : sockets) + { + socket.close(); + } + + _server.stop(); + } } diff --git a/jetty-ee9/jetty-ee9-servlets/src/test/java/org/eclipse/jetty/ee9/servlets/ThreadStarvationTest.java b/jetty-ee9/jetty-ee9-servlets/src/test/java/org/eclipse/jetty/ee9/servlets/ThreadStarvationTest.java index dd5a3524a8c9..2b64a9ce7198 100644 --- a/jetty-ee9/jetty-ee9-servlets/src/test/java/org/eclipse/jetty/ee9/servlets/ThreadStarvationTest.java +++ b/jetty-ee9/jetty-ee9-servlets/src/test/java/org/eclipse/jetty/ee9/servlets/ThreadStarvationTest.java @@ -13,7 +13,6 @@ package org.eclipse.jetty.ee9.servlets; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -24,43 +23,37 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import jakarta.servlet.ServletException; import org.eclipse.jetty.ee9.servlet.DefaultServlet; import org.eclipse.jetty.ee9.servlet.ServletContextHandler; import org.eclipse.jetty.http.HttpHeader; -import org.eclipse.jetty.http.HttpTester; -import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; -import org.eclipse.jetty.logging.StacklessLogging; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class ThreadStarvationTest @@ -75,7 +68,6 @@ public void dispose() throws Exception } @Test - @Tag("flaky") public void testDefaultServletSuccess() throws Exception { int maxThreads = 6; @@ -84,10 +76,10 @@ public void testDefaultServletSuccess() throws Exception _server = new Server(threadPool); // Prepare a big file to download. - File directory = MavenTestingUtils.getTargetTestingDir(); - Files.createDirectories(directory.toPath()); + Path directory = MavenTestingUtils.getTargetTestingPath(); + Files.createDirectories(directory); String resourceName = "resource.bin"; - Path resourcePath = Paths.get(directory.getPath(), resourceName); + Path resourcePath = directory.resolve(resourceName); try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { byte[] chunk = new byte[256 * 1024]; @@ -100,7 +92,7 @@ public void testDefaultServletSuccess() throws Exception } } - CountDownLatch writePending = new CountDownLatch(1); + final CountDownLatch writePending = new CountDownLatch(1); ServerConnector connector = new ServerConnector(_server, 0, 1) { @Override @@ -121,7 +113,7 @@ protected void onIncompleteFlush() _server.addConnector(connector); ServletContextHandler context = new ServletContextHandler(_server, "/"); - context.setResourceBase(directory.toURI().toString()); + context.setResourceBase(directory.toUri().toString()); context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false); _server.setHandler(context); @@ -144,37 +136,90 @@ protected void onIncompleteFlush() // Wait for a thread on the servlet to block. assertTrue(writePending.await(5, TimeUnit.SECONDS)); - ExecutorService executor = Executors.newCachedThreadPool(); - long expected = Files.size(resourcePath); - List> totals = new ArrayList<>(); + byte[] buffer = new byte[48 * 1024]; + List> totals = new ArrayList<>(); for (Socket socket : sockets) { - InputStream input = socket.getInputStream(); - totals.add(CompletableFuture.supplyAsync(() -> + final Exchanger x = new Exchanger<>(); + totals.add(x); + final InputStream input = socket.getInputStream(); + + new Thread() { - try - { - HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(input)); - if (response != null) - return response.getContentBytes().length; - return 0; - } - catch (IOException x) + @Override + public void run() { - x.printStackTrace(); - return -1; + long total = 0; + try + { + // look for CRLFCRLF + StringBuilder header = new StringBuilder(); + int state = 0; + while (state < 4 && header.length() < 2048) + { + int ch = input.read(); + if (ch < 0) + break; + header.append((char)ch); + switch (state) + { + case 0: + if (ch == '\r') + state = 1; + break; + case 1: + if (ch == '\n') + state = 2; + else + state = 0; + break; + case 2: + if (ch == '\r') + state = 3; + else + state = 0; + break; + case 3: + if (ch == '\n') + state = 4; + else + state = 0; + break; + } + } + + while (total < expected) + { + int read = input.read(buffer); + if (read < 0) + break; + total += read; + } + } + catch (IOException e) + { + e.printStackTrace(); + } + finally + { + try + { + x.exchange(total); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } } - }, executor)); + }.start(); } - // Wait for all responses to arrive. - CompletableFuture.allOf(totals.toArray(new CompletableFuture[0])).get(20, TimeUnit.SECONDS); - - for (CompletableFuture total : totals) + for (Exchanger x : totals) { - assertFalse(total.isCompletedExceptionally()); - assertEquals(expected, total.get().intValue()); + Long total = x.exchange(-1L, 10000, TimeUnit.SECONDS); + assertEquals(expected, total.longValue()); } // We could read everything, good. @@ -182,123 +227,170 @@ protected void onIncompleteFlush() { socket.close(); } - - executor.shutdown(); - - _server.stop(); } @Test - @Tag("flaky") public void testFailureStarvation() throws Exception { - Logger serverInternalLogger = LoggerFactory.getLogger("org.eclipse.jetty.server.internal"); - try (StacklessLogging ignored = new StacklessLogging(serverInternalLogger)) - { - int acceptors = 0; - int selectors = 1; - int maxThreads = 10; - int parties = maxThreads - acceptors - selectors * 2; - CyclicBarrier barrier = new CyclicBarrier(parties); + int acceptors = 0; + int selectors = 1; + int maxThreads = 10; + final int barried = maxThreads - acceptors - selectors * 2; + final CyclicBarrier barrier = new CyclicBarrier(barried); - QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads); - threadPool.setDetailedDump(true); - _server = new Server(threadPool); + QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads); + threadPool.setDetailedDump(true); + _server = new Server(threadPool); - ServerConnector connector = new ServerConnector(_server, acceptors, selectors) + ServerConnector connector = new ServerConnector(_server, acceptors, selectors) + { + @Override + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) { - @Override - protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) + return new SocketChannelEndPoint(channel, selectSet, key, getScheduler()) { - return new SocketChannelEndPoint(channel, selectSet, key, getScheduler()) + @Override + public boolean flush(ByteBuffer... buffers) throws IOException { - @Override - public boolean flush(ByteBuffer... buffers) throws IOException - { - // Write only the headers, then throw. - super.flush(buffers[0]); - throw new IOException("thrown by test"); - } - }; - } - }; - connector.setIdleTimeout(Long.MAX_VALUE); - _server.addConnector(connector); + super.flush(buffers[0]); + throw new IOException("TEST FAILURE"); + } + }; + } + }; + connector.setIdleTimeout(Long.MAX_VALUE); + _server.addConnector(connector); - AtomicInteger count = new AtomicInteger(0); - _server.setHandler(new Handler.Abstract() + final AtomicInteger count = new AtomicInteger(0); + class TheHandler extends Handler.Abstract + { + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception { - @Override - public boolean handle(Request request, Response response, Callback callback) throws Exception + int c = count.getAndIncrement(); + try { - int c = count.getAndIncrement(); - if (c < parties) + if (c < barried) + { barrier.await(10, TimeUnit.SECONDS); - response.setStatus(200); - response.getHeaders().put(HttpHeader.CONTENT_LENGTH, 13); - Content.Sink.write(response, true, "Hello World!\n", callback); - return true; + } + } + catch (InterruptedException | BrokenBarrierException | TimeoutException e) + { + throw new ServletException(e); } - }); - - _server.start(); - List sockets = new ArrayList<>(); - for (int i = 0; i < maxThreads * 2; ++i) - { - Socket socket = new Socket("localhost", connector.getLocalPort()); - sockets.add(socket); - OutputStream output = socket.getOutputStream(); - String request = """ - GET / HTTP/1.1\r - Host: localhost\r - \r - """; - output.write(request.getBytes(StandardCharsets.UTF_8)); - output.flush(); + response.setStatus(200); + response.getHeaders().put(HttpHeader.CONTENT_LENGTH, 13L); + response.write(true, BufferUtil.toBuffer("Hello World!\n"), callback); + return true; } + } + + _server.setHandler(new TheHandler()); + + _server.start(); + + List sockets = new ArrayList<>(); + for (int i = 0; i < maxThreads * 2; ++i) + { + Socket socket = new Socket("localhost", connector.getLocalPort()); + sockets.add(socket); + OutputStream output = socket.getOutputStream(); + String request = + "GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + // "Connection: close\r\n" + + "\r\n"; + output.write(request.getBytes(StandardCharsets.UTF_8)); + output.flush(); + } - ExecutorService executor = Executors.newCachedThreadPool(); + byte[] buffer = new byte[48 * 1024]; + List> totals = new ArrayList<>(); + for (Socket socket : sockets) + { + final Exchanger x = new Exchanger<>(); + totals.add(x); + final InputStream input = socket.getInputStream(); - List> totals = new ArrayList<>(); - for (Socket socket : sockets) + new Thread() { - InputStream input = socket.getInputStream(); - totals.add(CompletableFuture.supplyAsync(() -> + @Override + public void run() { + int read = 0; try { - HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(input)); - if (response != null) - return response.getContentBytes().length; - return input.read(); + // look for CRLFCRLF + StringBuilder header = new StringBuilder(); + int state = 0; + while (state < 4 && header.length() < 2048) + { + int ch = input.read(); + if (ch < 0) + break; + header.append((char)ch); + switch (state) + { + case 0: + if (ch == '\r') + state = 1; + break; + case 1: + if (ch == '\n') + state = 2; + else + state = 0; + break; + case 2: + if (ch == '\r') + state = 3; + else + state = 0; + break; + case 3: + if (ch == '\n') + state = 4; + else + state = 0; + break; + } + } + + read = input.read(buffer); } - catch (Exception x) + catch (IOException e) { - x.printStackTrace(); - return -1; + e.printStackTrace(); } - }, executor)); - } - - // Wait for all responses to arrive. - CompletableFuture.allOf(totals.toArray(new CompletableFuture[0])).get(20, TimeUnit.SECONDS); - - for (CompletableFuture total : totals) - { - assertFalse(total.isCompletedExceptionally()); - assertEquals(-1, total.get().intValue()); - } - - // We could read everything, good. - for (Socket socket : sockets) - { - socket.close(); - } + finally + { + try + { + x.exchange(read); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + } + }.start(); + } - executor.shutdown(); + for (Exchanger x : totals) + { + Integer read = x.exchange(-1, 10, TimeUnit.SECONDS); + assertEquals(-1, read.intValue()); + } - _server.stop(); + // We could read everything, good. + for (Socket socket : sockets) + { + socket.close(); } + + _server.stop(); } }