Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ public enum Statistic {
"Gauge of active memory in use",
TYPE_GAUGE),

/* Stream Write statistics */

/* Stream Write statistics */
STREAM_WRITE_EXCEPTIONS(
StreamStatisticNames.STREAM_WRITE_EXCEPTIONS,
"Count of stream write failures reported",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.impl.streams;

import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;

/**
* Implementation of AAL's RequestCallback interface that tracks analytics operations.
*/
public class AnalyticsRequestCallback implements RequestCallback {
private final S3AInputStreamStatistics statistics;

/**
* Create a new callback instance.
* @param statistics the statistics to update
*/
public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
this.statistics = statistics;
}

@Override
public void onGetRequest() {
statistics.initiateGetRequest();
}

@Override
public void onHeadRequest() {
statistics.incrementAnalyticsHeadRequests();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters,
final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
super(InputStreamType.Analytics, parameters);
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();

this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
Expand All @@ -80,13 +82,21 @@ public AnalyticsStream(final ObjectReadParameters parameters,
@Override
public int read() throws IOException {
throwIfClosed();

getS3AStreamStatistics().readOperationStarted(getPos(), 1);

int bytesRead;
try {
bytesRead = inputStream.read();
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}

if (bytesRead != -1) {
incrementBytesRead(1);
}

return bytesRead;
}

Expand Down Expand Up @@ -122,26 +132,42 @@ public synchronized long getPos() {
*/
public int readTail(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
getS3AStreamStatistics().readOperationStarted(getPos(), len);

int bytesRead;
try {
bytesRead = inputStream.readTail(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}

if (bytesRead > 0) {
incrementBytesRead(bytesRead);
}

return bytesRead;
}

@Override
public int read(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
long pos = getPos();

getS3AStreamStatistics().readOperationStarted(pos, len);

int bytesRead;
try {
bytesRead = inputStream.read(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}

if (bytesRead > 0) {
incrementBytesRead(bytesRead);
}

return bytesRead;
}

Expand Down Expand Up @@ -247,10 +273,13 @@ private void onReadFailure(IOException ioe) throws IOException {
}

private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {

final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics());

OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
OpenStreamInformation.builder()
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
.getInputPolicy()));
.getInputPolicy())).requestCallback(requestCallback);

if (parameters.getObjectAttributes().getETag() != null) {
openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
Expand Down Expand Up @@ -300,4 +329,16 @@ protected void throwIfClosed() throws IOException {
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}

/**
* Increment the bytes read counter if there is a stats instance
* and the number of bytes read is more than zero.
* @param bytesRead number of bytes read
*/
private void incrementBytesRead(long bytesRead) {
getS3AStreamStatistics().bytesRead(bytesRead);
if (getContext().getStats() != null && bytesRead > 0) {
getContext().getStats().incrementBytesRead(bytesRead);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ public void seekForwards(final long skipped,
final long bytesReadInSeek) {

}

@Override
public void incrementAnalyticsGetRequests() {
}
@Override
public void incrementAnalyticsHeadRequests() {
}
@Override
public long streamOpened() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,19 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
import static org.apache.hadoop.io.Sizes.S_1K;
import static org.apache.hadoop.io.Sizes.S_1M;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
Expand Down Expand Up @@ -105,6 +111,13 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
Assertions.assertThat(objectInputStream.getInputPolicy())
.isEqualTo(S3AInputPolicy.Sequential);

verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);

long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead();
Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read")
.isEqualTo(500);
}

verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
Expand Down Expand Up @@ -136,14 +149,24 @@ public void testMalformedParquetFooter() throws IOException {

byte[] buffer = new byte[500];
IOStatistics ioStats;
int bytesRead;

try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
ioStats = inputStream.getIOStatistics();
inputStream.seek(5);
inputStream.read(buffer, 0, 500);
bytesRead = inputStream.read(buffer, 0, 500);

ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream();
long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead();
Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read")
.isEqualTo(bytesRead);

}

verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
// S3A passes in the meta data on file open, we expect AAL to make no HEAD requests
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
}

/**
Expand Down Expand Up @@ -173,17 +196,23 @@ public void testMultiRowGroupParquet() throws Throwable {
}

verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);

verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
.must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
.build().get()) {
ioStats = inputStream.getIOStatistics();
inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
}

verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen());
verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);

verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);

// S3A passes in the meta-data(content length) on file open,
// we expect AAL to make no HEAD requests
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
}

@Test
Expand All @@ -203,4 +232,97 @@ public void testInvalidConfigurationThrows() throws Exception {
() -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
}

/**
* TXT files(SEQUENTIAL format) use SequentialPrefetcher(requests the entire 10MB file).
* RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB)
* The 10MB range gets split into: [0-8MB) and [8MB-10MB)
* Each split range becomes a separate Block, resulting in 2 GET requests:
*/
@Test
public void testLargeFileMultipleGets() throws Throwable {
describe("Large file should trigger multiple GET requests");

Path dest = path("large-test-file.txt");
byte[] data = dataset(10 * S_1M, 256, 255);
writeDataset(getFileSystem(), dest, data, 10 * S_1M, 1024, true);

byte[] buffer = new byte[S_1M * 10];
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
IOStatistics ioStats = inputStream.getIOStatistics();
inputStream.readFully(buffer);

verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2);
// Because S3A passes in the meta-data(content length) on file open,
// we expect AAL to make no HEAD requests
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
}
}

@Test
public void testSmallFileSingleGet() throws Throwable {
describe("Small file should trigger only one GET request");

Path dest = path("small-test-file.txt");
byte[] data = dataset(S_1M, 256, 255);
writeDataset(getFileSystem(), dest, data, S_1M, 1024, true);

byte[] buffer = new byte[S_1M];
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
IOStatistics ioStats = inputStream.getIOStatistics();
inputStream.readFully(buffer);

verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
// Because S3A passes in the meta-data(content length) on file open,
// we expect AAL to make no HEAD requests
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
}
}


@Test
public void testRandomSeekPatternGets() throws Throwable {
describe("Random seek pattern should optimize GET requests");

Path dest = path("seek-test.txt");
byte[] data = dataset(5 * S_1M, 256, 255);
writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true);

byte[] buffer = new byte[S_1M];
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
IOStatistics ioStats = inputStream.getIOStatistics();

inputStream.read(buffer);
inputStream.seek(2 * S_1M);
inputStream.read(new byte[512 * S_1K]);
inputStream.seek(3 * S_1M);
inputStream.read(new byte[512 * S_1K]);

verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
}
}


@Test
public void testSequentialStreamsNoDuplicateGets() throws Throwable {
describe("Sequential streams reading same object should not duplicate GETs");

Path dest = path("sequential-test.txt");
byte[] data = dataset(S_1M, 256, 255);
writeDataset(getFileSystem(), dest, data, S_1M, 1024, true);

byte[] buffer = new byte[1024];
try (FSDataInputStream stream1 = getFileSystem().open(dest);
FSDataInputStream stream2 = getFileSystem().open(dest)) {

stream1.read(buffer);
stream2.read(buffer);

IOStatistics stats1 = stream1.getIOStatistics();
IOStatistics stats2 = stream2.getIOStatistics();

verifyStatisticCounterValue(stats1, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
verifyStatisticCounterValue(stats2, STREAM_READ_ANALYTICS_GET_REQUESTS, 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
Expand Down Expand Up @@ -81,10 +80,7 @@ protected Configuration createConfiguration() {
public void setup() throws Exception {
super.setup();
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
// Analytics accelerator currently does not support IOStatisticsContext, this will be added as
// part of https://issues.apache.org/jira/browse/HADOOP-19364
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"Analytics Accelerator currently does not support IOStatisticsContext");


}

Expand Down
Loading