From cfc429af3cebd885103d92529f2e2d8a25cd436f Mon Sep 17 00:00:00 2001 From: RS146BIJAY Date: Wed, 14 Feb 2024 23:50:59 +0530 Subject: [PATCH] Fixing flaky test cases Signed-off-by: RS146BIJAY --- .../decider/DiskThresholdDeciderIT.java | 4 +- .../cluster/shards/ClusterShardLimitIT.java | 15 ++++--- .../index/engine/InternalEngineTests.java | 41 +++++++++++++++++-- 3 files changed, 47 insertions(+), 13 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 089a91a30dd17..cc8747e5f5666 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -245,8 +245,10 @@ public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermarkWithA (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) ); - // Validate if index create block is removed on the cluster + // Validate if index create block is removed on the cluster. Need to refresh this periodically as well to remove + // the node from high watermark breached list. assertBusy(() -> { + clusterInfoService.refresh(); ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState(); assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); }, 30L, TimeUnit.SECONDS); diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java index fb97ae59aae91..5eef7074e1dd6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java @@ -245,23 +245,22 @@ public void testIndexCreationOverLimitForDotIndexesFail() { assertFalse(clusterState.getMetadata().hasIndex(".test-index")); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6287") public void testCreateIndexWithMaxClusterShardSetting() { - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); - ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - setMaxShardLimit(dataNodes, shardsPerNodeKey); + int maxAllowedShardsPerNode = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + setMaxShardLimit(maxAllowedShardsPerNode, shardsPerNodeKey); - int maxAllowedShards = dataNodes + 1; - int extraShardCount = maxAllowedShards + 1; + // Always keep + int maxAllowedShardsPerCluster = maxAllowedShardsPerNode * 1000; + int extraShardCount = 1; // Getting total active shards in the cluster. int currentActiveShards = client().admin().cluster().prepareHealth().get().getActiveShards(); try { - setMaxShardLimit(maxAllowedShards, SETTING_MAX_SHARDS_PER_CLUSTER_KEY); + setMaxShardLimit(maxAllowedShardsPerCluster, SETTING_MAX_SHARDS_PER_CLUSTER_KEY); prepareCreate("test_index_with_cluster_shard_limit").setSettings( Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, extraShardCount).put(SETTING_NUMBER_OF_REPLICAS, 0).build() ).get(); } catch (final IllegalArgumentException ex) { - verifyException(Math.min(maxAllowedShards, dataNodes * dataNodes), currentActiveShards, extraShardCount, ex); + verifyException(maxAllowedShardsPerCluster, currentActiveShards, extraShardCount, ex); } finally { setMaxShardLimit(-1, SETTING_MAX_SHARDS_PER_CLUSTER_KEY); } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 5b586524d0bfc..cc927a19fd01a 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -78,6 +78,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; +import org.apache.lucene.tests.index.ForceMergePolicy; import org.apache.lucene.tests.mockfile.ExtrasFS; import org.apache.lucene.tests.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; @@ -152,6 +153,7 @@ import org.opensearch.index.translog.TranslogDeletionPolicyFactory; import org.opensearch.index.translog.TranslogException; import org.opensearch.index.translog.listener.TranslogEventListener; +import org.opensearch.test.DummyShardLock; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.MockLogAppender; import org.opensearch.test.VersionUtils; @@ -3278,12 +3280,15 @@ public void onFailedEngine(String reason, Exception e) { final AtomicReference retentionLeasesHolder = new AtomicReference<>( new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList()) ); + + // Just allow force merge so that regular merge does not close the shard first before any any other operation + // InternalEngine engine = createEngine( config( defaultSettings, store, createTempDir(), - newMergePolicy(), + newForceMergePolicy(), null, null, null, @@ -3377,7 +3382,7 @@ public void onFailedEngine(String reason, Exception e) { defaultSettings, store, createTempDir(), - newMergePolicy(), + newForceMergePolicy(), null, null, null, @@ -3446,7 +3451,8 @@ public void eval(MockDirectoryWrapper dir) throws IOException { wrapper.failOn(fail); MockLogAppender mockAppender = MockLogAppender.createForLoggers(Loggers.getLogger(Engine.class, shardId)); try { - Store store = createStore(wrapper); + // Create a store where directory is closed during unreferenced file cleanup. + Store store = createFailingDirectoryStore(wrapper); final Engine.EventListener eventListener = new Engine.EventListener() { @Override public void onFailedEngine(String reason, Exception e) { @@ -3473,7 +3479,7 @@ public void onFailedEngine(String reason, Exception e) { defaultSettings, store, createTempDir(), - newMergePolicy(), + newForceMergePolicy(), null, null, null, @@ -3534,6 +3540,33 @@ public void testSettings() { assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); } + /** + * Creates a merge policy which only supports force merge. + * @return returns a merge policy which only supports force merge. + */ + private MergePolicy newForceMergePolicy() { + return new ForceMergePolicy(new TieredMergePolicy()); + } + + /** + * Create a store where directory is closed when referenced while unreferenced file cleanup. + * + * @param directory directory used for creating the store. + * @return a store where directory is closed when referenced while unreferenced file cleanup. + */ + private Store createFailingDirectoryStore(final Directory directory) { + return new Store(shardId, INDEX_SETTINGS, directory, new DummyShardLock(shardId)) { + @Override + public Directory directory() { + if (callStackContainsAnyOf("cleanUpUnreferencedFiles")) { + throw new AlreadyClosedException("store is already closed"); + } + + return super.directory(); + } + }; + } + public void testCurrentTranslogUUIIDIsCommitted() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) {