-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allocation service changes for batch assignment #8888
base: main
Are you sure you want to change the base?
Changes from 3 commits
432efc1
eecf79f
c2be078
3bb7dd8
6e645a0
b1994fa
257f36d
1574fc2
d3b1140
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.opensearch.Version; | ||
import org.opensearch.cluster.ClusterInfoService; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.RestoreInProgress; | ||
|
@@ -55,6 +56,7 @@ | |
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; | ||
import org.opensearch.cluster.routing.allocation.decider.Decision; | ||
import org.opensearch.common.collect.ImmutableOpenMap; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.gateway.GatewayAllocator; | ||
import org.opensearch.gateway.PriorityComparator; | ||
import org.opensearch.snapshots.SnapshotsInfoService; | ||
|
@@ -73,6 +75,7 @@ | |
import static java.util.Collections.emptyList; | ||
import static java.util.Collections.singletonList; | ||
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; | ||
import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED; | ||
|
||
/** | ||
* This service manages the node allocation of a cluster. For this reason the | ||
|
@@ -87,6 +90,7 @@ public class AllocationService { | |
private static final Logger logger = LogManager.getLogger(AllocationService.class); | ||
|
||
private final AllocationDeciders allocationDeciders; | ||
private Settings settings; | ||
private Map<String, ExistingShardsAllocator> existingShardsAllocators; | ||
private final ShardsAllocator shardsAllocator; | ||
private final ClusterInfoService clusterInfoService; | ||
|
@@ -114,6 +118,22 @@ public AllocationService( | |
this.shardsAllocator = shardsAllocator; | ||
this.clusterInfoService = clusterInfoService; | ||
this.snapshotsInfoService = snapshotsInfoService; | ||
this.settings = Settings.EMPTY; | ||
} | ||
|
||
public AllocationService( | ||
AllocationDeciders allocationDeciders, | ||
ShardsAllocator shardsAllocator, | ||
ClusterInfoService clusterInfoService, | ||
SnapshotsInfoService snapshotsInfoService, | ||
Settings settings | ||
|
||
) { | ||
this.allocationDeciders = allocationDeciders; | ||
this.shardsAllocator = shardsAllocator; | ||
this.clusterInfoService = clusterInfoService; | ||
this.snapshotsInfoService = snapshotsInfoService; | ||
this.settings = settings; | ||
} | ||
|
||
/** | ||
|
@@ -548,6 +568,22 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { | |
existingShardsAllocator.beforeAllocation(allocation); | ||
} | ||
|
||
Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED.get(settings); | ||
|
||
if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to set this to the version in which we plan to release this feature. Please make a note of it. I think its fine to keep it current for safety purpose to avoid merging old version in a higher version release. But make sure it gets changed before merge. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may want to control it better with mixed cluster rest/integ test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ya, right now it's CURRENT for main branch. Basically 3 phased commits. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Gaurav614 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for the main branch, it has to be 3.0 and not Version.CURRENT There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tests are missing to check if batch/ non batch mode is picked correctly. |
||
// since allocators is per index setting, to have batch assignment verify allocators same for all shards | ||
// if not fallback to single assignment | ||
ExistingShardsAllocator allocator = verifySameAllocatorForAllUnassignedShards(allocation); | ||
if (allocator != null) { | ||
allocator.allocateUnassignedBatch(allocation, true); | ||
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) { | ||
existingShardsAllocator.afterPrimariesBeforeReplicas(allocation); | ||
} | ||
allocator.allocateUnassignedBatch(allocation, false); | ||
return; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If allocator was found null, then execution would come out of this if block and continue with single shard allocation. Is this intended? Shouldn't we be throwing an exception from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not it wont assign a single shard, but try to assign All unassigned Shards There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this method ultimately allocates all unassigned shards. And So, we can change it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like this complication. If the cluster is using single ExistingShardAllocator, then use batch mode otherwise let it fall back to single shard allocation. This should change later to batch mode code which is dealing with single ShardRouting once there is more confidence in the new code with batch mode. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a warning log that it can't use batch due to multiple ExistingShardAllocator, it is recommending to use one so that batch logic can run effectively. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. So we are currently not allowing any CustomAllocator that implements the new method that we introduced to |
||
} | ||
|
||
final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator(); | ||
while (primaryIterator.hasNext()) { | ||
final ShardRouting shardRouting = primaryIterator.next(); | ||
|
@@ -569,6 +605,33 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { | |
} | ||
} | ||
|
||
/** | ||
* Verify if all unassigned shards are allocated by the same allocator, if yes then return the allocator, else | ||
* return null | ||
* @param allocation {@link RoutingAllocation} | ||
* @return {@link ExistingShardsAllocator} or null | ||
*/ | ||
private ExistingShardsAllocator verifySameAllocatorForAllUnassignedShards(RoutingAllocation allocation) { | ||
// if there is a single Allocator set in Allocation Service then use it for all shards | ||
if (existingShardsAllocators.size() == 1) { | ||
return existingShardsAllocators.values().iterator().next(); | ||
} | ||
RoutingNodes.UnassignedShards unassignedShards = allocation.routingNodes().unassigned(); | ||
RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassignedShards.iterator(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be clubbed into single line ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not possible, we need both variables later |
||
ExistingShardsAllocator currentAllocatorForShard = null; | ||
if (unassignedShards.size() > 0) { | ||
ShardRouting shard = iterator.next(); | ||
currentAllocatorForShard = getAllocatorForShard(shard, allocation); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Single line ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack |
||
while (iterator.hasNext()) { | ||
ExistingShardsAllocator allocatorForShard = getAllocatorForShard(iterator.next(), allocation); | ||
if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName()) == false) { | ||
return null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you plan to do with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It wont be IllegalStateException with respect to AllocationService since it will fallback to non batched version. probably I will rename it to |
||
} | ||
} | ||
} | ||
return currentAllocatorForShard; | ||
} | ||
|
||
private void disassociateDeadNodes(RoutingAllocation allocation) { | ||
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext();) { | ||
RoutingNode node = it.next(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,13 @@ public interface ExistingShardsAllocator { | |
Setting.Property.PrivateIndex | ||
); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java doc here, we are adding a new public setting. Highly recommend adding multi-line comment explaining the purpose of the setting. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack |
||
public static final Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED = Setting.boolSetting( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Rename to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
"cluster.allocator.existing_shards_allocator.batch_enable", | ||
true, | ||
Setting.Property.NodeScope | ||
shwetathareja marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
|
||
|
||
/** | ||
* Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use constructor chaining for Line 117 - 121:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack