diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java index 63b17d91e70891..5731d42292520f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java +++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java @@ -21,7 +21,6 @@ import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable; import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture; import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer; -import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -45,6 +44,7 @@ import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.util.AsyncTaskCache; +import com.google.devtools.build.lib.remote.util.RxUtils; import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult; import com.google.devtools.build.lib.remote.util.TempPathGenerator; import com.google.devtools.build.lib.vfs.FileSystemUtils; @@ -53,6 +53,7 @@ import com.google.devtools.build.lib.vfs.PathFragment; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -311,8 +312,7 @@ public ListenableFuture prefetchFiles( Flowable transfers = Flowable.fromIterable(files) - .flatMapSingle( - input -> toTransferResult(prefetchFile(dirCtx, metadataSupplier, input, priority))); + .flatMapSingle(input -> prefetchFile(dirCtx, metadataSupplier, input, priority)); Completable prefetch = Completable.using( @@ -321,48 +321,53 @@ public ListenableFuture prefetchFiles( return toListenableFuture(prefetch); } - private Completable prefetchFile( + private Single prefetchFile( DirectoryContext dirCtx, MetadataSupplier metadataSupplier, ActionInput input, - Priority priority) - throws IOException, InterruptedException { - if (input instanceof VirtualActionInput) { - prefetchVirtualActionInput((VirtualActionInput) input); - return Completable.complete(); - } + Priority priority) { + try { + if (input instanceof VirtualActionInput) { + prefetchVirtualActionInput((VirtualActionInput) input); + return Single.just(TransferResult.ok()); + } - PathFragment execPath = input.getExecPath(); + PathFragment execPath = input.getExecPath(); - FileArtifactValue metadata = metadataSupplier.getMetadata(input); - if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) { - return Completable.complete(); - } + FileArtifactValue metadata = metadataSupplier.getMetadata(input); + if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) { + return Single.just(TransferResult.ok()); + } - @Nullable Symlink symlink = maybeGetSymlink(input, metadata, metadataSupplier); + @Nullable Symlink symlink = maybeGetSymlink(input, metadata, metadataSupplier); - if (symlink != null) { - checkState(execPath.startsWith(symlink.getLinkExecPath())); - execPath = - symlink.getTargetExecPath().getRelative(execPath.relativeTo(symlink.getLinkExecPath())); - } + if (symlink != null) { + checkState(execPath.startsWith(symlink.getLinkExecPath())); + execPath = + symlink.getTargetExecPath().getRelative(execPath.relativeTo(symlink.getLinkExecPath())); + } - @Nullable PathFragment treeRootExecPath = maybeGetTreeRoot(input, metadataSupplier); + @Nullable PathFragment treeRootExecPath = maybeGetTreeRoot(input, metadataSupplier); - Completable result = - downloadFileNoCheckRx( - dirCtx, - execRoot.getRelative(execPath), - treeRootExecPath != null ? execRoot.getRelative(treeRootExecPath) : null, - input, - metadata, - priority); + Completable result = + downloadFileNoCheckRx( + dirCtx, + execRoot.getRelative(execPath), + treeRootExecPath != null ? execRoot.getRelative(treeRootExecPath) : null, + input, + metadata, + priority); - if (symlink != null) { - result = result.andThen(plantSymlink(symlink)); - } + if (symlink != null) { + result = result.andThen(plantSymlink(symlink)); + } - return result; + return RxUtils.toTransferResult(result); + } catch (IOException e) { + return Single.just(TransferResult.error(e)); + } catch (InterruptedException e) { + return Single.just(TransferResult.interrupted()); + } } /** diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/RxUtils.java b/src/main/java/com/google/devtools/build/lib/remote/util/RxUtils.java index a12a975f7827fe..c5aa657a61e66b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/RxUtils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/RxUtils.java @@ -55,7 +55,7 @@ public static TransferResult error(IOException error) { /** Returns {@code true} if the operation succeed. */ public boolean isOk() { - return error == null; + return error == null && !interrupted; } /** Returns {@code true} if the operation failed. */ @@ -116,15 +116,15 @@ void onResult(TransferResult result) { } Completable toCompletable() { - if (bulkTransferException == null) { - return Completable.complete(); - } - if (interrupted) { return Completable.error(new InterruptedException()); } - return Completable.error(bulkTransferException); + if (bulkTransferException != null) { + return Completable.error(bulkTransferException); + } + + return Completable.complete(); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java index c1caf792a59793..1be03d1ef8d07f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java @@ -74,6 +74,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.function.BiFunction; import javax.annotation.Nullable; @@ -104,6 +105,8 @@ public static T getFromFuture(ListenableFuture f, boolean cancelOnInterru throws IOException, InterruptedException { try { return f.get(); + } catch (CancellationException e) { + throw new InterruptedException(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof InterruptedException) { diff --git a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java index c41e6da1582747..9f636e4af0bc68 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.actions.ActionInput; +import com.google.devtools.build.lib.actions.ActionInputPrefetcher.MetadataSupplier; import com.google.devtools.build.lib.actions.ActionInputPrefetcher.Priority; import com.google.devtools.build.lib.actions.Artifact; import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact; @@ -59,6 +60,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; @@ -555,6 +557,25 @@ public void prefetchFiles_multipleThreads_downloadIsNotCancelledByOtherThreads() assertThat(FileSystemUtils.readContent(artifact.getPath(), UTF_8)).isEqualTo("hello world"); } + @Test + public void prefetchFile_interruptingMetadataSupplier_interruptsDownload() throws Exception { + Map metadata = new HashMap<>(); + Map cas = new HashMap<>(); + Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cas); + AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas); + + MetadataSupplier interruptedMetadataSupplier = + unused -> { + throw new InterruptedException(); + }; + + ListenableFuture future = + prefetcher.prefetchFiles( + ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM); + + assertThrows(CancellationException.class, future::get); + } + @Test public void prefetchFiles_onInterrupt_deletePartialDownloadedFile() throws Exception { Semaphore startSemaphore = new Semaphore(0);