diff --git a/client/rest/src/test/java/org/opensearch/client/RestClientSingleHostIntegTests.java b/client/rest/src/test/java/org/opensearch/client/RestClientSingleHostIntegTests.java index beee1c5ca21a0..3d6700118ea57 100644 --- a/client/rest/src/test/java/org/opensearch/client/RestClientSingleHostIntegTests.java +++ b/client/rest/src/test/java/org/opensearch/client/RestClientSingleHostIntegTests.java @@ -48,6 +48,7 @@ import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.http.message.BasicHeader; @@ -73,6 +74,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -298,37 +300,70 @@ public void testRequestResetAndAbort() throws Exception { httpGet.reset(); assertFalse(httpGet.isAborted()); - Future future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null); - httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); - httpGet.abort(); + final Phaser phaser = new Phaser(2); + phaser.register(); try { - future.get(); - fail("expected cancellation exception"); - } catch (CancellationException e) { - // expected + Future future = client.execute( + getRequestProducer(httpGet, httpHost), + getResponseConsumer(phaser), + null + ); + httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); + httpGet.abort(); + + try { + phaser.arriveAndDeregister(); + future.get(); + fail("expected cancellation exception"); + } catch (CancellationException e) { + // expected + } + assertTrue(future.isCancelled()); + } finally { + // Forcing termination since the AsyncResponseConsumer may not be reached, + // the request is aborted right before + phaser.forceTermination(); } - assertTrue(future.isCancelled()); } { - httpGet.reset(); - Future future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null); - assertFalse(httpGet.isAborted()); - httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); - httpGet.abort(); - assertTrue(httpGet.isAborted()); + final Phaser phaser = new Phaser(2); + phaser.register(); + try { - assertTrue(future.isCancelled()); - future.get(); - throw new AssertionError("exception should have been thrown"); - } catch (CancellationException e) { - // expected + httpGet.reset(); + Future future = client.execute( + getRequestProducer(httpGet, httpHost), + getResponseConsumer(phaser), + null + ); + assertFalse(httpGet.isAborted()); + httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); + httpGet.abort(); + assertTrue(httpGet.isAborted()); + try { + phaser.arriveAndDeregister(); + assertTrue(future.isCancelled()); + future.get(); + throw new AssertionError("exception should have been thrown"); + } catch (CancellationException e) { + // expected + } + } finally { + // Forcing termination since the AsyncResponseConsumer may not be reached, + // the request is aborted right before + phaser.forceTermination(); } } { httpGet.reset(); assertFalse(httpGet.isAborted()); - Future future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null); + final Phaser phaser = new Phaser(0); + Future future = client.execute( + getRequestProducer(httpGet, httpHost), + getResponseConsumer(phaser), + null + ); assertFalse(httpGet.isAborted()); assertEquals(200, future.get().getCode()); assertFalse(future.isCancelled()); @@ -554,8 +589,15 @@ private Response bodyTest(RestClient restClient, String method, int statusCode, return esResponse; } - private AsyncResponseConsumer getResponseConsumer() { - return new HeapBufferedAsyncResponseConsumer(1024); + private AsyncResponseConsumer getResponseConsumer(Phaser phaser) { + phaser.register(); + return new HeapBufferedAsyncResponseConsumer(1024) { + @Override + protected ClassicHttpResponse buildResult(HttpResponse response, byte[] entity, ContentType contentType) { + phaser.arriveAndAwaitAdvance(); + return super.buildResult(response, entity, contentType); + } + }; } private HttpUriRequestProducer getRequestProducer(HttpUriRequestBase request, HttpHost host) { diff --git a/release-notes/opensearch.release-notes-1.3.12.md b/release-notes/opensearch.release-notes-1.3.12.md new file mode 100644 index 0000000000000..88551642e3c6d --- /dev/null +++ b/release-notes/opensearch.release-notes-1.3.12.md @@ -0,0 +1,8 @@ +## 2023-08-09 Version 1.3.12 Release Notes + +### Upgrades +- Upgrade `org.bouncycastle:bcprov-jdk15on` to `org.bouncycastle:bcprov-jdk15to18` version 1.75 ([#8247](https://github.com/opensearch-project/OpenSearch/pull/8247)) +- Upgrade `org.bouncycastle:bcmail-jdk15on` to `org.bouncycastle:bcmail-jdk15to18` version 1.75 ([#8247](https://github.com/opensearch-project/OpenSearch/pull/8247)) +- Upgrade `org.bouncycastle:bcpkix-jdk15on` to `org.bouncycastle:bcpkix-jdk15to18` version 1.75 ([#8247](https://github.com/opensearch-project/OpenSearch/pull/8247)) +- Upgrade `netty` from 4.1.94.Final to 4.1.96.Final ([#9030](https://github.com/opensearch-project/OpenSearch/pull/9030)) +- Upgrade bundled OpenJDK (July 2023 Patch releases) ([#8872](https://github.com/opensearch-project/OpenSearch/pull/8872)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 44f4119b20a40..e6834a4a667c9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -9,6 +9,10 @@ package org.opensearch.remotestore; import org.junit.After; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; @@ -33,10 +37,10 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; -import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING; +import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING; +import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { @@ -76,13 +80,18 @@ protected Map indexData(int numberOfIterations, boolean invokeFlus indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED + "-shard-" + shardId, maxSeqNoRefreshedOrFlushed); refreshedOrFlushedOperations = totalOperations; int numberOfOperations = randomIntBetween(20, 50); - for (int j = 0; j < numberOfOperations; j++) { - IndexResponse response = indexSingleDoc(index); - maxSeqNo = response.getSeqNo(); - shardId = response.getShardId().id(); - indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo); + int numberOfBulk = randomIntBetween(1, 5); + for (int j = 0; j < numberOfBulk; j++) { + BulkResponse res = indexBulk(index, numberOfOperations); + for (BulkItemResponse singleResp : res.getItems()) { + indexingStats.put( + MAX_SEQ_NO_TOTAL + "-shard-" + singleResp.getResponse().getShardId().id(), + singleResp.getResponse().getSeqNo() + ); + maxSeqNo = singleResp.getResponse().getSeqNo(); + } + totalOperations += numberOfOperations; } - totalOperations += numberOfOperations; } indexingStats.put(TOTAL_OPERATIONS, totalOperations); @@ -132,6 +141,18 @@ protected IndexResponse indexSingleDoc(String indexName, boolean forceRefresh) { return indexRequestBuilder.get(); } + protected BulkResponse indexBulk(String indexName, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + final IndexRequest request = client().prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)) + .request(); + bulkRequest.add(request); + } + return client().bulk(bulkRequest).actionGet(); + } + public static Settings remoteStoreClusterSettings(String segmentRepoName) { return remoteStoreClusterSettings(segmentRepoName, segmentRepoName); } @@ -179,10 +200,11 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) { return remoteStoreIndexSettings(numberOfReplicas, 1); } - protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) { + protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit, int refresh) { return Settings.builder() .put(remoteStoreIndexSettings(numberOfReplicas)) .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), String.valueOf(refresh)) .build(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 1bd0915a45048..b7bb0cc6608d0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -16,7 +16,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.common.settings.Settings; -import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -29,12 +28,10 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.oneOf; -import static org.hamcrest.Matchers.comparesEqualTo; import static org.hamcrest.Matchers.comparesEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.oneOf; +import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -148,10 +145,9 @@ public void testRemoteTranslogCleanup() throws Exception { verifyRemoteStoreCleanup(); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658") public void testStaleCommitDeletionWithInvokeFlush() throws Exception { - internalCluster().startDataOnlyNodes(3); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); int numberOfIterations = randomIntBetween(5, 15); indexData(numberOfIterations, true, INDEX_NAME); String indexUUID = client().admin() @@ -163,20 +159,22 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception { // Delete is async. assertBusy(() -> { int actualFileCount = getFileCount(indexPath); - if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { - MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1))); + if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) { + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); } else { // As delete is async its possible that the file gets created before the deletion or after // deletion. - MatcherAssert.assertThat(actualFileCount, is(oneOf(10, 11))); + MatcherAssert.assertThat( + actualFileCount, + is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1)) + ); } }, 30, TimeUnit.SECONDS); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658") public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { - internalCluster().startDataOnlyNodes(3); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); int numberOfIterations = randomIntBetween(5, 15); indexData(numberOfIterations, false, INDEX_NAME); String indexUUID = client().admin() @@ -187,6 +185,6 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); int actualFileCount = getFileCount(indexPath); // We also allow (numberOfIterations + 1) as index creation also triggers refresh. - MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1))); + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java index b68fd1f764a63..275197ec831d3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java @@ -122,7 +122,6 @@ public void testPromoteReplicaToPrimary() throws Exception { assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9130") public void testFailoverWhileIndexing() throws Exception { internalCluster().startNode(); internalCluster().startNode(); @@ -143,7 +142,7 @@ public void testFailoverWhileIndexing() throws Exception { .setSource("field", numAutoGenDocs.get()) .get(); - if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) { + if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.OK) { numAutoGenDocs.incrementAndGet(); if (numAutoGenDocs.get() == docCount / 2) { if (random().nextInt(3) == 0) {