Skip to content

Commit

Permalink
Allocation service changes for batch assignment
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <chngau@amazon.com>
  • Loading branch information
Gaurav614 committed Jul 26, 2023
1 parent e1da84d commit 432efc1
Showing 1 changed file with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,22 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.beforeAllocation(allocation);
}
// batch Mode enabled setting to be added
boolean batchModeEnabled = true;
if (batchModeEnabled) {
// since allocators is per index setting, to have batch assignment verify allocators same for all shards
// if not fallback to single assignment
ExistingShardsAllocator allocator = verifySameAllocatorForAllShards(allocation);
if (allocator != null) {
allocator.allocateUnassignedBatch(allocation, true);
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
}
allocator.allocateUnassignedBatch(allocation, false);
return;
}
}


final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
Expand All @@ -569,6 +585,27 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
}
}

private ExistingShardsAllocator verifySameAllocatorForAllShards(RoutingAllocation allocation) {
// if there is a single Allocator set in Allocation Service then use it for all shards
if (existingShardsAllocators.size() == 1) {
return existingShardsAllocators.values().iterator().next();
}
RoutingNodes.UnassignedShards unassignedShards = allocation.routingNodes().unassigned();
RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassignedShards.iterator();
ExistingShardsAllocator currentAllocatorForShard =null;
if (unassignedShards.size() > 0) {
ShardRouting shard = iterator.next();
currentAllocatorForShard= getAllocatorForShard(shard, allocation);
while (iterator.hasNext()){
ExistingShardsAllocator allocatorForShard = getAllocatorForShard(iterator.next(), allocation);
if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName())==false){
return null;
}
}
}
return currentAllocatorForShard;
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext();) {
RoutingNode node = it.next();
Expand Down

0 comments on commit 432efc1

Please sign in to comment.