Skip to content

Commit 9c767a3

Browse files
HADOOP-19027. S3A: S3AInputStream doesn't recover from channel exceptions (#6425) (#8003)
This is a fraction of #6425 backported to the V1 SDK, which does a lot of V2-specific translation/unwinding. 416 responses are mapped to RangeNotSatisfiableEOFException, whose retry policy is: fail. Classic EOFException is now retried as connection failure. calls to read() and lazyseek all retry on this with full retry policy, including handling of socket errors. Contributed by Steve Loughran
1 parent 678aea9 commit 9c767a3

File tree

7 files changed

+353
-62
lines changed

7 files changed

+353
-62
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
import java.io.EOFException;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
25+
/**
26+
* Status code 416, range not satisfiable.
27+
* Subclass of {@link EOFException} so that any code which expects that to
28+
* be the outcome of a 416 failure will continue to work.
29+
*/
30+
@InterfaceAudience.Private
31+
public class RangeNotSatisfiableEOFException extends EOFException {
32+
33+
public RangeNotSatisfiableEOFException(
34+
String operation,
35+
Exception cause) {
36+
super(operation);
37+
initCause(cause);
38+
}
39+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
9999
public static final String OPERATION_OPEN = "open";
100100
public static final String OPERATION_REOPEN = "re-open";
101101

102+
/**
103+
* Switch for behavior on when wrappedStream.read()
104+
* returns -1 or raises an EOF; the original semantics
105+
* are that the stream is kept open.
106+
* Value {@value}.
107+
*/
108+
private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true;
109+
102110
/**
103111
* This is the maximum temporary buffer size we use while
104112
* populating the data in direct byte buffers during a vectored IO
@@ -435,16 +443,23 @@ public boolean seekToNewSource(long targetPos) throws IOException {
435443

436444
/**
437445
* Perform lazy seek and adjust stream to correct position for reading.
438-
*
446+
* If an EOF Exception is raised there are two possibilities
447+
* <ol>
448+
* <li>the stream is at the end of the file</li>
449+
* <li>something went wrong with the network connection</li>
450+
* </ol>
451+
* This method does not attempt to distinguish; it assumes that an EOF
452+
* exception is always "end of file".
439453
* @param targetPos position from where data should be read
440454
* @param len length of the content that needs to be read
455+
* @throws RangeNotSatisfiableEOFException GET is out of range
456+
* @throws IOException anything else.
441457
*/
442458
@Retries.RetryTranslated
443459
private void lazySeek(long targetPos, long len) throws IOException {
444460

445461
Invoker invoker = context.getReadInvoker();
446-
invoker.maybeRetry(streamStatistics.getOpenOperations() == 0,
447-
"lazySeek", pathStr, true,
462+
invoker.retry("lazySeek to " + targetPos, pathStr, true,
448463
() -> {
449464
//For lazy seek
450465
seekInStream(targetPos, len);
@@ -478,7 +493,9 @@ public synchronized int read() throws IOException {
478493

479494
try {
480495
lazySeek(nextReadPos, 1);
481-
} catch (EOFException e) {
496+
} catch (RangeNotSatisfiableEOFException e) {
497+
// attempt to GET beyond the end of the object
498+
LOG.debug("Downgrading 416 response attempt to read at {} to -1 response", nextReadPos);
482499
return -1;
483500
}
484501

@@ -494,8 +511,6 @@ public synchronized int read() throws IOException {
494511
}
495512
try {
496513
b = wrappedStream.read();
497-
} catch (EOFException e) {
498-
return -1;
499514
} catch (SocketTimeoutException e) {
500515
onReadFailure(e, true);
501516
throw e;
@@ -509,10 +524,9 @@ public synchronized int read() throws IOException {
509524
if (byteRead >= 0) {
510525
pos++;
511526
nextReadPos++;
512-
}
513-
514-
if (byteRead >= 0) {
515527
incrementBytesRead(1);
528+
} else {
529+
streamReadResultNegative();
516530
}
517531
return byteRead;
518532
}
@@ -537,6 +551,18 @@ private void onReadFailure(IOException ioe, boolean forceAbort) {
537551
closeStream("failure recovery", forceAbort, false);
538552
}
539553

554+
/**
555+
* the read() call returned -1.
556+
* this means "the connection has gone past the end of the object" or
557+
* the stream has broken for some reason.
558+
* so close stream (without an abort).
559+
*/
560+
private void streamReadResultNegative() {
561+
if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) {
562+
closeStream("wrappedStream.read() returned -1", false, false);
563+
}
564+
}
565+
540566
/**
541567
* {@inheritDoc}
542568
*
@@ -562,8 +588,8 @@ public synchronized int read(byte[] buf, int off, int len)
562588

563589
try {
564590
lazySeek(nextReadPos, len);
565-
} catch (EOFException e) {
566-
// the end of the file has moved
591+
} catch (RangeNotSatisfiableEOFException e) {
592+
// attempt to GET beyond the end of the object
567593
return -1;
568594
}
569595

@@ -581,12 +607,12 @@ public synchronized int read(byte[] buf, int off, int len)
581607
}
582608
try {
583609
bytes = wrappedStream.read(buf, off, len);
584-
} catch (EOFException e) {
585-
// the base implementation swallows EOFs.
586-
return -1;
587610
} catch (SocketTimeoutException e) {
588611
onReadFailure(e, true);
589612
throw e;
613+
} catch (EOFException e) {
614+
// the base implementation swallows EOFs.
615+
return -1;
590616
} catch (IOException e) {
591617
onReadFailure(e, false);
592618
throw e;
@@ -597,8 +623,10 @@ public synchronized int read(byte[] buf, int off, int len)
597623
if (bytesRead > 0) {
598624
pos += bytesRead;
599625
nextReadPos += bytesRead;
626+
incrementBytesRead(bytesRead);
627+
} else {
628+
streamReadResultNegative();
600629
}
601-
incrementBytesRead(bytesRead);
602630
streamStatistics.readOperationCompleted(len, bytesRead);
603631
return bytesRead;
604632
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import java.io.FileNotFoundException;
2323
import java.io.IOException;
2424
import java.io.InterruptedIOException;
25+
import java.net.BindException;
26+
import java.net.ConnectException;
2527
import java.net.NoRouteToHostException;
28+
import java.net.SocketException;
2629
import java.net.SocketTimeoutException;
2730
import java.net.UnknownHostException;
2831
import java.nio.file.AccessDeniedException;
@@ -197,21 +200,27 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
197200
// implementation
198201
policyMap.put(NoVersionAttributeException.class, fail);
199202

203+
// range header is out of scope of object; retrying won't help
204+
policyMap.put(RangeNotSatisfiableEOFException.class, fail);
205+
200206
// should really be handled by resubmitting to new location;
201207
// that's beyond the scope of this retry policy
202208
policyMap.put(AWSRedirectException.class, fail);
203209

204210
// throttled requests are can be retried, always
205211
policyMap.put(AWSServiceThrottledException.class, throttlePolicy);
206212

213+
// socket exception subclass we consider unrecoverable
214+
// though this is normally only found when opening a port for listening.
215+
// which is never done in S3A.
216+
policyMap.put(BindException.class, fail);
217+
207218
// connectivity problems are retried without worrying about idempotency
208219
policyMap.put(ConnectTimeoutException.class, connectivityFailure);
220+
policyMap.put(ConnectException.class, connectivityFailure);
209221

210222
// this can be a sign of an HTTP connection breaking early.
211-
// which can be reacted to by another attempt if the request was idempotent.
212-
// But: could also be a sign of trying to read past the EOF on a GET,
213-
// which isn't going to be recovered from
214-
policyMap.put(EOFException.class, retryIdempotentCalls);
223+
policyMap.put(EOFException.class, connectivityFailure);
215224

216225
// policy on a 400/bad request still ambiguous.
217226
// Treated as an immediate failure
@@ -227,7 +236,9 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
227236
policyMap.put(AWSClientIOException.class, retryIdempotentCalls);
228237
policyMap.put(AWSServiceIOException.class, retryIdempotentCalls);
229238
policyMap.put(AWSS3IOException.class, retryIdempotentCalls);
230-
policyMap.put(SocketTimeoutException.class, retryIdempotentCalls);
239+
// general socket exceptions
240+
policyMap.put(SocketException.class, connectivityFailure);
241+
policyMap.put(SocketTimeoutException.class, connectivityFailure);
231242

232243
// Unsupported requests do not work, however many times you try
233244
policyMap.put(UnsupportedRequestException.class, fail);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
import static org.apache.commons.lang3.StringUtils.isEmpty;
9090
import static org.apache.hadoop.fs.s3a.Constants.*;
9191
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
92-
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
92+
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*;
9393
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.translateDeleteException;
9494
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
9595
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -280,10 +280,13 @@ public static IOException translateException(@Nullable String operation,
280280
break;
281281

282282
// out of range. This may happen if an object is overwritten with
283-
// a shorter one while it is being read.
284-
case 416:
285-
ioe = new EOFException(message);
286-
ioe.initCause(ase);
283+
// a shorter one while it is being read or openFile() was invoked
284+
// passing a FileStatus or file length less than that of the object.
285+
// although the HTTP specification says that the response should
286+
// include a range header specifying the actual range available,
287+
// this isn't picked up here.
288+
case SC_416_RANGE_NOT_SATISFIABLE:
289+
ioe = new RangeNotSatisfiableEOFException(message, ase);
287290
break;
288291

289292
// this has surfaced as a "no response from server" message.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,71 @@ private InternalConstants() {
108108
S3A_OPENFILE_KEYS = Collections.unmodifiableSet(keys);
109109
}
110110

111+
/** 200 status code: OK. */
112+
public static final int SC_200_OK = 200;
113+
114+
/** 301 status code: Moved Permanently. */
115+
public static final int SC_301_MOVED_PERMANENTLY = 301;
116+
117+
/** 307 status code: Temporary Redirect. */
118+
public static final int SC_307_TEMPORARY_REDIRECT = 307;
119+
120+
/** 400 status code: Bad Request. */
121+
public static final int SC_400_BAD_REQUEST = 400;
122+
123+
/** 401 status code: Unauthorized. */
124+
public static final int SC_401_UNAUTHORIZED = 401;
125+
126+
/** 403 status code: Forbidden. */
127+
public static final int SC_403_FORBIDDEN = 403;
128+
111129
/** 403 error code. */
112-
public static final int SC_403 = 403;
130+
public static final int SC_403 = SC_403_FORBIDDEN;
131+
132+
/** 404 status code: Not Found. */
133+
public static final int SC_404_NOT_FOUND = 404;
113134

114135
/** 404 error code. */
115-
public static final int SC_404 = 404;
136+
public static final int SC_404 = SC_404_NOT_FOUND;
137+
138+
/** 405 status code: Method Not Allowed. */
139+
public static final int SC_405_METHOD_NOT_ALLOWED = 405;
140+
141+
/** 409 status code: Conflict. Example: creating a bucket twice. */
142+
public static final int SC_409_CONFLICT = 409;
143+
144+
/** 410 status code: Gone. */
145+
public static final int SC_410_GONE = 410;
146+
147+
/** 412 status code: Precondition Failed. */
148+
public static final int SC_412_PRECONDITION_FAILED = 412;
149+
150+
/** 415 status code: Content type unsupported by this store. */
151+
public static final int SC_415_UNSUPPORTED_MEDIA_TYPE = 415;
152+
153+
/** 416 status code: Range Not Satisfiable. */
154+
public static final int SC_416_RANGE_NOT_SATISFIABLE = 416;
155+
156+
/** 429 status code: This is the google GCS throttle message. */
157+
public static final int SC_429_TOO_MANY_REQUESTS_GCS = 429;
158+
159+
/** 443 status code: No Response (unofficial). */
160+
public static final int SC_443_NO_RESPONSE = 443;
161+
162+
/** 444 status code: No Response (unofficial). */
163+
public static final int SC_444_NO_RESPONSE = 444;
164+
165+
/** 500 status code: Internal Server Error. */
166+
public static final int SC_500_INTERNAL_SERVER_ERROR = 500;
167+
168+
/** 501 status code: method not implemented. */
169+
public static final int SC_501_NOT_IMPLEMENTED = 501;
170+
171+
/** 503 status code: Service Unavailable. on AWS S3: throttle response. */
172+
public static final int SC_503_SERVICE_UNAVAILABLE = 503;
173+
174+
/** 504 Gateway Timeout. AWS SDK considers retryable. */
175+
public static final int SC_504_GATEWAY_TIMEOUT = 504;
116176

117177
/** Name of the log for throttling events. Value: {@value}. */
118178
public static final String THROTTLE_LOG_NAME =

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
2323
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
2424
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
25+
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416_RANGE_NOT_SATISFIABLE;
2526
import static org.junit.Assert.*;
2627

2728
import java.io.EOFException;
@@ -38,6 +39,7 @@
3839
import com.amazonaws.AmazonServiceException;
3940
import com.amazonaws.services.s3.model.AmazonS3Exception;
4041

42+
import org.assertj.core.api.Assertions;
4143
import org.junit.Test;
4244

4345
import org.apache.hadoop.fs.s3a.impl.ErrorTranslation;
@@ -80,10 +82,10 @@ protected void assertContained(String text, String contained) {
8082
text != null && text.contains(contained));
8183
}
8284

83-
protected <E extends Throwable> void verifyTranslated(
85+
protected <E extends Throwable> E verifyTranslated(
8486
int status,
8587
Class<E> expected) throws Exception {
86-
verifyTranslated(expected, createS3Exception(status));
88+
return verifyTranslated(expected, createS3Exception(status));
8789
}
8890

8991
@Test
@@ -128,7 +130,12 @@ public void test410isNotFound() throws Exception {
128130

129131
@Test
130132
public void test416isEOF() throws Exception {
131-
verifyTranslated(416, EOFException.class);
133+
134+
// 416 maps the the subclass of EOFException
135+
final IOException ex = verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE,
136+
RangeNotSatisfiableEOFException.class);
137+
Assertions.assertThat(ex)
138+
.isInstanceOf(EOFException.class);
132139
}
133140

134141
@Test

0 commit comments

Comments
 (0)