diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java index f5f076327c..06d26a13d1 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java @@ -101,12 +101,10 @@ public final synchronized int write(ByteBuffer src) throws IOException { } int write = tmp.write(src); return write; - } catch (StorageException e) { - throw new IOException(e); } catch (IOException e) { throw e; } catch (Exception e) { - throw new IOException(StorageException.coalesce(e)); + throw StorageException.coalesce(e); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 2a21ddcf52..2ae75282a8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -250,11 +250,34 @@ public Blob create( @Override public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) { - try { - return createFrom(blobInfo, content, options); - } catch (IOException e) { + requireNonNull(blobInfo, "blobInfo must be non null"); + + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); + + UnbufferedWritableByteChannelSession session = + ResumableMedia.gapic() + .write() + .byteChannel( + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext)) + .setHasher(Hasher.noop()) + .setByteStringStrategy(ByteStringStrategy.noCopy()) + .direct() + .unbuffered() + .setRequest(req) + .build(); + + // Specifically not in the try-with, so we don't close the provided stream + ReadableByteChannel src = + Channels.newChannel(firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES))); + try (UnbufferedWritableByteChannel dst = session.open()) { + ByteStreams.copy(src, dst); + } catch (Exception e) { throw StorageException.coalesce(e); } + return getBlob(session.getResult()); } @Override @@ -309,7 +332,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts o } return codecs.blobInfo().decode(object).asBlob(this); } catch (InterruptedException | ExecutionException e) { - throw StorageException.coalesce(e); + throw StorageException.coalesce(e.getCause()); } } @@ -359,7 +382,14 @@ public Blob createFrom( @Override public Bucket get(String bucket, BucketGetOption... options) { Opts unwrap = Opts.unwrap(options); - return internalBucketGet(bucket, unwrap); + try { + return internalBucketGet(bucket, unwrap); + } catch (StorageException e) { + if (e.getCode() == 404) { + return null; + } + throw e; + } } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index c688570ccc..48167b5db1 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.Executors.callable; import com.google.api.core.ApiFuture; import com.google.api.gax.paging.Page; @@ -37,12 +36,14 @@ import com.google.cloud.Policy; import com.google.cloud.WriteChannel; import com.google.cloud.storage.Acl.Entity; +import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest; import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext; import com.google.cloud.storage.HmacKey.HmacKeyMetadata; import com.google.cloud.storage.PostPolicyV4.ConditionV4Type; import com.google.cloud.storage.PostPolicyV4.PostConditionsV4; import com.google.cloud.storage.PostPolicyV4.PostFieldsV4; import com.google.cloud.storage.PostPolicyV4.PostPolicyV4Document; +import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; @@ -59,9 +60,10 @@ import com.google.common.collect.Maps; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; -import com.google.common.io.CountingOutputStream; +import com.google.common.io.ByteStreams; import com.google.common.primitives.Ints; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -73,6 +75,7 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.text.SimpleDateFormat; @@ -604,9 +607,25 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { Opts unwrap = Opts.unwrap(options); Opts resolve = unwrap.resolveFrom(blob); ImmutableMap optionsMap = resolve.getRpcOptions(); - ResultRetryAlgorithm algorithm = - retryAlgorithmManager.getForObjectsGet(storageObject, optionsMap); - return run(algorithm, () -> storageRpc.load(storageObject, optionsMap), Function.identity()); + boolean autoGzipDecompression = + Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true); + UnbufferedReadableByteChannelSession session = + ResumableMedia.http() + .read() + .byteChannel(BlobReadChannelContext.from(this)) + .setAutoGzipDecompression(autoGzipDecompression) + .unbuffered() + .setApiaryReadRequest( + new ApiaryReadRequest(storageObject, optionsMap, ByteRangeSpec.nullRange())) + .build(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (UnbufferedReadableByteChannel r = session.open(); + WritableByteChannel w = Channels.newChannel(baos)) { + ByteStreams.copy(r, w); + } catch (IOException e) { + throw StorageException.translate(e); + } + return baos.toByteArray(); } @Override @@ -638,19 +657,26 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) { @Override public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) { - final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); final StorageObject pb = codecs.blobId().encode(blob); - ImmutableMap optionsMap = - Opts.unwrap(options).resolveFrom(blob).getRpcOptions(); - ResultRetryAlgorithm algorithm = retryAlgorithmManager.getForObjectsGet(pb, optionsMap); - run( - algorithm, - callable( - () -> { - storageRpc.read( - pb, optionsMap, countingOutputStream.getCount(), countingOutputStream); - }), - Function.identity()); + Opts resolve = Opts.unwrap(options).resolveFrom(blob); + ImmutableMap optionsMap = resolve.getRpcOptions(); + boolean autoGzipDecompression = + Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true); + UnbufferedReadableByteChannelSession session = + ResumableMedia.http() + .read() + .byteChannel(BlobReadChannelContext.from(this)) + .setAutoGzipDecompression(autoGzipDecompression) + .unbuffered() + .setApiaryReadRequest(new ApiaryReadRequest(pb, optionsMap, ByteRangeSpec.nullRange())) + .build(); + // don't close the provided stream + WritableByteChannel w = Channels.newChannel(outputStream); + try (UnbufferedReadableByteChannel r = session.open()) { + ByteStreams.copy(r, w); + } catch (IOException e) { + throw StorageException.translate(e); + } } @Override diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java index bda9d6a8bd..2fd5501d50 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java @@ -19,12 +19,6 @@ import static com.google.cloud.storage.SignedUrlEncodingHelper.Rfc3986UriEncode; import static com.google.cloud.storage.testing.ApiPolicyMatcher.eqApiPolicy; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.getCurrentArguments; -import static org.easymock.EasyMock.replay; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -32,12 +26,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.util.DateTime; import com.google.api.core.ApiClock; -import com.google.api.gax.retrying.RetrySettings; import com.google.api.services.storage.model.Policy.Bindings; import com.google.api.services.storage.model.StorageObject; import com.google.api.services.storage.model.TestIamPermissionsResponse; @@ -59,12 +51,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.BaseEncoding; -import java.io.File; -import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.URL; import java.net.URLDecoder; -import java.nio.file.Files; import java.security.InvalidKeyException; import java.security.Key; import java.security.KeyFactory; @@ -86,7 +75,6 @@ import javax.crypto.spec.SecretKeySpec; import org.easymock.Capture; import org.easymock.EasyMock; -import org.easymock.IAnswer; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -692,85 +680,6 @@ public void testCopyMultipleRequests() { assertEquals(42L, writer.getBlobSize()); } - @Test - public void testReadAllBytes() { - EasyMock.expect( - storageRpcMock.load( - Conversions.json().blobId().encode(BlobId.of(BUCKET_NAME1, BLOB_NAME1)), - EMPTY_RPC_OPTIONS)) - .andReturn(BLOB_CONTENT); - EasyMock.replay(storageRpcMock); - initializeService(); - byte[] readBytes = storage.readAllBytes(BUCKET_NAME1, BLOB_NAME1); - assertArrayEquals(BLOB_CONTENT, readBytes); - } - - @Test - public void testReadAllBytesWithOptions() { - EasyMock.expect( - storageRpcMock.load( - Conversions.json().blobId().encode(BlobId.of(BUCKET_NAME1, BLOB_NAME1)), - BLOB_SOURCE_OPTIONS)) - .andReturn(BLOB_CONTENT); - EasyMock.replay(storageRpcMock); - initializeService(); - byte[] readBytes = - storage.readAllBytes( - BUCKET_NAME1, BLOB_NAME1, BLOB_SOURCE_GENERATION, BLOB_SOURCE_METAGENERATION); - assertArrayEquals(BLOB_CONTENT, readBytes); - } - - @Test - public void testReadAllBytesWithDecriptionKey() { - EasyMock.expect( - storageRpcMock.load( - Conversions.json().blobId().encode(BlobId.of(BUCKET_NAME1, BLOB_NAME1)), - ENCRYPTION_KEY_OPTIONS)) - .andReturn(BLOB_CONTENT) - .times(2); - EasyMock.replay(storageRpcMock); - initializeService(); - byte[] readBytes = - storage.readAllBytes(BUCKET_NAME1, BLOB_NAME1, BlobSourceOption.decryptionKey(KEY)); - assertArrayEquals(BLOB_CONTENT, readBytes); - readBytes = - storage.readAllBytes(BUCKET_NAME1, BLOB_NAME1, BlobSourceOption.decryptionKey(BASE64_KEY)); - assertArrayEquals(BLOB_CONTENT, readBytes); - } - - @Test - public void testReadAllBytesFromBlobIdWithOptions() { - EasyMock.expect( - storageRpcMock.load( - Conversions.json().blobId().encode(BLOB_INFO1.getBlobId()), BLOB_SOURCE_OPTIONS)) - .andReturn(BLOB_CONTENT); - EasyMock.replay(storageRpcMock); - initializeService(); - byte[] readBytes = - storage.readAllBytes( - BLOB_INFO1.getBlobId(), - BLOB_SOURCE_GENERATION_FROM_BLOB_ID, - BLOB_SOURCE_METAGENERATION); - assertArrayEquals(BLOB_CONTENT, readBytes); - } - - @Test - public void testReadAllBytesFromBlobIdWithDecriptionKey() { - EasyMock.expect( - storageRpcMock.load( - Conversions.json().blobId().encode(BLOB_INFO1.getBlobId()), ENCRYPTION_KEY_OPTIONS)) - .andReturn(BLOB_CONTENT) - .times(2); - EasyMock.replay(storageRpcMock); - initializeService(); - byte[] readBytes = - storage.readAllBytes(BLOB_INFO1.getBlobId(), BlobSourceOption.decryptionKey(KEY)); - assertArrayEquals(BLOB_CONTENT, readBytes); - readBytes = - storage.readAllBytes(BLOB_INFO1.getBlobId(), BlobSourceOption.decryptionKey(BASE64_KEY)); - assertArrayEquals(BLOB_CONTENT, readBytes); - } - @Test public void testBatch() { RpcBatch batchMock = EasyMock.mock(RpcBatch.class); @@ -2248,97 +2157,4 @@ public void testBucketLifecycleRules() { assertEquals(30, lifecycleRule.getCondition().getDaysSinceCustomTime().intValue()); assertNotNull(lifecycleRule.getCondition().getCustomTimeBefore()); } - - @Test - public void testDownloadTo() throws Exception { - BlobId blob = BlobId.of(BUCKET_NAME1, BLOB_NAME1); - storage = options.toBuilder().build().getService(); - final byte[] expected = {1, 2}; - EasyMock.expect( - storageRpcMock.read( - anyObject(StorageObject.class), - anyObject(Map.class), - eq(0l), - anyObject(OutputStream.class))) - .andAnswer( - new IAnswer() { - @Override - public Long answer() throws Throwable { - ((OutputStream) getCurrentArguments()[3]).write(expected); - return 2l; - } - }); - EasyMock.replay(storageRpcMock); - File file = File.createTempFile("blob", ".tmp"); - storage.downloadTo(blob, file.toPath()); - byte actual[] = Files.readAllBytes(file.toPath()); - assertArrayEquals(expected, actual); - } - - @Test - public void testDownloadToWithRetries() throws Exception { - BlobId blob = BlobId.of(BUCKET_NAME1, BLOB_NAME1); - storage = - options - .toBuilder() - .setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(2).build()) - .build() - .getService(); - final byte[] expected = {1, 2}; - expect( - storageRpcMock.read( - anyObject(StorageObject.class), - anyObject(Map.class), - eq(0l), - anyObject(OutputStream.class))) - .andAnswer( - new IAnswer() { - @Override - public Long answer() throws Throwable { - ((OutputStream) getCurrentArguments()[3]).write(expected[0]); - throw new StorageException(504, "error"); - } - }); - expect( - storageRpcMock.read( - anyObject(StorageObject.class), - anyObject(Map.class), - eq(1l), - anyObject(OutputStream.class))) - .andAnswer( - new IAnswer() { - @Override - public Long answer() throws Throwable { - ((OutputStream) getCurrentArguments()[3]).write(expected[1]); - return 1l; - } - }); - replay(storageRpcMock); - File file = File.createTempFile("blob", ".tmp"); - storage.downloadTo(blob, file.toPath()); - byte actual[] = Files.readAllBytes(file.toPath()); - assertArrayEquals(expected, actual); - } - - @Test - public void testDownloadToWithException() throws Exception { - BlobId blob = BlobId.of(BUCKET_NAME1, BLOB_NAME1); - storage = options.toBuilder().build().getService(); - Exception exception = new IllegalStateException("test"); - expect( - storageRpcMock.read( - anyObject(StorageObject.class), - anyObject(Map.class), - eq(0l), - anyObject(OutputStream.class))) - .andThrow(exception); - replay(storageRpcMock); - File file = File.createTempFile("blob", ".tmp"); - try { - storage.downloadTo(blob, file.toPath()); - fail(); - } catch (StorageException e) { - assertSame(exception, e.getCause()); - } - } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/ITRetryConformanceTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/ITRetryConformanceTest.java index 8957473484..6c93e26882 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/ITRetryConformanceTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/ITRetryConformanceTest.java @@ -21,6 +21,7 @@ import static com.google.cloud.storage.conformance.retry.Ctx.ctx; import static com.google.cloud.storage.conformance.retry.State.empty; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertNotNull; @@ -80,7 +81,6 @@ * RpcMethodMappings}. */ @RunWith(StorageITRunner.class) -// @CrossRun(transports = Transport.HTTP, backends = Backend.TEST_BENCH) @SingleBackend(Backend.TEST_BENCH) @Parameterized(RetryConformanceParameterProvider.class) @ParallelFriendly @@ -117,9 +117,10 @@ public void setUp() throws Throwable { public void tearDown() throws Throwable { LOGGER.fine("Running teardown..."); if (ctx != null) { + Ctx tearDownCtx = ctx.leftMap(s -> nonTestStorage); getReplaceStorageInObjectsFromCtx() .andThen(mapping.getTearDown()) - .apply(ctx, testRetryConformance); + .apply(tearDownCtx, testRetryConformance); } retryTestFixture.finished(null); LOGGER.fine("Running teardown complete"); @@ -177,7 +178,9 @@ public ImmutableList parameters() { } catch (IOException e) { throw new RuntimeException(e); } - assertThat(retryTestCases).isNotEmpty(); + assertWithMessage("Filter too strict. Resolved 0 retry test cases") + .that(retryTestCases) + .isNotEmpty(); return retryTestCases.stream() .map( rtc -> diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMapping.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMapping.java index 1c3947adbf..4b2c8ccf8b 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMapping.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMapping.java @@ -21,6 +21,7 @@ import static org.junit.Assert.fail; import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup; import com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceTeardown; import com.google.cloud.storage.conformance.retry.Functions.CtxFunction; @@ -104,8 +105,15 @@ public CtxFunction getTest() { if (instructions.contains("return-401") && code == 401) { matchExpectedCode = true; } - if (instructions.contains("return-reset-connection") && code == 0) { - matchExpectedCode = true; + if (instructions.contains("return-reset-connection")) { + if (c.getTransport() == Transport.HTTP && code == 0) { + matchExpectedCode = true; + } + // in grpc a broken connection is converted to an UNAVAILABLE + // our mapping from UNAVAILABLE to status code is 503 + if (c.getTransport() == Transport.GRPC && code == 503) { + matchExpectedCode = true; + } } if (matchExpectedCode) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java index 9dc246373c..a76173a129 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java @@ -53,6 +53,7 @@ import com.google.cloud.storage.Storage.SignUrlOption; import com.google.cloud.storage.Storage.UriScheme; import com.google.cloud.storage.StorageRoles; +import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.conformance.retry.CtxFunctions.Local; import com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup; import com.google.cloud.storage.conformance.retry.CtxFunctions.Rpc; @@ -1607,7 +1608,9 @@ private static void insert(ArrayList a) { .build()); a.add( RpcMethodMapping.newBuilder(54, objects.insert) - .withApplicable(not(TestRetryConformance::isPreconditionsProvided)) + .withApplicable( + not(TestRetryConformance::isPreconditionsProvided) + .and(TestRetryConformance.transportIs(Transport.HTTP))) .withSetup(defaultSetup.andThen(Local.blobInfoWithoutGeneration)) .withTest( (ctx, c) -> diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/TestRetryConformance.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/TestRetryConformance.java index 65c0734807..7644052fa3 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/TestRetryConformance.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/TestRetryConformance.java @@ -42,6 +42,7 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -239,6 +240,10 @@ public String toString() { return getTestName(); } + public static Predicate transportIs(Transport t) { + return trc -> trc.transport == t; + } + private static Supplier resolvePathForResource(String objectName, Method method) { return () -> { try {