Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.max.disks.to.report";
public static final int DFS_DATANODE_MAX_DISKS_TO_REPORT_DEFAULT =
5;
public static final String DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY =
"dfs.datanode.max.slowdisks.to.exclude";
public static final int DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_DEFAULT =
0;
public static final String DFS_DATANODE_HOST_NAME_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
blockChooserImpl);
blockChooserImpl, datanode.getDiskMetrics());
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf);
deletingBlock = new HashMap<String, Set<Long>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
Expand All @@ -41,6 +42,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
Expand All @@ -62,13 +64,16 @@ class FsVolumeList {
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
private final BlockScanner blockScanner;

private final DataNodeDiskMetrics diskMetrics;

FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos,
BlockScanner blockScanner,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
VolumeChoosingPolicy<FsVolumeImpl> blockChooser, DataNodeDiskMetrics dataNodeDiskMetrics) {
this.blockChooser = blockChooser;
this.blockScanner = blockScanner;
this.checkDirsLock = new AutoCloseableLock();
this.checkDirsLockCondition = checkDirsLock.newCondition();
this.diskMetrics = dataNodeDiskMetrics;
for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) {
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
volumeFailureInfo);
Expand All @@ -84,6 +89,15 @@ List<FsVolumeImpl> getVolumes() {

private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
long blockSize, String storageId) throws IOException {

// Exclude slow disks when choosing volume.
if (diskMetrics != null) {
List<String> slowDisksToExclude = diskMetrics.getSlowDisksToExclude();
list = list.stream()
.filter(volume -> !slowDisksToExclude.contains(volume.getBaseURI().getPath()))
.collect(Collectors.toList());
}

while (true) {
FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize,
storageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* This class detects and maintains DataNode disk outliers and their
Expand Down Expand Up @@ -69,6 +73,14 @@ public class DataNodeDiskMetrics {
* Threshold in milliseconds below which a disk is definitely not slow.
*/
private final long lowThresholdMs;
/**
* The number of slow disks that needs to be excluded.
*/
private int maxSlowDisksToExclude;
/**
* List of slow disks that need to be excluded.
*/
private List<String> slowDisksToExclude = new ArrayList<>();

public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs,
Configuration conf) {
Expand All @@ -80,6 +92,9 @@ public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs,
lowThresholdMs =
conf.getLong(DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT);
maxSlowDisksToExclude =
conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY,
DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_DEFAULT);
slowDiskDetector =
new OutlierDetector(minOutlierDetectionDisks, lowThresholdMs);
shouldRun = true;
Expand Down Expand Up @@ -127,6 +142,21 @@ public void run() {

detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
writeIoStats);

// Sort the slow disks by latency and extract the top n by maxSlowDisksToExclude.
if (maxSlowDisksToExclude > 0) {
ArrayList<DiskLatency> diskLatencies = new ArrayList<>();
for (Map.Entry<String, Map<DiskOp, Double>> diskStats :
diskOutliersStats.entrySet()) {
diskLatencies.add(new DiskLatency(diskStats.getKey(), diskStats.getValue()));
}

Collections.sort(diskLatencies, (o1, o2)
-> Double.compare(o2.getMaxLatency(), o1.getMaxLatency()));

slowDisksToExclude = diskLatencies.stream().limit(maxSlowDisksToExclude)
.map(DiskLatency::getSlowDisk).collect(Collectors.toList());
}
}

try {
Expand Down Expand Up @@ -171,6 +201,35 @@ private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats,
}
}

/**
* This structure is a wrapper over disk latencies.
*/
public static class DiskLatency {
final private String slowDisk;
final private Map<DiskOp, Double> latencyMap;

public DiskLatency(
String slowDiskID,
Map<DiskOp, Double> latencyMap) {
this.slowDisk = slowDiskID;
this.latencyMap = latencyMap;
}

double getMaxLatency() {
double maxLatency = 0;
for (double latency : latencyMap.values()) {
if (latency > maxLatency) {
maxLatency = latency;
}
}
return maxLatency;
}

public String getSlowDisk() {
return slowDisk;
}
}

private void addDiskStat(Map<String, Map<DiskOp, Double>> diskStats,
String disk, DiskOp diskOp, double latency) {
if (!diskStats.containsKey(disk)) {
Expand Down Expand Up @@ -206,4 +265,8 @@ public void addSlowDiskForTesting(String slowDiskPath,
diskOutliersStats.put(slowDiskPath, latencies);
}
}

public List<String> getSlowDisksToExclude() {
return slowDisksToExclude;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2434,6 +2434,15 @@
</description>
</property>

<property>
<name>dfs.datanode.max.slowdisks.to.exclude</name>
<value>0</value>
<description>
The number of slow disks that needs to be excluded. By default, this parameter is set to 0,
which disables excluding slow disk when choosing volume.
</description>
</property>

<property>
<name>hadoop.user.group.metrics.percentiles.intervals</name>
<value></value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
Expand All @@ -32,12 +33,16 @@
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -53,6 +58,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
import static org.junit.Assert.assertEquals;
Expand All @@ -73,6 +79,10 @@ public class TestFsVolumeList {
private FsDatasetImpl dataset = null;
private String baseDir;
private BlockScanner blockScanner;
private final static int NUM_DATANODES = 3;
private final static int STORAGES_PER_DATANODE = 3;
private final static int DEFAULT_BLOCK_SIZE = 102400;
private final static int BUFFER_LENGTH = 1024;

@Before
public void setUp() {
Expand All @@ -88,7 +98,8 @@ public void setUp() {
@Test(timeout=30000)
public void testGetNextVolumeWithClosedVolume() throws IOException {
FsVolumeList volumeList = new FsVolumeList(
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
Collections.<VolumeFailureInfo>emptyList(),
blockScanner, blockChooser, null);
final List<FsVolumeImpl> volumes = new ArrayList<>();
for (int i = 0; i < 3; i++) {
File curDir = new File(baseDir, "nextvolume-" + i);
Expand Down Expand Up @@ -131,7 +142,7 @@ public Boolean get() {
@Test(timeout=30000)
public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
FsVolumeList volumeList = new FsVolumeList(
Collections.<VolumeFailureInfo>emptyList(), null, blockChooser);
Collections.<VolumeFailureInfo>emptyList(), null, blockChooser, null);
File volDir = new File(baseDir, "volume-0");
volDir.mkdirs();
FsVolumeImpl volume = new FsVolumeImplBuilder()
Expand Down Expand Up @@ -452,4 +463,99 @@ public void testGetCachedVolumeCapacity() throws IOException {
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_FIXED_VOLUME_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_FIXED_VOLUME_SIZE_DEFAULT);
}

@Test
public void testExcludeSlowDiskWhenChoosingVolume() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
// Set datanode outliers report interval to 1s.
conf.setStrings(DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1s");
// Enable datanode disk metrics collector.
conf.setInt(DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, 30);
// Enable excluding slow disks when choosing volume.
conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY, 1);
// Ensure that each volume capacity is larger than the DEFAULT_BLOCK_SIZE.
long capacity = 10 * DEFAULT_BLOCK_SIZE;
long[][] capacities = new long[NUM_DATANODES][STORAGES_PER_DATANODE];
String[] hostnames = new String[NUM_DATANODES];
for (int i = 0; i < NUM_DATANODES; i++) {
hostnames[i] = i + "." + i + "." + i + "." + i;
for(int j = 0; j < STORAGES_PER_DATANODE; j++){
capacities[i][j]=capacity;
}
}

MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.hosts(hostnames)
.numDataNodes(NUM_DATANODES)
.storagesPerDatanode(STORAGES_PER_DATANODE)
.storageCapacities(capacities).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();

// Create file for each datanode.
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
DataNode dn0 = dataNodes.get(0);
DataNode dn1 = dataNodes.get(1);
DataNode dn2 = dataNodes.get(2);

// Mock the first disk of each datanode is a slowest disk.
String slowDisk0OnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(0)
.getVolume().getBaseURI().getPath();
String slowDisk0OnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(0)
.getVolume().getBaseURI().getPath();
String slowDisk0OnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(0)
.getVolume().getBaseURI().getPath();

String slowDisk1OnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(1)
.getVolume().getBaseURI().getPath();
String slowDisk1OnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(1)
.getVolume().getBaseURI().getPath();
String slowDisk1OnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(1)
.getVolume().getBaseURI().getPath();

dn0.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn0, ImmutableMap.of(
SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5,
SlowDiskReports.DiskOp.METADATA, 2.0));
dn1.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn1, ImmutableMap.of(
SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5,
SlowDiskReports.DiskOp.METADATA, 2.0));
dn2.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn2, ImmutableMap.of(
SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5,
SlowDiskReports.DiskOp.METADATA, 2.0));

dn0.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn0, ImmutableMap.of(
SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0,
SlowDiskReports.DiskOp.METADATA, 1.0));
dn1.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn1, ImmutableMap.of(
SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0,
SlowDiskReports.DiskOp.METADATA, 1.0));
dn2.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn2, ImmutableMap.of(
SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0,
SlowDiskReports.DiskOp.METADATA, 1.0));

// Wait until the data on the slow disk is collected successfully.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return dn0.getDiskMetrics().getSlowDisksToExclude().size() == 1 &&
dn1.getDiskMetrics().getSlowDisksToExclude().size() == 1 &&
dn2.getDiskMetrics().getSlowDisksToExclude().size() == 1;
}
}, 1000, 5000);

// Create a file with 3 replica.
DFSTestUtil.createFile(fs, new Path("/file0"), false, BUFFER_LENGTH, 1000,
DEFAULT_BLOCK_SIZE, (short) 3, 0, false, null);

// Asserts that the number of blocks created on a slow disk is 0.
Assert.assertEquals(0, dn0.getVolumeReport().stream()
.filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn0)).collect(Collectors.toList()).get(0)
.getNumBlocks());
Assert.assertEquals(0, dn1.getVolumeReport().stream()
.filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn1)).collect(Collectors.toList()).get(0)
.getNumBlocks());
Assert.assertEquals(0, dn2.getVolumeReport().stream()
.filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn2)).collect(Collectors.toList()).get(0)
.getNumBlocks());
}
}