diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java
index 1c068f9c44..2aab0fc9aa 100644
--- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java
+++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java
@@ -61,7 +61,7 @@
* @see http://helix.apache.org
*/
class HelixClusterManager implements ClusterMap {
- private final Logger logger = LoggerFactory.getLogger(getClass());
+ private static final Logger logger = LoggerFactory.getLogger(HelixClusterManager.class);
private final String clusterName;
private final String selfInstanceName;
private final MetricRegistry metricRegistry;
@@ -590,7 +590,9 @@ private void updateInstanceLiveness(List liveInstances) {
liveInstancesSet.add(liveInstance.getInstanceName());
}
for (String instanceName : allInstances) {
- if (liveInstancesSet.contains(instanceName)) {
+ // Here we ignore live instance change it's about self instance. The reason is, during server's startup, current
+ // node should be AVAILABLE but the list of live instances doesn't include current node since it hasn't joined yet.
+ if (liveInstancesSet.contains(instanceName) || instanceName.equals(selfInstanceName)) {
instanceNameToAmbryDataNode.get(instanceName).setState(HardwareState.AVAILABLE);
} else {
instanceNameToAmbryDataNode.get(instanceName).setState(HardwareState.UNAVAILABLE);
diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixClusterManagerTest.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixClusterManagerTest.java
index 246530c320..712cd01d04 100644
--- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixClusterManagerTest.java
+++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixClusterManagerTest.java
@@ -62,7 +62,7 @@
@RunWith(Parameterized.class)
public class HelixClusterManagerTest {
private final HashMap dcsToZkInfo = new HashMap<>();
- private final String dcs[] = new String[]{"DC0", "DC1"};
+ private final String[] dcs = new String[]{"DC0", "DC1"};
private final TestUtils.TestHardwareLayout testHardwareLayout;
private final TestPartitionLayout testPartitionLayout;
private final String clusterNameStatic = "HelixClusterManagerTestCluster";
@@ -70,6 +70,8 @@ public class HelixClusterManagerTest {
private final ClusterMapConfig clusterMapConfig;
private final MockHelixCluster helixCluster;
private final String hostname;
+ private final int portNum;
+ private final String selfInstanceName;
private final String localDc;
private final String remoteDc;
private ClusterMap clusterManager;
@@ -182,11 +184,15 @@ public HelixClusterManagerTest(boolean useComposite, boolean overrideEnabled, bo
}
}
- hostname = "localhost";
+ DataNode currentNode = testHardwareLayout.getRandomDataNodeFromDc(localDc);
+ hostname = currentNode.getHostname();
+ portNum = currentNode.getPort();
+ selfInstanceName = getInstanceName(hostname, portNum);
Properties props = new Properties();
props.setProperty("clustermap.host.name", hostname);
props.setProperty("clustermap.cluster.name", clusterNamePrefixInHelix + clusterNameStatic);
props.setProperty("clustermap.datacenter.name", localDc);
+ props.setProperty("clustermap.port", Integer.toString(portNum));
props.setProperty("clustermap.dcs.zk.connect.strings", zkJson.toString(2));
props.setProperty("clustermap.current.xid", Long.toString(CURRENT_XID));
props.setProperty("clustermap.enable.partition.override", Boolean.toString(overrideEnabled));
@@ -198,10 +204,10 @@ public HelixClusterManagerTest(boolean useComposite, boolean overrideEnabled, bo
new StaticClusterAgentsFactory(clusterMapConfig, hardwareLayoutPath, partitionLayoutPath);
metricRegistry = staticClusterAgentsFactory.getMetricRegistry();
clusterManager = new CompositeClusterManager(staticClusterAgentsFactory.getClusterMap(),
- new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, metricRegistry));
+ new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, metricRegistry));
} else {
metricRegistry = new MetricRegistry();
- clusterManager = new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, metricRegistry);
+ clusterManager = new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, metricRegistry);
}
}
@@ -234,13 +240,14 @@ public void instantiationTest() throws Exception {
JSONObject invalidZkJson = constructZkLayoutJSON(zkInfos);
Properties props = new Properties();
props.setProperty("clustermap.host.name", hostname);
+ props.setProperty("clustermap.port", Integer.toString(portNum));
props.setProperty("clustermap.cluster.name", clusterNamePrefixInHelix + clusterNameStatic);
props.setProperty("clustermap.datacenter.name", localDc);
props.setProperty("clustermap.dcs.zk.connect.strings", invalidZkJson.toString(2));
ClusterMapConfig invalidClusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
metricRegistry = new MetricRegistry();
- new HelixClusterManager(invalidClusterMapConfig, hostname, new MockHelixManagerFactory(helixCluster, null, null),
- metricRegistry);
+ new HelixClusterManager(invalidClusterMapConfig, selfInstanceName,
+ new MockHelixManagerFactory(helixCluster, null, null), metricRegistry);
assertEquals(0L,
metricRegistry.getGauges().get(HelixClusterManager.class.getName() + ".instantiationFailed").getValue());
assertEquals(1L, metricRegistry.getGauges()
@@ -256,8 +263,8 @@ public void instantiationTest() throws Exception {
invalidClusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
metricRegistry = new MetricRegistry();
try {
- new HelixClusterManager(invalidClusterMapConfig, hostname, new MockHelixManagerFactory(helixCluster, null, null),
- metricRegistry);
+ new HelixClusterManager(invalidClusterMapConfig, selfInstanceName,
+ new MockHelixManagerFactory(helixCluster, null, null), metricRegistry);
fail("Instantiation should have failed with invalid zk addresses");
} catch (IOException e) {
assertEquals(1L,
@@ -269,7 +276,7 @@ public void instantiationTest() throws Exception {
metricRegistry = new MetricRegistry();
try {
- new HelixClusterManager(clusterMapConfig, hostname,
+ new HelixClusterManager(clusterMapConfig, selfInstanceName,
new MockHelixManagerFactory(helixCluster, null, new Exception("beBad")), metricRegistry);
fail("Instantiation should fail with a HelixManager factory that throws exception on listener registrations");
} catch (Exception e) {
@@ -289,9 +296,8 @@ public void emptyPartitionOverrideTest() throws Exception {
assumeTrue(overrideEnabled);
metricRegistry = new MetricRegistry();
// create a MockHelixManagerFactory
- ClusterMap clusterManagerWithEmptyRecord =
- new HelixClusterManager(clusterMapConfig, hostname, new MockHelixManagerFactory(helixCluster, null, null),
- metricRegistry);
+ ClusterMap clusterManagerWithEmptyRecord = new HelixClusterManager(clusterMapConfig, selfInstanceName,
+ new MockHelixManagerFactory(helixCluster, null, null), metricRegistry);
Set writableInClusterManager = new HashSet<>();
for (PartitionId partition : clusterManagerWithEmptyRecord.getWritablePartitionIds(null)) {
@@ -314,14 +320,11 @@ public void emptyPartitionOverrideTest() throws Exception {
public void basicInterfaceTest() throws Exception {
assumeTrue(!overrideEnabled);
- for (String metricName : clusterManager.getMetricRegistry().getNames()) {
- System.out.println(metricName);
- }
assertEquals("Incorrect local datacenter ID", 0, clusterManager.getLocalDatacenterId());
testPartitionReplicaConsistency();
testInvalidPartitionId();
testDatacenterDatanodeReplicas();
- assertStateEquivalency();
+ assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
}
/**
@@ -334,24 +337,39 @@ public void helixInitiatedLivenessChangeTest() throws Exception {
assumeTrue(!useComposite && !overrideEnabled);
// all instances are up initially.
- assertStateEquivalency();
+ assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
- // Bring one instance down in each dc.
+ // Bring one instance (not current host) down in each dc
for (String zkAddr : helixCluster.getZkAddrs()) {
- helixCluster.bringInstanceDown(helixCluster.getUpInstances(zkAddr).get(0));
+ String instance =
+ helixCluster.getUpInstances(zkAddr).stream().filter(name -> !name.equals(selfInstanceName)).findFirst().get();
+ helixCluster.bringInstanceDown(instance);
}
- assertStateEquivalency();
+ assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
// Bring all instances down in all dcs.
helixCluster.bringAllInstancesDown();
- assertStateEquivalency();
+ Set expectedDownInstances = helixCluster.getDownInstances();
+ Set expectedUpInstances = helixCluster.getUpInstances();
+ expectedDownInstances.remove(selfInstanceName);
+ expectedUpInstances.add(selfInstanceName);
+ assertStateEquivalency(expectedDownInstances, expectedUpInstances);
// Bring one instance up in each dc.
+ boolean selfInstanceIsChosen = false;
for (String zkAddr : helixCluster.getZkAddrs()) {
- helixCluster.bringInstanceUp(helixCluster.getDownInstances(zkAddr).get(0));
+ String instanceName = helixCluster.getDownInstances(zkAddr).get(0);
+ selfInstanceIsChosen = instanceName.equals(selfInstanceName);
+ helixCluster.bringInstanceUp(instanceName);
+ }
+ expectedDownInstances = helixCluster.getDownInstances();
+ expectedUpInstances = helixCluster.getUpInstances();
+ if (!selfInstanceIsChosen) {
+ expectedDownInstances.remove(selfInstanceName);
+ expectedUpInstances.add(selfInstanceName);
}
- assertStateEquivalency();
+ assertStateEquivalency(expectedDownInstances, expectedUpInstances);
}
/**
@@ -422,7 +440,7 @@ public void clientInitiatedLivenessChangeTest() {
// The following does not do anything currently.
clusterManager.onReplicaEvent(replica, ReplicaEventType.Partition_ReadOnly);
- assertStateEquivalency();
+ assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
}
/**
@@ -488,7 +506,7 @@ public void sealedReplicaChangeTest() throws Exception {
assumeTrue(!useComposite && !overrideEnabled && listenCrossColo);
// all instances are up initially.
- assertStateEquivalency();
+ assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
AmbryPartition partition = (AmbryPartition) clusterManager.getWritablePartitionIds(null).get(0);
List instances = helixCluster.getInstancesForPartition((partition.toPathString()));
@@ -513,7 +531,7 @@ public void sealedReplicaChangeTest() throws Exception {
clusterManager.getWritablePartitionIds(null).contains(partition));
assertEquals("If no replica is SEALED, the whole partition should be Writable", PartitionState.READ_WRITE,
partition.getPartitionState());
- assertStateEquivalency();
+ assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
}
/**
@@ -568,7 +586,7 @@ public void clusterMapOverrideEnabledAndDisabledTest() throws Exception {
clusterManager.close();
MockHelixManagerFactory helixManagerFactory = new MockHelixManagerFactory(helixCluster, znRecord, null);
HelixClusterManager clusterManager =
- new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, new MetricRegistry());
+ new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, new MetricRegistry());
// Ensure the new RW partition is added
assertEquals("Mismatch in writable partitions when instanceConfig changes", writableInOverrideMap.size() + 1,
clusterManager.getWritablePartitionIds(null).size());
@@ -621,7 +639,7 @@ public void stoppedReplicaChangeTest() {
assumeTrue(!useComposite && !overrideEnabled && listenCrossColo);
// all instances are up initially.
- assertStateEquivalency();
+ assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
AmbryPartition partition = (AmbryPartition) clusterManager.getWritablePartitionIds(null).get(0);
List instances = helixCluster.getInstancesForPartition((partition.toPathString()));
@@ -679,11 +697,19 @@ public void xidTest() throws Exception {
MockHelixManagerFactory helixManagerFactory = new MockHelixManagerFactory(helixCluster, null, null);
List instanceConfigs = helixCluster.getAllInstanceConfigs();
int instanceCount = instanceConfigs.size();
- int randomIndex = com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceConfigs.size());
+ // find the self instance config and put it at the end of list. This is to ensure subsequent test won't choose self instance config.
+ for (int i = 0; i < instanceCount; ++i) {
+ if (instanceConfigs.get(i).getInstanceName().equals(selfInstanceName)) {
+ Collections.swap(instanceConfigs, i, instanceConfigs.size() - 1);
+ break;
+ }
+ }
+ int randomIndex = com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceCount - 1);
InstanceConfig aheadInstanceConfig = instanceConfigs.get(randomIndex);
- Collections.swap(instanceConfigs, randomIndex, instanceConfigs.size() - 1);
+ Collections.swap(instanceConfigs, randomIndex, instanceConfigs.size() - 2);
aheadInstanceConfig.getRecord().setSimpleField(XID_STR, Long.toString(CURRENT_XID + 1));
- clusterManager = new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, new MetricRegistry());
+ clusterManager =
+ new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, new MetricRegistry());
assertEquals(instanceCount - 1, clusterManager.getDataNodeIds().size());
for (DataNodeId dataNode : clusterManager.getDataNodeIds()) {
String instanceName = ClusterMapUtils.getInstanceName(dataNode.getHostname(), dataNode.getPort());
@@ -696,9 +722,9 @@ public void xidTest() throws Exception {
assertEquals(instanceCount, aheadInstanceClusterManager.getDataNodeIds().size());
}
- // Post-initialization InstanceConfig change:
+ // Post-initialization InstanceConfig change: pick an instance that is neither previous instance nor self instance
InstanceConfig ignoreInstanceConfig =
- instanceConfigs.get(com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceConfigs.size() - 1));
+ instanceConfigs.get(com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceCount - 2));
String ignoreInstanceName = ignoreInstanceConfig.getInstanceName();
ignoreInstanceConfig.getRecord().setSimpleField(XID_STR, Long.toString(CURRENT_XID + 2));
@@ -785,9 +811,11 @@ public void metricsTest() throws Exception {
// live instance trigger happens once initially.
long instanceTriggerCount = dcs.length;
- // Bring one instance down in each dc in order to test the metrics more generally.
+ // Bring one instance (not current instance) down in each dc in order to test the metrics more generally.
for (String zkAddr : helixCluster.getZkAddrs()) {
- helixCluster.bringInstanceDown(helixCluster.getUpInstances(zkAddr).get(0));
+ String instance =
+ helixCluster.getUpInstances(zkAddr).stream().filter(name -> !name.equals(selfInstanceName)).findFirst().get();
+ helixCluster.bringInstanceDown(instance);
instanceTriggerCount++;
}
@@ -981,11 +1009,10 @@ private void testDatacenterDatanodeReplicas() {
/**
* Assert that the state of datanodes in the cluster manager's view are consistent with their actual states in the
* cluster.
+ * @param expectedDownInstances the expected down instances set in cluster manager.
+ * @param expectedUpInstances the expected up instances set in cluster manager.
*/
- private void assertStateEquivalency() {
- Set upInstancesInCluster = helixCluster.getUpInstances();
- Set downInstancesInCluster = helixCluster.getDownInstances();
-
+ private void assertStateEquivalency(Set expectedDownInstances, Set expectedUpInstances) {
Set upInstancesInClusterManager = new HashSet<>();
Set downInstancesInClusterManager = new HashSet<>();
for (DataNodeId dataNode : clusterManager.getDataNodeIds()) {
@@ -997,8 +1024,8 @@ private void assertStateEquivalency() {
ClusterMapUtils.getInstanceName(dataNode.getHostname(), dataNode.getPort())));
}
}
- assertEquals(downInstancesInCluster, downInstancesInClusterManager);
- assertEquals(upInstancesInCluster, upInstancesInClusterManager);
+ assertEquals(expectedDownInstances, downInstancesInClusterManager);
+ assertEquals(expectedUpInstances, upInstancesInClusterManager);
Pair, Set> writablePartitionsInTwoPlaces = getWritablePartitions();
assertEquals(writablePartitionsInTwoPlaces.getFirst(), writablePartitionsInTwoPlaces.getSecond());
testAllPartitions();