Skip to content

Commit a9dbd7d

Browse files
ahmarsuhailsteveloughran
authored andcommitted
HADOOP-18190. Collect IOStatistics during S3A prefetching (#4458)
This adds iOStatisticsConnection to the S3PrefetchingInputStream class, with new statistic names in StreamStatistics. This stream is not (yet) IOStatisticsContext aware. Contributed by Ahmar Suhail
1 parent 6a3b9f1 commit a9dbd7d

File tree

18 files changed

+471
-63
lines changed

18 files changed

+471
-63
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,46 @@ public final class StreamStatisticNames {
387387
public static final String BLOCKS_RELEASED
388388
= "blocks_released";
389389

390+
/**
391+
* Total number of prefetching operations executed.
392+
*/
393+
public static final String STREAM_READ_PREFETCH_OPERATIONS
394+
= "stream_read_prefetch_operations";
395+
396+
/**
397+
* Total number of block in disk cache.
398+
*/
399+
public static final String STREAM_READ_BLOCKS_IN_FILE_CACHE
400+
= "stream_read_blocks_in_cache";
401+
402+
/**
403+
* Total number of active prefetch operations.
404+
*/
405+
public static final String STREAM_READ_ACTIVE_PREFETCH_OPERATIONS
406+
= "stream_read_active_prefetch_operations";
407+
408+
/**
409+
* Total bytes of memory in use by this input stream.
410+
*/
411+
public static final String STREAM_READ_ACTIVE_MEMORY_IN_USE
412+
= "stream_read_active_memory_in_use";
413+
414+
/**
415+
* count/duration of reading a remote block.
416+
*
417+
* Value: {@value}.
418+
*/
419+
public static final String STREAM_READ_REMOTE_BLOCK_READ
420+
= "stream_read_block_read";
421+
422+
/**
423+
* count/duration of acquiring a buffer and reading to it.
424+
*
425+
* Value: {@value}.
426+
*/
427+
public static final String STREAM_READ_BLOCK_ACQUIRE_AND_READ
428+
= "stream_read_block_acquire_read";
429+
390430
private StreamStatisticNames() {
391431
}
392432

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

34+
import static java.util.Objects.requireNonNull;
35+
3436
/**
3537
* Manages a fixed pool of {@code ByteBuffer} instances.
3638
*
@@ -56,26 +58,32 @@ public class BufferPool implements Closeable {
5658
// Allows associating metadata to each buffer in the pool.
5759
private Map<BufferData, ByteBuffer> allocated;
5860

61+
private PrefetchingStatistics prefetchingStatistics;
62+
5963
/**
6064
* Initializes a new instance of the {@code BufferPool} class.
6165
*
6266
* @param size number of buffer in this pool.
6367
* @param bufferSize size in bytes of each buffer.
68+
* @param prefetchingStatistics statistics for this stream.
6469
*
6570
* @throws IllegalArgumentException if size is zero or negative.
6671
* @throws IllegalArgumentException if bufferSize is zero or negative.
6772
*/
68-
public BufferPool(int size, int bufferSize) {
73+
public BufferPool(int size, int bufferSize, PrefetchingStatistics prefetchingStatistics) {
6974
Validate.checkPositiveInteger(size, "size");
7075
Validate.checkPositiveInteger(bufferSize, "bufferSize");
7176

7277
this.size = size;
7378
this.bufferSize = bufferSize;
7479
this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
80+
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
7581
this.pool = new BoundedResourcePool<ByteBuffer>(size) {
7682
@Override
7783
public ByteBuffer createNew() {
78-
return ByteBuffer.allocate(bufferSize);
84+
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
85+
prefetchingStatistics.memoryAllocated(bufferSize);
86+
return buffer;
7987
}
8088
};
8189
}
@@ -236,11 +244,15 @@ public synchronized void close() {
236244
}
237245
}
238246

239-
this.pool.close();
240-
this.pool = null;
247+
int currentPoolSize = pool.numCreated();
248+
249+
pool.close();
250+
pool = null;
251+
252+
allocated.clear();
253+
allocated = null;
241254

242-
this.allocated.clear();
243-
this.allocated = null;
255+
prefetchingStatistics.memoryFreed(currentPoolSize * bufferSize);
244256
}
245257

246258
// For debugging purposes.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import java.io.IOException;
2323
import java.nio.ByteBuffer;
24+
import java.time.Duration;
25+
import java.time.Instant;
2426
import java.util.concurrent.CompletableFuture;
2527
import java.util.concurrent.Future;
2628
import java.util.concurrent.TimeUnit;
@@ -31,6 +33,10 @@
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

36+
import org.apache.hadoop.fs.statistics.DurationTracker;
37+
38+
import static java.util.Objects.requireNonNull;
39+
3440
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
3541

3642
/**
@@ -70,33 +76,37 @@ public abstract class CachingBlockManager extends BlockManager {
7076
// Once set to true, any further caching requests will be ignored.
7177
private final AtomicBoolean cachingDisabled;
7278

79+
private final PrefetchingStatistics prefetchingStatistics;
80+
7381
/**
7482
* Constructs an instance of a {@code CachingBlockManager}.
7583
*
7684
* @param futurePool asynchronous tasks are performed in this pool.
7785
* @param blockData information about each block of the underlying file.
7886
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
87+
* @param prefetchingStatistics statistics for this stream.
7988
*
80-
* @throws IllegalArgumentException if futurePool is null.
8189
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
8290
*/
8391
public CachingBlockManager(
8492
ExecutorServiceFuturePool futurePool,
8593
BlockData blockData,
86-
int bufferPoolSize) {
94+
int bufferPoolSize,
95+
PrefetchingStatistics prefetchingStatistics) {
8796
super(blockData);
8897

89-
Validate.checkNotNull(futurePool, "futurePool");
9098
Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
9199

92-
this.futurePool = futurePool;
100+
this.futurePool = requireNonNull(futurePool);
93101
this.bufferPoolSize = bufferPoolSize;
94102
this.numCachingErrors = new AtomicInteger();
95103
this.numReadErrors = new AtomicInteger();
96104
this.cachingDisabled = new AtomicBoolean();
105+
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
97106

98107
if (this.getBlockData().getFileSize() > 0) {
99-
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize());
108+
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
109+
this.prefetchingStatistics);
100110
this.cache = this.createCache();
101111
}
102112

@@ -249,7 +259,7 @@ public void requestPrefetch(int blockNumber) {
249259
}
250260

251261
BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber);
252-
PrefetchTask prefetchTask = new PrefetchTask(data, this);
262+
PrefetchTask prefetchTask = new PrefetchTask(data, this, Instant.now());
253263
Future<Void> prefetchFuture = this.futurePool.executeFunction(prefetchTask);
254264
data.setPrefetch(prefetchFuture);
255265
this.ops.end(op);
@@ -279,8 +289,10 @@ private void read(BufferData data) throws IOException {
279289
}
280290
}
281291

282-
private void prefetch(BufferData data) throws IOException {
292+
private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOException {
283293
synchronized (data) {
294+
prefetchingStatistics.executorAcquired(
295+
Duration.between(taskQueuedStartTime, Instant.now()));
284296
this.readBlock(
285297
data,
286298
true,
@@ -297,6 +309,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
297309
}
298310

299311
BlockOperations.Operation op = null;
312+
DurationTracker tracker = null;
300313

301314
synchronized (data) {
302315
try {
@@ -318,6 +331,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
318331
}
319332

320333
if (isPrefetch) {
334+
tracker = prefetchingStatistics.prefetchOperationStarted();
321335
op = this.ops.prefetch(data.getBlockNumber());
322336
} else {
323337
op = this.ops.getRead(data.getBlockNumber());
@@ -333,13 +347,25 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
333347
} catch (Exception e) {
334348
String message = String.format("error during readBlock(%s)", data.getBlockNumber());
335349
LOG.error(message, e);
350+
351+
if (isPrefetch && tracker != null) {
352+
tracker.failed();
353+
}
354+
336355
this.numReadErrors.incrementAndGet();
337356
data.setDone();
338357
throw e;
339358
} finally {
340359
if (op != null) {
341360
this.ops.end(op);
342361
}
362+
363+
if (isPrefetch) {
364+
prefetchingStatistics.prefetchOperationCompleted();
365+
if (tracker != null) {
366+
tracker.close();
367+
}
368+
}
343369
}
344370
}
345371
}
@@ -350,16 +376,18 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
350376
private static class PrefetchTask implements Supplier<Void> {
351377
private final BufferData data;
352378
private final CachingBlockManager blockManager;
379+
private final Instant taskQueuedStartTime;
353380

354-
PrefetchTask(BufferData data, CachingBlockManager blockManager) {
381+
PrefetchTask(BufferData data, CachingBlockManager blockManager, Instant taskQueuedStartTime) {
355382
this.data = data;
356383
this.blockManager = blockManager;
384+
this.taskQueuedStartTime = taskQueuedStartTime;
357385
}
358386

359387
@Override
360388
public Void get() {
361389
try {
362-
this.blockManager.prefetch(data);
390+
this.blockManager.prefetch(data, taskQueuedStartTime);
363391
} catch (Exception e) {
364392
LOG.error("error during prefetch", e);
365393
}
@@ -420,14 +448,18 @@ public void requestCaching(BufferData data) {
420448
blockFuture = cf;
421449
}
422450

423-
CachePutTask task = new CachePutTask(data, blockFuture, this);
451+
CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
424452
Future<Void> actionFuture = this.futurePool.executeFunction(task);
425453
data.setCaching(actionFuture);
426454
this.ops.end(op);
427455
}
428456
}
429457

430-
private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture) {
458+
private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
459+
Instant taskQueuedStartTime) {
460+
prefetchingStatistics.executorAcquired(
461+
Duration.between(taskQueuedStartTime, Instant.now()));
462+
431463
if (this.closed) {
432464
return;
433465
}
@@ -493,7 +525,7 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture) {
493525
}
494526

495527
protected BlockCache createCache() {
496-
return new SingleFilePerBlockCache();
528+
return new SingleFilePerBlockCache(prefetchingStatistics);
497529
}
498530

499531
protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
@@ -513,18 +545,22 @@ private static class CachePutTask implements Supplier<Void> {
513545
// Block manager that manages this block.
514546
private final CachingBlockManager blockManager;
515547

548+
private final Instant taskQueuedStartTime;
549+
516550
CachePutTask(
517551
BufferData data,
518552
Future<Void> blockFuture,
519-
CachingBlockManager blockManager) {
553+
CachingBlockManager blockManager,
554+
Instant taskQueuedStartTime) {
520555
this.data = data;
521556
this.blockFuture = blockFuture;
522557
this.blockManager = blockManager;
558+
this.taskQueuedStartTime = taskQueuedStartTime;
523559
}
524560

525561
@Override
526562
public Void get() {
527-
this.blockManager.addToCacheAndRelease(this.data, this.blockFuture);
563+
this.blockManager.addToCacheAndRelease(this.data, this.blockFuture, taskQueuedStartTime);
528564
return null;
529565
}
530566
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.common;
21+
22+
import java.time.Duration;
23+
24+
import org.apache.hadoop.fs.statistics.DurationTracker;
25+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
26+
27+
public interface PrefetchingStatistics extends IOStatisticsSource {
28+
29+
/**
30+
* A prefetch operation has started.
31+
* @return duration tracker
32+
*/
33+
DurationTracker prefetchOperationStarted();
34+
35+
/**
36+
* A block has been saved to the file cache.
37+
*/
38+
void blockAddedToFileCache();
39+
40+
/**
41+
* A block has been removed from the file cache.
42+
*/
43+
void blockRemovedFromFileCache();
44+
45+
/**
46+
* A prefetch operation has completed.
47+
*/
48+
void prefetchOperationCompleted();
49+
50+
/**
51+
* An executor has been acquired, either for prefetching or caching.
52+
* @param timeInQueue time taken to acquire an executor.
53+
*/
54+
void executorAcquired(Duration timeInQueue);
55+
56+
/**
57+
* A new buffer has been added to the buffer pool.
58+
* @param size size of the new buffer
59+
*/
60+
void memoryAllocated(int size);
61+
62+
/**
63+
* Previously allocated memory has been freed.
64+
* @param size size of memory freed.
65+
*/
66+
void memoryFreed(int size);
67+
}

0 commit comments

Comments
 (0)