diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java index ef90223d838f9f..66f29c5120166d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java @@ -13,8 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; - +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -27,6 +26,7 @@ import com.google.devtools.build.lib.concurrent.ThreadSafety; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.Utils; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.Dirent; import com.google.devtools.build.lib.vfs.FileStatus; @@ -170,68 +170,84 @@ public void onFailure(Throwable t) { // TODO(olaola): will need to amend to include the TreeNodeRepository for updating. public void download(ActionResult result, Path execRoot, FileOutErr outErr) throws ExecException, IOException, InterruptedException { - try { - Context ctx = Context.current(); - List fileDownloads = - Collections.synchronizedList( - new ArrayList<>(result.getOutputFilesCount() + result.getOutputDirectoriesCount())); - for (OutputFile file : result.getOutputFilesList()) { - Path path = execRoot.getRelative(file.getPath()); - ListenableFuture download = - retrier.executeAsync( - () -> ctx.call(() -> downloadFile(path, file.getDigest(), file.getContent()))); - fileDownloads.add(new FuturePathBooleanTuple(download, path, file.getIsExecutable())); - } + Context ctx = Context.current(); + List fileDownloads = + Collections.synchronizedList( + new ArrayList<>(result.getOutputFilesCount() + result.getOutputDirectoriesCount())); + for (OutputFile file : result.getOutputFilesList()) { + Path path = execRoot.getRelative(file.getPath()); + ListenableFuture download = + retrier.executeAsync( + () -> ctx.call(() -> downloadFile(path, file.getDigest(), file.getContent()))); + fileDownloads.add(new FuturePathBooleanTuple(download, path, file.getIsExecutable())); + } - List> dirDownloads = - new ArrayList<>(result.getOutputDirectoriesCount()); - for (OutputDirectory dir : result.getOutputDirectoriesList()) { - SettableFuture dirDownload = SettableFuture.create(); - ListenableFuture protoDownload = - retrier.executeAsync(() -> ctx.call(() -> downloadBlob(dir.getTreeDigest()))); - Futures.addCallback( - protoDownload, - new FutureCallback() { - @Override - public void onSuccess(byte[] b) { - try { - Tree tree = Tree.parseFrom(b); - Map childrenMap = new HashMap<>(); - for (Directory child : tree.getChildrenList()) { - childrenMap.put(digestUtil.compute(child), child); - } - Path path = execRoot.getRelative(dir.getPath()); - fileDownloads.addAll(downloadDirectory(path, tree.getRoot(), childrenMap, ctx)); - dirDownload.set(null); - } catch (IOException e) { - dirDownload.setException(e); + List> dirDownloads = new ArrayList<>(result.getOutputDirectoriesCount()); + for (OutputDirectory dir : result.getOutputDirectoriesList()) { + SettableFuture dirDownload = SettableFuture.create(); + ListenableFuture protoDownload = + retrier.executeAsync(() -> ctx.call(() -> downloadBlob(dir.getTreeDigest()))); + Futures.addCallback( + protoDownload, + new FutureCallback() { + @Override + public void onSuccess(byte[] b) { + try { + Tree tree = Tree.parseFrom(b); + Map childrenMap = new HashMap<>(); + for (Directory child : tree.getChildrenList()) { + childrenMap.put(digestUtil.compute(child), child); } + Path path = execRoot.getRelative(dir.getPath()); + fileDownloads.addAll(downloadDirectory(path, tree.getRoot(), childrenMap, ctx)); + dirDownload.set(null); + } catch (IOException e) { + dirDownload.setException(e); } + } - @Override - public void onFailure(Throwable t) { - dirDownload.setException(t); - } - }, - MoreExecutors.directExecutor()); - dirDownloads.add(dirDownload); - } + @Override + public void onFailure(Throwable t) { + dirDownload.setException(t); + } + }, + MoreExecutors.directExecutor()); + dirDownloads.add(dirDownload); + } - fileDownloads.addAll(downloadOutErr(result, outErr, ctx)); + // Subsequently we need to wait for *every* download to finish, even if we already know that + // one failed. That's so that when exiting this method we can be sure that all downloads have + // finished and don't race with the cleanup routine. + // TODO(buchgr): Look into cancellation. - for (ListenableFuture dirDownload : dirDownloads) { - // Block on all directory download futures, so that we can be sure that we have discovered - // all file downloads and can subsequently safely iterate over the list of file downloads. + IOException downloadException = null; + try { + fileDownloads.addAll(downloadOutErr(result, outErr, ctx)); + } catch (IOException e) { + downloadException = e; + } + for (ListenableFuture dirDownload : dirDownloads) { + // Block on all directory download futures, so that we can be sure that we have discovered + // all file downloads and can subsequently safely iterate over the list of file downloads. + try { getFromFuture(dirDownload); + } catch (IOException e) { + downloadException = downloadException == null ? e : downloadException; } + } - for (FuturePathBooleanTuple download : fileDownloads) { + for (FuturePathBooleanTuple download : fileDownloads) { + try { getFromFuture(download.getFuture()); if (download.getPath() != null) { download.getPath().setExecutable(download.isExecutable()); } + } catch (IOException e) { + downloadException = downloadException == null ? e : downloadException; } - } catch (IOException downloadException) { + } + + if (downloadException != null) { try { // Delete any (partially) downloaded output files, since any subsequent local execution // of this action may expect none of the output files to exist. @@ -261,6 +277,11 @@ public void onFailure(Throwable t) { } } + @VisibleForTesting + protected T getFromFuture(ListenableFuture f) throws IOException, InterruptedException { + return Utils.getFromFuture(f); + } + /** Tuple of {@code ListenableFuture, Path, boolean}. */ private static class FuturePathBooleanTuple { private final ListenableFuture future; diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java index b1b0d604df366e..8e6269a44d8dfa 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java @@ -14,8 +14,6 @@ package com.google.devtools.build.lib.remote; -import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; - import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; diff --git a/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java index fe4881c4745300..7afe9b84063c25 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java +++ b/src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java @@ -15,19 +15,46 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows; +import static org.junit.Assert.fail; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.UploadManifest; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey; +import com.google.devtools.build.lib.remote.util.Utils; +import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.DigestHashFunction; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Command; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.OutputFile; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -40,6 +67,13 @@ public class AbstractRemoteActionCacheTests { private Path execRoot; private final DigestUtil digestUtil = new DigestUtil(DigestHashFunction.SHA256); + private static ListeningScheduledExecutorService retryService; + + @BeforeClass + public static void beforeEverything() { + retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + } + @Before public void setUp() throws Exception { fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256); @@ -47,6 +81,11 @@ public void setUp() throws Exception { execRoot.createDirectory(); } + @AfterClass + public static void afterEverything() { + retryService.shutdownNow(); + } + @Test public void uploadSymlinkAsFile() throws Exception { ActionResult.Builder result = ActionResult.newBuilder(); @@ -92,4 +131,118 @@ public void uploadSymlinkInDirectory() throws Exception { .hasMessageThat() .contains("Only regular files and directories may be uploaded to a remote cache."); } + + @Test + public void onErrorWaitForRemainingDownloadsToComplete() throws Exception { + // If one or more downloads of output files / directories fail then the code should + // wait for all downloads to have been completed before it tries to clean up partially + // downloaded files. + + Path stdout = fs.getPath("/execroot/stdout"); + Path stderr = fs.getPath("/execroot/stderr"); + + Map> downloadResults = new HashMap<>(); + Path file1 = fs.getPath("/execroot/file1"); + Digest digest1 = digestUtil.compute("file1".getBytes("UTF-8")); + downloadResults.put(digest1, Futures.immediateFuture("file1".getBytes("UTF-8"))); + Path file2 = fs.getPath("/execroot/file2"); + Digest digest2 = digestUtil.compute("file2".getBytes("UTF-8")); + downloadResults.put(digest2, Futures.immediateFailedFuture(new IOException("download failed"))); + Path file3 = fs.getPath("/execroot/file3"); + Digest digest3 = digestUtil.compute("file3".getBytes("UTF-8")); + downloadResults.put(digest3, Futures.immediateFuture("file3".getBytes("UTF-8"))); + + RemoteOptions options = new RemoteOptions(); + RemoteRetrier retrier = new RemoteRetrier(options, (e) -> false, retryService, + Retrier.ALLOW_ALL_CALLS); + List> blockingDownloads = new ArrayList<>(); + AtomicInteger numSuccess = new AtomicInteger(); + AtomicInteger numFailures = new AtomicInteger(); + AbstractRemoteActionCache cache = new DefaultRemoteActionCache(options, digestUtil, retrier) { + @Override + public ListenableFuture downloadBlob(Digest digest, OutputStream out) { + SettableFuture result = SettableFuture.create(); + Futures.addCallback(downloadResults.get(digest), new FutureCallback() { + @Override + public void onSuccess(byte[] bytes) { + numSuccess.incrementAndGet(); + try { + out.write(bytes); + out.close(); + result.set(null); + } catch (IOException e) { + result.setException(e); + } + } + + @Override + public void onFailure(Throwable throwable) { + numFailures.incrementAndGet(); + result.setException(throwable); + } + }, MoreExecutors.directExecutor()); + return result; + } + + @Override + protected T getFromFuture(ListenableFuture f) + throws IOException, InterruptedException { + blockingDownloads.add(f); + return Utils.getFromFuture(f); + } + }; + + ActionResult result = ActionResult.newBuilder() + .setExitCode(0) + .addOutputFiles(OutputFile.newBuilder().setPath(file1.getPathString()).setDigest(digest1)) + .addOutputFiles(OutputFile.newBuilder().setPath(file2.getPathString()).setDigest(digest2)) + .addOutputFiles(OutputFile.newBuilder().setPath(file3.getPathString()).setDigest(digest3)) + .build(); + try { + cache.download(result, execRoot, new FileOutErr(stdout, stderr)); + fail("Expected IOException"); + } catch (IOException e) { + assertThat(numSuccess.get()).isEqualTo(2); + assertThat(numFailures.get()).isEqualTo(1); + assertThat(blockingDownloads).hasSize(3); + assertThat(Throwables.getRootCause(e)).hasMessageThat().isEqualTo("download failed"); + } + } + + private static class DefaultRemoteActionCache extends AbstractRemoteActionCache { + + public DefaultRemoteActionCache(RemoteOptions options, + DigestUtil digestUtil, Retrier retrier) { + super(options, digestUtil, retrier); + } + + @Override + public void ensureInputsPresent(TreeNodeRepository repository, Path execRoot, TreeNode root, + Command command) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + ActionResult getCachedActionResult(ActionKey actionKey) + throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + void upload(ActionKey actionKey, Path execRoot, Collection files, FileOutErr outErr, + boolean uploadAction) throws ExecException, IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected ListenableFuture downloadBlob(Digest digest, OutputStream out) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } + } }