Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
Expand Down Expand Up @@ -1190,7 +1191,7 @@ static class Scheduler {

private final long heartbeatIntervalMs;
private final long lifelineIntervalMs;
private final long blockReportIntervalMs;
private volatile long blockReportIntervalMs;
private final long outliersReportIntervalMs;

Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
Expand Down Expand Up @@ -1346,6 +1347,15 @@ void setNextBlockReportTime(long nextBlockReportTime) {
this.nextBlockReportTime.getAndSet(nextBlockReportTime);
}

long getBlockReportIntervalMs() {
return this.blockReportIntervalMs;
}

void setBlockReportIntervalMs(long intervalMs) {
Preconditions.checkArgument(intervalMs > 0);
this.blockReportIntervalMs = intervalMs;
}

/**
* Wrapped for testing.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.util.Preconditions;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -105,7 +106,7 @@ public class DNConf {
final long readaheadLength;
final long heartBeatInterval;
private final long lifelineIntervalMs;
final long blockReportInterval;
volatile long blockReportInterval;
final long blockReportSplitThreshold;
final boolean peerStatsEnabled;
final boolean diskStatsEnabled;
Expand Down Expand Up @@ -475,6 +476,11 @@ public long getProcessCommandsThresholdMs() {
return processCommandsThresholdMs;
}

void setBlockReportInterval(long intervalMs) {
Preconditions.checkArgument(intervalMs > 0);
blockReportInterval = intervalMs;
}

public long getBlockReportInterval() {
return blockReportInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode;


import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING;
Expand Down Expand Up @@ -299,7 +301,8 @@ public class DataNode extends ReconfigurableBase
Collections.unmodifiableList(
Arrays.asList(
DFS_DATANODE_DATA_DIR_KEY,
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY));
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));

public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");

Expand Down Expand Up @@ -533,78 +536,111 @@ protected Configuration getNewConf() {
public String reconfigurePropertyImpl(String property, String newVal)
throws ReconfigurationException {
switch (property) {
case DFS_DATANODE_DATA_DIR_KEY: {
IOException rootException = null;
case DFS_DATANODE_DATA_DIR_KEY: {
IOException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
this.refreshVolumes(newVal);
return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
} catch (IOException e) {
rootException = e;
} finally {
// Send a full block report to let NN acknowledge the volume changes.
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
this.refreshVolumes(newVal);
return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
triggerBlockReport(
new BlockReportOptions.Factory().setIncremental(false).build());
} catch (IOException e) {
rootException = e;
LOG.warn("Exception while sending the block report after refreshing"
+ " volumes {} to {}", property, newVal, e);
if (rootException == null) {
rootException = e;
}
} finally {
// Send a full block report to let NN acknowledge the volume changes.
try {
triggerBlockReport(
new BlockReportOptions.Factory().setIncremental(false).build());
} catch (IOException e) {
LOG.warn("Exception while sending the block report after refreshing"
+ " volumes {} to {}", property, newVal, e);
if (rootException == null) {
rootException = e;
}
} finally {
if (rootException != null) {
throw new ReconfigurationException(property, newVal,
getConf().get(property), rootException);
}
if (rootException != null) {
throw new ReconfigurationException(property, newVal,
getConf().get(property), rootException);
}
}
break;
}
case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: {
ReconfigurationException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
int movers;
if (newVal == null) {
// set to default
movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
} else {
movers = Integer.parseInt(newVal);
if (movers <= 0) {
rootException = new ReconfigurationException(
property,
newVal,
getConf().get(property),
new IllegalArgumentException(
"balancer max concurrent movers must be larger than 0"));
}
}
boolean success = xserver.updateBalancerMaxConcurrentMovers(movers);
if (!success) {
break;
}
case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: {
ReconfigurationException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
int movers;
if (newVal == null) {
// set to default
movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
} else {
movers = Integer.parseInt(newVal);
if (movers <= 0) {
rootException = new ReconfigurationException(
property,
newVal,
getConf().get(property),
new IllegalArgumentException(
"Could not modify concurrent moves thread count"));
"balancer max concurrent movers must be larger than 0"));
}
return Integer.toString(movers);
} catch (NumberFormatException nfe) {
}
boolean success = xserver.updateBalancerMaxConcurrentMovers(movers);
if (!success) {
rootException = new ReconfigurationException(
property, newVal, getConf().get(property), nfe);
} finally {
if (rootException != null) {
LOG.warn(String.format(
"Exception in updating balancer max concurrent movers %s to %s",
property, newVal), rootException);
throw rootException;
property,
newVal,
getConf().get(property),
new IllegalArgumentException(
"Could not modify concurrent moves thread count"));
}
return Integer.toString(movers);
} catch (NumberFormatException nfe) {
rootException = new ReconfigurationException(
property, newVal, getConf().get(property), nfe);
} finally {
if (rootException != null) {
LOG.warn(String.format(
"Exception in updating balancer max concurrent movers %s to %s",
property, newVal), rootException);
throw rootException;
}
}
break;
}
case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: {
ReconfigurationException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
long intervalMs;
if (newVal == null) {
// Set to default.
intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
} else {
intervalMs = Long.parseLong(newVal);
}
dnConf.setBlockReportInterval(intervalMs);
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
actor.getScheduler().setBlockReportIntervalMs(intervalMs);
}
}
}
break;
return Long.toString(intervalMs);
} catch (IllegalArgumentException e) {
rootException = new ReconfigurationException(
property, newVal, getConf().get(property), e);
} finally {
if (rootException != null) {
LOG.warn(String.format(
"Exception in updating block report interval %s to %s",
property, newVal), rootException);
throw rootException;
}
}
default:
break;
break;
}
default:
break;
}
throw new ReconfigurationException(
property, newVal, getConf().get(property));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hdfs.server.datanode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -293,4 +295,74 @@ private void testAcquireOnMaxConcurrentMoversReconfiguration(
assertEquals("should not be able to get thread quota", false,
dataNode.xserver.balanceThrottler.acquire());
}

@Test
public void testBlockReportIntervalReconfiguration()
throws ReconfigurationException, IOException {
int blockReportInterval = 300 * 1000;
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);

// Try invalid values.
try {
dn.reconfigureProperty(
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}

// Change properties.
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
String.valueOf(blockReportInterval));

// Verify change.
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
blockReportInterval,
dn.getDnConf().getBlockReportInterval());
for (BPOfferService bpos : dn.getAllBpOs()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
blockReportInterval,
actor.getScheduler().getBlockReportIntervalMs());
}
}
}

// Revert to default.
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
null);
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
dn.getDnConf().getBlockReportInterval());
// Verify default.
for (BPOfferService bpos : dn.getAllBpOs()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
actor.getScheduler().getBlockReportIntervalMs());
}
}
}
assertEquals(String.format("expect %s is not configured",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), null, dn
.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(3, outs.size());
assertEquals(4, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down