-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support adding/removing store via AdminRequest #1259
Conversation
jsjtzyy
commented
Sep 9, 2019
•
edited
Loading
edited
- Refactored store control admin request to support adding/removing store.
- Changes made in replication manager to support adding/removing replica
- Added method in clustermap that allows caller to get detailed info of new replica from Helix PropertyStore.
- Changed removeBlobStore in StorageManager to return the store if it is successfully removed.
- Moved MockStorageManager to a separate file which can be re-used by other tests.
Still working on unit tests and integration tests. Also this PR partially depends on #1249 |
Codecov Report
@@ Coverage Diff @@
## master #1259 +/- ##
============================================
+ Coverage 72.09% 72.14% +0.05%
- Complexity 6232 6258 +26
============================================
Files 449 450 +1
Lines 35744 35888 +144
Branches 4540 4552 +12
============================================
+ Hits 25769 25893 +124
- Misses 8796 8809 +13
- Partials 1179 1186 +7
Continue to review full report at Codecov.
|
c8a8b5f
to
3ccfb32
Compare
I may add more tests into this PR but the production code is ready for review. |
This PR contains lots of changes and refactoring in testing. Reviewers can start with production code first. (Now the PR is completely ready for review.) |
* @param replicaId the replica to remove | ||
* @return {@code true} if replica is successfully removed. {@code false} otherwise | ||
*/ | ||
public boolean removeReplica(ReplicaId replicaId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here same method signature is intentionally adopted for this PR and #1262, which allows us to put an abstract method ReplicationEngine. Both regular ReplicationManager and VcrReplicationManager.
7ffbd3a
to
a8588df
Compare
Have some difficulties to understand |
a8588df
to
6d88f02
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments left. Most of the logic LGTM!
@@ -122,6 +122,15 @@ | |||
*/ | |||
JSONObject getSnapshot(); | |||
|
|||
/** | |||
* Get new replica of certain partition that resides on given host. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that one host have multiple new replicas? If not, can you add some comments regarding the assumption?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible that one host has several new replicas but they should come from different partitions. In this method, neither argument partitionIdStr
nor return value ReplicaId
is collection, because the method is supposed to be called by state model where each partition get its new replica separately. I will add some comments for this method.
/** | ||
* Get new replica of certain partition that resides on given host. | ||
* @param partitionIdStr the partition id string | ||
* @param hostname the host on which replica should reside. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentNode.hostname
and currentNode.port
are passed in, why not use DataNode
as argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. I will figure out a way to pass in DataNodeId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you decide to do this, then you can use dataNodeId::compareTo methods in places you are comparing hostname and port combination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, thanks for letting me know.
controlRequestType = | ||
stream.readByte() == 1 ? BlobStoreControlRequestType.StartStore : BlobStoreControlRequestType.StopStore; | ||
break; | ||
case VERSION_V2: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is VERSION_V2
needed? Look like BlobStoreControlRequestType.values()[stream.readByte()]
can fully cover VERSION_V1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what case will controlRequestType for VERSION_V1 be not 1? What are the other valid values it can have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you dont need different cases for two versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zzmao @ankagrawal for your inputs here. You are right, no need to keep version here. I will simplify this piece of code.
@@ -533,6 +533,12 @@ public JSONObject getSnapshot() { | |||
return snapshot; | |||
} | |||
|
|||
@Override | |||
public ReplicaId getNewReplica(String partitionIdStr, String hostname, Integer port) { | |||
logger.warn("Adding new replica is currently not supported in static cluster manager. Return null here"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw UnsupportedException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* @param replicaId the local replica | ||
* @return list of {@link RemoteReplicaInfo} associated with local replica. | ||
*/ | ||
private List<RemoteReplicaInfo> populateRemoteReplicaAndPartitionInfos(List<? extends ReplicaId> peerReplicas, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's good to split this function to two: createRemoteReplicaInfos()
and updatePartitionInfoMaps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -231,12 +231,13 @@ void shutdown() throws InterruptedException { | |||
|
|||
/** | |||
* @param id the {@link PartitionId} to find the store for. | |||
* @param skipStateCheck whether to skip checking state of store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduce skipStateCheck
makes the interface complex and hard to understand. Can we alway return store
and check isStarted()
in the caller?
if (peerReplicas != null) { | ||
remoteReplicaInfos = populateRemoteReplicaAndPartitionInfos(peerReplicas, replicaId); | ||
} | ||
logger.info("Assigning thread for {}", replicaId.getPartitionId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the logic below under if (peerReplicas != null) {
?
List<RemoteReplicaInfo> remoteReplicaInfos = partitionInfo.getRemoteReplicaInfos(); | ||
logger.info("Removing remote replicas of {} from replica threads", replicaId.getPartitionId()); | ||
removeRemoteReplicaInfoFromReplicaThread(remoteReplicaInfos); | ||
mountPathToPartitionInfos.get(replicaId.getMountPath()).remove(partitionInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
computeIfPresent
is atomic for concurrent map. This can be used to avoid heavy copyOnWriteArrayList
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Let me remove copyOnWriteArrayList
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial comments. Will continue to review.
@@ -533,6 +533,12 @@ public JSONObject getSnapshot() { | |||
return snapshot; | |||
} | |||
|
|||
@Override | |||
public ReplicaId getNewReplica(String partitionIdStr, String hostname, Integer port) { | |||
logger.warn("Adding new replica is currently not supported in static cluster manager. Return null here"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+!
for (PartitionId partitionId : partitions.values()) { | ||
List<? extends ReplicaId> replicaIds = partitionId.getReplicaIds(); | ||
for (ReplicaId replicaId : replicaIds) { | ||
if (replicaId.getDataNodeId().getHostname().compareTo(dataNodeId.getHostname()) == 0 | ||
if (replicaId.getDataNodeId().getHostname().equals(dataNodeId.getHostname()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My latest commit has changed the way compareTo is implemented for datanode. You can use replicaId.getDataNodeId().compareTo(dataNodeId.getHostName()) == 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, will take your advice.
/** | ||
* Get new replica of certain partition that resides on given host. | ||
* @param partitionIdStr the partition id string | ||
* @param hostname the host on which replica should reside. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you decide to do this, then you can use dataNodeId::compareTo methods in places you are comparing hostname and port combination.
@Override | ||
public ReplicaId getNewReplica(String partitionIdStr, String hostname, Integer port) { | ||
ReplicaId newReplica = null; | ||
for (Map.Entry<Long, PartitionId> entry : partitions.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can simplify this by converting partitionIdStr to long partitionId and then do partitions.get(partitionId).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, changes are made as you suggested
controlRequestType = | ||
stream.readByte() == 1 ? BlobStoreControlRequestType.StartStore : BlobStoreControlRequestType.StopStore; | ||
break; | ||
case VERSION_V2: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what case will controlRequestType for VERSION_V1 be not 1? What are the other valid values it can have?
} | ||
|
||
public BlobStoreControlAdminRequest(short numReplicasCaughtUpPerPartition, boolean enable, | ||
AdminRequest adminRequest) { | ||
public BlobStoreControlAdminRequest(short numReplicasCaughtUpPerPartition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc for this constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
controlRequestType = | ||
stream.readByte() == 1 ? BlobStoreControlRequestType.StartStore : BlobStoreControlRequestType.StopStore; | ||
break; | ||
case VERSION_V2: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you dont need different cases for two versions.
* @return the associated {@link Store}, or {@code null} if the partition is not on this disk, or the store is not | ||
* started. | ||
*/ | ||
Store getStore(PartitionId id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are lot of "getStore(PartitionId, false)" calls throughout the code. I would suggest overload getStore as getStore(PArtitionId) and getStore(PartitionId, boolean). And then getStore(PartitionId) just calls getStore(PartitionId, false);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, I made the change as you suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is getStore(PartitionId, false)
really needed? Can we add extra logic to check after getStore(PartitionId)
?
Less interface is better than more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point here. I temporarily don't want to change current logic in most cases where getStore(partitionId)
is called. However, I do need to get the store that is already shutdown when removing replica. Note that, remove replica always happens after stop replica. Hence, this method allows us to get a "stopped" store.
4161813
to
10a5ef7
Compare
Gentle reminder to review. Thanks in advance. |
68517e2
to
927e0e5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few minor concerns. Lets quickly discuss.
new AmbryPartition(Long.valueOf(partitionIdStr), partitionClass, helixClusterManagerCallback); | ||
} | ||
// Check if data node or disk is in current cluster map, if not, set newReplica to null. | ||
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(instanceName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who calls getNewReplica? Can it be called for a node when the partition doesnt even exist on the node as per clustermap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed this offline. Currently AmbryRequests
calls this method. Eventually, state model of each partition will call this method.
String mountPathFromHelix = replicaInfos.get(instanceName); | ||
Set<AmbryDisk> disks = dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null; | ||
Optional<AmbryDisk> potentialDisk = | ||
disks != null ? disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so when someone creates a new partition, he/she will ensure that the mount path exists on the node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SRE is supposed to guarantee new added replica has valid mount path otherwise the creation should fail.
v.remove(partitionInfo); | ||
return v; | ||
}); | ||
//mountPathToPartitionInfos.get(replicaId.getMountPath()).remove(partitionInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this commented line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (!partitionToPartitionInfo.containsKey(replicaId.getPartitionId())) { | ||
logger.error("{} doesn't exist in replication manager, skipping removing replica request.", | ||
replicaId.getPartitionId()); | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the add/remove replica code cause thread safety concerns ? Let's discuss! There are some subtle differences between ConcurrentMap and ConcurrentHashMap and we should try to explicitly use ConcurrentHashMap is we want certain guarantees.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I decided to add ReadWriteLock
in replication manager and disk manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continuing to review
@@ -122,6 +122,14 @@ | |||
*/ | |||
JSONObject getSnapshot(); | |||
|
|||
/** | |||
* Attempt to get new replica of certain partition that resides on given data node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what a "new replica" is in this javadoc? i.e. what makes a "new" replica different from a regular replica.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually "new replica" that will be added to specified datanode. Currently, it is used in HelixClusterManager only because target node needs replica detailed info to create it. The replica info is stored in Helix PropertyStore. Before moving replica, we use Helix Bootstrap tool to upload new replica infos onto Helix PropertyStore, thus target node can fetch this info to create new replica. Let me know if you come up with a better name to disambiguate in this context. Thanks
Optional<AmbryDisk> potentialDisk = | ||
disks != null ? disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny() | ||
: Optional.empty(); | ||
if (dataNode != null && potentialDisk.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove the dataNode != null check since you check that on line 407?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, fair point.
@Override | ||
public ReplicaId getNewReplica(String partitionIdStr, DataNodeId dataNodeId) { | ||
throw new UnsupportedOperationException( | ||
"Adding new replica is currently not supported in static cluster manager. Return null here"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return null like the exception message says?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the exception message. Feel like return null
is unneeded.
throw new IllegalStateException("Unrecognized version for BlobStoreControlAdminRequest: " + versionId); | ||
} | ||
// read the version | ||
stream.readShort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, are we just erasing all remnants of V1 of this request? I guess thats ok since this is an admin operation that isn't called by the router in a live environment. However, you should probably add some notes that say why v1 is not present?
Do you still want to have the check that the version number matches the expected value (2) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like maybe we can still use V1 for this request. Current change doesn't really modify serialize/deserialize logic of BlobStoreControlAdminRequest
, it interprets stream.readByte()
as a new introduced enum type instead of boolean value. I think keeping using V1 makes sense here (also, we can keep checking version number in this method).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I agree that in this circumstance it makes sense to keep using v1
* The order of these enums should not be changed since their relative position goes into the serialized form of | ||
* requests. | ||
*/ | ||
public enum BlobStoreControlRequestType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlobStoreControlAction
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes more sense, will change as you suggested.
*/ | ||
public boolean removeReplica(ReplicaId replicaId) { | ||
boolean succeed = false; | ||
rwLock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do all of these methods that mutate the data structures take the read lock? shouldn't they take the write lock since they mutate the data structure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any usages of the write lock in this class. Typically a readwritelock is used to allow for concurrent readers during normal scenarios (by using the read lock), but lock the entire datastructure (ensure that all readers finish there reads and make future readers wait by using the write lock) when editing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, will change this to simple ReentrantLock
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With just a simple mutex, wouldn't you have to synchronize all reads/writes to these data structures? Would it be better to take a read lock when reading from the shared data structures and taking a write lock when editing them? We can discuss in person.
FindToken findToken = | ||
this.tokenHelper.getFindTokenFactoryFromReplicaType(remoteReplica.getReplicaType()).getNewFindToken(); | ||
RemoteReplicaInfo remoteReplicaInfo = new RemoteReplicaInfo(remoteReplica, replicaId, store, findToken, | ||
storeConfig.storeDataFlushIntervalSeconds * SystemTime.MsPerSec * Replication_Delay_Multiplier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimeUnit.SECONDS.toMillis(storeConfig.storeDataFlushIntervalSeconds)
BlobStore store = | ||
new BlobStore(replica, storeConfig, scheduler, longLivedTaskScheduler, diskIOScheduler, diskSpaceAllocator, | ||
storeMainMetrics, storeUnderCompactionMetrics, keyFactory, recovery, hardDelete, replicaStatusDelegate, | ||
time); | ||
rwLock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment for this class.
} | ||
} else { | ||
logger.debug("Validate request fails for {} with error code {} when trying to stop store", partitionId, | ||
error); | ||
errorCode = ServerErrorCode.Unknown_Error; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For these handle* methods, it might be more readable to intersperse the method with returns
since otherwise there are a ton of nested layers of indentation.
e.g.
if (!success1) {
return errorCode1;
}
if (!success2) {
return errorCode2;
}
...
return No_Error
logger.info("Partition {} is currently not present in cluster map, creating a new partition.", | ||
partitionIdStr); | ||
mappedPartition = | ||
new AmbryPartition(Long.valueOf(partitionIdStr), partitionClass, helixClusterManagerCallback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intellij suggests using Long.parseLong
here to avoid unnecessary boxing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Map<String, Map<String, String>> partitionToReplicas = zNRecord.getMapFields(); | ||
Map<String, String> replicaInfos = partitionToReplicas.get(partitionIdStr); | ||
if (replicaInfos != null && replicaInfos.containsKey(instanceName)) { | ||
long replicaCapacity = Long.valueOf(replicaInfos.get(REPLICAS_CAPACITY_STR)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
// Check if data node or disk is in current cluster map, if not, set newReplica to null. | ||
AmbryDataNode dataNode = instanceNameToAmbryDataNode.get(instanceName); | ||
String mountPathFromHelix = replicaInfos.get(instanceName); | ||
Set<AmbryDisk> disks = dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also do
Optional<AmbryDisk> potentialDisk =
Optional.ofNullable(dataNode != null ? ambryDataNodeToAmbryDisks.get(dataNode) : null)
.flatMap(disks -> disks.stream().filter(d -> d.getMountPath().equals(mountPathFromHelix)).findAny());
logger.info("Removing remote replicas of {} from replica threads", replicaId.getPartitionId()); | ||
removeRemoteReplicaInfoFromReplicaThread(remoteReplicaInfos); | ||
mountPathToPartitionInfos.computeIfPresent(replicaId.getMountPath(), (k, v) -> { | ||
v.remove(partitionInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you are removing arbitrary elements from v
, maybe it should be a HashSet
instead of ArrayList
} else { | ||
List<? extends ReplicaId> peerReplicas = replicaId.getPeerReplicaIds(); | ||
List<RemoteReplicaInfo> remoteReplicaInfos = new ArrayList<>(); | ||
if (peerReplicas != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the case where peerReplicas
is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved, just a couple more comments to optionally address
@@ -175,9 +177,9 @@ void addReplica(PartitionId partitionId) throws ReplicationException { | |||
partitionToPartitionInfo.put(partitionId, partitionInfo); | |||
mountPathToPartitionInfos.compute(cloudReplica.getMountPath(), (key, value) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simplify this to the way you did it in CloudTokenPersistorTest
:
mountPathToPartitionInfoList.computeIfAbsent(cloudReplicaId.getMountPath(), key -> ConcurrentHashMap.newKeySet()).add(partitionInfo);
* residing on hosts that are not present in clustermap. | ||
* The ZNRecord of REPLICA_ADDITION_ZNODE has following format in mapFields. | ||
* <p/> | ||
* "mapFields": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: put the json in <pre><pre/>
tags. Also for the paragraph separator, I think javadoc nags if you use <p/>
instead of <p>
.