Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support adding/removing store via AdminRequest #1259

Merged
merged 8 commits into from
Oct 14, 2019

Conversation

jsjtzyy
Copy link
Contributor

@jsjtzyy jsjtzyy commented Sep 9, 2019

  1. Refactored store control admin request to support adding/removing store.
  2. Changes made in replication manager to support adding/removing replica
  3. Added method in clustermap that allows caller to get detailed info of new replica from Helix PropertyStore.
  4. Changed removeBlobStore in StorageManager to return the store if it is successfully removed.
  5. Moved MockStorageManager to a separate file which can be re-used by other tests.

@jsjtzyy
Copy link
Contributor Author

jsjtzyy commented Sep 9, 2019

Still working on unit tests and integration tests. Also this PR partially depends on #1249

@jsjtzyy jsjtzyy self-assigned this Sep 9, 2019
@codecov-io
Copy link

codecov-io commented Sep 9, 2019

Codecov Report

Merging #1259 into master will increase coverage by 0.05%.
The diff coverage is 88.54%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1259      +/-   ##
============================================
+ Coverage     72.09%   72.14%   +0.05%     
- Complexity     6232     6258      +26     
============================================
  Files           449      450       +1     
  Lines         35744    35888     +144     
  Branches       4540     4552      +12     
============================================
+ Hits          25769    25893     +124     
- Misses         8796     8809      +13     
- Partials       1179     1186       +7
Impacted Files Coverage Δ Complexity Δ
...va/com.github.ambry.cloud/CloudTokenPersistor.java 81.81% <ø> (ø) 5 <0> (ø) ⬇️
.../main/java/com.github.ambry.cloud/VcrRequests.java 100% <ø> (ø) 10 <0> (ø) ⬇️
...om.github.ambry.replication/ReplicationEngine.java 82.2% <ø> (ø) 38 <0> (ø) ⬇️
...rc/main/java/com.github.ambry.store/BlobStore.java 90.23% <ø> (ø) 97 <0> (ø) ⬇️
...ithub.ambry.replication/ReplicaTokenPersistor.java 86.2% <ø> (ø) 10 <0> (ø) ⬇️
...m.github.ambry.replication/DiskTokenPersistor.java 77.27% <ø> (ø) 5 <0> (ø) ⬇️
....github.ambry.clustermap/StaticClusterManager.java 71.31% <0%> (+1.35%) 83 <0> (+1) ⬆️
.../com.github.ambry/tools/admin/ServerAdminTool.java 0% <0%> (ø) 0 <0> (ø) ⬇️
...thub.ambry.clustermap/CompositeClusterManager.java 61.7% <0%> (-0.45%) 22 <0> (ø)
....github.ambry.protocol/BlobStoreControlAction.java 100% <100%> (ø) 1 <1> (?)
... and 27 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a154380...aaa32c4. Read the comment docs.

@jsjtzyy jsjtzyy force-pushed the control-store-by-tool branch from c8a8b5f to 3ccfb32 Compare September 12, 2019 05:32
@jsjtzyy jsjtzyy marked this pull request as ready for review September 12, 2019 22:32
@jsjtzyy jsjtzyy requested review from ankagrawal and cgtz September 12, 2019 22:32
@jsjtzyy
Copy link
Contributor Author

jsjtzyy commented Sep 12, 2019

I may add more tests into this PR but the production code is ready for review.

@jsjtzyy
Copy link
Contributor Author

jsjtzyy commented Sep 16, 2019

This PR contains lots of changes and refactoring in testing. Reviewers can start with production code first. (Now the PR is completely ready for review.)

@jsjtzyy jsjtzyy requested a review from zzmao September 20, 2019 20:57
* @param replicaId the replica to remove
* @return {@code true} if replica is successfully removed. {@code false} otherwise
*/
public boolean removeReplica(ReplicaId replicaId) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here same method signature is intentionally adopted for this PR and #1262, which allows us to put an abstract method ReplicationEngine. Both regular ReplicationManager and VcrReplicationManager.

@jsjtzyy jsjtzyy force-pushed the control-store-by-tool branch from 7ffbd3a to a8588df Compare September 24, 2019 20:32
@zzmao
Copy link
Contributor

zzmao commented Sep 25, 2019

Have some difficulties to understand populateRemoteReplicaAndPartitionInfos and getNewReplica. Need to talk offline.

@jsjtzyy jsjtzyy force-pushed the control-store-by-tool branch from a8588df to 6d88f02 Compare September 25, 2019 16:54
Copy link
Contributor

@zzmao zzmao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments left. Most of the logic LGTM!

@@ -122,6 +122,15 @@
*/
JSONObject getSnapshot();

/**
* Get new replica of certain partition that resides on given host.
Copy link
Contributor

@zzmao zzmao Sep 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that one host have multiple new replicas? If not, can you add some comments regarding the assumption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that one host has several new replicas but they should come from different partitions. In this method, neither argument partitionIdStr nor return value ReplicaId is collection, because the method is supposed to be called by state model where each partition get its new replica separately. I will add some comments for this method.

/**
* Get new replica of certain partition that resides on given host.
* @param partitionIdStr the partition id string
* @param hostname the host on which replica should reside.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentNode.hostname and currentNode.port are passed in, why not use DataNode as argument?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. I will figure out a way to pass in DataNodeId

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you decide to do this, then you can use dataNodeId::compareTo methods in places you are comparing hostname and port combination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, thanks for letting me know.

controlRequestType =
stream.readByte() == 1 ? BlobStoreControlRequestType.StartStore : BlobStoreControlRequestType.StopStore;
break;
case VERSION_V2:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is VERSION_V2 needed? Look like BlobStoreControlRequestType.values()[stream.readByte()] can fully cover VERSION_V1.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case will controlRequestType for VERSION_V1 be not 1? What are the other valid values it can have?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you dont need different cases for two versions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zzmao @ankagrawal for your inputs here. You are right, no need to keep version here. I will simplify this piece of code.

@@ -533,6 +533,12 @@ public JSONObject getSnapshot() {
return snapshot;
}

@Override
public ReplicaId getNewReplica(String partitionIdStr, String hostname, Integer port) {
logger.warn("Adding new replica is currently not supported in static cluster manager. Return null here");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw UnsupportedException?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @param replicaId the local replica
* @return list of {@link RemoteReplicaInfo} associated with local replica.
*/
private List<RemoteReplicaInfo> populateRemoteReplicaAndPartitionInfos(List<? extends ReplicaId> peerReplicas,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to split this function to two: createRemoteReplicaInfos() and updatePartitionInfoMaps

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -231,12 +231,13 @@ void shutdown() throws InterruptedException {

/**
* @param id the {@link PartitionId} to find the store for.
* @param skipStateCheck whether to skip checking state of store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce skipStateCheck makes the interface complex and hard to understand. Can we alway return store and check isStarted() in the caller?

if (peerReplicas != null) {
remoteReplicaInfos = populateRemoteReplicaAndPartitionInfos(peerReplicas, replicaId);
}
logger.info("Assigning thread for {}", replicaId.getPartitionId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the logic below under if (peerReplicas != null) {?

List<RemoteReplicaInfo> remoteReplicaInfos = partitionInfo.getRemoteReplicaInfos();
logger.info("Removing remote replicas of {} from replica threads", replicaId.getPartitionId());
removeRemoteReplicaInfoFromReplicaThread(remoteReplicaInfos);
mountPathToPartitionInfos.get(replicaId.getMountPath()).remove(partitionInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computeIfPresent is atomic for concurrent map. This can be used to avoid heavy copyOnWriteArrayList

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. Let me remove copyOnWriteArrayList

Copy link
Collaborator

@ankagrawal ankagrawal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial comments. Will continue to review.

@@ -533,6 +533,12 @@ public JSONObject getSnapshot() {
return snapshot;
}

@Override
public ReplicaId getNewReplica(String partitionIdStr, String hostname, Integer port) {
logger.warn("Adding new replica is currently not supported in static cluster manager. Return null here");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+!

for (PartitionId partitionId : partitions.values()) {
List<? extends ReplicaId> replicaIds = partitionId.getReplicaIds();
for (ReplicaId replicaId : replicaIds) {
if (replicaId.getDataNodeId().getHostname().compareTo(dataNodeId.getHostname()) == 0
if (replicaId.getDataNodeId().getHostname().equals(dataNodeId.getHostname())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My latest commit has changed the way compareTo is implemented for datanode. You can use replicaId.getDataNodeId().compareTo(dataNodeId.getHostName()) == 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, will take your advice.

/**
* Get new replica of certain partition that resides on given host.
* @param partitionIdStr the partition id string
* @param hostname the host on which replica should reside.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you decide to do this, then you can use dataNodeId::compareTo methods in places you are comparing hostname and port combination.

@Override
public ReplicaId getNewReplica(String partitionIdStr, String hostname, Integer port) {
ReplicaId newReplica = null;
for (Map.Entry<Long, PartitionId> entry : partitions.entrySet()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simplify this by converting partitionIdStr to long partitionId and then do partitions.get(partitionId).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, changes are made as you suggested

controlRequestType =
stream.readByte() == 1 ? BlobStoreControlRequestType.StartStore : BlobStoreControlRequestType.StopStore;
break;
case VERSION_V2:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case will controlRequestType for VERSION_V1 be not 1? What are the other valid values it can have?

}

public BlobStoreControlAdminRequest(short numReplicasCaughtUpPerPartition, boolean enable,
AdminRequest adminRequest) {
public BlobStoreControlAdminRequest(short numReplicasCaughtUpPerPartition,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc for this constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

controlRequestType =
stream.readByte() == 1 ? BlobStoreControlRequestType.StartStore : BlobStoreControlRequestType.StopStore;
break;
case VERSION_V2:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you dont need different cases for two versions.

* @return the associated {@link Store}, or {@code null} if the partition is not on this disk, or the store is not
* started.
*/
Store getStore(PartitionId id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are lot of "getStore(PartitionId, false)" calls throughout the code. I would suggest overload getStore as getStore(PArtitionId) and getStore(PartitionId, boolean). And then getStore(PartitionId) just calls getStore(PartitionId, false);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I made the change as you suggested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is getStore(PartitionId, false) really needed? Can we add extra logic to check after getStore(PartitionId)?

Less interface is better than more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point here. I temporarily don't want to change current logic in most cases where getStore(partitionId) is called. However, I do need to get the store that is already shutdown when removing replica. Note that, remove replica always happens after stop replica. Hence, this method allows us to get a "stopped" store.

@jsjtzyy jsjtzyy force-pushed the control-store-by-tool branch 2 times, most recently from 4161813 to 10a5ef7 Compare September 27, 2019 19:01
@jsjtzyy
Copy link
Contributor Author

jsjtzyy commented Oct 8, 2019

Gentle reminder to review. Thanks in advance.

@jsjtzyy jsjtzyy force-pushed the control-store-by-tool branch from 68517e2 to 927e0e5 Compare October 9, 2019 05:12
Copy link
Collaborator

@ankagrawal ankagrawal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few minor concerns. Lets quickly discuss.

new AmbryPartition(Long.valueOf(partitionIdStr), partitionClass, helixClusterManagerCallback);
}
// Check if data node or disk is in current cluster map, if not, set newReplica to null.
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(instanceName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who calls getNewReplica? Can it be called for a node when the partition doesnt even exist on the node as per clustermap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline. Currently AmbryRequests calls this method. Eventually, state model of each partition will call this method.

String mountPathFromHelix = replicaInfos.get(instanceName);
Set<AmbryDisk> disks = dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null;
Optional<AmbryDisk> potentialDisk =
disks != null ? disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so when someone creates a new partition, he/she will ensure that the mount path exists on the node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SRE is supposed to guarantee new added replica has valid mount path otherwise the creation should fail.

v.remove(partitionInfo);
return v;
});
//mountPathToPartitionInfos.get(replicaId.getMountPath()).remove(partitionInfo);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this commented line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (!partitionToPartitionInfo.containsKey(replicaId.getPartitionId())) {
logger.error("{} doesn't exist in replication manager, skipping removing replica request.",
replicaId.getPartitionId());
return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the add/remove replica code cause thread safety concerns ? Let's discuss! There are some subtle differences between ConcurrentMap and ConcurrentHashMap and we should try to explicitly use ConcurrentHashMap is we want certain guarantees.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I decided to add ReadWriteLock in replication manager and disk manager.

Copy link
Contributor

@cgtz cgtz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing to review

@@ -122,6 +122,14 @@
*/
JSONObject getSnapshot();

/**
* Attempt to get new replica of certain partition that resides on given data node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what a "new replica" is in this javadoc? i.e. what makes a "new" replica different from a regular replica.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually "new replica" that will be added to specified datanode. Currently, it is used in HelixClusterManager only because target node needs replica detailed info to create it. The replica info is stored in Helix PropertyStore. Before moving replica, we use Helix Bootstrap tool to upload new replica infos onto Helix PropertyStore, thus target node can fetch this info to create new replica. Let me know if you come up with a better name to disambiguate in this context. Thanks

Optional<AmbryDisk> potentialDisk =
disks != null ? disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny()
: Optional.empty();
if (dataNode != null && potentialDisk.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the dataNode != null check since you check that on line 407?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, fair point.

@Override
public ReplicaId getNewReplica(String partitionIdStr, DataNodeId dataNodeId) {
throw new UnsupportedOperationException(
"Adding new replica is currently not supported in static cluster manager. Return null here");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return null like the exception message says?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the exception message. Feel like return null is unneeded.

throw new IllegalStateException("Unrecognized version for BlobStoreControlAdminRequest: " + versionId);
}
// read the version
stream.readShort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, are we just erasing all remnants of V1 of this request? I guess thats ok since this is an admin operation that isn't called by the router in a live environment. However, you should probably add some notes that say why v1 is not present?

Do you still want to have the check that the version number matches the expected value (2) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like maybe we can still use V1 for this request. Current change doesn't really modify serialize/deserialize logic of BlobStoreControlAdminRequest, it interprets stream.readByte() as a new introduced enum type instead of boolean value. I think keeping using V1 makes sense here (also, we can keep checking version number in this method).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I agree that in this circumstance it makes sense to keep using v1

* The order of these enums should not be changed since their relative position goes into the serialized form of
* requests.
*/
public enum BlobStoreControlRequestType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlobStoreControlAction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes more sense, will change as you suggested.

*/
public boolean removeReplica(ReplicaId replicaId) {
boolean succeed = false;
rwLock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do all of these methods that mutate the data structures take the read lock? shouldn't they take the write lock since they mutate the data structure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any usages of the write lock in this class. Typically a readwritelock is used to allow for concurrent readers during normal scenarios (by using the read lock), but lock the entire datastructure (ensure that all readers finish there reads and make future readers wait by using the write lock) when editing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, will change this to simple ReentrantLock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With just a simple mutex, wouldn't you have to synchronize all reads/writes to these data structures? Would it be better to take a read lock when reading from the shared data structures and taking a write lock when editing them? We can discuss in person.

FindToken findToken =
this.tokenHelper.getFindTokenFactoryFromReplicaType(remoteReplica.getReplicaType()).getNewFindToken();
RemoteReplicaInfo remoteReplicaInfo = new RemoteReplicaInfo(remoteReplica, replicaId, store, findToken,
storeConfig.storeDataFlushIntervalSeconds * SystemTime.MsPerSec * Replication_Delay_Multiplier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeUnit.SECONDS.toMillis(storeConfig.storeDataFlushIntervalSeconds)

BlobStore store =
new BlobStore(replica, storeConfig, scheduler, longLivedTaskScheduler, diskIOScheduler, diskSpaceAllocator,
storeMainMetrics, storeUnderCompactionMetrics, keyFactory, recovery, hardDelete, replicaStatusDelegate,
time);
rwLock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment for this class.

}
} else {
logger.debug("Validate request fails for {} with error code {} when trying to stop store", partitionId,
error);
errorCode = ServerErrorCode.Unknown_Error;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these handle* methods, it might be more readable to intersperse the method with returns since otherwise there are a ton of nested layers of indentation.
e.g.

if (!success1) {
  return errorCode1;
}
if (!success2) {
  return errorCode2;
}
...
return No_Error

logger.info("Partition {} is currently not present in cluster map, creating a new partition.",
partitionIdStr);
mappedPartition =
new AmbryPartition(Long.valueOf(partitionIdStr), partitionClass, helixClusterManagerCallback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intellij suggests using Long.parseLong here to avoid unnecessary boxing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Map<String, Map<String, String>> partitionToReplicas = zNRecord.getMapFields();
Map<String, String> replicaInfos = partitionToReplicas.get(partitionIdStr);
if (replicaInfos != null && replicaInfos.containsKey(instanceName)) {
long replicaCapacity = Long.valueOf(replicaInfos.get(REPLICAS_CAPACITY_STR));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// Check if data node or disk is in current cluster map, if not, set newReplica to null.
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(instanceName);
String mountPathFromHelix = replicaInfos.get(instanceName);
Set<AmbryDisk> disks = dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also do

        Optional<AmbryDisk> potentialDisk =
            Optional.ofNullable(dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null)
                .flatMap(disks -> disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny());

logger.info("Removing remote replicas of {} from replica threads", replicaId.getPartitionId());
removeRemoteReplicaInfoFromReplicaThread(remoteReplicaInfos);
mountPathToPartitionInfos.computeIfPresent(replicaId.getMountPath(), (k, v) -> {
v.remove(partitionInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you are removing arbitrary elements from v, maybe it should be a HashSet instead of ArrayList

} else {
List<? extends ReplicaId> peerReplicas = replicaId.getPeerReplicaIds();
List<RemoteReplicaInfo> remoteReplicaInfos = new ArrayList<>();
if (peerReplicas != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the case where peerReplicas is null?

Copy link
Contributor

@cgtz cgtz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved, just a couple more comments to optionally address

@@ -175,9 +177,9 @@ void addReplica(PartitionId partitionId) throws ReplicationException {
partitionToPartitionInfo.put(partitionId, partitionInfo);
mountPathToPartitionInfos.compute(cloudReplica.getMountPath(), (key, value) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify this to the way you did it in CloudTokenPersistorTest:

mountPathToPartitionInfoList.computeIfAbsent(cloudReplicaId.getMountPath(), key -> ConcurrentHashMap.newKeySet()).add(partitionInfo);

* residing on hosts that are not present in clustermap.
* The ZNRecord of REPLICA_ADDITION_ZNODE has following format in mapFields.
* <p/>
* "mapFields": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: put the json in <pre><pre/> tags. Also for the paragraph separator, I think javadoc nags if you use <p/> instead of <p>.

@zzmao zzmao merged commit b22b487 into linkedin:master Oct 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants