diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java index 9f6e3c62516c06..faecc1439f522d 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java @@ -24,7 +24,12 @@ import java.io.StringWriter; import java.lang.management.ManagementFactory; import java.time.Duration; +import java.util.ArrayDeque; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -34,22 +39,47 @@ * (https://docs.bazel.build/versions/master/multiplex-worker.html). */ public class WorkRequestHandler implements AutoCloseable { - /** Contains the logic for reading {@link WorkRequest}s and writing {@link WorkResponse}s. */ public interface WorkerMessageProcessor { /** Reads the next incoming request from this worker's stdin. */ - public WorkRequest readWorkRequest() throws IOException; + WorkRequest readWorkRequest() throws IOException; /** * Writes the provided {@link WorkResponse} to this worker's stdout. This function is also * responsible for flushing the stdout. */ - public void writeWorkResponse(WorkResponse workResponse) throws IOException; + void writeWorkResponse(WorkResponse workResponse) throws IOException; /** Clean up. */ - public void close() throws IOException; + void close() throws IOException; + } + + /** Holds information necessary to properly handle a request, especially for cancellation. */ + static class RequestInfo { + /** + * The builder for the response to this request. Since only one response must be sent per + * request, this builder must be accessed through takeBuilder(), which zeroes this field and + * returns the builder. + */ + private WorkResponse.Builder responseBuilder = WorkResponse.newBuilder(); + + /** + * Returns the response builder. If called more than once on the same instance, subsequent calls + * will return {@code null}. + */ + synchronized Optional takeBuilder() { + WorkResponse.Builder b = responseBuilder; + responseBuilder = null; + return Optional.ofNullable(b); + } } + /** Requests that are currently being processed. Visible for testing. */ + final Map activeRequests = new ConcurrentHashMap<>(); + + /** WorkRequests that have been received but could not be processed yet. */ + private final Queue availableRequests = new ArrayDeque<>(); + /** The function to be called after each {@link WorkRequest} is read. */ private final BiFunction, PrintWriter, Integer> callback; @@ -58,6 +88,7 @@ public interface WorkerMessageProcessor { final WorkerMessageProcessor messageProcessor; + private final CpuTimeBasedGcScheduler gcScheduler; /** @@ -160,34 +191,61 @@ public void processRequests() throws IOException { if (request == null) { break; } - if (request.getRequestId() != 0) { - Thread t = createResponseThread(request); - t.start(); - } else { - respondToRequest(request); + availableRequests.add(request); + startRequestThreads(); + } + } + + /** + * Starts threads for as many outstanding requests as possible. This is the only method that adds + * to {@code activeRequests}. + */ + private synchronized void startRequestThreads() { + while (!availableRequests.isEmpty()) { + // If there's a singleplex request in process, don't start more processes. + if (activeRequests.containsKey(0)) { + return; } + WorkRequest request = availableRequests.peek(); + // Don't start new singleplex requests if there are other requests running. + if (request.getRequestId() == 0 && !activeRequests.isEmpty()) { + return; + } + availableRequests.remove(); + Thread t = createResponseThread(request); + activeRequests.put(request.getRequestId(), new RequestInfo()); + t.start(); } } /** Creates a new {@link Thread} to process a multiplex request. */ - public Thread createResponseThread(WorkRequest request) { + Thread createResponseThread(WorkRequest request) { Thread currentThread = Thread.currentThread(); + String threadName = + request.getRequestId() > 0 + ? "multiplex-request-" + request.getRequestId() + : "singleplex-request"; return new Thread( () -> { + RequestInfo requestInfo = activeRequests.get(request.getRequestId()); try { - respondToRequest(request); + respondToRequest(request, requestInfo); } catch (IOException e) { e.printStackTrace(stderr); // In case of error, shut down the entire worker. currentThread.interrupt(); + } finally { + activeRequests.remove(request.getRequestId()); + // A good time to start more requests, especially if we finished a singleplex request + startRequestThreads(); } }, - "multiplex-request-" + request.getRequestId()); + threadName); } /** Handles and responds to the given {@link WorkRequest}. */ @VisibleForTesting - void respondToRequest(WorkRequest request) throws IOException { + void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOException { try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { int exitCode; @@ -198,14 +256,15 @@ void respondToRequest(WorkRequest request) throws IOException { exitCode = 1; } pw.flush(); - WorkResponse workResponse = - WorkResponse.newBuilder() - .setOutput(sw.toString()) - .setExitCode(exitCode) - .setRequestId(request.getRequestId()) - .build(); - synchronized (this) { - messageProcessor.writeWorkResponse(workResponse); + Optional optBuilder = requestInfo.takeBuilder(); + if (optBuilder.isPresent()) { + WorkResponse.Builder builder = optBuilder.get(); + builder.setRequestId(request.getRequestId()); + builder.setOutput(builder.getOutput() + sw.toString()).setExitCode(exitCode); + WorkResponse response = builder.build(); + synchronized (this) { + messageProcessor.writeWorkResponse(response); + } } gcScheduler.maybePerformGc(); } diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java index ad70a18d262d89..6190bbc2e7bee5 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java @@ -98,7 +98,7 @@ public void processRequests() throws IOException { Thread t = createResponseThread(request); t.start(); } else { - respondToRequest(request); + respondToRequest(request, new RequestInfo()); } if (workerOptions.exitAfter > 0 && workUnitCounter > workerOptions.exitAfter) { System.exit(0); diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java index 1975b03a6c357c..cd3da8d4a55514 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java @@ -16,6 +16,7 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.devtools.build.lib.worker.WorkRequestHandler.RequestInfo; import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; import java.io.ByteArrayInputStream; @@ -50,7 +51,7 @@ public void testNormalWorkRequest() throws IOException { List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - handler.respondToRequest(request); + handler.respondToRequest(request, new RequestInfo()); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -70,7 +71,7 @@ public void testMultiplexWorkRequest() throws IOException { List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).setRequestId(42).build(); - handler.respondToRequest(request); + handler.respondToRequest(request, new RequestInfo()); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -93,7 +94,7 @@ public void testOutput() throws IOException { List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - handler.respondToRequest(request); + handler.respondToRequest(request, new RequestInfo()); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -115,7 +116,7 @@ public void testException() throws IOException { List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - handler.respondToRequest(request); + handler.respondToRequest(request, new RequestInfo()); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));