Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ public class DNConf {
final long heartBeatInterval;
private final long lifelineIntervalMs;
volatile long blockReportInterval;
final long blockReportSplitThreshold;
volatile long blockReportSplitThreshold;
final boolean peerStatsEnabled;
final boolean diskStatsEnabled;
final long outliersReportIntervalMs;
final long ibrInterval;
final long initialBlockReportDelayMs;
volatile long initialBlockReportDelayMs;
volatile long cacheReportInterval;
final long datanodeSlowIoWarningThresholdMs;

Expand Down Expand Up @@ -215,19 +215,7 @@ public DNConf(final Configurable dn) {
this.datanodeSlowIoWarningThresholdMs = getConf().getLong(
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);

long initBRDelay = getConf().getTimeDuration(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT,
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
if (initBRDelay >= blockReportInterval) {
initBRDelay = 0;
DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY + " is "
+ "greater than or equal to" + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY
+ ". Setting initial delay to 0 msec:");
}
initialBlockReportDelayMs = initBRDelay;

initBlockReportDelay();
heartBeatInterval = getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS,
TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -311,6 +299,19 @@ public DNConf(final Configurable dn) {
);
}

private void initBlockReportDelay() {
long initBRDelay = getConf().getTimeDuration(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
if (initBRDelay >= blockReportInterval || initBRDelay < 0) {
initBRDelay = 0;
DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY +
" is greater than or equal to " + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY +
". Setting initial delay to 0 msec.");
}
initialBlockReportDelayMs = initBRDelay;
}

// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion;
Expand Down Expand Up @@ -477,7 +478,8 @@ public long getProcessCommandsThresholdMs() {
}

void setBlockReportInterval(long intervalMs) {
Preconditions.checkArgument(intervalMs > 0);
Preconditions.checkArgument(intervalMs > 0,
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY + " should be larger than 0");
blockReportInterval = intervalMs;
}

Expand All @@ -487,11 +489,22 @@ public long getBlockReportInterval() {

void setCacheReportInterval(long intervalMs) {
Preconditions.checkArgument(intervalMs > 0,
"dfs.cachereport.intervalMsec should be larger than 0");
DFS_CACHEREPORT_INTERVAL_MSEC_KEY + " should be larger than 0");
cacheReportInterval = intervalMs;
}

public long getCacheReportInterval() {
return cacheReportInterval;
}

void setBlockReportSplitThreshold(long threshold) {
Preconditions.checkArgument(threshold >= 0,
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY + " should be larger than or equal to 0");
blockReportSplitThreshold = threshold;
}

void setInitBRDelayMs(String delayMs) {
dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs);
initBlockReportDelay();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
package org.apache.hadoop.hdfs.server.datanode;


import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING;
Expand Down Expand Up @@ -308,6 +312,8 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_DATA_DIR_KEY,
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
DFS_CACHEREPORT_INTERVAL_MSEC_KEY));

Expand Down Expand Up @@ -620,39 +626,10 @@ public String reconfigurePropertyImpl(String property, String newVal)
}
break;
}
case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: {
ReconfigurationException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
long intervalMs;
if (newVal == null) {
// Set to default.
intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
} else {
intervalMs = Long.parseLong(newVal);
}
dnConf.setBlockReportInterval(intervalMs);
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
actor.getScheduler().setBlockReportIntervalMs(intervalMs);
}
}
}
return Long.toString(intervalMs);
} catch (IllegalArgumentException e) {
rootException = new ReconfigurationException(
property, newVal, getConf().get(property), e);
} finally {
if (rootException != null) {
LOG.warn(String.format(
"Exception in updating block report interval %s to %s",
property, newVal), rootException);
throw rootException;
}
}
break;
}
case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY:
case DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY:
case DFS_BLOCKREPORT_INITIAL_DELAY_KEY:
return reconfBlockReportParameters(property, newVal);
case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
return reconfDataXceiverParameters(property, newVal);
case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
Expand Down Expand Up @@ -698,6 +675,44 @@ private String reconfCacheReportParameters(String property, String newVal)
}
}

private String reconfBlockReportParameters(String property, String newVal)
throws ReconfigurationException {
String result = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
if (property.equals(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)) {
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
long intervalMs = newVal == null ? DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT :
Long.parseLong(newVal);
result = Long.toString(intervalMs);
dnConf.setBlockReportInterval(intervalMs);
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
actor.getScheduler().setBlockReportIntervalMs(intervalMs);
}
}
}
} else if (property.equals(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY)) {
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
long threshold = newVal == null ? DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT :
Long.parseLong(newVal);
result = Long.toString(threshold);
dnConf.setBlockReportSplitThreshold(threshold);
} else if (property.equals(DFS_BLOCKREPORT_INITIAL_DELAY_KEY)) {
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
int initialDelay = newVal == null ? DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT :
Integer.parseInt(newVal);
result = Integer.toString(initialDelay);
dnConf.setInitBRDelayMs(result);
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down Expand Up @@ -3944,7 +3959,8 @@ boolean isSlownode() {
return blockPoolManager.isSlownode();
}

BlockPoolManager getBlockPoolManager() {
@VisibleForTesting
public BlockPoolManager getBlockPoolManager() {
return blockPoolManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.hadoop.hdfs.server.datanode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
Expand Down Expand Up @@ -303,40 +305,49 @@ private void testAcquireOnMaxConcurrentMoversReconfiguration(

@Test
public void testBlockReportIntervalReconfiguration()
throws ReconfigurationException, IOException {
throws ReconfigurationException {
int blockReportInterval = 300 * 1000;
String[] blockReportParameters = {
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_KEY};

for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
BlockPoolManager blockPoolManager = dn.getBlockPoolManager();

// Try invalid values.
for (String blockReportParameter : blockReportParameters) {
try {
dn.reconfigureProperty(blockReportParameter, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
}

try {
dn.reconfigureProperty(
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, "text");
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
try {
dn.reconfigureProperty(
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
String.valueOf(-1));
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, String.valueOf(-1));
assertEquals(0, dn.getDnConf().initialBlockReportDelayMs);

// Change properties.
// Change properties and verify the change.
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
String.valueOf(blockReportInterval));

// Verify change.
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
blockReportInterval,
dn.getDnConf().getBlockReportInterval());
for (BPOfferService bpos : dn.getAllBpOs()) {
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
assertEquals(String.format("%s has wrong value",
Expand All @@ -347,15 +358,15 @@ public void testBlockReportIntervalReconfiguration()
}
}

// Revert to default.
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
null);
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
dn.getDnConf().getBlockReportInterval());
// Verify default.
for (BPOfferService bpos : dn.getAllBpOs()) {
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(123));
assertEquals(123, dn.getDnConf().blockReportSplitThreshold);

dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "123");
assertEquals(123000, dn.getDnConf().initialBlockReportDelayMs);

// Revert to default and verify default.
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, null);
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
assertEquals(String.format("%s has wrong value",
Expand All @@ -365,9 +376,16 @@ public void testBlockReportIntervalReconfiguration()
}
}
}
assertEquals(String.format("expect %s is not configured",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), null, dn
.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
dn.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));

dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, null);
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY),
dn.getConf().get(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY));

dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, null);
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INITIAL_DELAY_KEY),
dn.getConf().get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(6, outs.size());
assertEquals(8, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down