Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support adding/removing store via AdminRequest #1259

Merged
merged 8 commits into from
Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ public interface ClusterMap extends AutoCloseable {
*/
JSONObject getSnapshot();

/**
* Attempt to get new replica of certain partition that resides on given data node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what a "new replica" is in this javadoc? i.e. what makes a "new" replica different from a regular replica.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually "new replica" that will be added to specified datanode. Currently, it is used in HelixClusterManager only because target node needs replica detailed info to create it. The replica info is stored in Helix PropertyStore. Before moving replica, we use Helix Bootstrap tool to upload new replica infos onto Helix PropertyStore, thus target node can fetch this info to create new replica. Let me know if you come up with a better name to disambiguate in this context. Thanks

* @param partitionIdStr the partition id string
* @param dataNodeId the {@link DataNodeId} on which new replica is placed
* @return {@link ReplicaId} if there is a new replica satisfying given partition and data node. {@code null} otherwise.
*/
ReplicaId getNewReplica(String partitionIdStr, DataNodeId dataNodeId);

/**
* Close the cluster map. Any cleanups should be done in this call.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ReplicationConfig {
/**
* The factory class the replication uses to create cloud token
*/
@Config("replication.cloudtoken.factory")
@Config("replication.cloud.token.factory")
@Default("com.github.ambry.cloud.CloudFindTokenFactory")
public final String replicationCloudTokenFactory;

Expand Down Expand Up @@ -134,15 +134,15 @@ public class ReplicationConfig {
/**
* The version of metadata request to be used for replication.
*/
@Config("replication.metadatarequest.version")
@Config("replication.metadata.request.version")
@Default("1")
public final short replicaMetadataRequestVersion;

public ReplicationConfig(VerifiableProperties verifiableProperties) {

replicationStoreTokenFactory =
verifiableProperties.getString("replication.token.factory", "com.github.ambry.store.StoreFindTokenFactory");
replicationCloudTokenFactory = verifiableProperties.getString("replication.cloudtoken.factory",
replicationCloudTokenFactory = verifiableProperties.getString("replication.cloud.token.factory",
"com.github.ambry.cloud.CloudFindTokenFactory");
replicationNumOfIntraDCReplicaThreads =
verifiableProperties.getInt("replication.no.of.intra.dc.replica.threads", 1);
Expand Down Expand Up @@ -172,6 +172,6 @@ public ReplicationConfig(VerifiableProperties verifiableProperties) {
replicationTrackPerPartitionLagFromRemote =
verifiableProperties.getBoolean("replication.track.per.partition.lag.from.remote", false);
replicaMetadataRequestVersion =
verifiableProperties.getShortInRange("replication.metadatarequest.version", (short) 1, (short) 1, (short) 2);
verifiableProperties.getShortInRange("replication.metadata.request.version", (short) 1, (short) 1, (short) 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public VcrRequests(StoreManager storeManager, RequestResponseChannel requestResp
NotificationSystem notification, ReplicationEngine replicationEngine, StoreKeyFactory storageKeyFactory,
boolean enableDataPrefetch, StoreKeyConverterFactory storeKeyConverterFactory) {
super(storeManager, requestResponseChannel, clusterMap, currentNode, registry, serverMetrics, findTokenHelper,
notification, replicationEngine, storageKeyFactory, enableDataPrefetch, storeKeyConverterFactory);
notification, replicationEngine, storageKeyFactory, enableDataPrefetch, storeKeyConverterFactory, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ public JSONObject getSnapshot() {
return staticClusterManager.getSnapshot();
}

@Override
public ReplicaId getNewReplica(String partitionIdStr, DataNodeId dataNodeId) {
return helixClusterManager.getNewReplica(partitionIdStr, dataNodeId);
}

@Override
public void close() {
staticClusterManager.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -87,6 +88,7 @@ class HelixClusterManager implements ClusterMap {
final HelixClusterManagerMetrics helixClusterManagerMetrics;
private final PartitionSelectionHelper partitionSelectionHelper;
private final Map<String, Map<String, String>> partitionOverrideInfoMap = new HashMap<>();
private ZkHelixPropertyStore<ZNRecord> helixPropertyStoreInLocalDc = null;
// The current xid currently does not change after instantiation. This can change in the future, allowing the cluster
// manager to dynamically incorporate newer changes in the cluster. This variable is atomic so that the gauge metric
// reflects the current value.
Expand Down Expand Up @@ -216,13 +218,13 @@ private HelixManager initializeHelixManagerAndPropertyStoreInLocalDC(Map<String,
DcZkInfo dcZkInfo = dataCenterToZkAddress.get(clusterMapConfig.clusterMapDatacenterName);
String zkConnectStr = dcZkInfo.getZkConnectStr();
HelixManager manager;
ZkHelixPropertyStore<ZNRecord> helixPropertyStore;
manager = helixFactory.getZKHelixManager(clusterName, instanceName, InstanceType.SPECTATOR, zkConnectStr);
logger.info("Connecting to Helix manager in local zookeeper at {}", zkConnectStr);
manager.connect();
logger.info("Established connection to Helix manager in local zookeeper at {}", zkConnectStr);
helixPropertyStore = manager.getHelixPropertyStore();
logger.info("HelixPropertyStore from local datacenter {} is: {}", dcZkInfo.getDcName(), helixPropertyStore);
helixPropertyStoreInLocalDc = manager.getHelixPropertyStore();
logger.info("HelixPropertyStore from local datacenter {} is: {}", dcZkInfo.getDcName(),
helixPropertyStoreInLocalDc);
IZkDataListener dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) {
Expand All @@ -235,10 +237,9 @@ public void handleDataDeleted(String dataPath) {
}
};
logger.info("Subscribing data listener to HelixPropertyStore.");
helixPropertyStore.subscribeDataChanges(ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH, dataListener);
logger.info("Getting ZNRecord from HelixPropertyStore");
ZNRecord zNRecord =
helixPropertyStore.get(ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH, null, AccessOption.PERSISTENT);
helixPropertyStoreInLocalDc.subscribeDataChanges(PARTITION_OVERRIDE_ZNODE_PATH, dataListener);
logger.info("Getting PartitionOverride ZNRecord from HelixPropertyStore");
ZNRecord zNRecord = helixPropertyStoreInLocalDc.get(PARTITION_OVERRIDE_ZNODE_PATH, null, AccessOption.PERSISTENT);
if (clusterMapConfig.clusterMapEnablePartitionOverride) {
if (zNRecord != null) {
partitionOverrideInfoMap.putAll(zNRecord.getMapFields());
Expand Down Expand Up @@ -381,6 +382,55 @@ public List<PartitionId> getAllPartitionIds(String partitionClass) {
return partitionSelectionHelper.getPartitions(partitionClass);
}

@Override
public ReplicaId getNewReplica(String partitionIdStr, DataNodeId dataNodeId) {
ReplicaId newReplica = null;
logger.info("Getting ReplicaAddition ZNRecord from HelixPropertyStore in local DC.");
ZNRecord zNRecord = helixPropertyStoreInLocalDc.get(REPLICA_ADDITION_ZNODE_PATH, null, AccessOption.PERSISTENT);
if (zNRecord != null) {
String instanceName = getInstanceName(dataNodeId.getHostname(), dataNodeId.getPort());
Map<String, Map<String, String>> partitionToReplicas = zNRecord.getMapFields();
Map<String, String> replicaInfos = partitionToReplicas.get(partitionIdStr);
if (replicaInfos != null && replicaInfos.containsKey(instanceName)) {
long replicaCapacity = Long.valueOf(replicaInfos.get(REPLICAS_CAPACITY_STR));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

String partitionClass = replicaInfos.get(PARTITION_CLASS_STR);
AmbryPartition mappedPartition = partitionNameToAmbryPartition.get(partitionIdStr);
if (mappedPartition == null) {
logger.info("Partition {} is currently not present in cluster map, creating a new partition.",
partitionIdStr);
mappedPartition =
new AmbryPartition(Long.valueOf(partitionIdStr), partitionClass, helixClusterManagerCallback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intellij suggests using Long.parseLong here to avoid unnecessary boxing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
// Check if data node or disk is in current cluster map, if not, set newReplica to null.
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(instanceName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who calls getNewReplica? Can it be called for a node when the partition doesnt even exist on the node as per clustermap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline. Currently AmbryRequests calls this method. Eventually, state model of each partition will call this method.

String mountPathFromHelix = replicaInfos.get(instanceName);
Set<AmbryDisk> disks = dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also do

        Optional<AmbryDisk> potentialDisk =
            Optional.ofNullable(dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null)
                .flatMap(disks -> disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny());

Optional<AmbryDisk> potentialDisk =
disks != null ? disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so when someone creates a new partition, he/she will ensure that the mount path exists on the node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SRE is supposed to guarantee new added replica has valid mount path otherwise the creation should fail.

: Optional.empty();
if (dataNode != null && potentialDisk.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the dataNode != null check since you check that on line 407?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, fair point.

try {
newReplica =
new AmbryReplica(clusterMapConfig, mappedPartition, potentialDisk.get(), true, replicaCapacity, false);
} catch (Exception e) {
logger.error("Failed to create new replica for partition {} on {} due to exception: ", partitionIdStr,
instanceName, e);
newReplica = null;
}
} else {
logger.error(
"Either datanode or disk that associated with new replica is not found in cluster map. Cannot create new replica.");
}
} else {
logger.warn("Partition {} or replica on host {} is not found in replica info map", partitionIdStr,
instanceName);
}
} else {
logger.warn("ZNRecord from HelixPropertyStore is NULL, partition to replicaInfo map doesn't exist.");
}
return newReplica;
}

/**
* Disconnect from the HelixManagers associated with each and every datacenter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,12 @@ public JSONObject getSnapshot() {
return snapshot;
}

@Override
public ReplicaId getNewReplica(String partitionIdStr, DataNodeId dataNodeId) {
throw new UnsupportedOperationException(
"Adding new replica is currently not supported in static cluster manager. Return null here");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return null like the exception message says?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the exception message. Feel like return null is unneeded.

}

@Override
public void close() {
// No-op.
Expand Down
Loading