Skip to content

Commit

Permalink
Allow use of JSON protocol in multiplex workers.
Browse files Browse the repository at this point in the history
RELNOTES: Multiplex persistent workers can now use the JSON protocol.
PiperOrigin-RevId: 352415016
  • Loading branch information
larsrc-google authored and copybara-github committed Jan 18, 2021
1 parent 80c03ef commit 003cfcd
Showing 1 changed file with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class WorkerMultiplexer {
* {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed.
*/
private Subprocess process;
/** The implementation of the worker protocol (JSON or Proto). */
private WorkerProtocolImpl workerProtocol;
/** InputStream from the worker process. */
private RecordingInputStream recordingStream;
/** True if this multiplexer was explicitly destroyed. */
Expand Down Expand Up @@ -142,6 +144,18 @@ public synchronized void createProcess(Path workDir) throws IOException {
processBuilder.setStderr(logFile.getPathFile());
processBuilder.setEnv(workerKey.getEnv());
this.process = processBuilder.start();
recordingStream = new RecordingInputStream(process.getInputStream());
recordingStream.startRecording(4096);
if (workerProtocol == null) {
switch (workerKey.getProtocolFormat()) {
case JSON:
workerProtocol = new JsonWorkerProtocol(process.getOutputStream(), recordingStream);
break;
case PROTO:
workerProtocol = new ProtoWorkerProtocol(process.getOutputStream(), recordingStream);
break;
}
}
String id = workerKey.getMnemonic() + "-" + workerKey.hashCode();
// TODO(larsrc): Consider moving sender/receiver threads into separate classes.
this.requestSender =
Expand Down Expand Up @@ -277,8 +291,7 @@ private boolean sendRequest() {
return false;
}
try {
request.writeDelimitedTo(process.getOutputStream());
process.getOutputStream().flush();
workerProtocol.putRequest(request);
} catch (IOException e) {
// We can't know how much of the request was sent, so we have to assume the worker's input
// now contains garbage, and this request is lost.
Expand All @@ -303,11 +316,9 @@ private boolean sendRequest() {
* execution cancellation, but only by a call to {@link #destroyProcess()}.
*/
private boolean readResponse() {
recordingStream = new RecordingInputStream(process.getInputStream());
recordingStream.startRecording(4096);
WorkResponse parsedResponse;
try {
parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
parsedResponse = workerProtocol.getResponse();
} catch (IOException e) {
if (!(e instanceof InterruptedIOException)) {
report(
Expand All @@ -320,7 +331,8 @@ private boolean readResponse() {
destroyProcess();
return false;
}
// A null parsedResponse can happen if the input stream is closed, in which case we

// A null parsedResponse can only happen if the input stream is closed, in which case we
// drop everything.
if (parsedResponse == null) {
report(
Expand Down

0 comments on commit 003cfcd

Please sign in to comment.