Skip to content

Commit

Permalink
Makes singleplex requests be handled in separate threads in WorkReque…
Browse files Browse the repository at this point in the history
…stHandler.

Preparation for cancellation.

RELNOTES: None.
PiperOrigin-RevId: 372134373
  • Loading branch information
larsrc-google authored and copybara-github committed May 5, 2021
1 parent ebe3a40 commit 1a519bb
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

Expand All @@ -34,22 +39,47 @@
* (https://docs.bazel.build/versions/master/multiplex-worker.html).
*/
public class WorkRequestHandler implements AutoCloseable {

/** Contains the logic for reading {@link WorkRequest}s and writing {@link WorkResponse}s. */
public interface WorkerMessageProcessor {
/** Reads the next incoming request from this worker's stdin. */
public WorkRequest readWorkRequest() throws IOException;
WorkRequest readWorkRequest() throws IOException;

/**
* Writes the provided {@link WorkResponse} to this worker's stdout. This function is also
* responsible for flushing the stdout.
*/
public void writeWorkResponse(WorkResponse workResponse) throws IOException;
void writeWorkResponse(WorkResponse workResponse) throws IOException;

/** Clean up. */
public void close() throws IOException;
void close() throws IOException;
}

/** Holds information necessary to properly handle a request, especially for cancellation. */
static class RequestInfo {
/**
* The builder for the response to this request. Since only one response must be sent per
* request, this builder must be accessed through takeBuilder(), which zeroes this field and
* returns the builder.
*/
private WorkResponse.Builder responseBuilder = WorkResponse.newBuilder();

/**
* Returns the response builder. If called more than once on the same instance, subsequent calls
* will return {@code null}.
*/
synchronized Optional<WorkResponse.Builder> takeBuilder() {
WorkResponse.Builder b = responseBuilder;
responseBuilder = null;
return Optional.ofNullable(b);
}
}

/** Requests that are currently being processed. Visible for testing. */
final Map<Integer, RequestInfo> activeRequests = new ConcurrentHashMap<>();

/** WorkRequests that have been received but could not be processed yet. */
private final Queue<WorkRequest> availableRequests = new ArrayDeque<>();

/** The function to be called after each {@link WorkRequest} is read. */
private final BiFunction<List<String>, PrintWriter, Integer> callback;

Expand All @@ -58,6 +88,7 @@ public interface WorkerMessageProcessor {

final WorkerMessageProcessor messageProcessor;


private final CpuTimeBasedGcScheduler gcScheduler;

/**
Expand Down Expand Up @@ -160,34 +191,61 @@ public void processRequests() throws IOException {
if (request == null) {
break;
}
if (request.getRequestId() != 0) {
Thread t = createResponseThread(request);
t.start();
} else {
respondToRequest(request);
availableRequests.add(request);
startRequestThreads();
}
}

/**
* Starts threads for as many outstanding requests as possible. This is the only method that adds
* to {@code activeRequests}.
*/
private synchronized void startRequestThreads() {
while (!availableRequests.isEmpty()) {
// If there's a singleplex request in process, don't start more processes.
if (activeRequests.containsKey(0)) {
return;
}
WorkRequest request = availableRequests.peek();
// Don't start new singleplex requests if there are other requests running.
if (request.getRequestId() == 0 && !activeRequests.isEmpty()) {
return;
}
availableRequests.remove();
Thread t = createResponseThread(request);
activeRequests.put(request.getRequestId(), new RequestInfo());
t.start();
}
}

/** Creates a new {@link Thread} to process a multiplex request. */
public Thread createResponseThread(WorkRequest request) {
Thread createResponseThread(WorkRequest request) {
Thread currentThread = Thread.currentThread();
String threadName =
request.getRequestId() > 0
? "multiplex-request-" + request.getRequestId()
: "singleplex-request";
return new Thread(
() -> {
RequestInfo requestInfo = activeRequests.get(request.getRequestId());
try {
respondToRequest(request);
respondToRequest(request, requestInfo);
} catch (IOException e) {
e.printStackTrace(stderr);
// In case of error, shut down the entire worker.
currentThread.interrupt();
} finally {
activeRequests.remove(request.getRequestId());
// A good time to start more requests, especially if we finished a singleplex request
startRequestThreads();
}
},
"multiplex-request-" + request.getRequestId());
threadName);
}

/** Handles and responds to the given {@link WorkRequest}. */
@VisibleForTesting
void respondToRequest(WorkRequest request) throws IOException {
void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOException {
try (StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
int exitCode;
Expand All @@ -198,14 +256,15 @@ void respondToRequest(WorkRequest request) throws IOException {
exitCode = 1;
}
pw.flush();
WorkResponse workResponse =
WorkResponse.newBuilder()
.setOutput(sw.toString())
.setExitCode(exitCode)
.setRequestId(request.getRequestId())
.build();
synchronized (this) {
messageProcessor.writeWorkResponse(workResponse);
Optional<WorkResponse.Builder> optBuilder = requestInfo.takeBuilder();
if (optBuilder.isPresent()) {
WorkResponse.Builder builder = optBuilder.get();
builder.setRequestId(request.getRequestId());
builder.setOutput(builder.getOutput() + sw.toString()).setExitCode(exitCode);
WorkResponse response = builder.build();
synchronized (this) {
messageProcessor.writeWorkResponse(response);
}
}
gcScheduler.maybePerformGc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void processRequests() throws IOException {
Thread t = createResponseThread(request);
t.start();
} else {
respondToRequest(request);
respondToRequest(request, new RequestInfo());
}
if (workerOptions.exitAfter > 0 && workUnitCounter > workerOptions.exitAfter) {
System.exit(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.google.common.truth.Truth.assertThat;

import com.google.devtools.build.lib.worker.WorkRequestHandler.RequestInfo;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -50,7 +51,7 @@ public void testNormalWorkRequest() throws IOException {

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
handler.respondToRequest(request);
handler.respondToRequest(request, new RequestInfo());

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand All @@ -70,7 +71,7 @@ public void testMultiplexWorkRequest() throws IOException {

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).setRequestId(42).build();
handler.respondToRequest(request);
handler.respondToRequest(request, new RequestInfo());

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand All @@ -93,7 +94,7 @@ public void testOutput() throws IOException {

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
handler.respondToRequest(request);
handler.respondToRequest(request, new RequestInfo());

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand All @@ -115,7 +116,7 @@ public void testException() throws IOException {

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
handler.respondToRequest(request);
handler.respondToRequest(request, new RequestInfo());

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand Down

0 comments on commit 1a519bb

Please sign in to comment.