From a5f2813acf3e31aeb3037d80a6f9d7fddf76a1c8 Mon Sep 17 00:00:00 2001 From: Brentley Jones Date: Wed, 9 Feb 2022 07:26:05 -0600 Subject: [PATCH] Remote: Only waits for background tasks from remote execution. (#14752) We added the block waiting behaviour after each command in remote module to wait for background uploads when introducing async upload. However, not all background uploads should be waited, e.g. uploads from BES module but with flag `--bes_upload_mode=fully_async`. This PR updates remote module so that only uploads initiated by remote module are waited after the command. This also enable us to implement something like `--remote_upload_mode=fully_async` in the future. Fixes #14620. Closes #14634. PiperOrigin-RevId: 424296966 (cherry picked from commit 3836ad029f202ca13c64c9f07e4568ea8ab2d9a6) Co-authored-by: Chi Wang --- .../build/lib/remote/RemoteExecutionService.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index b16be3bbd4c0cf..c09934743d9036 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -142,6 +142,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -164,6 +165,7 @@ public class RemoteExecutionService { @Nullable private final Path captureCorruptedOutputsDir; private final Cache merkleTreeCache; private final Set reportedErrors = new HashSet<>(); + private final Phaser backgroundTaskPhaser = new Phaser(1); private final Scheduler scheduler; @@ -1160,13 +1162,18 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult) .subscribe( new SingleObserver() { @Override - public void onSubscribe(@NonNull Disposable d) {} + public void onSubscribe(@NonNull Disposable d) { + backgroundTaskPhaser.register(); + } @Override - public void onSuccess(@NonNull ActionResult actionResult) {} + public void onSuccess(@NonNull ActionResult actionResult) { + backgroundTaskPhaser.arriveAndDeregister(); + } @Override public void onError(@NonNull Throwable e) { + backgroundTaskPhaser.arriveAndDeregister(); reportUploadError(e); } }); @@ -1300,7 +1307,7 @@ public void shutdown() { remoteCache.release(); try { - remoteCache.awaitTermination(); + backgroundTaskPhaser.awaitAdvanceInterruptibly(backgroundTaskPhaser.arrive()); } catch (InterruptedException e) { buildInterrupted.set(true); remoteCache.shutdownNow();