Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update implementation of readAllBytes and downloadTo to be more robust to retryable errors #2305

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,10 @@ public final synchronized int write(ByteBuffer src) throws IOException {
}
int write = tmp.write(src);
return write;
} catch (StorageException e) {
throw new IOException(e);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(StorageException.coalesce(e));
throw StorageException.coalesce(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,34 @@ public Blob create(

@Override
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
try {
return createFrom(blobInfo, content, options);
} catch (IOException e) {
requireNonNull(blobInfo, "blobInfo must be non null");

Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.enabled())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.direct()
.unbuffered()
.setRequest(req)
.build();

// Specifically not in the try-with, so we don't close the provided stream
ReadableByteChannel src =
Channels.newChannel(firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES)));
try (UnbufferedWritableByteChannel dst = session.open()) {
ByteStreams.copy(src, dst);
} catch (Exception e) {
throw StorageException.coalesce(e);
}
return getBlob(session.getResult());
}

@Override
Expand Down Expand Up @@ -309,7 +332,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
}
return codecs.blobInfo().decode(object).asBlob(this);
} catch (InterruptedException | ExecutionException e) {
throw StorageException.coalesce(e);
throw StorageException.coalesce(e.getCause());
}
}

Expand Down Expand Up @@ -359,7 +382,14 @@ public Blob createFrom(
@Override
public Bucket get(String bucket, BucketGetOption... options) {
Opts<BucketSourceOpt> unwrap = Opts.unwrap(options);
return internalBucketGet(bucket, unwrap);
try {
return internalBucketGet(bucket, unwrap);
} catch (StorageException e) {
if (e.getCode() == 404) {
return null;
}
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.Executors.callable;

import com.google.api.core.ApiFuture;
import com.google.api.gax.paging.Page;
Expand All @@ -37,12 +36,14 @@
import com.google.cloud.Policy;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest;
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
import com.google.cloud.storage.PostPolicyV4.PostConditionsV4;
import com.google.cloud.storage.PostPolicyV4.PostFieldsV4;
import com.google.cloud.storage.PostPolicyV4.PostPolicyV4Document;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
Expand All @@ -59,9 +60,10 @@
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -73,6 +75,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -604,9 +607,25 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
Opts<ObjectSourceOpt> unwrap = Opts.unwrap(options);
Opts<ObjectSourceOpt> resolve = unwrap.resolveFrom(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsGet(storageObject, optionsMap);
return run(algorithm, () -> storageRpc.load(storageObject, optionsMap), Function.identity());
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
UnbufferedReadableByteChannelSession<StorageObject> session =
ResumableMedia.http()
.read()
.byteChannel(BlobReadChannelContext.from(this))
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setApiaryReadRequest(
new ApiaryReadRequest(storageObject, optionsMap, ByteRangeSpec.nullRange()))
.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Channels.newChannel(baos)) {
ByteStreams.copy(r, w);
} catch (IOException e) {
throw StorageException.translate(e);
}
return baos.toByteArray();
}

@Override
Expand Down Expand Up @@ -638,19 +657,26 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {

@Override
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {
final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream);
final StorageObject pb = codecs.blobId().encode(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap =
Opts.unwrap(options).resolveFrom(blob).getRpcOptions();
ResultRetryAlgorithm<?> algorithm = retryAlgorithmManager.getForObjectsGet(pb, optionsMap);
run(
algorithm,
callable(
() -> {
storageRpc.read(
pb, optionsMap, countingOutputStream.getCount(), countingOutputStream);
}),
Function.identity());
Opts<ObjectSourceOpt> resolve = Opts.unwrap(options).resolveFrom(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
UnbufferedReadableByteChannelSession<StorageObject> session =
ResumableMedia.http()
.read()
.byteChannel(BlobReadChannelContext.from(this))
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setApiaryReadRequest(new ApiaryReadRequest(pb, optionsMap, ByteRangeSpec.nullRange()))
.build();
// don't close the provided stream
WritableByteChannel w = Channels.newChannel(outputStream);
try (UnbufferedReadableByteChannel r = session.open()) {
ByteStreams.copy(r, w);
} catch (IOException e) {
throw StorageException.translate(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.storage.ByteSizeConstants._2MiB;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.core.ApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
Expand Down Expand Up @@ -81,16 +82,19 @@ public void create_bytes() throws Exception {

@Test
public void create_inputStream() throws Exception {
Resumable.FakeService service = Resumable.FakeService.create();
Direct.FakeService service = Direct.FakeService.create();
try (TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize);
FakeServer server = FakeServer.of(service);
Storage s = server.getGrpcStorageOptions().getService();
InputStream in = Channels.newInputStream(tmpFile.reader())) {
BlobInfo info = BlobInfo.newBuilder("buck", "obj").build();
s.create(info, in, BlobWriteOption.doesNotExist());
// create uses a direct upload, once the stream is consumed there is no means for us to retry
// if an error happens it should be surfaced
StorageException se =
assertThrows(
StorageException.class, () -> s.create(info, in, BlobWriteOption.doesNotExist()));
assertThat(se.getCode()).isEqualTo(500);
}

assertThat(service.returnError.get()).isFalse();
}

@Test
Expand Down
Loading