From a4251eab6988d6cf4f5e35681fbe2c1b0abe48ef Mon Sep 17 00:00:00 2001 From: steinman Date: Fri, 6 Nov 2020 10:50:59 -0800 Subject: [PATCH] Refactor WorkRequestHandler to be an interface, of which Proto is one implementation. This lays the foundation for WorkRequestHandler to be used for JSON workers as well. RELNOTES: None. PiperOrigin-RevId: 341078345 --- .../build/buildjar/BazelJavaBuilder.java | 14 +++- .../google/devtools/build/lib/worker/BUILD | 1 + .../worker/ProtoWorkerMessageProcessor.java | 54 +++++++++++++ .../build/lib/worker/WorkRequestHandler.java | 79 +++++++++++-------- .../lib/worker/WorkRequestHandlerTest.java | 36 ++++++--- 5 files changed, 138 insertions(+), 46 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java diff --git a/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java b/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java index 6642dd2dc269f5..ae78c446e35319 100644 --- a/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java +++ b/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java @@ -23,6 +23,7 @@ import com.google.devtools.build.buildjar.javac.plugins.BlazeJavaCompilerPlugin; import com.google.devtools.build.buildjar.javac.plugins.dependency.DependencyModule; import com.google.devtools.build.buildjar.javac.plugins.errorprone.ErrorPronePlugin; +import com.google.devtools.build.lib.worker.ProtoWorkerMessageProcessor; import com.google.devtools.build.lib.worker.WorkRequestHandler; import java.io.IOException; import java.io.OutputStreamWriter; @@ -41,8 +42,17 @@ public class BazelJavaBuilder { public static void main(String[] args) { BazelJavaBuilder builder = new BazelJavaBuilder(); if (args.length == 1 && args[0].equals("--persistent_worker")) { - WorkRequestHandler workerHandler = new WorkRequestHandler(builder::parseAndBuild); - System.exit(workerHandler.processRequests(System.in, System.out, System.err)); + WorkRequestHandler workerHandler = + new WorkRequestHandler( + builder::parseAndBuild, + System.err, + new ProtoWorkerMessageProcessor(System.in, System.out)); + try { + workerHandler.processRequests(); + } catch (IOException e) { + System.err.println(e.getMessage()); + System.exit(1); + } } else { PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.err, Charset.defaultCharset())); diff --git a/src/main/java/com/google/devtools/build/lib/worker/BUILD b/src/main/java/com/google/devtools/build/lib/worker/BUILD index 6188dcfde22349..4262f426c0db30 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/BUILD +++ b/src/main/java/com/google/devtools/build/lib/worker/BUILD @@ -50,6 +50,7 @@ java_library( java_library( name = "work_request_handlers", srcs = [ + "ProtoWorkerMessageProcessor.java", "WorkRequestHandler.java", ], deps = [ diff --git a/src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java b/src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java new file mode 100644 index 00000000000000..182695d2a2b58f --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java @@ -0,0 +1,54 @@ +// Copyright 2020 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.worker; + +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** Implementation of the Worker Protocol using Proto to communicate with Bazel. */ +public final class ProtoWorkerMessageProcessor + implements WorkRequestHandler.WorkerMessageProcessor { + + /** This worker's stdin. */ + private final InputStream stdin; + + /** This worker's stdout. Only {@link WorkRequest}s should be written here. */ + private final OutputStream stdout; + + /** Constructs a {@link WorkRequestHandler} that reads and writes Protocol Buffers. */ + public ProtoWorkerMessageProcessor(InputStream stdin, OutputStream stdout) { + this.stdin = stdin; + this.stdout = stdout; + } + + @Override + public WorkRequest readWorkRequest() throws IOException { + return WorkRequest.parseDelimitedFrom(stdin); + } + + @Override + public void writeWorkResponse(WorkResponse workResponse) throws IOException { + try { + workResponse.writeDelimitedTo(stdout); + } finally { + stdout.flush(); + } + } + + @Override + public void close() {} +} diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java index c34de02d0f3721..2f5deb4a3bbfbc 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java @@ -17,7 +17,6 @@ import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; import java.io.IOException; -import java.io.InputStream; import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringWriter; @@ -29,59 +28,76 @@ * (https://docs.bazel.build/versions/master/persistent-workers.html), including multiplex workers * (https://docs.bazel.build/versions/master/multiplex-worker.html). */ -public class WorkRequestHandler { +public class WorkRequestHandler implements AutoCloseable { + /** Contains the logic for reading {@link WorkRequest}s and writing {@link WorkResponse}s. */ + public interface WorkerMessageProcessor { + /** Reads the next incoming request from this worker's stdin. */ + public WorkRequest readWorkRequest() throws IOException; + + /** + * Writes the provided {@link WorkResponse} to this worker's stdout. This function is also + * responsible for flushing the stdout. + */ + public void writeWorkResponse(WorkResponse workResponse) throws IOException; + + /** Clean up. */ + public void close() throws IOException; + } + + /** The function to be called after each {@link WorkRequest} is read. */ private final BiFunction, PrintWriter, Integer> callback; + /** This worker's stderr. */ + private final PrintStream stderr; + + private final WorkerMessageProcessor messageProcessor; + /** * Creates a {@code WorkRequestHandler} that will call {@code callback} for each WorkRequest * received. The first argument to {@code callback} is the set of command-line arguments, the - * second is where all error messages and similar should be written to. + * second is where all error messages and similar should be written to. The callback should return + * an exit code indicating success (0) or failure (nonzero). */ - public WorkRequestHandler(BiFunction, PrintWriter, Integer> callback) { + public WorkRequestHandler( + BiFunction, PrintWriter, Integer> callback, + PrintStream stderr, + WorkerMessageProcessor messageProcessor) { this.callback = callback; + this.stderr = stderr; + this.messageProcessor = messageProcessor; } /** - * Runs an infinite loop of reading {@code WorkRequest} from {@code in}, running the callback, - * then writing the corresponding {@code WorkResponse} to {@code out}. If there is an error + * Runs an infinite loop of reading {@link WorkRequest} from {@code in}, running the callback, + * then writing the corresponding {@link WorkResponse} to {@code out}. If there is an error * reading or writing the requests or responses, it writes an error message on {@code err} and * returns. If {@code in} reaches EOF, it also returns. - * - * @return 0 if we reached EOF, 1 if there was an error. */ - public int processRequests(InputStream in, PrintStream out, PrintStream err) { + public void processRequests() throws IOException { while (true) { - try { - WorkRequest request = WorkRequest.parseDelimitedFrom(in); - + WorkRequest request = messageProcessor.readWorkRequest(); if (request == null) { break; } - if (request.getRequestId() != 0) { - Thread t = createResponseThread(request, out, err); + Thread t = createResponseThread(request); t.start(); } else { - respondToRequest(request, out); - } - } catch (IOException e) { - e.printStackTrace(err); - return 1; + respondToRequest(request); } } - return 0; } - /** Creates a new {@code Thread} to process a multiplex request. */ - public Thread createResponseThread(WorkRequest request, PrintStream out, PrintStream err) { + /** Creates a new {@link Thread} to process a multiplex request. */ + public Thread createResponseThread(WorkRequest request) { Thread currentThread = Thread.currentThread(); return new Thread( () -> { try { - respondToRequest(request, out); + respondToRequest(request); } catch (IOException e) { - e.printStackTrace(err); + e.printStackTrace(stderr); // In case of error, shut down the entire worker. currentThread.interrupt(); } @@ -89,9 +105,9 @@ public Thread createResponseThread(WorkRequest request, PrintStream out, PrintSt "multiplex-request-" + request.getRequestId()); } - /** Responds to {@code request}, writing the {@code WorkResponse} proto to {@code out}. */ + /** Handles and responds to the given {@link WorkRequest}. */ @VisibleForTesting - void respondToRequest(WorkRequest request, PrintStream out) throws IOException { + void respondToRequest(WorkRequest request) throws IOException { try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { int exitCode; @@ -109,14 +125,13 @@ void respondToRequest(WorkRequest request, PrintStream out) throws IOException { .setRequestId(request.getRequestId()) .build(); synchronized (this) { - workResponse.writeDelimitedTo(out); + messageProcessor.writeWorkResponse(workResponse); } } - out.flush(); + } - // Hint to the system that now would be a good time to run a gc. After a compile - // completes lots of objects should be available for collection and it should be cheap to - // collect them. - System.gc(); + @Override + public void close() throws IOException { + messageProcessor.close(); } } diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java index 5aebafe10b6e6f..1975b03a6c357c 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java @@ -41,12 +41,16 @@ public void init() { @Test public void testNormalWorkRequest() throws IOException { - WorkRequestHandler handler = new WorkRequestHandler((args, err) -> 1); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + WorkRequestHandler handler = + new WorkRequestHandler( + (args, err) -> 1, + new PrintStream(new ByteArrayOutputStream()), + new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out)); List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - handler.respondToRequest(request, new PrintStream(out)); + handler.respondToRequest(request); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -57,12 +61,16 @@ public void testNormalWorkRequest() throws IOException { @Test public void testMultiplexWorkRequest() throws IOException { - WorkRequestHandler handler = new WorkRequestHandler((args, err) -> 0); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + WorkRequestHandler handler = + new WorkRequestHandler( + (args, err) -> 0, + new PrintStream(new ByteArrayOutputStream()), + new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out)); List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).setRequestId(42).build(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - handler.respondToRequest(request, new PrintStream(out)); + handler.respondToRequest(request); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -73,17 +81,19 @@ public void testMultiplexWorkRequest() throws IOException { @Test public void testOutput() throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); WorkRequestHandler handler = new WorkRequestHandler( (args, err) -> { err.println("Failed!"); return 1; - }); + }, + new PrintStream(new ByteArrayOutputStream()), + new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out)); List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - handler.respondToRequest(request, new PrintStream(out)); + handler.respondToRequest(request); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -94,16 +104,18 @@ public void testOutput() throws IOException { @Test public void testException() throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); WorkRequestHandler handler = new WorkRequestHandler( (args, err) -> { throw new RuntimeException("Exploded!"); - }); + }, + new PrintStream(new ByteArrayOutputStream()), + new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out)); List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - handler.respondToRequest(request, new PrintStream(out)); + handler.respondToRequest(request); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));