Skip to content

Commit

Permalink
Support adding/removing store via AdminRequest (#1259)
Browse files Browse the repository at this point in the history
Add/Remove replica via ServerAdminTool
  • Loading branch information
jsjtzyy authored and zzmao committed Oct 14, 2019
1 parent 1dcfdcf commit b22b487
Show file tree
Hide file tree
Showing 43 changed files with 1,759 additions and 891 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ public interface ClusterMap extends AutoCloseable {
*/
JSONObject getSnapshot();

/**
* Attempt to get a bootstrap replica of certain partition that is supposed to be added onto specified data node.
* This method is designed to fetch detailed infos about bootstrap replica and create an instance of this replica. The
* purpose is to support dynamically adding new replica to specified data node.
* @param partitionIdStr the partition id string
* @param dataNodeId the {@link DataNodeId} on which bootstrap replica is placed
* @return {@link ReplicaId} if there is a new replica satisfying given partition and data node. {@code null} otherwise.
*/
ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeId);

/**
* Close the cluster map. Any cleanups should be done in this call.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ReplicationConfig {
/**
* The factory class the replication uses to create cloud token
*/
@Config("replication.cloudtoken.factory")
@Config("replication.cloud.token.factory")
@Default("com.github.ambry.cloud.CloudFindTokenFactory")
public final String replicationCloudTokenFactory;

Expand Down Expand Up @@ -134,15 +134,15 @@ public class ReplicationConfig {
/**
* The version of metadata request to be used for replication.
*/
@Config("replication.metadatarequest.version")
@Config("replication.metadata.request.version")
@Default("1")
public final short replicaMetadataRequestVersion;

public ReplicationConfig(VerifiableProperties verifiableProperties) {

replicationStoreTokenFactory =
verifiableProperties.getString("replication.token.factory", "com.github.ambry.store.StoreFindTokenFactory");
replicationCloudTokenFactory = verifiableProperties.getString("replication.cloudtoken.factory",
replicationCloudTokenFactory = verifiableProperties.getString("replication.cloud.token.factory",
"com.github.ambry.cloud.CloudFindTokenFactory");
replicationNumOfIntraDCReplicaThreads =
verifiableProperties.getInt("replication.no.of.intra.dc.replica.threads", 1);
Expand Down Expand Up @@ -172,6 +172,6 @@ public ReplicationConfig(VerifiableProperties verifiableProperties) {
replicationTrackPerPartitionLagFromRemote =
verifiableProperties.getBoolean("replication.track.per.partition.lag.from.remote", false);
replicaMetadataRequestVersion =
verifiableProperties.getShortInRange("replication.metadatarequest.version", (short) 1, (short) 1, (short) 2);
verifiableProperties.getShortInRange("replication.metadata.request.version", (short) 1, (short) 1, (short) 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,7 +50,7 @@ public class CloudTokenPersistor extends ReplicaTokenPersistor {
* @param clusterMap the {@link ClusterMap} to deserialize tokens.
* @param tokenHelper the {@link FindTokenHelper} to deserialize tokens.
*/
public CloudTokenPersistor(String replicaTokenFileName, Map<String, List<PartitionInfo>> partitionGroupedByMountPath,
public CloudTokenPersistor(String replicaTokenFileName, Map<String, Set<PartitionInfo>> partitionGroupedByMountPath,
ReplicationMetrics replicationMetrics, ClusterMap clusterMap, FindTokenHelper tokenHelper,
CloudDestination cloudDestination) {
super(partitionGroupedByMountPath, replicationMetrics, clusterMap, tokenHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.helix.InstanceType;
Expand Down Expand Up @@ -173,12 +174,9 @@ void addReplica(PartitionId partitionId) throws ReplicationException {
}
PartitionInfo partitionInfo = new PartitionInfo(remoteReplicaInfos, partitionId, store, cloudReplica);
partitionToPartitionInfo.put(partitionId, partitionInfo);
mountPathToPartitionInfos.compute(cloudReplica.getMountPath(), (key, value) -> {
// For CloudBackupManager, at most one PartitionInfo in the list.
List<PartitionInfo> retList = (value == null) ? new ArrayList<>() : value;
retList.add(partitionInfo);
return retList;
});
// For CloudBackupManager, at most one PartitionInfo in the set.
mountPathToPartitionInfos.computeIfAbsent(cloudReplica.getMountPath(), key -> ConcurrentHashMap.newKeySet())
.add(partitionInfo);
partitionStoreMap.put(partitionId.toPathString(), store);
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public VcrRequests(StoreManager storeManager, RequestResponseChannel requestResp
NotificationSystem notification, ReplicationEngine replicationEngine, StoreKeyFactory storageKeyFactory,
boolean enableDataPrefetch, StoreKeyConverterFactory storeKeyConverterFactory) {
super(storeManager, requestResponseChannel, clusterMap, currentNode, registry, serverMetrics, findTokenHelper,
notification, replicationEngine, storageKeyFactory, enableDataPrefetch, storeKeyConverterFactory);
notification, replicationEngine, storageKeyFactory, enableDataPrefetch, storeKeyConverterFactory, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -52,7 +54,7 @@ public void basicTest() throws Exception {
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
ClusterMap clusterMap = new MockClusterMap();
DataNodeId dataNodeId = new CloudDataNode(cloudConfig, clusterMapConfig);
Map<String, List<PartitionInfo>> mountPathToPartitionInfoList = new HashMap<>();
Map<String, Set<PartitionInfo>> mountPathToPartitionInfoList = new HashMap<>();
BlobIdFactory blobIdFactory = new BlobIdFactory(clusterMap);
StoreFindTokenFactory factory = new StoreFindTokenFactory(blobIdFactory);
PartitionId partitionId = clusterMap.getAllPartitionIds(null).get(0);
Expand All @@ -70,7 +72,7 @@ public void basicTest() throws Exception {
replicaTokenInfos.add(new RemoteReplicaInfo.ReplicaTokenInfo(remoteReplicaInfo));
}
PartitionInfo partitionInfo = new PartitionInfo(remoteReplicas, partitionId, null, cloudReplicaId);
mountPathToPartitionInfoList.computeIfAbsent(cloudReplicaId.getMountPath(), key -> new ArrayList<>())
mountPathToPartitionInfoList.computeIfAbsent(cloudReplicaId.getMountPath(), key -> ConcurrentHashMap.newKeySet())
.add(partitionInfo);

LatchBasedInMemoryCloudDestination cloudDestination =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ public JSONObject getSnapshot() {
return staticClusterManager.getSnapshot();
}

@Override
public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeId) {
return helixClusterManager.getBootstrapReplica(partitionIdStr, dataNodeId);
}

@Override
public void close() {
staticClusterManager.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -87,6 +88,7 @@ class HelixClusterManager implements ClusterMap {
final HelixClusterManagerMetrics helixClusterManagerMetrics;
private final PartitionSelectionHelper partitionSelectionHelper;
private final Map<String, Map<String, String>> partitionOverrideInfoMap = new HashMap<>();
private ZkHelixPropertyStore<ZNRecord> helixPropertyStoreInLocalDc = null;
// The current xid currently does not change after instantiation. This can change in the future, allowing the cluster
// manager to dynamically incorporate newer changes in the cluster. This variable is atomic so that the gauge metric
// reflects the current value.
Expand Down Expand Up @@ -216,13 +218,13 @@ private HelixManager initializeHelixManagerAndPropertyStoreInLocalDC(Map<String,
DcZkInfo dcZkInfo = dataCenterToZkAddress.get(clusterMapConfig.clusterMapDatacenterName);
String zkConnectStr = dcZkInfo.getZkConnectStr();
HelixManager manager;
ZkHelixPropertyStore<ZNRecord> helixPropertyStore;
manager = helixFactory.getZKHelixManager(clusterName, instanceName, InstanceType.SPECTATOR, zkConnectStr);
logger.info("Connecting to Helix manager in local zookeeper at {}", zkConnectStr);
manager.connect();
logger.info("Established connection to Helix manager in local zookeeper at {}", zkConnectStr);
helixPropertyStore = manager.getHelixPropertyStore();
logger.info("HelixPropertyStore from local datacenter {} is: {}", dcZkInfo.getDcName(), helixPropertyStore);
helixPropertyStoreInLocalDc = manager.getHelixPropertyStore();
logger.info("HelixPropertyStore from local datacenter {} is: {}", dcZkInfo.getDcName(),
helixPropertyStoreInLocalDc);
IZkDataListener dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) {
Expand All @@ -235,10 +237,9 @@ public void handleDataDeleted(String dataPath) {
}
};
logger.info("Subscribing data listener to HelixPropertyStore.");
helixPropertyStore.subscribeDataChanges(ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH, dataListener);
logger.info("Getting ZNRecord from HelixPropertyStore");
ZNRecord zNRecord =
helixPropertyStore.get(ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH, null, AccessOption.PERSISTENT);
helixPropertyStoreInLocalDc.subscribeDataChanges(PARTITION_OVERRIDE_ZNODE_PATH, dataListener);
logger.info("Getting PartitionOverride ZNRecord from HelixPropertyStore");
ZNRecord zNRecord = helixPropertyStoreInLocalDc.get(PARTITION_OVERRIDE_ZNODE_PATH, null, AccessOption.PERSISTENT);
if (clusterMapConfig.clusterMapEnablePartitionOverride) {
if (zNRecord != null) {
partitionOverrideInfoMap.putAll(zNRecord.getMapFields());
Expand Down Expand Up @@ -381,6 +382,77 @@ public List<PartitionId> getAllPartitionIds(String partitionClass) {
return partitionSelectionHelper.getPartitions(partitionClass);
}

/**
* {@inheritDoc}
* To create bootstrap replica, {@link HelixClusterManager} needs to fetch replica info (i.e. capacity, mount path)
* from Helix PropertyStore. This method looks up the ZNode in local datacenter and does some validation. Right now,
* {@link HelixClusterManager} supports getting bootstrap replica of new partition but it doesn't support getting replica
* residing on hosts that are not present in clustermap.
* The ZNRecord of REPLICA_ADDITION_ZNODE has following format in mapFields.
* <pre>
* "mapFields": {
* "1": {
* "replicaCapacityInBytes": 107374182400,
* "partitionClass": "max-replicas-all-datacenters",
* "localhost1_17088": "/tmp/c/1",
* "localhost2_17088": "/tmp/d/1"
* },
* "2": {
* "replicaCapacityInBytes": 107374182400,
* "partitionClass": "max-replicas-all-datacenters",
* "localhost3_17088": "/tmp/e/1"
* }
* }
* </pre>
* In above example, two bootstrap replicas of partition[1] will be added to localhost1 and localhost2 respectively.
* The host name is followed by mount path on which the bootstrap replica should be placed.
*/
@Override
public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeId) {
ReplicaId bootstrapReplica = null;
logger.info("Getting ReplicaAddition ZNRecord from HelixPropertyStore in local DC.");
ZNRecord zNRecord = helixPropertyStoreInLocalDc.get(REPLICA_ADDITION_ZNODE_PATH, null, AccessOption.PERSISTENT);
if (zNRecord == null) {
logger.warn("ZNRecord from HelixPropertyStore is NULL, partition to replicaInfo map doesn't exist.");
return null;
}
String instanceName = getInstanceName(dataNodeId.getHostname(), dataNodeId.getPort());
Map<String, Map<String, String>> partitionToReplicas = zNRecord.getMapFields();
Map<String, String> replicaInfos = partitionToReplicas.get(partitionIdStr);
if (replicaInfos == null || !replicaInfos.containsKey(instanceName)) {
logger.warn("Partition {} or replica on host {} is not found in replica info map", partitionIdStr, instanceName);
return null;
}
long replicaCapacity = Long.parseLong(replicaInfos.get(REPLICAS_CAPACITY_STR));
String partitionClass = replicaInfos.get(PARTITION_CLASS_STR);
AmbryPartition mappedPartition = partitionNameToAmbryPartition.get(partitionIdStr);
if (mappedPartition == null) {
logger.info("Partition {} is currently not present in cluster map, creating a new partition.", partitionIdStr);
mappedPartition = new AmbryPartition(Long.parseLong(partitionIdStr), partitionClass, helixClusterManagerCallback);
}
// Check if data node or disk is in current cluster map, if not, set bootstrapReplica to null.
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(instanceName);
String mountPathFromHelix = replicaInfos.get(instanceName);
Set<AmbryDisk> disks = dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null;
Optional<AmbryDisk> potentialDisk =
disks != null ? disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny()
: Optional.empty();
if (potentialDisk.isPresent()) {
try {
bootstrapReplica =
new AmbryReplica(clusterMapConfig, mappedPartition, potentialDisk.get(), true, replicaCapacity, false);
} catch (Exception e) {
logger.error("Failed to create bootstrap replica for partition {} on {} due to exception: ", partitionIdStr,
instanceName, e);
bootstrapReplica = null;
}
} else {
logger.error(
"Either datanode or disk that associated with bootstrap replica is not found in cluster map. Cannot create the replica.");
}
return bootstrapReplica;
}

/**
* Disconnect from the HelixManagers associated with each and every datacenter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,12 @@ public JSONObject getSnapshot() {
return snapshot;
}

@Override
public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeId) {
throw new UnsupportedOperationException(
"Adding new replica is currently not supported in static cluster manager.");
}

@Override
public void close() {
// No-op.
Expand Down
Loading

0 comments on commit b22b487

Please sign in to comment.