Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocation service changes for batch assignment #8888

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public ClusterModule(
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, settings);
}

public static List<Entry> getNamedWriteables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.RestoreInProgress;
Expand All @@ -55,6 +56,7 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.snapshots.SnapshotsInfoService;
Expand All @@ -73,6 +75,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED;

/**
* This service manages the node allocation of a cluster. For this reason the
Expand All @@ -87,6 +90,7 @@ public class AllocationService {
private static final Logger logger = LogManager.getLogger(AllocationService.class);

private final AllocationDeciders allocationDeciders;
private Settings settings;
private Map<String, ExistingShardsAllocator> existingShardsAllocators;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
Expand Down Expand Up @@ -114,6 +118,22 @@ public AllocationService(
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = Settings.EMPTY;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use constructor chaining for Line 117 - 121:

this(allocationDeciders, 
    shardsAllocator, 
    clusterInfoService, 
    snapshotsInfoService, 
    Settings.EMPTY);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
Settings settings

) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = settings;
}

/**
Expand Down Expand Up @@ -548,6 +568,22 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
existingShardsAllocator.beforeAllocation(allocation);
}

Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED.get(settings);

if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to set this to the version in which we plan to release this feature. Please make a note of it. I think its fine to keep it current for safety purpose to avoid merging old version in a higher version release. But make sure it gets changed before merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to control it better with mixed cluster rest/integ test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, right now it's CURRENT for main branch.
When it goes to 2.x - it'll go with 2.12 version
Then again in main it'll be changed to 2.12

Basically 3 phased commits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Gaurav614
There must be unit or integ test verifying the if-else logic. Based on commits, UT/integ checks will also be changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the main branch, it has to be 3.0 and not Version.CURRENT

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests are missing to check if batch/ non batch mode is picked correctly.

// since allocators is per index setting, to have batch assignment verify allocators same for all shards
// if not fallback to single assignment
ExistingShardsAllocator allocator = verifySameAllocatorForAllUnassignedShards(allocation);
if (allocator != null) {
allocator.allocateUnassignedBatch(allocation, true);
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
}
allocator.allocateUnassignedBatch(allocation, false);
return;
}
Copy link
Contributor

@vikasvb90 vikasvb90 Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If allocator was found null, then execution would come out of this if block and continue with single shard allocation. Is this intended? Shouldn't we be throwing an exception from verifySameAllocatorForAllUnassignedShards and remove this null check altogether?

Copy link
Contributor Author

@Gaurav614 Gaurav614 Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not it wont assign a single shard, but try to assign All unassigned Shards
@amkhar I think you made some suggestion to rename allocateUnassignedBatch to allocateAllUnassignedShards?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this method ultimately allocates all unassigned shards. And UnassignedBatch feels like a single batch is getting assigned.

So, we can change it to allocateAllUnassignedShards. Feel free to suggest other better/relevant names.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this complication. If the cluster is using single ExistingShardAllocator, then use batch mode otherwise let it fall back to single shard allocation. This should change later to batch mode code which is dealing with single ShardRouting once there is more confidence in the new code with batch mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a warning log that it can't use batch due to multiple ExistingShardAllocator, it is recommending to use one so that batch logic can run effectively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. So we are currently not allowing any CustomAllocator that implements the new method that we introduced to allocateUnassignedBatch [renamed to allocateAllUnassignedShards] to run its implementation and we are using are own implementation for now.

}

final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
final ShardRouting shardRouting = primaryIterator.next();
Expand All @@ -569,6 +605,33 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
}
}

/**
* Verify if all unassigned shards are allocated by the same allocator, if yes then return the allocator, else
* return null
* @param allocation {@link RoutingAllocation}
* @return {@link ExistingShardsAllocator} or null
*/
private ExistingShardsAllocator verifySameAllocatorForAllUnassignedShards(RoutingAllocation allocation) {
// if there is a single Allocator set in Allocation Service then use it for all shards
if (existingShardsAllocators.size() == 1) {
return existingShardsAllocators.values().iterator().next();
}
RoutingNodes.UnassignedShards unassignedShards = allocation.routingNodes().unassigned();
RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassignedShards.iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be clubbed into single line ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not possible, we need both variables later

ExistingShardsAllocator currentAllocatorForShard = null;
if (unassignedShards.size() > 0) {
ShardRouting shard = iterator.next();
currentAllocatorForShard = getAllocatorForShard(shard, allocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single line ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

while (iterator.hasNext()) {
ExistingShardsAllocator allocatorForShard = getAllocatorForShard(iterator.next(), allocation);
if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName()) == false) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you plan to do with null if allocator was found different? Shouldn't be possible right given there's no code bug? Do you think IllegalStateException would be a better fit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wont be IllegalStateException with respect to AllocationService since it will fallback to non batched version.
Null is send because we didnt have same Allocators set

probably I will rename it to getAndVerifySameAllocatorForAllUnassignedShards

}
}
}
return currentAllocatorForShard;
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext();) {
RoutingNode node = it.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public interface ExistingShardsAllocator {
Setting.Property.PrivateIndex
);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc here, we are adding a new public setting. Highly recommend adding multi-line comment explaining the purpose of the setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

public static final Setting<Boolean> EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED = Setting.boolSetting(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Rename to EXISTING_SHARDS_ALLOCATOR_BATCH_MODE ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"cluster.allocator.existing_shards_allocator.batch_enable",
true,
Setting.Property.NodeScope
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
);


/**
* Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.OperationRouting;
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
Expand Down Expand Up @@ -246,6 +247,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED,
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING,
Expand Down