From 354a9455dbbee5819ba01c69a20f4e0ce8a6d481 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Tue, 17 Oct 2023 11:59:01 +0530 Subject: [PATCH] Added UT to test new batches are created for failed shards Signed-off-by: Shivansh Arora --- .../opensearch/gateway/GatewayAllocator.java | 16 +- .../gateway/GatewayAllocatorTests.java | 330 ++++++++++++++++++ .../test/gateway/TestGatewayAllocator.java | 7 + 3 files changed, 350 insertions(+), 3 deletions(-) create mode 100644 server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index 27ce635b03d1e..8b3e1af8ec7e7 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -76,8 +76,8 @@ public class GatewayAllocator implements ExistingShardsAllocator { private final RerouteService rerouteService; - private final PrimaryShardAllocator primaryShardAllocator; - private final ReplicaShardAllocator replicaShardAllocator; + private PrimaryShardAllocator primaryShardAllocator; + private ReplicaShardAllocator replicaShardAllocator; private final ConcurrentMap< ShardId, @@ -113,6 +113,17 @@ protected GatewayAllocator() { this.replicaShardAllocator = null; } + // for tests + protected void setShardAllocators(PrimaryShardAllocator primaryShardAllocator, + ReplicaShardAllocator replicaShardAllocator, + PrimaryShardBatchAllocator primaryShardBatchAllocator, + ReplicaShardBatchAllocator replicaBatchShardAllocator) { + this.primaryShardAllocator = primaryShardAllocator; + this.replicaShardAllocator = replicaShardAllocator; + this.primaryBatchShardAllocator = primaryShardBatchAllocator; + this.replicaBatchShardAllocator = replicaBatchShardAllocator; + } + @Override public int getNumberOfInFlightFetches() { int count = 0; @@ -190,7 +201,6 @@ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting assert routingAllocation.debugDecision(); boolean batchMode = routingAllocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT); if (batchMode) { - // TODO add integ test for testing this behaviour when shard is unassigned but failed many times and is ultimately removed from batch if (getBatchId(unassignedShard, unassignedShard.primary()) == null) { createAndUpdateBatches(routingAllocation, unassignedShard.primary()); } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java new file mode 100644 index 0000000000000..0f74965dc0a80 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -0,0 +1,330 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.FailedShard; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.snapshots.SnapshotShardSizeInfo; +import org.opensearch.test.gateway.TestGatewayAllocator; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; + +public class GatewayAllocatorTests extends OpenSearchAllocationTestCase { + + private final Logger logger = LogManager.getLogger(GatewayAllocatorTests.class); + TestGatewayAllocator testGatewayAllocator =null; + ClusterState clusterState =null; + RoutingAllocation testAllocation = null; + String indexPrefix = "TEST"; + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + testGatewayAllocator = new TestGatewayAllocator(); + } + public void testSingleBatchCreation(){ + createIndexAndUpdateClusterState(1, 3, 1); + createBatchesAndAssert(1); + } + + public void testTwoBatchCreation(){ + createIndexAndUpdateClusterState(2,1020, 1); + createBatchesAndAssert(2); + + List listOfBatches = new ArrayList<>(testGatewayAllocator.getBatchIdToStartedShardBatch().values()); + assertNotEquals(listOfBatches.get(0), listOfBatches.get(1)); + + // test for replicas + listOfBatches = new ArrayList<>(testGatewayAllocator.getBatchIdToStoreShardBatch().values()); + assertNotEquals(listOfBatches.get(0), listOfBatches.get(1)); + } + + public void testNonDuplicationOfBatch(){ + createIndexAndUpdateClusterState(1, 3, 1); + Tuple, Set> batches = createBatchesAndAssert(1); + assertEquals(1, batches.v1().size()); + assertEquals(1, batches.v2().size()); + + // again try to create batch and verify no new batch is created since shard is already batched and no new unassigned shard + assertEquals(batches.v1(), testGatewayAllocator.createAndUpdateBatches(testAllocation, true)); + assertEquals(batches.v2(), testGatewayAllocator.createAndUpdateBatches(testAllocation, false)); + } + + public void testCorrectnessOfBatch(){ + createIndexAndUpdateClusterState(2, 1020,1); + createBatchesAndAssert(2); + Set shardsSet1 = clusterState.routingTable().index(indexPrefix+0).getShards().values().stream().map(IndexShardRoutingTable::getShardId).collect(Collectors.toSet()); + Set shardsSet2 = clusterState.routingTable().index(indexPrefix+1).getShards().values().stream().map(IndexShardRoutingTable::getShardId).collect(Collectors.toSet()); + shardsSet1.addAll(shardsSet2); + + Set shardsInAllbatches = testGatewayAllocator.getBatchIdToStartedShardBatch().values().stream().map(GatewayAllocator.ShardsBatch::getBatchedShards). + flatMap(Set::stream).collect(Collectors.toSet()); + assertEquals(shardsInAllbatches, shardsSet1); + shardsInAllbatches = testGatewayAllocator.getBatchIdToStoreShardBatch().values().stream().map(GatewayAllocator.ShardsBatch::getBatchedShards). + flatMap(Set::stream).collect(Collectors.toSet()); + assertEquals(shardsInAllbatches, shardsSet1); + + Set primariesInAllBatches = testGatewayAllocator.getBatchIdToStartedShardBatch().values().stream().map(GatewayAllocator.ShardsBatch::getBatchedShardRoutings). + flatMap(Set::stream).collect(Collectors.toSet()); + primariesInAllBatches.forEach(shardRouting -> assertTrue(shardRouting.unassigned() && shardRouting.primary()==true)); + + Set replicasInAllBatches = testGatewayAllocator.getBatchIdToStoreShardBatch().values().stream().map(GatewayAllocator.ShardsBatch::getBatchedShardRoutings). + flatMap(Set::stream).collect(Collectors.toSet()); + + replicasInAllBatches.forEach(shardRouting -> assertTrue(shardRouting.unassigned() && shardRouting.primary()==false)); + } + + public void testAsyncFetcherCreationInBatch(){ + createIndexAndUpdateClusterState(1, 3, 1); + Tuple, Set> batchesTuple = createBatchesAndAssert(1); + Set primaryBatches = batchesTuple.v1(); + Set replicaBatches = batchesTuple.v2(); + + GatewayAllocator.ShardsBatch shardsBatch = testGatewayAllocator.getBatchIdToStartedShardBatch().get(primaryBatches.iterator().next()); + AsyncShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); + // assert asyncFetcher is not null + assertNotNull(asyncFetcher); + shardsBatch = testGatewayAllocator.getBatchIdToStoreShardBatch().get(replicaBatches.iterator().next()); + asyncFetcher = shardsBatch.getAsyncFetcher(); + assertNotNull(asyncFetcher); + } + + public void testSafelyRemoveShardFromBatch(){ + createIndexAndUpdateClusterState(2, 1023, 1); + + Tuple, Set> batchesTuple = createBatchesAndAssert(2); + Set primaryBatches = batchesTuple.v1(); + Set replicaBatches = batchesTuple.v2(); + + GatewayAllocator.ShardsBatch primaryShardsBatch = testGatewayAllocator.getBatchIdToStartedShardBatch().get(primaryBatches.iterator().next()); + ShardRouting primaryShardRouting= primaryShardsBatch.getBatchedShardRoutings().iterator().next(); + assertEquals(2, replicaBatches.size()); + GatewayAllocator.ShardsBatch replicaShardsBatch = testGatewayAllocator.getBatchIdToStoreShardBatch().get(replicaBatches.iterator().next()); + ShardRouting replicaShardRouting = replicaShardsBatch.getBatchedShardRoutings().iterator().next(); + + // delete 1 shard routing from each batch + testGatewayAllocator.safelyRemoveShardFromBatch(primaryShardRouting); + + testGatewayAllocator.safelyRemoveShardFromBatch(replicaShardRouting); + // verify that shard routing is removed from both batches + assertFalse(primaryShardsBatch.getBatchedShards().contains(primaryShardRouting.shardId())); + assertFalse(replicaShardsBatch.getBatchedShards().contains(replicaShardRouting.shardId())); + + // try to remove that shard again to see if its no op and doent result in exception + testGatewayAllocator.safelyRemoveShardFromBatch(primaryShardRouting); + testGatewayAllocator.safelyRemoveShardFromBatch(replicaShardRouting); + + // now remove all shard routings to verify that batch only gets deleted + primaryShardsBatch.getBatchedShardRoutings().forEach(testGatewayAllocator::safelyRemoveShardFromBatch); + replicaShardsBatch.getBatchedShardRoutings().forEach(testGatewayAllocator::safelyRemoveShardFromBatch); + + assertFalse(testGatewayAllocator.getBatchIdToStartedShardBatch().containsKey(primaryShardsBatch.getBatchId())); + assertFalse(testGatewayAllocator.getBatchIdToStoreShardBatch().containsKey(replicaShardsBatch.getBatchId())); + assertEquals(1, testGatewayAllocator.getBatchIdToStartedShardBatch().size()); + assertEquals(1, testGatewayAllocator.getBatchIdToStoreShardBatch().size()); + } + + public void testSafelyRemoveShardFromBothBatch(){ + createIndexAndUpdateClusterState(1,3,1); + createBatchesAndAssert(1); + GatewayAllocator.ShardsBatch primaryShardsBatch = testGatewayAllocator.getBatchIdToStartedShardBatch().values().iterator().next(); + GatewayAllocator.ShardsBatch replicaShardsBatch = testGatewayAllocator.getBatchIdToStoreShardBatch().values().iterator().next(); + + ShardRouting anyPrimary = primaryShardsBatch.getBatchedShardRoutings().iterator().next(); + // remove first shard routing from both batches + testGatewayAllocator.safelyRemoveShardFromBothBatch(anyPrimary); + + // verify that shard routing is removed from both batches + assertFalse(primaryShardsBatch.getBatchedShards().contains(anyPrimary.shardId())); + assertFalse(replicaShardsBatch.getBatchedShards().contains(anyPrimary.shardId())); + + // try to remove that shard again to see if its no op and doesnt result in exception + testGatewayAllocator.safelyRemoveShardFromBothBatch(anyPrimary); + + // now remove all shard routings to verify that batch gets deleted + primaryShardsBatch.getBatchedShardRoutings().forEach(testGatewayAllocator::safelyRemoveShardFromBothBatch); + replicaShardsBatch.getBatchedShardRoutings().forEach(testGatewayAllocator::safelyRemoveShardFromBothBatch); + + + assertFalse(testGatewayAllocator.getBatchIdToStartedShardBatch().containsKey(primaryShardsBatch.getBatchId())); + assertFalse(testGatewayAllocator.getBatchIdToStoreShardBatch().containsKey(replicaShardsBatch.getBatchId())); + assertEquals(0, testGatewayAllocator.getBatchIdToStartedShardBatch().size()); + assertEquals(0, testGatewayAllocator.getBatchIdToStoreShardBatch().size()); + } + + public void testGetBatchIdExisting() { + createIndexAndUpdateClusterState(2, 1020, 1); + // get all shardsRoutings for test index + List allShardRoutings1 = clusterState.routingTable().index(indexPrefix +0).getShards().values().stream().map(IndexShardRoutingTable::getShards) + .flatMap(List::stream).collect(Collectors.toList()); + List allShardRouting2 = clusterState.routingTable().index(indexPrefix+1).getShards().values().stream().map(IndexShardRoutingTable::getShards) + .flatMap(List::stream).collect(Collectors.toList()); + + Tuple, Set> batchesTuple = createBatchesAndAssert(2); + Set primaryBatches = batchesTuple.v1(); + Set replicaBatches = batchesTuple.v2(); + + // create a map of shards to batch id for primaries + + Map shardIdToBatchIdForStartedShards = new HashMap<>(); + allShardRoutings1.addAll(allShardRouting2); + assertEquals(4080, allShardRoutings1.size()); + for (ShardRouting shardRouting : allShardRoutings1) { + for (String batchId : primaryBatches) { + if (shardRouting.primary() == true && testGatewayAllocator.getBatchIdToStartedShardBatch().get(batchId).getBatchedShards().contains(shardRouting.shardId())) { + if (shardIdToBatchIdForStartedShards.containsKey(shardRouting.shardId())) { + fail("found duplicate shard routing for shard. One shard cant be in multiple batches " + shardRouting.shardId()); + } + assertTrue(shardRouting.primary()); + shardIdToBatchIdForStartedShards.put(shardRouting.shardId(), batchId); + } + } + } + Map shardIdToBatchIdForStoreShards = new HashMap<>(); + + for (ShardRouting shardRouting : allShardRoutings1) { + for (String batchId : replicaBatches) { + if (shardRouting.primary() == false && testGatewayAllocator.getBatchIdToStoreShardBatch().get(batchId).getBatchedShards().contains(shardRouting.shardId())) { + if (shardIdToBatchIdForStoreShards.containsKey(shardRouting.shardId())) { + fail("found duplicate shard routing for shard. One shard cant be in multiple batches " + shardRouting.shardId()); + } + assertFalse(shardRouting.primary()); + shardIdToBatchIdForStoreShards.put(shardRouting.shardId(), batchId); + } + } + } + + assertEquals(4080, shardIdToBatchIdForStartedShards.size() + shardIdToBatchIdForStoreShards.size()); + // now compare the maps with getBatchId() call + for (ShardRouting shardRouting : allShardRoutings1) { + if(shardRouting .primary()) { + assertEquals(shardIdToBatchIdForStartedShards.get(shardRouting.shardId()), testGatewayAllocator.getBatchId(shardRouting, true)); + } + else { + assertEquals(shardIdToBatchIdForStoreShards.get(shardRouting.shardId()), testGatewayAllocator.getBatchId(shardRouting, false)); + } + } + } + + public void testGetBatchIdNonExisting(){ + createIndexAndUpdateClusterState(1, 1, 1); + List allShardRoutings = clusterState.routingTable().index(indexPrefix +0).getShards().values().stream().map(IndexShardRoutingTable::getShards) + .flatMap(List::stream).collect(Collectors.toList()); + allShardRoutings.forEach(shard -> assertNull(testGatewayAllocator.getBatchId(shard, shard.primary()))); + } + + public void testExplainUnassignedForFailedShard() { + createIndexAndUpdateClusterState(2, 1, 1); + AllocationService allocation = createAllocationService(); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(); + clusterState = allocation.reroute(clusterState, "reroute"); + // starting primaries + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + // starting replicas + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + // fail shard + ShardRouting shardToFail = clusterState.getRoutingNodes().shardsWithState(STARTED).get(0); + clusterState = allocation.applyFailedShards( + clusterState, + Collections.singletonList(new FailedShard(shardToFail, "test fail", null, randomBoolean())) + ); + // assert that batches are empty + assertEquals(0, testGatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(0, testGatewayAllocator.getNumberOfStoreShardBatches()); + + // now calling allocation explain to ensure that batches are getting created + testAllocation.debugDecision(true); + logger.info(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)); + testGatewayAllocator.explainUnassignedShardAllocation(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0), testAllocation); + + // assert that new batches are created for failed shard + assertEquals(1, testGatewayAllocator.getNumberOfStartedShardBatches()); + assertEquals(1, testGatewayAllocator.getNumberOfStoreShardBatches()); + + } + + private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas){ + if (count == 0) + return; + Metadata.Builder metadata = Metadata.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + for(int i=0;i, Set> createBatchesAndAssert(int expectedBatchSize) { + Set primaryBatches = testGatewayAllocator.createAndUpdateBatches(testAllocation, true); + Set replicaBatches = testGatewayAllocator.createAndUpdateBatches(testAllocation, false); + assertEquals(expectedBatchSize,primaryBatches.size()); + assertEquals(expectedBatchSize, replicaBatches.size()); + assertEquals(expectedBatchSize, testGatewayAllocator.getBatchIdToStartedShardBatch().size()); + assertEquals(expectedBatchSize, testGatewayAllocator.getBatchIdToStoreShardBatch().size()); + assertEquals(testGatewayAllocator.getBatchIdToStartedShardBatch().keySet(), primaryBatches); + assertEquals(testGatewayAllocator.getBatchIdToStoreShardBatch().keySet(), replicaBatches); + return new Tuple<>(primaryBatches, replicaBatches); + } +} diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index 107e42ce43487..8ce96806dfcb1 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -35,6 +35,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.FailedShard; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.core.index.shard.ShardId; @@ -171,4 +172,10 @@ public String getReplicationCheckPointKey(ShardId shardId, String nodeName) { public void addReplicationCheckpoint(ShardId shardId, String nodeName, ReplicationCheckpoint replicationCheckpoint) { shardIdNodeToReplicationCheckPointMap.putIfAbsent(getReplicationCheckPointKey(shardId, nodeName), replicationCheckpoint); } + + @Override + public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) { + setShardAllocators(primaryShardAllocator, replicaShardAllocator, primaryBatchShardAllocator, replicaBatchShardAllocator); + return super.explainUnassignedShardAllocation(unassignedShard, routingAllocation); + } }