-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support adding/removing store via AdminRequest #1259
Changes from 2 commits
07d5ac1
927e0e5
dafd89e
d74c737
da18827
b7eba21
9423783
aaa32c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.CountDownLatch; | ||
|
@@ -87,6 +88,7 @@ class HelixClusterManager implements ClusterMap { | |
final HelixClusterManagerMetrics helixClusterManagerMetrics; | ||
private final PartitionSelectionHelper partitionSelectionHelper; | ||
private final Map<String, Map<String, String>> partitionOverrideInfoMap = new HashMap<>(); | ||
private ZkHelixPropertyStore<ZNRecord> helixPropertyStoreInLocalDc = null; | ||
// The current xid currently does not change after instantiation. This can change in the future, allowing the cluster | ||
// manager to dynamically incorporate newer changes in the cluster. This variable is atomic so that the gauge metric | ||
// reflects the current value. | ||
|
@@ -216,13 +218,13 @@ private HelixManager initializeHelixManagerAndPropertyStoreInLocalDC(Map<String, | |
DcZkInfo dcZkInfo = dataCenterToZkAddress.get(clusterMapConfig.clusterMapDatacenterName); | ||
String zkConnectStr = dcZkInfo.getZkConnectStr(); | ||
HelixManager manager; | ||
ZkHelixPropertyStore<ZNRecord> helixPropertyStore; | ||
manager = helixFactory.getZKHelixManager(clusterName, instanceName, InstanceType.SPECTATOR, zkConnectStr); | ||
logger.info("Connecting to Helix manager in local zookeeper at {}", zkConnectStr); | ||
manager.connect(); | ||
logger.info("Established connection to Helix manager in local zookeeper at {}", zkConnectStr); | ||
helixPropertyStore = manager.getHelixPropertyStore(); | ||
logger.info("HelixPropertyStore from local datacenter {} is: {}", dcZkInfo.getDcName(), helixPropertyStore); | ||
helixPropertyStoreInLocalDc = manager.getHelixPropertyStore(); | ||
logger.info("HelixPropertyStore from local datacenter {} is: {}", dcZkInfo.getDcName(), | ||
helixPropertyStoreInLocalDc); | ||
IZkDataListener dataListener = new IZkDataListener() { | ||
@Override | ||
public void handleDataChange(String dataPath, Object data) { | ||
|
@@ -235,10 +237,9 @@ public void handleDataDeleted(String dataPath) { | |
} | ||
}; | ||
logger.info("Subscribing data listener to HelixPropertyStore."); | ||
helixPropertyStore.subscribeDataChanges(ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH, dataListener); | ||
logger.info("Getting ZNRecord from HelixPropertyStore"); | ||
ZNRecord zNRecord = | ||
helixPropertyStore.get(ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH, null, AccessOption.PERSISTENT); | ||
helixPropertyStoreInLocalDc.subscribeDataChanges(PARTITION_OVERRIDE_ZNODE_PATH, dataListener); | ||
logger.info("Getting PartitionOverride ZNRecord from HelixPropertyStore"); | ||
ZNRecord zNRecord = helixPropertyStoreInLocalDc.get(PARTITION_OVERRIDE_ZNODE_PATH, null, AccessOption.PERSISTENT); | ||
if (clusterMapConfig.clusterMapEnablePartitionOverride) { | ||
if (zNRecord != null) { | ||
partitionOverrideInfoMap.putAll(zNRecord.getMapFields()); | ||
|
@@ -381,6 +382,55 @@ public List<PartitionId> getAllPartitionIds(String partitionClass) { | |
return partitionSelectionHelper.getPartitions(partitionClass); | ||
} | ||
|
||
@Override | ||
public ReplicaId getNewReplica(String partitionIdStr, DataNodeId dataNodeId) { | ||
ReplicaId newReplica = null; | ||
logger.info("Getting ReplicaAddition ZNRecord from HelixPropertyStore in local DC."); | ||
ZNRecord zNRecord = helixPropertyStoreInLocalDc.get(REPLICA_ADDITION_ZNODE_PATH, null, AccessOption.PERSISTENT); | ||
if (zNRecord != null) { | ||
String instanceName = getInstanceName(dataNodeId.getHostname(), dataNodeId.getPort()); | ||
Map<String, Map<String, String>> partitionToReplicas = zNRecord.getMapFields(); | ||
Map<String, String> replicaInfos = partitionToReplicas.get(partitionIdStr); | ||
if (replicaInfos != null && replicaInfos.containsKey(instanceName)) { | ||
long replicaCapacity = Long.valueOf(replicaInfos.get(REPLICAS_CAPACITY_STR)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
String partitionClass = replicaInfos.get(PARTITION_CLASS_STR); | ||
AmbryPartition mappedPartition = partitionNameToAmbryPartition.get(partitionIdStr); | ||
if (mappedPartition == null) { | ||
logger.info("Partition {} is currently not present in cluster map, creating a new partition.", | ||
partitionIdStr); | ||
mappedPartition = | ||
new AmbryPartition(Long.valueOf(partitionIdStr), partitionClass, helixClusterManagerCallback); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. intellij suggests using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
} | ||
// Check if data node or disk is in current cluster map, if not, set newReplica to null. | ||
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(instanceName); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Who calls getNewReplica? Can it be called for a node when the partition doesnt even exist on the node as per clustermap? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed this offline. Currently |
||
String mountPathFromHelix = replicaInfos.get(instanceName); | ||
Set<AmbryDisk> disks = dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could also do Optional<AmbryDisk> potentialDisk =
Optional.ofNullable(dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null)
.flatMap(disks -> disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny()); |
||
Optional<AmbryDisk> potentialDisk = | ||
disks != null ? disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so when someone creates a new partition, he/she will ensure that the mount path exists on the node? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SRE is supposed to guarantee new added replica has valid mount path otherwise the creation should fail. |
||
: Optional.empty(); | ||
if (dataNode != null && potentialDisk.isPresent()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you remove the dataNode != null check since you check that on line 407? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, fair point. |
||
try { | ||
newReplica = | ||
new AmbryReplica(clusterMapConfig, mappedPartition, potentialDisk.get(), true, replicaCapacity, false); | ||
} catch (Exception e) { | ||
logger.error("Failed to create new replica for partition {} on {} due to exception: ", partitionIdStr, | ||
instanceName, e); | ||
newReplica = null; | ||
} | ||
} else { | ||
logger.error( | ||
"Either datanode or disk that associated with new replica is not found in cluster map. Cannot create new replica."); | ||
} | ||
} else { | ||
logger.warn("Partition {} or replica on host {} is not found in replica info map", partitionIdStr, | ||
instanceName); | ||
} | ||
} else { | ||
logger.warn("ZNRecord from HelixPropertyStore is NULL, partition to replicaInfo map doesn't exist."); | ||
} | ||
return newReplica; | ||
} | ||
|
||
/** | ||
* Disconnect from the HelixManagers associated with each and every datacenter. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -533,6 +533,12 @@ public JSONObject getSnapshot() { | |
return snapshot; | ||
} | ||
|
||
@Override | ||
public ReplicaId getNewReplica(String partitionIdStr, DataNodeId dataNodeId) { | ||
throw new UnsupportedOperationException( | ||
"Adding new replica is currently not supported in static cluster manager. Return null here"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this return null like the exception message says? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated the exception message. Feel like |
||
} | ||
|
||
@Override | ||
public void close() { | ||
// No-op. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what a "new replica" is in this javadoc? i.e. what makes a "new" replica different from a regular replica.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually "new replica" that will be added to specified datanode. Currently, it is used in HelixClusterManager only because target node needs replica detailed info to create it. The replica info is stored in Helix PropertyStore. Before moving replica, we use Helix Bootstrap tool to upload new replica infos onto Helix PropertyStore, thus target node can fetch this info to create new replica. Let me know if you come up with a better name to disambiguate in this context. Thanks