diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 09c19ad071a98..42952e2a42b30 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -489,6 +489,16 @@ public final class StreamStatisticNames { public static final String STREAM_FILE_CACHE_EVICTION = "stream_file_cache_eviction"; + /** + * Bytes that were prefetched by the stream. + */ + public static final String STREAM_READ_PREFETCHED_BYTES = "stream_read_prefetched_bytes"; + + /** + * Tracks failures in footer parsing. + */ + public static final String STREAM_READ_PARQUET_FOOTER_PARSING_FAILED = "stream_read_parquet_footer_parsing_failed"; + private StreamStatisticNames() { } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index b3c907428ac4e..1c93795c9a288 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -81,6 +81,8 @@ import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; import static org.apache.hadoop.fs.s3a.Statistic.*; @@ -891,7 +893,11 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES, StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE, - StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED) + StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST, + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, + STREAM_READ_PREFETCHED_BYTES, + STREAM_READ_PARQUET_FOOTER_PARSING_FAILED + ) .withGauges(STREAM_READ_GAUGE_INPUT_POLICY, STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(), STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(), @@ -1128,6 +1134,27 @@ public void readVectoredBytesDiscarded(int discarded) { bytesDiscardedInVectoredIO.addAndGet(discarded); } + @Override + public void getRequestInitiated() { + increment(ACTION_HTTP_GET_REQUEST); + } + + @Override + public void headRequestInitiated() { + increment(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST); + } + + @Override + public void bytesPrefetched(long size) { + increment(STREAM_READ_PREFETCHED_BYTES, size); + } + + @Override + public void footerParsingFailed() { + increment(STREAM_READ_PARQUET_FOOTER_PARSING_FAILED); + } + + @Override public void executorAcquired(Duration timeInQueue) { // update the duration fields in the IOStatistics. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 6389742167def..e992a1e1c5ae4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -459,8 +459,8 @@ public enum Statistic { "Gauge of active memory in use", TYPE_GAUGE), - /* Stream Write statistics */ + /* Stream Write statistics */ STREAM_WRITE_EXCEPTIONS( StreamStatisticNames.STREAM_WRITE_EXCEPTIONS, "Count of stream write failures reported", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java new file mode 100644 index 0000000000000..7c5ef99b6a41e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; + +/** + * Implementation of AAL's RequestCallback interface that tracks analytics operations. + */ +public class AnalyticsRequestCallback implements RequestCallback { + private final S3AInputStreamStatistics statistics; + + /** + * Create a new callback instance. + * @param statistics the statistics to update + */ + public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) { + this.statistics = statistics; + } + + @Override + public void onGetRequest() { + statistics.getRequestInitiated(); + } + + @Override + public void onHeadRequest() { + statistics.headRequestInitiated(); + } + + @Override + public void onBlockPrefetch(long start, long end) { + statistics.bytesPrefetched(end - start); + } + + @Override + public void footerParsingFailed() { + statistics.footerParsingFailed(); + } + + +} + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 8920b5b2dfc7c..f8b54eb2ad01f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -40,6 +40,7 @@ import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException { super(InputStreamType.Analytics, parameters); S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()), buildOpenStreamInformation(parameters)); getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); @@ -80,6 +82,9 @@ public AnalyticsStream(final ObjectReadParameters parameters, @Override public int read() throws IOException { throwIfClosed(); + + getS3AStreamStatistics().readOperationStarted(getPos(), 1); + int bytesRead; try { bytesRead = inputStream.read(); @@ -87,6 +92,11 @@ public int read() throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead != -1) { + incrementBytesRead(1); + } + return bytesRead; } @@ -122,6 +132,8 @@ public synchronized long getPos() { */ public int readTail(byte[] buf, int off, int len) throws IOException { throwIfClosed(); + getS3AStreamStatistics().readOperationStarted(getPos(), len); + int bytesRead; try { bytesRead = inputStream.readTail(buf, off, len); @@ -129,12 +141,20 @@ public int readTail(byte[] buf, int off, int len) throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead > 0) { + incrementBytesRead(bytesRead); + } + return bytesRead; } @Override public int read(byte[] buf, int off, int len) throws IOException { throwIfClosed(); + + getS3AStreamStatistics().readOperationStarted(getPos(), len); + int bytesRead; try { bytesRead = inputStream.read(buf, off, len); @@ -142,6 +162,11 @@ public int read(byte[] buf, int off, int len) throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead > 0) { + incrementBytesRead(bytesRead); + } + return bytesRead; } @@ -247,10 +272,13 @@ private void onReadFailure(IOException ioe) throws IOException { } private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) { + + final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics()); + OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder = OpenStreamInformation.builder() .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext() - .getInputPolicy())); + .getInputPolicy())).requestCallback(requestCallback); if (parameters.getObjectAttributes().getETag() != null) { openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder() @@ -300,4 +328,16 @@ protected void throwIfClosed() throws IOException { throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } } + + /** + * Increment the bytes read counter if there is a stats instance + * and the number of bytes read is more than zero. + * @param bytesRead number of bytes read + */ + private void incrementBytesRead(long bytesRead) { + getS3AStreamStatistics().bytesRead(bytesRead); + if (getContext().getStats() != null && bytesRead > 0) { + getContext().getStats().incrementBytesRead(bytesRead); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java index 50333c68e0cd2..cff743242f135 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -48,7 +48,6 @@ public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory { private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration; private LazyAutoCloseableReference s3SeekableInputStreamFactory; - private boolean requireCrt; public AnalyticsStreamFactory() { super("AnalyticsStreamFactory"); @@ -61,7 +60,6 @@ protected void serviceInit(final Configuration conf) throws Exception { ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); this.seekableInputStreamConfiguration = S3SeekableInputStreamConfiguration.fromConfiguration(configuration); - this.requireCrt = false; } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java index 7ad7cf75367e2..f6e06cd04d8f8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java @@ -119,6 +119,27 @@ void readVectoredOperationStarted(int numIncomingRanges, */ void readVectoredBytesDiscarded(int discarded); + /** + * Number of S3 GET requests initiated by the stream. + */ + void getRequestInitiated(); + + /** + * Number of S3 HEAD requests initiated by the stream. + */ + void headRequestInitiated(); + + /** + * Number of bytes prefetched. + * @param size number of bytes prefetched. + */ + void bytesPrefetched(long size); + + /** + * Number of failures in footer parsing. + */ + void footerParsingFailed(); + @Override void close(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index 26b9f2b1568ca..1d85bf81b8047 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -212,6 +212,26 @@ public void readVectoredBytesDiscarded(int discarded) { } + @Override + public void getRequestInitiated() { + + } + + @Override + public void headRequestInitiated() { + + } + + @Override + public void bytesPrefetched(long size) { + + } + + @Override + public void footerParsingFailed() { + + } + @Override public void close() { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index 8f8f90f9b1e65..21cfb4e70b0c6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -43,14 +43,21 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; +import static org.apache.hadoop.io.Sizes.S_1K; +import static org.apache.hadoop.io.Sizes.S_1M; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_KB; +import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_MB; /** * Tests integration of the @@ -105,9 +112,21 @@ public void testConnectorFrameWorkIntegration() throws Throwable { Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics); Assertions.assertThat(objectInputStream.getInputPolicy()) .isEqualTo(S3AInputPolicy.Sequential); + + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); + + long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read") + .isEqualTo(500); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + + // Total file size is: 21511173, and read starts from pos 5. Since policy is WHOLE_FILE, the whole file starts + // getting prefetched as soon as the stream to it is opened. So prefetched bytes is 21511173 - 5 = 21511168 + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 21511168); + fs.close(); verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1); @@ -118,6 +137,45 @@ public void testConnectorFrameWorkIntegration() throws Throwable { verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); } + @Test + public void testSequentialPrefetching() throws IOException { + S3AFileSystem fs = + (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), getConfiguration()); + byte[] buffer = new byte[10 * ONE_MB]; + IOStatistics ioStats; + + // Here we read through the 21MB external test file, but do not pass in the WHOLE_FILE policy. Instead, we rely + // on AAL detecting a sequential pattern being read, and then prefetching bytes in a geometrical progression. + // AAL's sequential prefetching starts prefetching in increments 4MB, 8MB, 16MB etc. depending on how many + // sequential reads happen. + try (FSDataInputStream inputStream = fs.open(externalTestFile)) { + ioStats = inputStream.getIOStatistics(); + + inputStream.readFully(buffer, 0, ONE_MB); + // The first sequential read, so prefetch the next 4MB. + inputStream.readFully(buffer, 0, ONE_MB); + + // Since ONE_MB was requested by the reader, the prefetched bytes are 3MB. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * ONE_MB); + + // These next two reads are within the last prefetched bytes, so no further bytes are prefetched. + inputStream.readFully(buffer, 0, 2 * ONE_MB); + inputStream.readFully(buffer, 0, ONE_MB); + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * ONE_MB); + + // Another sequential read, GP will now prefetch the next 8MB of data. + inputStream.readFully(buffer, 0, ONE_MB); + // A total of 10MB is prefetched - 3MB and then 7MB. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB); + + inputStream.readFully(buffer, 0, 10 * ONE_MB); + + // Though the next GP should prefetch 16MB, since the file is ~23MB, only the bytes till EoF are prefetched: + // 6291456 remaining bytes. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB + 6291456); + } + } + @Test public void testMalformedParquetFooter() throws IOException { describe("Reading a malformed parquet file should not throw an exception"); @@ -136,7 +194,32 @@ public void testMalformedParquetFooter() throws IOException { byte[] buffer = new byte[500]; IOStatistics ioStats; + int bytesRead; + + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + ioStats = inputStream.getIOStatistics(); + inputStream.seek(5); + bytesRead = inputStream.read(buffer, 0, 500); + + ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream(); + long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read") + .isEqualTo(bytesRead); + + } + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0); + // This file has a content length of 450. Since it's a parquet file, AAL will prefetch the footer bytes (last 32KB), + // as soon as the file is opened, but because the file is < 32KB, the whole file is prefetched. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 450); + // The footer is only prefetched once, but parsing is attempted on each stream open. + verifyStatisticCounterValue(ioStats, STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 1); + + // Open a stream to the object twice, verifying that data is cached, and streams to the same object, do not + // prefetch the same data twice. try (FSDataInputStream inputStream = getFileSystem().open(dest)) { ioStats = inputStream.getIOStatistics(); inputStream.seek(5); @@ -144,6 +227,13 @@ public void testMalformedParquetFooter() throws IOException { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0); + // No data is prefetched, as it already exists in the cache from the previous factory. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 0); + + // The footer is only prefetched once, but parsing is attempted on each stream open. + verifyStatisticCounterValue(ioStats, STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 1); } /** @@ -164,6 +254,9 @@ public void testMultiRowGroupParquet() throws Throwable { FileStatus fileStatus = getFileSystem().getFileStatus(dest); + // Audit operations before any reading happens, required for uploading the parquet file. + verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 8); + byte[] buffer = new byte[3000]; IOStatistics ioStats; @@ -173,17 +266,31 @@ public void testMultiRowGroupParquet() throws Throwable { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + + // S3A makes a HEAD request on the stream open(), and then AAL makes a GET request to get the object, total audit + // operations = 10. + verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 8 + 2); try (FSDataInputStream inputStream = getFileSystem().openFile(dest) .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) .build().get()) { ioStats = inputStream.getIOStatistics(); inputStream.readFully(buffer, 0, (int) fileStatus.getLen()); + + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen()); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); + // S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0); + + // S3A makes a HEAD on the stream open, no GET request is made since data is already cached. + verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 10 + 1); } @Test @@ -203,4 +310,59 @@ public void testInvalidConfigurationThrows() throws Exception { () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); } + + @Test + public void testRandomSeekPatternGets() throws Throwable { + describe("Random seek pattern should optimize GET requests"); + + Path dest = path("seek-test.txt"); + byte[] data = dataset(5 * S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true); + + byte[] buffer = new byte[S_1M]; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + + inputStream.read(buffer); + inputStream.seek(2 * S_1M); + inputStream.read(new byte[512 * S_1K]); + inputStream.seek(3 * S_1M); + inputStream.read(new byte[512 * S_1K]); + + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0); + } + } + + + @Test + public void testSequentialStreamsNoDuplicateGets() throws Throwable { + describe("Sequential streams reading same object should not duplicate GETs"); + + Path dest = path("sequential-test.txt"); + byte[] data = dataset(S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, S_1M, 1024, true); + + byte[] buffer = new byte[ONE_MB]; + try (FSDataInputStream stream1 = getFileSystem().open(dest); + FSDataInputStream stream2 = getFileSystem().open(dest)) { + + stream1.read(buffer, 0, 2 * ONE_KB); + stream2.read(buffer); + stream1.read(buffer, 0 , 10 * ONE_KB); + + IOStatistics stats1 = stream1.getIOStatistics(); + IOStatistics stats2 = stream2.getIOStatistics(); + + verifyStatisticCounterValue(stats1, ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(stats2, ACTION_HTTP_HEAD_REQUEST, 0); + + // Since it's a small file (ALL will prefetch the whole file for size < 8MB), the whole file is prefetched + // on the first read. + verifyStatisticCounterValue(stats1, STREAM_READ_PREFETCHED_BYTES, 1048575); + + // The second stream will not prefetch any bytes, as they have already been prefetched by stream 1. + verifyStatisticCounterValue(stats2, STREAM_READ_PREFETCHED_BYTES, 0); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java index 12a1cd7d8f63e..2bc342717a16c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -45,7 +45,6 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; @@ -81,10 +80,7 @@ protected Configuration createConfiguration() { public void setup() throws Exception { super.setup(); executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); - // Analytics accelerator currently does not support IOStatisticsContext, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support IOStatisticsContext"); + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java index 2ae28c74fe5b7..548b30a3b2dcb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.io.InputStream; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; /** @@ -52,10 +51,6 @@ public void testMetricsRegister() @Test public void testStreamStatistics() throws IOException { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path file = path("testStreamStatistics"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index 9ad2c0625a094..7a62f76dd0b7d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; @@ -174,10 +173,7 @@ private void abortActiveStream() throws IOException { public void testCostOfCreatingMagicFile() throws Throwable { describe("Files created under magic paths skip existence checks and marker deletes"); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); Path destFile = methodSubPath("file.txt"); fs.delete(destFile.getParent(), true); @@ -255,10 +251,6 @@ public void testCostOfCreatingMagicFile() throws Throwable { public void testCostOfSavingLoadingPendingFile() throws Throwable { describe("Verify costs of saving .pending file under a magic path"); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path partDir = methodSubPath("file.pending"); Path destFile = new Path(partDir, "file.pending"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index 484d1cc2c3121..e958c98942095 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -48,10 +47,6 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest { @BeforeEach public void setUp() throws Exception { conf = new Configuration(); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(conf, - "Analytics Accelerator currently does not support stream statistics"); fc = S3ATestUtils.createTestFileContext(conf); testRootPath = fileContextTestHelper.getTestRootPath(fc, "test"); fc.mkdir(testRootPath, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index c1c03ca6e7212..8fe7c914bd64a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -58,7 +58,6 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; @@ -71,6 +70,7 @@ import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; @@ -95,6 +95,16 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { */ private boolean prefetching; + /** + * Is the analytics stream enabled? + */ + private boolean analyticsStream; + + /** + * Is the classic input stream enabled? + */ + private boolean classicInputStream; + @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); @@ -113,17 +123,14 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); testFile = methodPath(); - writeTextFile(fs, testFile, TEXT, true); testFileStatus = fs.getFileStatus(testFile); fileLength = (int)testFileStatus.getLen(); prefetching = prefetching(); + analyticsStream = isAnalyticsStream(); + classicInputStream = isClassicInputStream(); } /** @@ -177,6 +184,10 @@ public void testStreamIsNotChecksummed() throws Throwable { // if prefetching is enabled, skip this test assumeNoPrefetching(); + // If AAL is enabled, skip this test. AAL uses S3A's default S3 client, and if checksumming is disabled on the + // client, then AAL will also not enforce it. + assumeNotAnalytics(); + S3AFileSystem fs = getFileSystem(); // open the file @@ -193,13 +204,19 @@ public void testStreamIsNotChecksummed() throws Throwable { // open the stream. in.read(); + // now examine the innermost stream and make sure it doesn't have a checksum - assertStreamIsNotChecksummed(getS3AInputStream(in)); + assertStreamIsNotChecksummed(getS3AInputStream(in)); } } @Test public void testOpenFileShorterLength() throws Throwable { + + // For AAL, since it makes the HEAD to get the file length if the eTag is not supplied, + // it is not able to use the file length supplied in the open() call, and the test fails. + assumeNotAnalytics(); + // do a second read with the length declared as short. // we now expect the bytes read to be shorter. S3AFileSystem fs = getFileSystem(); @@ -244,7 +261,6 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { final int extra = 10; long longLen = fileLength + extra; - // assert behaviors of seeking/reading past the file length. // there is no attempt at recovery. verifyMetrics(() -> { @@ -261,10 +277,12 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { } }, always(), - // two GET calls were made, one for readFully, - // the second on the read() past the EOF + // two GET calls were made, one for readFully, + // the second on the read() past the EOF // the operation has got as far as S3 - probe(!prefetching(), STREAM_READ_OPENED, 1 + 1)); + probe(classicInputStream, STREAM_READ_OPENED, 1 + 1), + // For AAL, the seek past content length fails, before the GET is made. + probe(analyticsStream, STREAM_READ_OPENED, 1)); // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { @@ -275,10 +293,6 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { return in.toString(); } }, - // two GET calls were made, one for readFully, - // the second on the read() past the EOF - // the operation has got as far as S3 - with(STREAM_READ_OPENED, 1)); } @@ -348,7 +362,9 @@ public void testReadPastEOF() throws Throwable { } }, always(), - probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra)); + probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, extra), + // AAL won't make the GET call if trying to read beyond EOF + probe(analyticsStream, Statistic.ACTION_HTTP_GET_REQUEST, 0)); } /** @@ -439,18 +455,27 @@ public void testVectorReadPastEOF() throws Throwable { byte[] buf = new byte[longLen]; ByteBuffer bb = ByteBuffer.wrap(buf); final FileRange range = FileRange.createFileRange(0, longLen); - in.readVectored(Arrays.asList(range), (i) -> bb); - interceptFuture(EOFException.class, - "", - ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, - TimeUnit.SECONDS, - range.getData()); - assertS3StreamClosed(in); - return "vector read past EOF with " + in; + + // For AAL, if there is no eTag, the provided length will not be passed in, and a HEAD request will be made. + // AAL requires the etag to detect changes in the object and then do cache eviction if required. + if (isAnalyticsStream()) { + intercept(EOFException.class, () -> in.readVectored(Arrays.asList(range), (i) -> bb)); + verifyStatisticCounterValue(in.getIOStatistics(), ACTION_HTTP_HEAD_REQUEST, 1); + return "vector read past EOF with " + in; + } else { + in.readVectored(Arrays.asList(range), (i) -> bb); + interceptFuture(EOFException.class, + "", + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + range.getData()); + assertS3StreamClosed(in); + return "vector read past EOF with " + in; + } } }, always(), - probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); + probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, 1)); } /** @@ -461,6 +486,22 @@ private boolean prefetching() { return InputStreamType.Prefetch == streamType(getFileSystem()); } + /** + * Is the current stream type Analytics? + * @return true if Analytics stream is enabled. + */ + private boolean isAnalyticsStream() { + return InputStreamType.Analytics == streamType(getFileSystem()); + } + + /** + * Is the current input stream type S3AInputStream? + * @return true if the S3AInputStream is being used. + */ + private boolean isClassicInputStream() { + return InputStreamType.Classic == streamType(getFileSystem()); + } + /** * Skip the test if prefetching is enabled. */ @@ -470,6 +511,12 @@ private void assumeNoPrefetching(){ } } + private void assumeNotAnalytics() { + if (analyticsStream) { + skip("Analytics stream is enabled"); + } + } + /** * Assert that the inner S3 Stream is closed. * @param in input stream diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java index 4165f7a6c9cb9..7f363d190c53a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.tags.IntegrationTest; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; /** @@ -86,10 +85,6 @@ public List outputStreamStatisticKeys() { @Test @Override public void testInputStreamStatisticRead() throws Throwable { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), - "Analytics Accelerator currently does not support stream statistics"); super.testInputStreamStatisticRead(); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index 2f54cab00b13d..81a719fbea2b2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.StreamStatisticNames; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { @@ -44,10 +43,6 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { */ @Test public void testBytesReadWithStream() throws IOException { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path filePath = path(getMethodName()); byte[] oneKbBuf = new byte[ONE_KB];