From 3836ad029f202ca13c64c9f07e4568ea8ab2d9a6 Mon Sep 17 00:00:00 2001 From: Chi Wang Date: Wed, 26 Jan 2022 02:27:16 -0800 Subject: [PATCH] Remote: Only waits for background tasks from remote execution. We added the block waiting behaviour after each command in remote module to wait for background uploads when introducing async upload. However, not all background uploads should be waited, e.g. uploads from BES module but with flag `--bes_upload_mode=fully_async`. This PR updates remote module so that only uploads initiated by remote module are waited after the command. This also enable us to implement something like `--remote_upload_mode=fully_async` in the future. Fixes #14620. Closes #14634. PiperOrigin-RevId: 424296966 --- .../build/lib/remote/RemoteExecutionService.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index 0dfcedb90e743d..cf1f8938b9e1dc 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -142,6 +142,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -164,6 +165,7 @@ public class RemoteExecutionService { @Nullable private final Path captureCorruptedOutputsDir; private final Cache merkleTreeCache; private final Set reportedErrors = new HashSet<>(); + private final Phaser backgroundTaskPhaser = new Phaser(1); private final Scheduler scheduler; @@ -1162,13 +1164,18 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult) .subscribe( new SingleObserver() { @Override - public void onSubscribe(@NonNull Disposable d) {} + public void onSubscribe(@NonNull Disposable d) { + backgroundTaskPhaser.register(); + } @Override - public void onSuccess(@NonNull ActionResult actionResult) {} + public void onSuccess(@NonNull ActionResult actionResult) { + backgroundTaskPhaser.arriveAndDeregister(); + } @Override public void onError(@NonNull Throwable e) { + backgroundTaskPhaser.arriveAndDeregister(); reportUploadError(e); } }); @@ -1302,7 +1309,7 @@ public void shutdown() { remoteCache.release(); try { - remoteCache.awaitTermination(); + backgroundTaskPhaser.awaitAdvanceInterruptibly(backgroundTaskPhaser.arrive()); } catch (InterruptedException e) { buildInterrupted.set(true); remoteCache.shutdownNow();