diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java index 37093565fa086..5247efdf416eb 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java @@ -48,7 +48,9 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.common.lucene.uid.Versions.MATCH_DELETED; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -57,6 +59,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests extends ReindexTestCase { @@ -105,12 +108,8 @@ public void testUpdateByQuery() throws Exception { scriptEnabled, updateByQuery(), true, - (bulkByScrollResponse, maxDocs, conflictingUpdates) -> { - assertThat(bulkByScrollResponse.getUpdated(), is((long) maxDocs)); - assertThat(bulkByScrollResponse.getVersionConflicts(), is((long) conflictingUpdates)); - if (scriptEnabled) { - assertThat(bulkByScrollResponse.getNoops(), is((long) maxDocs)); - } + (bulkByScrollResponse, updatedDocCount) -> { + assertThat(bulkByScrollResponse.getUpdated(), is((long) updatedDocCount)); }); } @@ -131,12 +130,8 @@ public void testReindex() throws Exception { scriptEnabled, reindexRequestBuilder, false, - (bulkByScrollResponse, maxDocs, conflictingUpdates) -> { - assertThat(bulkByScrollResponse.getCreated(), is((long) maxDocs)); - assertThat(bulkByScrollResponse.getVersionConflicts(), is((long) conflictingUpdates)); - if (scriptEnabled) { - assertThat(bulkByScrollResponse.getNoops(), is((long) maxDocs)); - } + (bulkByScrollResponse, reindexDocCount) -> { + assertThat(bulkByScrollResponse.getCreated(), is((long) reindexDocCount)); }); } @@ -147,9 +142,8 @@ public void testDeleteByQuery() throws Exception { false, deleteByQuery(), true, - (bulkByScrollResponse, maxDocs, conflictingUpdates) -> { - assertThat(bulkByScrollResponse.getDeleted(), is((long) maxDocs)); - assertThat(bulkByScrollResponse.getVersionConflicts(), is((long) conflictingUpdates)); + (bulkByScrollResponse, deletedDocCount) -> { + assertThat(bulkByScrollResponse.getDeleted(), is((long) deletedDocCount)); }); } @@ -159,12 +153,12 @@ Self extends AbstractBulkByScrollRequestBuilder> void executeConcurrent boolean scriptEnabled, AbstractBulkByScrollRequestBuilder requestBuilder, boolean useOptimisticConcurrency, - TriConsumer resultConsumer) throws Exception { + BiConsumer resultConsumer) throws Exception { createIndexWithSingleShard(sourceIndex); final int numDocs = 100; - final int scrollSize = 50; final int maxDocs = 10; + final int scrollSize = randomIntBetween(maxDocs, numDocs); List indexRequests = new ArrayList<>(numDocs); int noopDocs = 0; @@ -203,43 +197,21 @@ Self extends AbstractBulkByScrollRequestBuilder> void executeConcurrent barrier.await(); final SearchResponse searchResponse = client().prepareSearch(sourceIndex) - .setSize(scrollSize) + .setSize(numDocs) // Get all indexed docs .addSort(SORTING_FIELD, SortOrder.DESC) .execute() .actionGet(); // Modify a subset of the target documents concurrently final List originalDocs = Arrays.asList(searchResponse.getHits().getHits()); - final List docsModifiedConcurrently = new ArrayList<>(); - if (randomBoolean()) { - // The entire first scroll request conflicts, forcing a second request - docsModifiedConcurrently.addAll(originalDocs); - } else { - if (randomBoolean()) { - // We can satisfy the entire update by query request with a single scroll response - // Only the first maxDocs documents would conflict - docsModifiedConcurrently.addAll(originalDocs.subList(0, maxDocs)); - } else { - // Only a subset of maxDocs can be updated using the first scroll response - // Only 1 document per "page" is not modified concurrently - final int pages = scrollSize / maxDocs; - for (int i = 0; i < pages; i++) { - final int offset = i * 10; - docsModifiedConcurrently.addAll(originalDocs.subList(offset, offset + maxDocs - 1)); - } + int conflictingOps = randomIntBetween(maxDocs, numDocs); + final List docsModifiedConcurrently = randomSubsetOf(conflictingOps, originalDocs); - // Force to request an additional scroll page - assertThat(pages, lessThan(maxDocs)); - } - } - - int conflictingUpdates = 0; BulkRequest conflictingUpdatesBulkRequest = new BulkRequest(); for (SearchHit searchHit : docsModifiedConcurrently) { - if (searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD) == false) { - conflictingUpdates++; + if (scriptEnabled && searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD)) { + conflictingOps--; } - conflictingUpdatesBulkRequest.add(createUpdatedIndexRequest(searchHit, targetIndex, useOptimisticConcurrency)); } @@ -275,7 +247,12 @@ Self extends AbstractBulkByScrollRequestBuilder> void executeConcurrent } final BulkByScrollResponse bulkByScrollResponse = updateByQueryResponse.actionGet(); - resultConsumer.apply(bulkByScrollResponse, maxDocs, conflictingUpdates); + assertThat(bulkByScrollResponse.getVersionConflicts(), lessThanOrEqualTo((long) conflictingOps)); + // When scripts are enabled, the first maxDocs are a NoOp + final int candidateOps = scriptEnabled ? numDocs - maxDocs : numDocs; + int successfulOps = Math.min(candidateOps - conflictingOps, maxDocs); + assertThat(bulkByScrollResponse.getNoops(), is((long) (scriptEnabled ? maxDocs : 0))); + resultConsumer.accept(bulkByScrollResponse, successfulOps); } private void createIndexWithSingleShard(String index) throws Exception {