diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 323d79a6487..fcd72cf30a0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -417,16 +417,17 @@ private void updateDatanodeOpState(DatanodeDetails reportedDn) || scmStatus.getOpStateExpiryEpochSeconds() != reportedDn.getPersistedOpStateExpiryEpochSec()) { LOG.info("Scheduling a command to update the operationalState " + - "persisted on the datanode as the reported value ({}, {}) does not " + + "persisted on {} as the reported value does not " + "match the value stored in SCM ({}, {})", - reportedDn.getPersistedOpState(), - reportedDn.getPersistedOpStateExpiryEpochSec(), + reportedDn, scmStatus.getOperationalState(), scmStatus.getOpStateExpiryEpochSeconds()); - commandQueue.addCommand(reportedDn.getUuid(), + + onMessage(new CommandForDatanode(reportedDn.getUuid(), new SetNodeOperationalStateCommand( Time.monotonicNow(), scmStatus.getOperationalState(), - scmStatus.getOpStateExpiryEpochSeconds())); + scmStatus.getOpStateExpiryEpochSeconds()) + ), null); } DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn); scmDnd.setPersistedOpStateExpiryEpochSec( diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java index bf848fead7b..197955058ca 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java @@ -39,6 +39,8 @@ import org.apache.hadoop.util.Time; import com.google.common.collect.ImmutableSet; + +import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,6 +131,10 @@ public void onMessage(CommandForDatanode commandForDatanode, public List processHeartbeat(DatanodeDetails datanodeDetails) { // Update heartbeat map with current time datanodeHeartbeatMap.put(datanodeDetails.getUuid(), Time.now()); - return super.processHeartbeat(datanodeDetails); + + List cmds = super.processHeartbeat(datanodeDetails); + return cmds.stream() + .filter(c -> ALLOWED_COMMANDS.contains(c.getType())) + .collect(toList()); } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java index c934caef22e..ee7f8caf5e8 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java @@ -26,16 +26,22 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.List; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -91,6 +97,25 @@ public void testReconNodeDB() throws IOException { assertEquals(1, reconNodeManager.getAllNodes().size()); assertNotNull(reconNodeManager.getNodeByUuid(uuidString)); + // If any commands are added to the eventQueue without using the onMessage + // interface, then they should be filtered out and not returned to the DN + // when it heartbeats. + // This command should never be returned by Recon + reconNodeManager.addDatanodeCommand(datanodeDetails.getUuid(), + new SetNodeOperationalStateCommand(1234, + HddsProtos.NodeOperationalState.DECOMMISSIONING, 0)); + + // This one should be returned + reconNodeManager.addDatanodeCommand(datanodeDetails.getUuid(), + new ReregisterCommand()); + + // Upon processing the heartbeat, the illegal command should be filtered out + List returnedCmds = + reconNodeManager.processHeartbeat(datanodeDetails); + assertEquals(1, returnedCmds.size()); + assertEquals(SCMCommandProto.Type.reregisterCommand, + returnedCmds.get(0).getType()); + // Close the DB, and recreate the instance of Recon Node Manager. eventQueue.close(); reconNodeManager.close();