diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java index 1c068f9c44..2aab0fc9aa 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java @@ -61,7 +61,7 @@ * @see http://helix.apache.org */ class HelixClusterManager implements ClusterMap { - private final Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HelixClusterManager.class); private final String clusterName; private final String selfInstanceName; private final MetricRegistry metricRegistry; @@ -590,7 +590,9 @@ private void updateInstanceLiveness(List liveInstances) { liveInstancesSet.add(liveInstance.getInstanceName()); } for (String instanceName : allInstances) { - if (liveInstancesSet.contains(instanceName)) { + // Here we ignore live instance change it's about self instance. The reason is, during server's startup, current + // node should be AVAILABLE but the list of live instances doesn't include current node since it hasn't joined yet. + if (liveInstancesSet.contains(instanceName) || instanceName.equals(selfInstanceName)) { instanceNameToAmbryDataNode.get(instanceName).setState(HardwareState.AVAILABLE); } else { instanceNameToAmbryDataNode.get(instanceName).setState(HardwareState.UNAVAILABLE); diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixClusterManagerTest.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixClusterManagerTest.java index 246530c320..712cd01d04 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixClusterManagerTest.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixClusterManagerTest.java @@ -62,7 +62,7 @@ @RunWith(Parameterized.class) public class HelixClusterManagerTest { private final HashMap dcsToZkInfo = new HashMap<>(); - private final String dcs[] = new String[]{"DC0", "DC1"}; + private final String[] dcs = new String[]{"DC0", "DC1"}; private final TestUtils.TestHardwareLayout testHardwareLayout; private final TestPartitionLayout testPartitionLayout; private final String clusterNameStatic = "HelixClusterManagerTestCluster"; @@ -70,6 +70,8 @@ public class HelixClusterManagerTest { private final ClusterMapConfig clusterMapConfig; private final MockHelixCluster helixCluster; private final String hostname; + private final int portNum; + private final String selfInstanceName; private final String localDc; private final String remoteDc; private ClusterMap clusterManager; @@ -182,11 +184,15 @@ public HelixClusterManagerTest(boolean useComposite, boolean overrideEnabled, bo } } - hostname = "localhost"; + DataNode currentNode = testHardwareLayout.getRandomDataNodeFromDc(localDc); + hostname = currentNode.getHostname(); + portNum = currentNode.getPort(); + selfInstanceName = getInstanceName(hostname, portNum); Properties props = new Properties(); props.setProperty("clustermap.host.name", hostname); props.setProperty("clustermap.cluster.name", clusterNamePrefixInHelix + clusterNameStatic); props.setProperty("clustermap.datacenter.name", localDc); + props.setProperty("clustermap.port", Integer.toString(portNum)); props.setProperty("clustermap.dcs.zk.connect.strings", zkJson.toString(2)); props.setProperty("clustermap.current.xid", Long.toString(CURRENT_XID)); props.setProperty("clustermap.enable.partition.override", Boolean.toString(overrideEnabled)); @@ -198,10 +204,10 @@ public HelixClusterManagerTest(boolean useComposite, boolean overrideEnabled, bo new StaticClusterAgentsFactory(clusterMapConfig, hardwareLayoutPath, partitionLayoutPath); metricRegistry = staticClusterAgentsFactory.getMetricRegistry(); clusterManager = new CompositeClusterManager(staticClusterAgentsFactory.getClusterMap(), - new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, metricRegistry)); + new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, metricRegistry)); } else { metricRegistry = new MetricRegistry(); - clusterManager = new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, metricRegistry); + clusterManager = new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, metricRegistry); } } @@ -234,13 +240,14 @@ public void instantiationTest() throws Exception { JSONObject invalidZkJson = constructZkLayoutJSON(zkInfos); Properties props = new Properties(); props.setProperty("clustermap.host.name", hostname); + props.setProperty("clustermap.port", Integer.toString(portNum)); props.setProperty("clustermap.cluster.name", clusterNamePrefixInHelix + clusterNameStatic); props.setProperty("clustermap.datacenter.name", localDc); props.setProperty("clustermap.dcs.zk.connect.strings", invalidZkJson.toString(2)); ClusterMapConfig invalidClusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); metricRegistry = new MetricRegistry(); - new HelixClusterManager(invalidClusterMapConfig, hostname, new MockHelixManagerFactory(helixCluster, null, null), - metricRegistry); + new HelixClusterManager(invalidClusterMapConfig, selfInstanceName, + new MockHelixManagerFactory(helixCluster, null, null), metricRegistry); assertEquals(0L, metricRegistry.getGauges().get(HelixClusterManager.class.getName() + ".instantiationFailed").getValue()); assertEquals(1L, metricRegistry.getGauges() @@ -256,8 +263,8 @@ public void instantiationTest() throws Exception { invalidClusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); metricRegistry = new MetricRegistry(); try { - new HelixClusterManager(invalidClusterMapConfig, hostname, new MockHelixManagerFactory(helixCluster, null, null), - metricRegistry); + new HelixClusterManager(invalidClusterMapConfig, selfInstanceName, + new MockHelixManagerFactory(helixCluster, null, null), metricRegistry); fail("Instantiation should have failed with invalid zk addresses"); } catch (IOException e) { assertEquals(1L, @@ -269,7 +276,7 @@ public void instantiationTest() throws Exception { metricRegistry = new MetricRegistry(); try { - new HelixClusterManager(clusterMapConfig, hostname, + new HelixClusterManager(clusterMapConfig, selfInstanceName, new MockHelixManagerFactory(helixCluster, null, new Exception("beBad")), metricRegistry); fail("Instantiation should fail with a HelixManager factory that throws exception on listener registrations"); } catch (Exception e) { @@ -289,9 +296,8 @@ public void emptyPartitionOverrideTest() throws Exception { assumeTrue(overrideEnabled); metricRegistry = new MetricRegistry(); // create a MockHelixManagerFactory - ClusterMap clusterManagerWithEmptyRecord = - new HelixClusterManager(clusterMapConfig, hostname, new MockHelixManagerFactory(helixCluster, null, null), - metricRegistry); + ClusterMap clusterManagerWithEmptyRecord = new HelixClusterManager(clusterMapConfig, selfInstanceName, + new MockHelixManagerFactory(helixCluster, null, null), metricRegistry); Set writableInClusterManager = new HashSet<>(); for (PartitionId partition : clusterManagerWithEmptyRecord.getWritablePartitionIds(null)) { @@ -314,14 +320,11 @@ public void emptyPartitionOverrideTest() throws Exception { public void basicInterfaceTest() throws Exception { assumeTrue(!overrideEnabled); - for (String metricName : clusterManager.getMetricRegistry().getNames()) { - System.out.println(metricName); - } assertEquals("Incorrect local datacenter ID", 0, clusterManager.getLocalDatacenterId()); testPartitionReplicaConsistency(); testInvalidPartitionId(); testDatacenterDatanodeReplicas(); - assertStateEquivalency(); + assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances()); } /** @@ -334,24 +337,39 @@ public void helixInitiatedLivenessChangeTest() throws Exception { assumeTrue(!useComposite && !overrideEnabled); // all instances are up initially. - assertStateEquivalency(); + assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances()); - // Bring one instance down in each dc. + // Bring one instance (not current host) down in each dc for (String zkAddr : helixCluster.getZkAddrs()) { - helixCluster.bringInstanceDown(helixCluster.getUpInstances(zkAddr).get(0)); + String instance = + helixCluster.getUpInstances(zkAddr).stream().filter(name -> !name.equals(selfInstanceName)).findFirst().get(); + helixCluster.bringInstanceDown(instance); } - assertStateEquivalency(); + assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances()); // Bring all instances down in all dcs. helixCluster.bringAllInstancesDown(); - assertStateEquivalency(); + Set expectedDownInstances = helixCluster.getDownInstances(); + Set expectedUpInstances = helixCluster.getUpInstances(); + expectedDownInstances.remove(selfInstanceName); + expectedUpInstances.add(selfInstanceName); + assertStateEquivalency(expectedDownInstances, expectedUpInstances); // Bring one instance up in each dc. + boolean selfInstanceIsChosen = false; for (String zkAddr : helixCluster.getZkAddrs()) { - helixCluster.bringInstanceUp(helixCluster.getDownInstances(zkAddr).get(0)); + String instanceName = helixCluster.getDownInstances(zkAddr).get(0); + selfInstanceIsChosen = instanceName.equals(selfInstanceName); + helixCluster.bringInstanceUp(instanceName); + } + expectedDownInstances = helixCluster.getDownInstances(); + expectedUpInstances = helixCluster.getUpInstances(); + if (!selfInstanceIsChosen) { + expectedDownInstances.remove(selfInstanceName); + expectedUpInstances.add(selfInstanceName); } - assertStateEquivalency(); + assertStateEquivalency(expectedDownInstances, expectedUpInstances); } /** @@ -422,7 +440,7 @@ public void clientInitiatedLivenessChangeTest() { // The following does not do anything currently. clusterManager.onReplicaEvent(replica, ReplicaEventType.Partition_ReadOnly); - assertStateEquivalency(); + assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances()); } /** @@ -488,7 +506,7 @@ public void sealedReplicaChangeTest() throws Exception { assumeTrue(!useComposite && !overrideEnabled && listenCrossColo); // all instances are up initially. - assertStateEquivalency(); + assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances()); AmbryPartition partition = (AmbryPartition) clusterManager.getWritablePartitionIds(null).get(0); List instances = helixCluster.getInstancesForPartition((partition.toPathString())); @@ -513,7 +531,7 @@ public void sealedReplicaChangeTest() throws Exception { clusterManager.getWritablePartitionIds(null).contains(partition)); assertEquals("If no replica is SEALED, the whole partition should be Writable", PartitionState.READ_WRITE, partition.getPartitionState()); - assertStateEquivalency(); + assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances()); } /** @@ -568,7 +586,7 @@ public void clusterMapOverrideEnabledAndDisabledTest() throws Exception { clusterManager.close(); MockHelixManagerFactory helixManagerFactory = new MockHelixManagerFactory(helixCluster, znRecord, null); HelixClusterManager clusterManager = - new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, new MetricRegistry()); + new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, new MetricRegistry()); // Ensure the new RW partition is added assertEquals("Mismatch in writable partitions when instanceConfig changes", writableInOverrideMap.size() + 1, clusterManager.getWritablePartitionIds(null).size()); @@ -621,7 +639,7 @@ public void stoppedReplicaChangeTest() { assumeTrue(!useComposite && !overrideEnabled && listenCrossColo); // all instances are up initially. - assertStateEquivalency(); + assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances()); AmbryPartition partition = (AmbryPartition) clusterManager.getWritablePartitionIds(null).get(0); List instances = helixCluster.getInstancesForPartition((partition.toPathString())); @@ -679,11 +697,19 @@ public void xidTest() throws Exception { MockHelixManagerFactory helixManagerFactory = new MockHelixManagerFactory(helixCluster, null, null); List instanceConfigs = helixCluster.getAllInstanceConfigs(); int instanceCount = instanceConfigs.size(); - int randomIndex = com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceConfigs.size()); + // find the self instance config and put it at the end of list. This is to ensure subsequent test won't choose self instance config. + for (int i = 0; i < instanceCount; ++i) { + if (instanceConfigs.get(i).getInstanceName().equals(selfInstanceName)) { + Collections.swap(instanceConfigs, i, instanceConfigs.size() - 1); + break; + } + } + int randomIndex = com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceCount - 1); InstanceConfig aheadInstanceConfig = instanceConfigs.get(randomIndex); - Collections.swap(instanceConfigs, randomIndex, instanceConfigs.size() - 1); + Collections.swap(instanceConfigs, randomIndex, instanceConfigs.size() - 2); aheadInstanceConfig.getRecord().setSimpleField(XID_STR, Long.toString(CURRENT_XID + 1)); - clusterManager = new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, new MetricRegistry()); + clusterManager = + new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, new MetricRegistry()); assertEquals(instanceCount - 1, clusterManager.getDataNodeIds().size()); for (DataNodeId dataNode : clusterManager.getDataNodeIds()) { String instanceName = ClusterMapUtils.getInstanceName(dataNode.getHostname(), dataNode.getPort()); @@ -696,9 +722,9 @@ public void xidTest() throws Exception { assertEquals(instanceCount, aheadInstanceClusterManager.getDataNodeIds().size()); } - // Post-initialization InstanceConfig change: + // Post-initialization InstanceConfig change: pick an instance that is neither previous instance nor self instance InstanceConfig ignoreInstanceConfig = - instanceConfigs.get(com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceConfigs.size() - 1)); + instanceConfigs.get(com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceCount - 2)); String ignoreInstanceName = ignoreInstanceConfig.getInstanceName(); ignoreInstanceConfig.getRecord().setSimpleField(XID_STR, Long.toString(CURRENT_XID + 2)); @@ -785,9 +811,11 @@ public void metricsTest() throws Exception { // live instance trigger happens once initially. long instanceTriggerCount = dcs.length; - // Bring one instance down in each dc in order to test the metrics more generally. + // Bring one instance (not current instance) down in each dc in order to test the metrics more generally. for (String zkAddr : helixCluster.getZkAddrs()) { - helixCluster.bringInstanceDown(helixCluster.getUpInstances(zkAddr).get(0)); + String instance = + helixCluster.getUpInstances(zkAddr).stream().filter(name -> !name.equals(selfInstanceName)).findFirst().get(); + helixCluster.bringInstanceDown(instance); instanceTriggerCount++; } @@ -981,11 +1009,10 @@ private void testDatacenterDatanodeReplicas() { /** * Assert that the state of datanodes in the cluster manager's view are consistent with their actual states in the * cluster. + * @param expectedDownInstances the expected down instances set in cluster manager. + * @param expectedUpInstances the expected up instances set in cluster manager. */ - private void assertStateEquivalency() { - Set upInstancesInCluster = helixCluster.getUpInstances(); - Set downInstancesInCluster = helixCluster.getDownInstances(); - + private void assertStateEquivalency(Set expectedDownInstances, Set expectedUpInstances) { Set upInstancesInClusterManager = new HashSet<>(); Set downInstancesInClusterManager = new HashSet<>(); for (DataNodeId dataNode : clusterManager.getDataNodeIds()) { @@ -997,8 +1024,8 @@ private void assertStateEquivalency() { ClusterMapUtils.getInstanceName(dataNode.getHostname(), dataNode.getPort()))); } } - assertEquals(downInstancesInCluster, downInstancesInClusterManager); - assertEquals(upInstancesInCluster, upInstancesInClusterManager); + assertEquals(expectedDownInstances, downInstancesInClusterManager); + assertEquals(expectedUpInstances, upInstancesInClusterManager); Pair, Set> writablePartitionsInTwoPlaces = getWritablePartitions(); assertEquals(writablePartitionsInTwoPlaces.getFirst(), writablePartitionsInTwoPlaces.getSecond()); testAllPartitions();