Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put new retry options in CloudStorageConfiguration #3869

Merged
merged 11 commits into from
Nov 27, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;

import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import java.io.EOFException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Map;

/**
Expand Down Expand Up @@ -89,6 +94,17 @@ public abstract class CloudStorageConfiguration {
*/
public abstract boolean useUserProjectOnlyForRequesterPaysBuckets();

/**
* Returns the set of HTTP error codes that will be retried, in addition to the normally
* retryable ones.
*/
public abstract ImmutableList<Integer> retryableHttpCodes();

/**
* Returns the set of exceptions for which we'll try a channel reopen if maxChannelReopens
* is positive.
*/
public abstract ImmutableList<Class<? extends Exception>> reopenableExceptions();

/**
* Creates a new builder, initialized with the following settings:
Expand Down Expand Up @@ -118,6 +134,10 @@ public static final class Builder {
private @Nullable String userProject = null;
// This of this as "clear userProject if not RequesterPays"
private boolean useUserProjectOnlyForRequesterPaysBuckets = false;
private ImmutableList<Integer> retryableHttpCodes = ImmutableList.of(500, 502, 503);
private ImmutableList<Class<? extends Exception>> reopenableExceptions =
ImmutableList.<Class<? extends Exception>>of(
SSLException.class, EOFException.class, SocketException.class, SocketTimeoutException.class);

/**
* Changes current working directory for new filesystem. This defaults to the root directory.
Expand Down Expand Up @@ -186,6 +206,16 @@ public Builder autoDetectRequesterPays(boolean value) {
return this;
}

public Builder retryableHttpCodes(ImmutableList<Integer> value) {
retryableHttpCodes = value;
return this;
}

public Builder reopenableExceptions(ImmutableList<Class<? extends Exception>> values) {
reopenableExceptions = values;
return this;
}

/**
* Creates new instance without destroying builder.
*/
Expand All @@ -198,7 +228,9 @@ public CloudStorageConfiguration build() {
blockSize,
maxChannelReopens,
userProject,
useUserProjectOnlyForRequesterPaysBuckets);
useUserProjectOnlyForRequesterPaysBuckets,
retryableHttpCodes,
reopenableExceptions);
}

Builder(CloudStorageConfiguration toModify) {
Expand All @@ -210,6 +242,8 @@ public CloudStorageConfiguration build() {
maxChannelReopens = toModify.maxChannelReopens();
userProject = toModify.userProject();
useUserProjectOnlyForRequesterPaysBuckets = toModify.useUserProjectOnlyForRequesterPaysBuckets();
retryableHttpCodes = toModify.retryableHttpCodes();
reopenableExceptions = toModify.reopenableExceptions();
}

Builder() {}
Expand Down Expand Up @@ -250,6 +284,12 @@ static private CloudStorageConfiguration fromMap(Builder builder, Map<String, ?>
case "useUserProjectOnlyForRequesterPaysBuckets":
builder.autoDetectRequesterPays((Boolean) entry.getValue());
break;
case "retryableHttpCodes":
builder.retryableHttpCodes((ImmutableList<Integer>) entry.getValue());
break;
case "reopenableExceptions":
builder.reopenableExceptions((ImmutableList<Class<? extends Exception>>) entry.getValue());
break;
default:
throw new IllegalArgumentException(entry.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ private SeekableByteChannel newReadChannel(Path path, Set<? extends OpenOption>
cloudPath.getBlobId(),
0,
maxChannelReopens,
cloudPath.getFileSystem().config(),
userProject,
blobSourceOptions.toArray(new BlobSourceOption[blobSourceOptions.size()]));
}
Expand Down Expand Up @@ -461,7 +462,8 @@ public boolean deleteIfExists(Path path) throws IOException {
throw new CloudStoragePseudoDirectoryException(cloudPath);
}

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(path));
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
cloudPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
Expand Down Expand Up @@ -586,7 +588,8 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep
throw new CloudStoragePseudoDirectoryException(toPath);
}

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(source));
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
fromPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
Expand Down Expand Up @@ -672,11 +675,12 @@ public void checkAccess(Path path, AccessMode... modes) throws IOException {
}
}

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(path));
final CloudStoragePath cloudPath = CloudStorageUtil.checkPath(path);
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
cloudPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
CloudStoragePath cloudPath = CloudStorageUtil.checkPath(path);
boolean nullId;
if (isNullOrEmpty(userProject)) {
nullId = storage.get(
Expand Down Expand Up @@ -725,11 +729,12 @@ public <A extends BasicFileAttributes> A readAttributes(
}
initStorage();

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(path));
final CloudStoragePath cloudPath = CloudStorageUtil.checkPath(path);
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
cloudPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
CloudStoragePath cloudPath = CloudStorageUtil.checkPath(path);
BlobInfo blobInfo = null;
try {
BlobId blobId = cloudPath.getBlobId();
Expand Down Expand Up @@ -811,7 +816,8 @@ public DirectoryStream<Path> newDirectoryStream(final Path dir, final Filter<? s
checkNotNull(filter);
initStorage();

final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(CloudStorageUtil.getMaxChannelReopensFromPath(dir));
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(
cloudPath.getFileSystem().config());
// Loop will terminate via an exception if all retries are exhausted
while (true) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ final class CloudStorageReadChannel implements SeekableByteChannel {
final int maxRetries;
// open options, we keep them around for reopens.
final BlobSourceOption[] blobSourceOptions;
final CloudStorageConfiguration config;
private ReadChannel channel;
private long position;
private long size;
Expand All @@ -67,6 +68,7 @@ final class CloudStorageReadChannel implements SeekableByteChannel {
/**
* @param maxChannelReopens max number of times to try re-opening the channel if it closes on us
* unexpectedly.
* @param config configuration for what to retry on.
* @param blobSourceOptions BlobSourceOption.userProject if you want to pay the charges (required
* for requester-pays buckets). Note:
* Buckets that have Requester Pays disabled still accept requests that include a billing
Expand All @@ -79,17 +81,18 @@ final class CloudStorageReadChannel implements SeekableByteChannel {
*/
@CheckReturnValue
@SuppressWarnings("resource")
static CloudStorageReadChannel create(Storage gcsStorage, BlobId file, long position, int maxChannelReopens, @Nullable String userProject, BlobSourceOption... blobSourceOptions)
static CloudStorageReadChannel create(Storage gcsStorage, BlobId file, long position, int maxChannelReopens, final CloudStorageConfiguration config, @Nullable String userProject, BlobSourceOption... blobSourceOptions)
throws IOException {
return new CloudStorageReadChannel(gcsStorage, file, position, maxChannelReopens, userProject, blobSourceOptions);
return new CloudStorageReadChannel(gcsStorage, file, position, maxChannelReopens, config, userProject, blobSourceOptions);
}

private CloudStorageReadChannel(Storage gcsStorage, BlobId file, long position, int maxChannelReopens, @Nullable String userProject, BlobSourceOption... blobSourceOptions) throws IOException {
private CloudStorageReadChannel(Storage gcsStorage, BlobId file, long position, int maxChannelReopens, final CloudStorageConfiguration config, @Nullable String userProject, BlobSourceOption... blobSourceOptions) throws IOException {
this.gcsStorage = gcsStorage;
this.file = file;
this.position = position;
this.maxChannelReopens = maxChannelReopens;
this.maxRetries = Math.max(3, maxChannelReopens);
this.config = config;
// get the generation, enshrine that in our options
fetchSize(gcsStorage, userProject, file);
List options = Lists.newArrayList(blobSourceOptions);
Expand Down Expand Up @@ -133,7 +136,7 @@ public int read(ByteBuffer dst) throws IOException {
synchronized (this) {
checkOpen();
int amt;
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(maxRetries, maxChannelReopens);
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(maxRetries, maxChannelReopens, config);
dst.mark();
while (true) {
try {
Expand Down Expand Up @@ -203,7 +206,7 @@ private void checkOpen() throws ClosedChannelException {
}

private long fetchSize(Storage gcsStorage, @Nullable String userProject, BlobId file) throws IOException {
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(maxRetries, maxChannelReopens);
final CloudStorageRetryHandler retryHandler = new CloudStorageRetryHandler(maxRetries, maxChannelReopens, config);

while (true) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
package com.google.cloud.storage.contrib.nio;

import com.google.cloud.storage.StorageException;

import java.io.EOFException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import javax.net.ssl.SSLException;
import com.google.common.annotations.VisibleForTesting;

/**
* Simple counter class to keep track of retry and reopen attempts when StorageExceptions are
Expand All @@ -34,26 +30,62 @@ public class CloudStorageRetryHandler {
private long totalWaitTime; // in milliseconds
private final int maxRetries;
private final int maxReopens;
private final CloudStorageConfiguration config;


/**
* Create a CloudStorageRetryHandler with the maximum retries and reopens set to the same value.
*
* @param maxRetriesAndReopens value for both maxRetries and maxReopens
*
* @deprecated use CloudStorageRetryHandler(CloudStorageConfiguration) instead.
*/
@java.lang.Deprecated

This comment was marked as spam.

public CloudStorageRetryHandler(final int maxRetriesAndReopens) {
this.maxRetries = maxRetriesAndReopens;
this.maxReopens = maxRetriesAndReopens;
// we're just using the retry parameters from the config, so it's OK to have a default.
this.config = CloudStorageConfiguration.DEFAULT;
}

/**
/**

This comment was marked as spam.

* Create a CloudStorageRetryHandler with the maximum retries and reopens set to different values.
*
* @param maxRetries maximum number of retries
* @param maxReopens maximum number of reopens
*
* @deprecated use CloudStorageRetryHandler(CloudStorageConfiguration) instead.
*/
@java.lang.Deprecated

This comment was marked as spam.

public CloudStorageRetryHandler(final int maxRetries, final int maxReopens) {
this.maxRetries = maxRetries;
this.maxReopens = maxReopens;
// we're just using the retry parameters from the config, so it's OK to have a default.
this.config = CloudStorageConfiguration.DEFAULT;
}

/**
* Create a CloudStorageRetryHandler with the maximum retries and reopens set to the same value.
*
* @param config - configuration for reopens and retryable codes.
*/
public CloudStorageRetryHandler(final CloudStorageConfiguration config) {
this.maxRetries = config.maxChannelReopens();
this.maxReopens = config.maxChannelReopens();
this.config = config;
}

/**
* Create a CloudStorageRetryHandler with the maximum retries and reopens set to different values.
*
* @param maxRetries maximum number of retries
* @param maxReopens maximum number of reopens (overrides what's in the config)
* @param config http codes we'll retry on, and exceptions we'll reopen on.
*/
public CloudStorageRetryHandler(final int maxRetries, final int maxReopens, final CloudStorageConfiguration config) {
chingor13 marked this conversation as resolved.
Show resolved Hide resolved
this.maxRetries = maxRetries;
this.maxReopens = maxReopens;
this.config = config;
}

/**
Expand Down Expand Up @@ -143,25 +175,36 @@ void sleepForAttempt(int attempt) {
* @param exs StorageException to test
* @return true if exs is a retryable error, otherwise false
*/
private static boolean isRetryable(final StorageException exs) {
return exs.isRetryable() || exs.getCode() == 500 || exs.getCode() == 502 || exs.getCode() == 503;
@VisibleForTesting
boolean isRetryable(final StorageException exs) {
if (exs.isRetryable()) {
return true;
}
for (int code : config.retryableHttpCodes()) {
if (exs.getCode() == code) {
return true;
}
}
return false;
}

/**
* @param exs StorageException to test
* @return true if exs is an error that can be resolved via a channel reopen, otherwise false
*/
private static boolean isReopenable(final StorageException exs) {
@VisibleForTesting
boolean isReopenable(final StorageException exs) {
Throwable throwable = exs;
// ensures finite iteration
int maxDepth = 20;
while (throwable != null && maxDepth-- > 0) {
if ((throwable.getMessage() != null
&& throwable.getMessage().contains("Connection closed prematurely"))
|| throwable instanceof SSLException
|| throwable instanceof EOFException
|| throwable instanceof SocketException
|| throwable instanceof SocketTimeoutException) {
for (Class<? extends Exception> reopenableException : config.reopenableExceptions()) {
if (reopenableException.isInstance(throwable)) {
return true;
}
}
if (throwable.getMessage() != null
&& throwable.getMessage().contains("Connection closed prematurely")) {
return true;
}
throwable = throwable.getCause();
Expand Down
Loading