From 9e7bc0b768dd103dfb32c5610e52758d95535b04 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Thu, 2 Oct 2025 11:50:00 +0100 Subject: [PATCH 1/4] IoStats changes --- .../org/apache/hadoop/fs/s3a/Statistic.java | 2 +- .../streams/AnalyticsRequestCallback.java | 49 +++++++ .../fs/s3a/impl/streams/AnalyticsStream.java | 43 +++++- .../impl/EmptyS3AStatisticsContext.java | 7 +- ...tS3AAnalyticsAcceleratorStreamReading.java | 128 +++++++++++++++++- .../fs/s3a/ITestS3AIOStatisticsContext.java | 6 +- .../apache/hadoop/fs/s3a/ITestS3AMetrics.java | 5 - .../s3a/commit/ITestCommitOperationCost.java | 10 +- .../ITestS3AFileContextStatistics.java | 5 - .../fs/s3a/performance/ITestS3AOpenCost.java | 34 +++-- .../ITestS3AContractStreamIOStatistics.java | 6 +- .../ITestS3AFileSystemStatistic.java | 5 - 12 files changed, 250 insertions(+), 50 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 6389742167def..e992a1e1c5ae4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -459,8 +459,8 @@ public enum Statistic { "Gauge of active memory in use", TYPE_GAUGE), - /* Stream Write statistics */ + /* Stream Write statistics */ STREAM_WRITE_EXCEPTIONS( StreamStatisticNames.STREAM_WRITE_EXCEPTIONS, "Count of stream write failures reported", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java new file mode 100644 index 0000000000000..b4d99d8328426 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.statistics.DurationTracker; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; + +/** + * Implementation of AAL's RequestCallback interface that tracks analytics operations. + */ +public class AnalyticsRequestCallback implements RequestCallback { + private final S3AInputStreamStatistics statistics; + + /** + * Create a new callback instance. + * @param statistics the statistics to update + */ + public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) { + this.statistics = statistics; + } + + @Override + public void onGetRequest() { + statistics.initiateGetRequest(); + } + + @Override + public void onHeadRequest() { + statistics.incrementAnalyticsHeadRequests(); + } +} + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 8920b5b2dfc7c..bca8f122dab09 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -40,6 +40,7 @@ import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException { super(InputStreamType.Analytics, parameters); S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()), buildOpenStreamInformation(parameters)); getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); @@ -80,6 +82,9 @@ public AnalyticsStream(final ObjectReadParameters parameters, @Override public int read() throws IOException { throwIfClosed(); + + getS3AStreamStatistics().readOperationStarted(getPos(), 1); + int bytesRead; try { bytesRead = inputStream.read(); @@ -87,6 +92,11 @@ public int read() throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead != -1) { + incrementBytesRead(1); + } + return bytesRead; } @@ -122,6 +132,8 @@ public synchronized long getPos() { */ public int readTail(byte[] buf, int off, int len) throws IOException { throwIfClosed(); + getS3AStreamStatistics().readOperationStarted(getPos(), len); + int bytesRead; try { bytesRead = inputStream.readTail(buf, off, len); @@ -129,12 +141,21 @@ public int readTail(byte[] buf, int off, int len) throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead > 0) { + incrementBytesRead(bytesRead); + } + return bytesRead; } @Override public int read(byte[] buf, int off, int len) throws IOException { throwIfClosed(); + long pos = getPos(); + + getS3AStreamStatistics().readOperationStarted(pos, len); + int bytesRead; try { bytesRead = inputStream.read(buf, off, len); @@ -142,6 +163,11 @@ public int read(byte[] buf, int off, int len) throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead > 0) { + incrementBytesRead(bytesRead); + } + return bytesRead; } @@ -247,10 +273,13 @@ private void onReadFailure(IOException ioe) throws IOException { } private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) { + + final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics()); + OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder = OpenStreamInformation.builder() .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext() - .getInputPolicy())); + .getInputPolicy())).requestCallback(requestCallback); if (parameters.getObjectAttributes().getETag() != null) { openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder() @@ -300,4 +329,16 @@ protected void throwIfClosed() throws IOException { throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } } + + /** + * Increment the bytes read counter if there is a stats instance + * and the number of bytes read is more than zero. + * @param bytesRead number of bytes read + */ + private void incrementBytesRead(long bytesRead) { + getS3AStreamStatistics().bytesRead(bytesRead); + if (getContext().getStats() != null && bytesRead > 0) { + getContext().getStats().incrementBytesRead(bytesRead); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index 26b9f2b1568ca..48739181efee6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -159,7 +159,12 @@ public void seekForwards(final long skipped, final long bytesReadInSeek) { } - + @Override + public void incrementAnalyticsGetRequests() { + } + @Override + public void incrementAnalyticsHeadRequests() { + } @Override public long streamOpened() { return 0; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index 8f8f90f9b1e65..c921fda32a680 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -43,13 +43,19 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED; +import static org.apache.hadoop.io.Sizes.S_1K; +import static org.apache.hadoop.io.Sizes.S_1M; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -105,6 +111,13 @@ public void testConnectorFrameWorkIntegration() throws Throwable { Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics); Assertions.assertThat(objectInputStream.getInputPolicy()) .isEqualTo(S3AInputPolicy.Sequential); + + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); + + long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read") + .isEqualTo(500); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); @@ -136,14 +149,24 @@ public void testMalformedParquetFooter() throws IOException { byte[] buffer = new byte[500]; IOStatistics ioStats; + int bytesRead; try (FSDataInputStream inputStream = getFileSystem().open(dest)) { ioStats = inputStream.getIOStatistics(); inputStream.seek(5); - inputStream.read(buffer, 0, 500); + bytesRead = inputStream.read(buffer, 0, 500); + + ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream(); + long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read") + .isEqualTo(bytesRead); + } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + // S3A passes in the meta data on file open, we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } /** @@ -173,17 +196,23 @@ public void testMultiRowGroupParquet() throws Throwable { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); try (FSDataInputStream inputStream = getFileSystem().openFile(dest) .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) .build().get()) { ioStats = inputStream.getIOStatistics(); inputStream.readFully(buffer, 0, (int) fileStatus.getLen()); - } + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen()); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); + } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); + + // S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } @Test @@ -203,4 +232,97 @@ public void testInvalidConfigurationThrows() throws Exception { () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); } + /** + * TXT files(SEQUENTIAL format) use SequentialPrefetcher(requests the entire 10MB file). + * RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB) + * The 10MB range gets split into: [0-8MB) and [8MB-10MB) + * Each split range becomes a separate Block, resulting in 2 GET requests: + */ + @Test + public void testLargeFileMultipleGets() throws Throwable { + describe("Large file should trigger multiple GET requests"); + + Path dest = path("large-test-file.txt"); + byte[] data = dataset(10 * S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, 10 * S_1M, 1024, true); + + byte[] buffer = new byte[S_1M * 10]; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + inputStream.readFully(buffer); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2); + // Because S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + } + } + + @Test + public void testSmallFileSingleGet() throws Throwable { + describe("Small file should trigger only one GET request"); + + Path dest = path("small-test-file.txt"); + byte[] data = dataset(S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, S_1M, 1024, true); + + byte[] buffer = new byte[S_1M]; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + inputStream.readFully(buffer); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + // Because S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + } + } + + + @Test + public void testRandomSeekPatternGets() throws Throwable { + describe("Random seek pattern should optimize GET requests"); + + Path dest = path("seek-test.txt"); + byte[] data = dataset(5 * S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true); + + byte[] buffer = new byte[S_1M]; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + + inputStream.read(buffer); + inputStream.seek(2 * S_1M); + inputStream.read(new byte[512 * S_1K]); + inputStream.seek(3 * S_1M); + inputStream.read(new byte[512 * S_1K]); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + } + } + + + @Test + public void testSequentialStreamsNoDuplicateGets() throws Throwable { + describe("Sequential streams reading same object should not duplicate GETs"); + + Path dest = path("sequential-test.txt"); + byte[] data = dataset(S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, S_1M, 1024, true); + + byte[] buffer = new byte[1024]; + try (FSDataInputStream stream1 = getFileSystem().open(dest); + FSDataInputStream stream2 = getFileSystem().open(dest)) { + + stream1.read(buffer); + stream2.read(buffer); + + IOStatistics stats1 = stream1.getIOStatistics(); + IOStatistics stats2 = stream2.getIOStatistics(); + + verifyStatisticCounterValue(stats1, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + verifyStatisticCounterValue(stats2, STREAM_READ_ANALYTICS_GET_REQUESTS, 0); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java index 12a1cd7d8f63e..2bc342717a16c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -45,7 +45,6 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; @@ -81,10 +80,7 @@ protected Configuration createConfiguration() { public void setup() throws Exception { super.setup(); executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); - // Analytics accelerator currently does not support IOStatisticsContext, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support IOStatisticsContext"); + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java index 2ae28c74fe5b7..548b30a3b2dcb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.io.InputStream; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; /** @@ -52,10 +51,6 @@ public void testMetricsRegister() @Test public void testStreamStatistics() throws IOException { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path file = path("testStreamStatistics"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index 9ad2c0625a094..7a62f76dd0b7d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; @@ -174,10 +173,7 @@ private void abortActiveStream() throws IOException { public void testCostOfCreatingMagicFile() throws Throwable { describe("Files created under magic paths skip existence checks and marker deletes"); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); Path destFile = methodSubPath("file.txt"); fs.delete(destFile.getParent(), true); @@ -255,10 +251,6 @@ public void testCostOfCreatingMagicFile() throws Throwable { public void testCostOfSavingLoadingPendingFile() throws Throwable { describe("Verify costs of saving .pending file under a magic path"); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path partDir = methodSubPath("file.pending"); Path destFile = new Path(partDir, "file.pending"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index 484d1cc2c3121..e958c98942095 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -48,10 +47,6 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest { @BeforeEach public void setUp() throws Exception { conf = new Configuration(); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(conf, - "Analytics Accelerator currently does not support stream statistics"); fc = S3ATestUtils.createTestFileContext(conf); testRootPath = fileContextTestHelper.getTestRootPath(fc, "test"); fc.mkdir(testRootPath, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index c1c03ca6e7212..34a7d5a54037b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -58,7 +58,6 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; @@ -113,10 +112,7 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); testFile = methodPath(); @@ -177,6 +173,11 @@ public void testStreamIsNotChecksummed() throws Throwable { // if prefetching is enabled, skip this test assumeNoPrefetching(); + // Skip for Analytics streams - checksum validation only exists in S3AInputStream. + // AnalyticsStream handles data integrity through AWS Analytics Accelerator internally. + if (isAnalyticsStream()) { + skip("Analytics stream doesn't use checksums"); + } S3AFileSystem fs = getFileSystem(); // open the file @@ -261,10 +262,13 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { } }, always(), - // two GET calls were made, one for readFully, - // the second on the read() past the EOF - // the operation has got as far as S3 - probe(!prefetching(), STREAM_READ_OPENED, 1 + 1)); + // Analytics stream: 1 open (persistent connection) + // S3AInputStream: 2 opens (reopen on EOF) + // two GET calls were made, one for readFully, + // the second on the read() past the EOF + // the operation has got as far as S3 + probe(!prefetching() && !isAnalyticsStream(), STREAM_READ_OPENED, 2), + probe(!prefetching() && isAnalyticsStream(), STREAM_READ_OPENED, 1)); // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { @@ -348,7 +352,9 @@ public void testReadPastEOF() throws Throwable { } }, always(), - probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra)); + // Analytics streams don't make HTTP requests when reading past EOF + probe(!prefetching && !isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, extra), + probe(!prefetching && isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, 0)); } /** @@ -461,6 +467,14 @@ private boolean prefetching() { return InputStreamType.Prefetch == streamType(getFileSystem()); } + /** + * Is the current stream type Analytics? + * @return true if Analytics stream is enabled. + */ + private boolean isAnalyticsStream() { + return streamType(getFileSystem()) == InputStreamType.Analytics; + } + /** * Skip the test if prefetching is enabled. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java index 4165f7a6c9cb9..35f28d0eb08af 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.tags.IntegrationTest; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; /** @@ -86,10 +85,7 @@ public List outputStreamStatisticKeys() { @Test @Override public void testInputStreamStatisticRead() throws Throwable { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), - "Analytics Accelerator currently does not support stream statistics"); super.testInputStreamStatisticRead(); } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index 2f54cab00b13d..81a719fbea2b2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.StreamStatisticNames; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { @@ -44,10 +43,6 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { */ @Test public void testBytesReadWithStream() throws IOException { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path filePath = path(getMethodName()); byte[] oneKbBuf = new byte[ONE_KB]; From 63726fbb49c4f777c0840e1c600238ce21de66ee Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Tue, 7 Oct 2025 15:11:00 +0100 Subject: [PATCH 2/4] adds more stats --- .../fs/statistics/StreamStatisticNames.java | 4 + .../hadoop/fs/s3a/S3AInstrumentation.java | 43 ++++- .../streams/AnalyticsRequestCallback.java | 19 ++- .../impl/streams/AnalyticsStreamFactory.java | 2 - .../statistics/S3AInputStreamStatistics.java | 8 + .../impl/EmptyS3AStatisticsContext.java | 27 ++- ...tS3AAnalyticsAcceleratorStreamReading.java | 155 +++++++++++------- 7 files changed, 183 insertions(+), 75 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 09c19ad071a98..31378157a17d0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -489,6 +489,10 @@ public final class StreamStatisticNames { public static final String STREAM_FILE_CACHE_EVICTION = "stream_file_cache_eviction"; + public static final String STREAM_READ_PREFETCHED_BYTES = "stream_read_prefetched_bytes"; + + public static final String STREAM_READ_PARQUET_FOOTER_PARSING_FAILED = "stream_read_parquet_footer_parsing_failed"; + private StreamStatisticNames() { } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index b3c907428ac4e..83c9ebfe4b6e2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -75,11 +75,25 @@ import java.util.concurrent.atomic.AtomicLong; import static org.apache.hadoop.fs.s3a.Constants.STREAM_READ_GAUGE_INPUT_POLICY; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_ACTIVE_MEMORY_IN_USE; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BLOCKS_IN_FILE_CACHE; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_ACTIVE; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_PENDING; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BYTES; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_EXCEPTIONS; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_QUEUE_DURATION; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_TOTAL_DATA; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_TOTAL_TIME; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; import static org.apache.hadoop.fs.s3a.Statistic.*; @@ -884,14 +898,18 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_SEEK_BYTES_DISCARDED, StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, StreamStatisticNames.STREAM_READ_TOTAL_BYTES, - StreamStatisticNames.STREAM_READ_UNBUFFERED, + STREAM_READ_UNBUFFERED, StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES, StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE, - StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED) + StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST, + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, + StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES, + StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED + ) .withGauges(STREAM_READ_GAUGE_INPUT_POLICY, STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(), STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(), @@ -1128,6 +1146,27 @@ public void readVectoredBytesDiscarded(int discarded) { bytesDiscardedInVectoredIO.addAndGet(discarded); } + @Override + public void getRequestInitiated() { + increment(ACTION_HTTP_GET_REQUEST); + } + + @Override + public void headRequestInitiated() { + increment(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST); + } + + @Override + public void bytesPrefetched(long size) { + increment(STREAM_READ_PREFETCHED_BYTES, size); + } + + @Override + public void footerParsingFailed() { + increment(STREAM_READ_PARQUET_FOOTER_PARSING_FAILED); + } + + @Override public void executorAcquired(Duration timeInQueue) { // update the duration fields in the IOStatistics. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java index b4d99d8328426..e295abc58ead6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java @@ -18,7 +18,9 @@ package org.apache.hadoop.fs.s3a.impl.streams; +import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.statistics.DurationTracker; import software.amazon.s3.analyticsaccelerator.util.RequestCallback; @@ -38,12 +40,25 @@ public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) { @Override public void onGetRequest() { - statistics.initiateGetRequest(); + statistics.getRequestInitiated(); } @Override public void onHeadRequest() { - statistics.incrementAnalyticsHeadRequests(); + statistics.headRequestInitiated(); } + + @Override + public void onBlockPrefetch(long start, long end) { + System.out.println("BLOCKS READ: " + start + "end: " + end); + statistics.bytesPrefetched(end - start); + } + + @Override + public void footerParsingFailed() { + statistics.footerParsingFailed(); + } + + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java index 50333c68e0cd2..cff743242f135 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -48,7 +48,6 @@ public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory { private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration; private LazyAutoCloseableReference s3SeekableInputStreamFactory; - private boolean requireCrt; public AnalyticsStreamFactory() { super("AnalyticsStreamFactory"); @@ -61,7 +60,6 @@ protected void serviceInit(final Configuration conf) throws Exception { ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); this.seekableInputStreamConfiguration = S3SeekableInputStreamConfiguration.fromConfiguration(configuration); - this.requireCrt = false; } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java index 7ad7cf75367e2..eb8051e9a220e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java @@ -119,6 +119,14 @@ void readVectoredOperationStarted(int numIncomingRanges, */ void readVectoredBytesDiscarded(int discarded); + void getRequestInitiated(); + + void headRequestInitiated(); + + void bytesPrefetched(long size); + + void footerParsingFailed(); + @Override void close(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index 48739181efee6..1d85bf81b8047 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -159,12 +159,7 @@ public void seekForwards(final long skipped, final long bytesReadInSeek) { } - @Override - public void incrementAnalyticsGetRequests() { - } - @Override - public void incrementAnalyticsHeadRequests() { - } + @Override public long streamOpened() { return 0; @@ -217,6 +212,26 @@ public void readVectoredBytesDiscarded(int discarded) { } + @Override + public void getRequestInitiated() { + + } + + @Override + public void headRequestInitiated() { + + } + + @Override + public void bytesPrefetched(long size) { + + } + + @Override + public void footerParsingFailed() { + + } + @Override public void close() { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index c921fda32a680..bf5c1a438e9d8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -50,13 +50,14 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; import static org.apache.hadoop.io.Sizes.S_1K; import static org.apache.hadoop.io.Sizes.S_1M; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_KB; +import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_MB; /** * Tests integration of the @@ -121,6 +122,11 @@ public void testConnectorFrameWorkIntegration() throws Throwable { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + + // Total file size is: 21511173, and read starts from pos 5. Since policy is WHOLE_FILE, the whole file starts + // getting prefetched as soon as the stream to it is opened. So prefetched bytes is 21511173 - 5 = 21511168 + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 21511168); + fs.close(); verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1); @@ -131,6 +137,45 @@ public void testConnectorFrameWorkIntegration() throws Throwable { verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); } + @Test + public void testSequentialPrefetching() throws IOException { + S3AFileSystem fs = + (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), getConfiguration()); + byte[] buffer = new byte[10 * ONE_MB]; + IOStatistics ioStats; + + // Here we read through the 21MB external test file, but do not pass in the WHOLE_FILE policy. Instead, we rely + // on AAL detecting a sequential pattern being read, and then prefetching bytes in a geometrical progression. + // AAL's sequential prefetching starts prefetching in increments 4MB, 8MB, 16MB etc. depending on how many + // sequential reads happen. + try (FSDataInputStream inputStream = fs.open(externalTestFile)) { + ioStats = inputStream.getIOStatistics(); + + inputStream.readFully(buffer, 0, ONE_MB); + // The first sequential read, so prefetch the next 4MB. + inputStream.readFully(buffer, 0, ONE_MB); + + // Since ONE_MB was requested by the reader, the prefetched bytes are 3MB. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * ONE_MB); + + // These next two reads are within the last prefetched bytes, so no further bytes are prefetched. + inputStream.readFully(buffer, 0, 2 * ONE_MB); + inputStream.readFully(buffer, 0, ONE_MB); + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * ONE_MB); + + // Another sequential read, GP will now prefetch the next 8MB of data. + inputStream.readFully(buffer, 0, ONE_MB); + // A total of 10MB is prefetched - 3MB and then 7MB. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB); + + inputStream.readFully(buffer, 0, 10 * ONE_MB); + + // Though the next GP should prefetch 16MB, since the file is ~23MB, only the bytes till EoF are prefetched: + // 6291456 remaining bytes. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB + 6291456); + } + } + @Test public void testMalformedParquetFooter() throws IOException { describe("Reading a malformed parquet file should not throw an exception"); @@ -164,9 +209,31 @@ public void testMalformedParquetFooter() throws IOException { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); - // S3A passes in the meta data on file open, we expect AAL to make no HEAD requests - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0); + // This file has a content length of 450. Since it's a parquet file, AAL will prefetch the footer bytes (last 32KB), + // as soon as the file is opened, but because the file is < 32KB, the whole file is prefetched. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 450); + + // The footer is only prefetched once, but parsing is attempted on each stream open. + verifyStatisticCounterValue(ioStats, STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 1); + + // Open a stream to the object twice, verifying that data is cached, and streams to the same object, do not + // prefetch the same data twice. + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + ioStats = inputStream.getIOStatistics(); + inputStream.seek(5); + inputStream.read(buffer, 0, 500); + } + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0); + // No data is prefetched, as it already exists in the cache from the previous factory. + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 0); + + // The footer is only prefetched once, but parsing is attempted on each stream open. + verifyStatisticCounterValue(ioStats, STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 1); } /** @@ -196,7 +263,8 @@ public void testMultiRowGroupParquet() throws Throwable { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + try (FSDataInputStream inputStream = getFileSystem().openFile(dest) .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) .build().get()) { @@ -206,13 +274,12 @@ public void testMultiRowGroupParquet() throws Throwable { verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen()); verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); } - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); // S3A passes in the meta-data(content length) on file open, // we expect AAL to make no HEAD requests - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0); } @Test @@ -232,52 +299,6 @@ public void testInvalidConfigurationThrows() throws Exception { () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); } - /** - * TXT files(SEQUENTIAL format) use SequentialPrefetcher(requests the entire 10MB file). - * RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB) - * The 10MB range gets split into: [0-8MB) and [8MB-10MB) - * Each split range becomes a separate Block, resulting in 2 GET requests: - */ - @Test - public void testLargeFileMultipleGets() throws Throwable { - describe("Large file should trigger multiple GET requests"); - - Path dest = path("large-test-file.txt"); - byte[] data = dataset(10 * S_1M, 256, 255); - writeDataset(getFileSystem(), dest, data, 10 * S_1M, 1024, true); - - byte[] buffer = new byte[S_1M * 10]; - try (FSDataInputStream inputStream = getFileSystem().open(dest)) { - IOStatistics ioStats = inputStream.getIOStatistics(); - inputStream.readFully(buffer); - - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2); - // Because S3A passes in the meta-data(content length) on file open, - // we expect AAL to make no HEAD requests - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); - } - } - - @Test - public void testSmallFileSingleGet() throws Throwable { - describe("Small file should trigger only one GET request"); - - Path dest = path("small-test-file.txt"); - byte[] data = dataset(S_1M, 256, 255); - writeDataset(getFileSystem(), dest, data, S_1M, 1024, true); - - byte[] buffer = new byte[S_1M]; - try (FSDataInputStream inputStream = getFileSystem().open(dest)) { - IOStatistics ioStats = inputStream.getIOStatistics(); - inputStream.readFully(buffer); - - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); - // Because S3A passes in the meta-data(content length) on file open, - // we expect AAL to make no HEAD requests - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); - } - } - @Test public void testRandomSeekPatternGets() throws Throwable { @@ -297,8 +318,8 @@ public void testRandomSeekPatternGets() throws Throwable { inputStream.seek(3 * S_1M); inputStream.read(new byte[512 * S_1K]); - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0); } } @@ -311,18 +332,26 @@ public void testSequentialStreamsNoDuplicateGets() throws Throwable { byte[] data = dataset(S_1M, 256, 255); writeDataset(getFileSystem(), dest, data, S_1M, 1024, true); - byte[] buffer = new byte[1024]; + byte[] buffer = new byte[ONE_MB]; try (FSDataInputStream stream1 = getFileSystem().open(dest); FSDataInputStream stream2 = getFileSystem().open(dest)) { - stream1.read(buffer); + stream1.read(buffer, 0, 2 * ONE_KB); stream2.read(buffer); + stream1.read(buffer, 0 , 10 * ONE_KB); IOStatistics stats1 = stream1.getIOStatistics(); IOStatistics stats2 = stream2.getIOStatistics(); - verifyStatisticCounterValue(stats1, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); - verifyStatisticCounterValue(stats2, STREAM_READ_ANALYTICS_GET_REQUESTS, 0); + verifyStatisticCounterValue(stats1, ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(stats2, ACTION_HTTP_HEAD_REQUEST, 0); + + // Since it's a small file (ALL will prefetch the whole file for size < 8MB), the whole file is prefetched + // on the first read. + verifyStatisticCounterValue(stats1, STREAM_READ_PREFETCHED_BYTES, 1048575); + + // The second stream will not prefetch any bytes, as they have already been prefetched by stream 1. + verifyStatisticCounterValue(stats2, STREAM_READ_PREFETCHED_BYTES, 0); } } } From 6d6ce7fb0c268d32050e337054934301ef2aa138 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Wed, 8 Oct 2025 13:32:00 +0100 Subject: [PATCH 3/4] clean-up --- .../fs/statistics/StreamStatisticNames.java | 6 ++ .../hadoop/fs/s3a/S3AInstrumentation.java | 22 +---- .../streams/AnalyticsRequestCallback.java | 4 - .../fs/s3a/impl/streams/AnalyticsStream.java | 3 +- .../statistics/S3AInputStreamStatistics.java | 13 +++ .../fs/s3a/performance/ITestS3AOpenCost.java | 99 ++++++++++++------- .../ITestS3AContractStreamIOStatistics.java | 1 - 7 files changed, 91 insertions(+), 57 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 31378157a17d0..42952e2a42b30 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -489,8 +489,14 @@ public final class StreamStatisticNames { public static final String STREAM_FILE_CACHE_EVICTION = "stream_file_cache_eviction"; + /** + * Bytes that were prefetched by the stream. + */ public static final String STREAM_READ_PREFETCHED_BYTES = "stream_read_prefetched_bytes"; + /** + * Tracks failures in footer parsing. + */ public static final String STREAM_READ_PARQUET_FOOTER_PARSING_FAILED = "stream_read_parquet_footer_parsing_failed"; private StreamStatisticNames() { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 83c9ebfe4b6e2..1c93795c9a288 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -75,26 +75,14 @@ import java.util.concurrent.atomic.AtomicLong; import static org.apache.hadoop.fs.s3a.Constants.STREAM_READ_GAUGE_INPUT_POLICY; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_ACTIVE_MEMORY_IN_USE; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BLOCKS_IN_FILE_CACHE; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_ACTIVE; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_PENDING; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BYTES; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_EXCEPTIONS; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_QUEUE_DURATION; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_TOTAL_DATA; -import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_TOTAL_TIME; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; import static org.apache.hadoop.fs.s3a.Statistic.*; @@ -898,7 +886,7 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_SEEK_BYTES_DISCARDED, StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, StreamStatisticNames.STREAM_READ_TOTAL_BYTES, - STREAM_READ_UNBUFFERED, + StreamStatisticNames.STREAM_READ_UNBUFFERED, StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, @@ -907,8 +895,8 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE, StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST, StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, - StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES, - StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED + STREAM_READ_PREFETCHED_BYTES, + STREAM_READ_PARQUET_FOOTER_PARSING_FAILED ) .withGauges(STREAM_READ_GAUGE_INPUT_POLICY, STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(), diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java index e295abc58ead6..7c5ef99b6a41e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java @@ -18,10 +18,7 @@ package org.apache.hadoop.fs.s3a.impl.streams; -import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; -import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; -import org.apache.hadoop.fs.statistics.DurationTracker; import software.amazon.s3.analyticsaccelerator.util.RequestCallback; /** @@ -50,7 +47,6 @@ public void onHeadRequest() { @Override public void onBlockPrefetch(long start, long end) { - System.out.println("BLOCKS READ: " + start + "end: " + end); statistics.bytesPrefetched(end - start); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index bca8f122dab09..f8b54eb2ad01f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -152,9 +152,8 @@ public int readTail(byte[] buf, int off, int len) throws IOException { @Override public int read(byte[] buf, int off, int len) throws IOException { throwIfClosed(); - long pos = getPos(); - getS3AStreamStatistics().readOperationStarted(pos, len); + getS3AStreamStatistics().readOperationStarted(getPos(), len); int bytesRead; try { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java index eb8051e9a220e..f6e06cd04d8f8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java @@ -119,12 +119,25 @@ void readVectoredOperationStarted(int numIncomingRanges, */ void readVectoredBytesDiscarded(int discarded); + /** + * Number of S3 GET requests initiated by the stream. + */ void getRequestInitiated(); + /** + * Number of S3 HEAD requests initiated by the stream. + */ void headRequestInitiated(); + /** + * Number of bytes prefetched. + * @param size number of bytes prefetched. + */ void bytesPrefetched(long size); + /** + * Number of failures in footer parsing. + */ void footerParsingFailed(); @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 34a7d5a54037b..8fe7c914bd64a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -70,6 +70,7 @@ import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; @@ -94,6 +95,16 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { */ private boolean prefetching; + /** + * Is the analytics stream enabled? + */ + private boolean analyticsStream; + + /** + * Is the classic input stream enabled? + */ + private boolean classicInputStream; + @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); @@ -112,14 +123,14 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - S3AFileSystem fs = getFileSystem(); testFile = methodPath(); - writeTextFile(fs, testFile, TEXT, true); testFileStatus = fs.getFileStatus(testFile); fileLength = (int)testFileStatus.getLen(); prefetching = prefetching(); + analyticsStream = isAnalyticsStream(); + classicInputStream = isClassicInputStream(); } /** @@ -173,11 +184,10 @@ public void testStreamIsNotChecksummed() throws Throwable { // if prefetching is enabled, skip this test assumeNoPrefetching(); - // Skip for Analytics streams - checksum validation only exists in S3AInputStream. - // AnalyticsStream handles data integrity through AWS Analytics Accelerator internally. - if (isAnalyticsStream()) { - skip("Analytics stream doesn't use checksums"); - } + // If AAL is enabled, skip this test. AAL uses S3A's default S3 client, and if checksumming is disabled on the + // client, then AAL will also not enforce it. + assumeNotAnalytics(); + S3AFileSystem fs = getFileSystem(); // open the file @@ -194,13 +204,19 @@ public void testStreamIsNotChecksummed() throws Throwable { // open the stream. in.read(); + // now examine the innermost stream and make sure it doesn't have a checksum - assertStreamIsNotChecksummed(getS3AInputStream(in)); + assertStreamIsNotChecksummed(getS3AInputStream(in)); } } @Test public void testOpenFileShorterLength() throws Throwable { + + // For AAL, since it makes the HEAD to get the file length if the eTag is not supplied, + // it is not able to use the file length supplied in the open() call, and the test fails. + assumeNotAnalytics(); + // do a second read with the length declared as short. // we now expect the bytes read to be shorter. S3AFileSystem fs = getFileSystem(); @@ -245,7 +261,6 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { final int extra = 10; long longLen = fileLength + extra; - // assert behaviors of seeking/reading past the file length. // there is no attempt at recovery. verifyMetrics(() -> { @@ -262,13 +277,12 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { } }, always(), - // Analytics stream: 1 open (persistent connection) - // S3AInputStream: 2 opens (reopen on EOF) - // two GET calls were made, one for readFully, - // the second on the read() past the EOF - // the operation has got as far as S3 - probe(!prefetching() && !isAnalyticsStream(), STREAM_READ_OPENED, 2), - probe(!prefetching() && isAnalyticsStream(), STREAM_READ_OPENED, 1)); + // two GET calls were made, one for readFully, + // the second on the read() past the EOF + // the operation has got as far as S3 + probe(classicInputStream, STREAM_READ_OPENED, 1 + 1), + // For AAL, the seek past content length fails, before the GET is made. + probe(analyticsStream, STREAM_READ_OPENED, 1)); // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { @@ -279,10 +293,6 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { return in.toString(); } }, - // two GET calls were made, one for readFully, - // the second on the read() past the EOF - // the operation has got as far as S3 - with(STREAM_READ_OPENED, 1)); } @@ -352,9 +362,9 @@ public void testReadPastEOF() throws Throwable { } }, always(), - // Analytics streams don't make HTTP requests when reading past EOF - probe(!prefetching && !isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, extra), - probe(!prefetching && isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, 0)); + probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, extra), + // AAL won't make the GET call if trying to read beyond EOF + probe(analyticsStream, Statistic.ACTION_HTTP_GET_REQUEST, 0)); } /** @@ -445,18 +455,27 @@ public void testVectorReadPastEOF() throws Throwable { byte[] buf = new byte[longLen]; ByteBuffer bb = ByteBuffer.wrap(buf); final FileRange range = FileRange.createFileRange(0, longLen); - in.readVectored(Arrays.asList(range), (i) -> bb); - interceptFuture(EOFException.class, - "", - ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, - TimeUnit.SECONDS, - range.getData()); - assertS3StreamClosed(in); - return "vector read past EOF with " + in; + + // For AAL, if there is no eTag, the provided length will not be passed in, and a HEAD request will be made. + // AAL requires the etag to detect changes in the object and then do cache eviction if required. + if (isAnalyticsStream()) { + intercept(EOFException.class, () -> in.readVectored(Arrays.asList(range), (i) -> bb)); + verifyStatisticCounterValue(in.getIOStatistics(), ACTION_HTTP_HEAD_REQUEST, 1); + return "vector read past EOF with " + in; + } else { + in.readVectored(Arrays.asList(range), (i) -> bb); + interceptFuture(EOFException.class, + "", + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + range.getData()); + assertS3StreamClosed(in); + return "vector read past EOF with " + in; + } } }, always(), - probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); + probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, 1)); } /** @@ -472,7 +491,15 @@ private boolean prefetching() { * @return true if Analytics stream is enabled. */ private boolean isAnalyticsStream() { - return streamType(getFileSystem()) == InputStreamType.Analytics; + return InputStreamType.Analytics == streamType(getFileSystem()); + } + + /** + * Is the current input stream type S3AInputStream? + * @return true if the S3AInputStream is being used. + */ + private boolean isClassicInputStream() { + return InputStreamType.Classic == streamType(getFileSystem()); } /** @@ -484,6 +511,12 @@ private void assumeNoPrefetching(){ } } + private void assumeNotAnalytics() { + if (analyticsStream) { + skip("Analytics stream is enabled"); + } + } + /** * Assert that the inner S3 Stream is closed. * @param in input stream diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java index 35f28d0eb08af..7f363d190c53a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java @@ -87,5 +87,4 @@ public List outputStreamStatisticKeys() { public void testInputStreamStatisticRead() throws Throwable { super.testInputStreamStatisticRead(); } - } From b8fa648a27e8b7be322134c16c8892ba33ce0915 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Wed, 8 Oct 2025 15:16:09 +0100 Subject: [PATCH 4/4] fix audit test --- .../ITestS3AAnalyticsAcceleratorStreamReading.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index bf5c1a438e9d8..21cfb4e70b0c6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -254,6 +254,9 @@ public void testMultiRowGroupParquet() throws Throwable { FileStatus fileStatus = getFileSystem().getFileStatus(dest); + // Audit operations before any reading happens, required for uploading the parquet file. + verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 8); + byte[] buffer = new byte[3000]; IOStatistics ioStats; @@ -265,6 +268,10 @@ public void testMultiRowGroupParquet() throws Throwable { verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + // S3A makes a HEAD request on the stream open(), and then AAL makes a GET request to get the object, total audit + // operations = 10. + verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 8 + 2); + try (FSDataInputStream inputStream = getFileSystem().openFile(dest) .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) .build().get()) { @@ -280,6 +287,10 @@ public void testMultiRowGroupParquet() throws Throwable { // S3A passes in the meta-data(content length) on file open, // we expect AAL to make no HEAD requests verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0); + + // S3A makes a HEAD on the stream open, no GET request is made since data is already cached. + verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 10 + 1); } @Test