Skip to content

Commit

Permalink
fix: update implementation of readAllBytes and downloadTo to be more …
Browse files Browse the repository at this point in the history
…robust to retryable errors

Additional small changes to bring http and grpc implementation into conformance with each other.

Much of this also serves as pre-work to the grpc retry conformance tests enablement after the next release of testbench.
  • Loading branch information
BenWhitehead committed Nov 15, 2023
1 parent 68b96a9 commit 4db2e2c
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,10 @@ public final synchronized int write(ByteBuffer src) throws IOException {
}
int write = tmp.write(src);
return write;
} catch (StorageException e) {
throw new IOException(e);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(StorageException.coalesce(e));
throw StorageException.coalesce(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,34 @@ public Blob create(

@Override
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
try {
return createFrom(blobInfo, content, options);
} catch (IOException e) {
requireNonNull(blobInfo, "blobInfo must be non null");

Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.noop())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.direct()
.unbuffered()
.setRequest(req)
.build();

// Specifically not in the try-with, so we don't close the provided stream
ReadableByteChannel src =
Channels.newChannel(firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES)));
try (UnbufferedWritableByteChannel dst = session.open()) {
ByteStreams.copy(src, dst);
} catch (Exception e) {
throw StorageException.coalesce(e);
}
return getBlob(session.getResult());
}

@Override
Expand Down Expand Up @@ -309,7 +332,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
}
return codecs.blobInfo().decode(object).asBlob(this);
} catch (InterruptedException | ExecutionException e) {
throw StorageException.coalesce(e);
throw StorageException.coalesce(e.getCause());
}
}

Expand Down Expand Up @@ -359,7 +382,14 @@ public Blob createFrom(
@Override
public Bucket get(String bucket, BucketGetOption... options) {
Opts<BucketSourceOpt> unwrap = Opts.unwrap(options);
return internalBucketGet(bucket, unwrap);
try {
return internalBucketGet(bucket, unwrap);
} catch (StorageException e) {
if (e.getCode() == 404) {
return null;
}
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.Executors.callable;

import com.google.api.core.ApiFuture;
import com.google.api.gax.paging.Page;
Expand All @@ -37,12 +36,14 @@
import com.google.cloud.Policy;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest;
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
import com.google.cloud.storage.PostPolicyV4.PostConditionsV4;
import com.google.cloud.storage.PostPolicyV4.PostFieldsV4;
import com.google.cloud.storage.PostPolicyV4.PostPolicyV4Document;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
Expand All @@ -59,9 +60,10 @@
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -73,6 +75,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -604,9 +607,25 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
Opts<ObjectSourceOpt> unwrap = Opts.unwrap(options);
Opts<ObjectSourceOpt> resolve = unwrap.resolveFrom(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsGet(storageObject, optionsMap);
return run(algorithm, () -> storageRpc.load(storageObject, optionsMap), Function.identity());
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
UnbufferedReadableByteChannelSession<StorageObject> session =
ResumableMedia.http()
.read()
.byteChannel(BlobReadChannelContext.from(this))
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setApiaryReadRequest(
new ApiaryReadRequest(storageObject, optionsMap, ByteRangeSpec.nullRange()))
.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Channels.newChannel(baos)) {
ByteStreams.copy(r, w);
} catch (IOException e) {
throw StorageException.translate(e);
}
return baos.toByteArray();
}

@Override
Expand Down Expand Up @@ -638,19 +657,26 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {

@Override
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {
final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream);
final StorageObject pb = codecs.blobId().encode(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap =
Opts.unwrap(options).resolveFrom(blob).getRpcOptions();
ResultRetryAlgorithm<?> algorithm = retryAlgorithmManager.getForObjectsGet(pb, optionsMap);
run(
algorithm,
callable(
() -> {
storageRpc.read(
pb, optionsMap, countingOutputStream.getCount(), countingOutputStream);
}),
Function.identity());
Opts<ObjectSourceOpt> resolve = Opts.unwrap(options).resolveFrom(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
UnbufferedReadableByteChannelSession<StorageObject> session =
ResumableMedia.http()
.read()
.byteChannel(BlobReadChannelContext.from(this))
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setApiaryReadRequest(new ApiaryReadRequest(pb, optionsMap, ByteRangeSpec.nullRange()))
.build();
// don't close the provided stream
WritableByteChannel w = Channels.newChannel(outputStream);
try (UnbufferedReadableByteChannel r = session.open()) {
ByteStreams.copy(r, w);
} catch (IOException e) {
throw StorageException.translate(e);
}
}

@Override
Expand Down
Loading

0 comments on commit 4db2e2c

Please sign in to comment.