diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java index 3a03343e3e8bc4..89fa1f91e89d53 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java @@ -19,7 +19,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.hash.HashCode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -194,11 +193,7 @@ private ListenableFuture> uploadLocalFiles( final ListenableFuture upload; Context prevCtx = ctx.attach(); try { - upload = - uploader.uploadBlobAsync( - HashCode.fromString(path.getDigest().getHash()), - chunker, - /* forceUpload=*/ false); + upload = uploader.uploadBlobAsync(path.getDigest(), chunker, /* forceUpload=*/ false); } finally { ctx.detach(prevCtx); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index a69a4df5486dab..c1133796129b4b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -19,6 +19,7 @@ import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.SECONDS; +import build.bazel.remote.execution.v2.Digest; import com.google.bytestream.ByteStreamGrpc; import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub; import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; @@ -64,10 +65,10 @@ /** * A client implementing the {@code Write} method of the {@code ByteStream} gRPC service. * - *

The uploader supports reference counting to easily be shared between components with - * different lifecyles. After instantiation the reference count is {@code 1}. + *

The uploader supports reference counting to easily be shared between components with different + * lifecyles. After instantiation the reference count is {@code 1}. * - * See {@link ReferenceCounted} for more information on reference counting. + *

See {@link ReferenceCounted} for more information on reference counting. */ class ByteStreamUploader extends AbstractReferenceCounted { @@ -81,12 +82,12 @@ class ByteStreamUploader extends AbstractReferenceCounted { private final Object lock = new Object(); - /** Contains the hash codes of already uploaded blobs. **/ + /** Contains the hash codes of already uploaded blobs. * */ @GuardedBy("lock") private final Set uploadedBlobs = new HashSet<>(); @GuardedBy("lock") - private final Map> uploadsInProgress = new HashMap<>(); + private final Map> uploadsInProgress = new HashMap<>(); @GuardedBy("lock") private boolean isShutdown; @@ -179,8 +180,8 @@ public void uploadBlobs(Map chunkers, boolean forceUpload) * Cancels all running uploads. The method returns immediately and does NOT wait for the uploads * to be cancelled. * - *

This method should not be called directly, but will be called implicitly when the - * reference count reaches {@code 0}. + *

This method should not be called directly, but will be called implicitly when the reference + * count reaches {@code 0}. */ @VisibleForTesting void shutdown() { @@ -199,6 +200,16 @@ void shutdown() { } } + /** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */ + @Deprecated + @VisibleForTesting + public ListenableFuture uploadBlobAsync( + HashCode hash, Chunker chunker, boolean forceUpload) { + Digest digest = + Digest.newBuilder().setHash(hash.toString()).setSizeBytes(chunker.getSize()).build(); + return uploadBlobAsync(digest, chunker, forceUpload); + } + /** * Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns * immediately and one can listen to the returned future for the success/failure of the upload. @@ -209,32 +220,32 @@ void shutdown() { *

Trying to upload the same BLOB multiple times concurrently, results in only one upload being * performed. This is transparent to the user of this API. * - * @param hash the hash of the data to upload. + * @param digest the {@link Digest} of the data to upload. * @param chunker the data to upload. * @param forceUpload if {@code false} the blob is not uploaded if it has previously been * uploaded, if {@code true} the blob is uploaded. * @throws IOException when reading of the {@link Chunker}s input source fails */ public ListenableFuture uploadBlobAsync( - HashCode hash, Chunker chunker, boolean forceUpload) { + Digest digest, Chunker chunker, boolean forceUpload) { synchronized (lock) { checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); - if (!forceUpload && uploadedBlobs.contains(hash)) { + if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) { return Futures.immediateFuture(null); } - ListenableFuture inProgress = uploadsInProgress.get(hash); + ListenableFuture inProgress = uploadsInProgress.get(digest); if (inProgress != null) { return inProgress; } ListenableFuture uploadResult = Futures.transform( - startAsyncUpload(hash, chunker), + startAsyncUpload(digest, chunker), (v) -> { synchronized (lock) { - uploadedBlobs.add(hash); + uploadedBlobs.add(HashCode.fromString(digest.getHash())); } return null; }, @@ -244,14 +255,20 @@ public ListenableFuture uploadBlobAsync( Futures.catchingAsync( uploadResult, StatusRuntimeException.class, - (sre) -> Futures.immediateFailedFuture(new IOException(sre)), + (sre) -> + Futures.immediateFailedFuture( + new IOException( + String.format( + "Error while uploading artifact with digest '%s/%s'", + digest.getHash(), digest.getSizeBytes()), + sre)), MoreExecutors.directExecutor()); - uploadsInProgress.put(hash, uploadResult); + uploadsInProgress.put(digest, uploadResult); uploadResult.addListener( () -> { synchronized (lock) { - uploadsInProgress.remove(hash); + uploadsInProgress.remove(digest); } }, MoreExecutors.directExecutor()); @@ -267,9 +284,9 @@ boolean uploadsInProgress() { } } - private static String buildUploadResourceName( - String instanceName, UUID uuid, HashCode hash, long size) { - String resourceName = format("uploads/%s/blobs/%s/%d", uuid, hash, size); + private static String buildUploadResourceName(String instanceName, UUID uuid, Digest digest) { + String resourceName = + format("uploads/%s/blobs/%s/%d", uuid, digest.getHash(), digest.getSizeBytes()); if (!Strings.isNullOrEmpty(instanceName)) { resourceName = instanceName + "/" + resourceName; } @@ -277,15 +294,23 @@ private static String buildUploadResourceName( } /** Starts a file upload and returns a future representing the upload. */ - private ListenableFuture startAsyncUpload(HashCode hash, Chunker chunker) { + private ListenableFuture startAsyncUpload(Digest digest, Chunker chunker) { try { chunker.reset(); } catch (IOException e) { return Futures.immediateFailedFuture(e); } + if (chunker.getSize() != digest.getSizeBytes()) { + return Futures.immediateFailedFuture( + new IllegalStateException( + String.format( + "Expected chunker size of %d, got %d", + digest.getSizeBytes(), chunker.getSize()))); + } + UUID uploadId = UUID.randomUUID(); - String resourceName = buildUploadResourceName(instanceName, uploadId, hash, chunker.getSize()); + String resourceName = buildUploadResourceName(instanceName, uploadId, digest); AsyncUpload newUpload = new AsyncUpload( channel, callCredentialsProvider, callTimeoutSecs, retrier, resourceName, chunker); diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 2e783cae0c70af..93a5c1c7309488 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -38,7 +38,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.flogger.GoogleLogger; -import com.google.common.hash.HashCode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -387,7 +386,7 @@ public void onCompleted() { @Override public ListenableFuture uploadFile(Digest digest, Path path) { return uploader.uploadBlobAsync( - HashCode.fromString(digest.getHash()), + digest, Chunker.builder().setInput(digest.getSizeBytes(), path).build(), /* forceUpload= */ true); } @@ -395,8 +394,6 @@ public ListenableFuture uploadFile(Digest digest, Path path) { @Override public ListenableFuture uploadBlob(Digest digest, ByteString data) { return uploader.uploadBlobAsync( - HashCode.fromString(digest.getHash()), - Chunker.builder().setInput(data.toByteArray()).build(), - /* forceUpload= */ true); + digest, Chunker.builder().setInput(data.toByteArray()).build(), /* forceUpload= */ true); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java index ce3072e5cc3895..5c44697da14a97 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java @@ -333,7 +333,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception StaticMissingDigestsFinder digestQuerier = Mockito.spy(new StaticMissingDigestsFinder(ImmutableSet.of(remoteDigest))); ByteStreamUploader uploader = Mockito.mock(ByteStreamUploader.class); - when(uploader.uploadBlobAsync(any(), any(), anyBoolean())) + when(uploader.uploadBlobAsync(any(Digest.class), any(), anyBoolean())) .thenReturn(Futures.immediateFuture(null)); ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader, digestQuerier); @@ -349,8 +349,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception // assert verify(digestQuerier).findMissingDigests(any()); - verify(uploader) - .uploadBlobAsync(eq(HashCode.fromString(localDigest.getHash())), any(), anyBoolean()); + verify(uploader).uploadBlobAsync(eq(localDigest), any(), anyBoolean()); assertThat(pathConverter.apply(remoteFile)).contains(remoteDigest.getHash()); assertThat(pathConverter.apply(localFile)).contains(localDigest.getHash()); }