Skip to content

Commit

Permalink
Added batch mode setting
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <chngau@amazon.com>
  • Loading branch information
Gaurav614 committed Sep 28, 2023
1 parent 432efc1 commit eecf79f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.RestoreInProgress;
Expand All @@ -55,6 +56,7 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.snapshots.SnapshotsInfoService;
Expand All @@ -73,6 +75,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED;

/**
* This service manages the node allocation of a cluster. For this reason the
Expand All @@ -87,6 +90,7 @@ public class AllocationService {
private static final Logger logger = LogManager.getLogger(AllocationService.class);

private final AllocationDeciders allocationDeciders;
private Settings settings;
private Map<String, ExistingShardsAllocator> existingShardsAllocators;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
Expand Down Expand Up @@ -114,6 +118,22 @@ public AllocationService(
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = Settings.EMPTY;
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
Settings settings

) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = settings;
}

/**
Expand Down Expand Up @@ -547,12 +567,13 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.beforeAllocation(allocation);
}
// batch Mode enabled setting to be added
boolean batchModeEnabled = true;
if (batchModeEnabled) {

Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED.get(settings);

if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) {
// since allocators is per index setting, to have batch assignment verify allocators same for all shards
// if not fallback to single assignment
ExistingShardsAllocator allocator = verifySameAllocatorForAllShards(allocation);
ExistingShardsAllocator allocator = verifySameAllocatorForAllUnassignedShards(allocation);
if (allocator != null) {
allocator.allocateUnassignedBatch(allocation, true);
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
Expand All @@ -563,7 +584,6 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
}
}


final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
final ShardRouting shardRouting = primaryIterator.next();
Expand All @@ -585,20 +605,26 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
}
}

private ExistingShardsAllocator verifySameAllocatorForAllShards(RoutingAllocation allocation) {
/**
* Verify if all unassigned shards are allocated by the same allocator, if yes then return the allocator, else
* return null
* @param allocation {@link RoutingAllocation}
* @return {@link ExistingShardsAllocator} or null
*/
private ExistingShardsAllocator verifySameAllocatorForAllUnassignedShards(RoutingAllocation allocation) {
// if there is a single Allocator set in Allocation Service then use it for all shards
if (existingShardsAllocators.size() == 1) {
return existingShardsAllocators.values().iterator().next();
}
RoutingNodes.UnassignedShards unassignedShards = allocation.routingNodes().unassigned();
RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassignedShards.iterator();
ExistingShardsAllocator currentAllocatorForShard =null;
ExistingShardsAllocator currentAllocatorForShard = null;
if (unassignedShards.size() > 0) {
ShardRouting shard = iterator.next();
currentAllocatorForShard= getAllocatorForShard(shard, allocation);
while (iterator.hasNext()){
currentAllocatorForShard = getAllocatorForShard(shard, allocation);
while (iterator.hasNext()) {
ExistingShardsAllocator allocatorForShard = getAllocatorForShard(iterator.next(), allocation);
if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName())==false){
if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName()) == false) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public interface ExistingShardsAllocator {
Setting.Property.PrivateIndex
);

public static final Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED = Setting.boolSetting(
"cluster.allocator.existing_shards_allocator.batch_enable",
true,
Setting.Property.NodeScope
);


/**
* Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate.
*/
Expand Down

0 comments on commit eecf79f

Please sign in to comment.