diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index cc31b5bf070705..d4aa19061c3c34 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -248,7 +248,7 @@ public ListenableFuture uploadBlobAsync( checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) { - return Futures.immediateFuture(null); + return immediateVoidFuture(); } ListenableFuture inProgress = uploadsInProgress.get(digest); @@ -410,7 +410,7 @@ ListenableFuture start() { () -> retrier.executeAsync( () -> { - if (chunker.getSize() == 0) { + if (chunker.getSize() == committedOffset.get()) { return immediateVoidFuture(); } try { @@ -426,7 +426,7 @@ ListenableFuture start() { if (chunker.hasNext()) { return callAndQueryOnFailure(committedOffset, progressiveBackoff); } - return Futures.immediateFuture(null); + return immediateVoidFuture(); }, progressiveBackoff), callCredentialsProvider); @@ -448,7 +448,7 @@ ListenableFuture start() { return Futures.immediateFailedFuture(new IOException(message)); } } - return Futures.immediateFuture(null); + return immediateVoidFuture(); }, MoreExecutors.directExecutor()); } @@ -536,7 +536,7 @@ private ListenableFuture query( progressiveBackoff.reset(); } committedOffset.set(committedSize); - return Futures.immediateFuture(null); + return immediateVoidFuture(); }, MoreExecutors.directExecutor()); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 15cc335a0ace40..c4165c2de1cf97 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -1517,6 +1517,68 @@ public void onCompleted() { blockUntilInternalStateConsistent(uploader); } + @Test + public void failureAfterUploadCompletes() throws Exception { + AtomicInteger numUploads = new AtomicInteger(); + RemoteRetrier retrier = + TestUtils.newRemoteRetrier( + () -> mockBackoff, e -> e instanceof StatusRuntimeException, retryService); + ByteStreamUploader uploader = + new ByteStreamUploader( + INSTANCE_NAME, + new ReferenceCountedChannel(channelConnectionFactory), + CallCredentialsProvider.NO_CREDENTIALS, + /* callTimeoutSecs= */ 60, + retrier); + + byte[] blob = new byte[CHUNK_SIZE - 1]; + new Random().nextBytes(blob); + + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + numUploads.incrementAndGet(); + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) {} + + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } + + @Override + public void onCompleted() { + streamObserver.onNext( + WriteResponse.newBuilder().setCommittedSize(blob.length).build()); + streamObserver.onError(Status.UNAVAILABLE.asException()); + } + }; + } + + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, StreamObserver response) { + response.onNext( + QueryWriteStatusResponse.newBuilder() + .setCommittedSize(blob.length) + .setComplete(true) + .build()); + response.onCompleted(); + } + }); + + Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); + HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); + + uploader.uploadBlob(context, hash, chunker, true); + + blockUntilInternalStateConsistent(uploader); + + assertThat(numUploads.get()).isEqualTo(1); + } + @Test public void testCompressedUploads() throws Exception { RemoteRetrier retrier =