From 77d0c6c1ca303bb5f5391b8e85804151175a1c1f Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Thu, 13 Jun 2024 22:59:07 +0530 Subject: [PATCH] Revert "[Remote Store] Fix sleep time bug during remote store sync (#14037)" (#14274) This reverts commit afeddc228ba7791a549fb7c6ef94349d432c0824. Signed-off-by: Gaurav Bafna --- .../MigrationBaseTestCase.java | 32 ++----------------- .../RemotePrimaryRelocationIT.java | 23 +++++++++++-- .../opensearch/index/shard/IndexShard.java | 2 +- 3 files changed, 25 insertions(+), 32 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 9dcbe380477dc..901b36f872622 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -9,8 +9,6 @@ package org.opensearch.remotemigration; import org.opensearch.action.DocWriteResponse; -import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.bulk.BulkRequest; @@ -18,15 +16,11 @@ import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; -import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; @@ -45,7 +39,6 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -import static org.hamcrest.Matchers.equalTo; public class MigrationBaseTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; @@ -121,10 +114,6 @@ public void initDocRepToRemoteMigration() { ); } - public ClusterHealthStatus ensureGreen(String... indices) { - return ensureGreen(TimeValue.timeValueSeconds(60), indices); - } - public BulkResponse indexBulk(String indexName, int numDocs) { BulkRequest bulkRequest = new BulkRequest(); for (int i = 0; i < numDocs; i++) { @@ -192,12 +181,14 @@ private Thread getIndexingThread() { long currentDocCount = indexedDocs.incrementAndGet(); if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) { if (rarely()) { + logger.info("--> [iteration {}] flushing index", currentDocCount); client().admin().indices().prepareFlush(indexName).get(); - logger.info("Completed ingestion of {} docs. Flushing now", currentDocCount); } else { + logger.info("--> [iteration {}] refreshing index", currentDocCount); client().admin().indices().prepareRefresh(indexName).get(); } } + logger.info("Completed ingestion of {} docs", currentDocCount); } }); } @@ -227,21 +218,4 @@ public void stopShardRebalancing() { .get() ); } - - public ClusterHealthStatus waitForRelocation() { - ClusterHealthRequest request = Requests.clusterHealthRequest() - .waitForNoRelocatingShards(true) - .timeout(TimeValue.timeValueSeconds(60)) - .waitForEvents(Priority.LANGUID); - ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet(); - if (actionGet.isTimedOut()) { - logger.info( - "waitForRelocation timed out, cluster state:\n{}\n{}", - client().admin().cluster().prepareState().get().getState(), - client().admin().cluster().preparePendingClusterTasks().get() - ); - assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false)); - } - return actionGet.getStatus(); - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index fa3b9368ded47..cea653c0ead4b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -99,7 +99,16 @@ public void testRemotePrimaryRelocation() throws Exception { .add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode)) .execute() .actionGet(); - waitForRelocation(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setTimeout(TimeValue.timeValueSeconds(60)) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .execute() + .actionGet(); + + assertEquals(0, clusterHealthResponse.getRelocatingShards()); assertEquals(remoteNode, primaryNodeName("test")); logger.info("--> relocation from docrep to remote complete"); @@ -114,7 +123,16 @@ public void testRemotePrimaryRelocation() throws Exception { .add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2)) .execute() .actionGet(); - waitForRelocation(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setTimeout(TimeValue.timeValueSeconds(60)) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .execute() + .actionGet(); + + assertEquals(0, clusterHealthResponse.getRelocatingShards()); assertEquals(remoteNode2, primaryNodeName("test")); logger.info("--> relocation from remote to remote complete"); @@ -137,6 +155,7 @@ public void testRemotePrimaryRelocation() throws Exception { public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { String docRepNode = internalCluster().startNode(); + Client client = internalCluster().client(docRepNode); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 82b68b32f3bf8..49cb710c915fc 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2146,7 +2146,7 @@ public void waitForRemoteStoreSync(Runnable onProgress) throws IOException { segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size(); } try { - Thread.sleep(TimeValue.timeValueSeconds(30).millis()); + Thread.sleep(TimeValue.timeValueSeconds(30).seconds()); } catch (InterruptedException ie) { throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie); }