Skip to content

Commit

Permalink
Add more test scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed Apr 19, 2021
1 parent da4c320 commit 9fe5874
Showing 1 changed file with 22 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.common.lucene.uid.Versions.MATCH_DELETED;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -57,6 +59,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests extends ReindexTestCase {
Expand Down Expand Up @@ -105,12 +108,8 @@ public void testUpdateByQuery() throws Exception {
scriptEnabled,
updateByQuery(),
true,
(bulkByScrollResponse, maxDocs, conflictingUpdates) -> {
assertThat(bulkByScrollResponse.getUpdated(), is((long) maxDocs));
assertThat(bulkByScrollResponse.getVersionConflicts(), is((long) conflictingUpdates));
if (scriptEnabled) {
assertThat(bulkByScrollResponse.getNoops(), is((long) maxDocs));
}
(bulkByScrollResponse, updatedDocCount) -> {
assertThat(bulkByScrollResponse.getUpdated(), is((long) updatedDocCount));
});
}

Expand All @@ -131,12 +130,8 @@ public void testReindex() throws Exception {
scriptEnabled,
reindexRequestBuilder,
false,
(bulkByScrollResponse, maxDocs, conflictingUpdates) -> {
assertThat(bulkByScrollResponse.getCreated(), is((long) maxDocs));
assertThat(bulkByScrollResponse.getVersionConflicts(), is((long) conflictingUpdates));
if (scriptEnabled) {
assertThat(bulkByScrollResponse.getNoops(), is((long) maxDocs));
}
(bulkByScrollResponse, reindexDocCount) -> {
assertThat(bulkByScrollResponse.getCreated(), is((long) reindexDocCount));
});
}

Expand All @@ -147,9 +142,8 @@ public void testDeleteByQuery() throws Exception {
false,
deleteByQuery(),
true,
(bulkByScrollResponse, maxDocs, conflictingUpdates) -> {
assertThat(bulkByScrollResponse.getDeleted(), is((long) maxDocs));
assertThat(bulkByScrollResponse.getVersionConflicts(), is((long) conflictingUpdates));
(bulkByScrollResponse, deletedDocCount) -> {
assertThat(bulkByScrollResponse.getDeleted(), is((long) deletedDocCount));
});
}

Expand All @@ -159,12 +153,12 @@ Self extends AbstractBulkByScrollRequestBuilder<R, Self>> void executeConcurrent
boolean scriptEnabled,
AbstractBulkByScrollRequestBuilder<R, Self> requestBuilder,
boolean useOptimisticConcurrency,
TriConsumer<BulkByScrollResponse, Integer, Integer> resultConsumer) throws Exception {
BiConsumer<BulkByScrollResponse, Integer> resultConsumer) throws Exception {
createIndexWithSingleShard(sourceIndex);

final int numDocs = 100;
final int scrollSize = 50;
final int maxDocs = 10;
final int scrollSize = randomIntBetween(maxDocs, numDocs);

List<IndexRequestBuilder> indexRequests = new ArrayList<>(numDocs);
int noopDocs = 0;
Expand Down Expand Up @@ -203,43 +197,21 @@ Self extends AbstractBulkByScrollRequestBuilder<R, Self>> void executeConcurrent
barrier.await();

final SearchResponse searchResponse = client().prepareSearch(sourceIndex)
.setSize(scrollSize)
.setSize(numDocs) // Get all indexed docs
.addSort(SORTING_FIELD, SortOrder.DESC)
.execute()
.actionGet();

// Modify a subset of the target documents concurrently
final List<SearchHit> originalDocs = Arrays.asList(searchResponse.getHits().getHits());
final List<SearchHit> docsModifiedConcurrently = new ArrayList<>();
if (randomBoolean()) {
// The entire first scroll request conflicts, forcing a second request
docsModifiedConcurrently.addAll(originalDocs);
} else {
if (randomBoolean()) {
// We can satisfy the entire update by query request with a single scroll response
// Only the first maxDocs documents would conflict
docsModifiedConcurrently.addAll(originalDocs.subList(0, maxDocs));
} else {
// Only a subset of maxDocs can be updated using the first scroll response
// Only 1 document per "page" is not modified concurrently
final int pages = scrollSize / maxDocs;
for (int i = 0; i < pages; i++) {
final int offset = i * 10;
docsModifiedConcurrently.addAll(originalDocs.subList(offset, offset + maxDocs - 1));
}
int conflictingOps = randomIntBetween(maxDocs, numDocs);
final List<SearchHit> docsModifiedConcurrently = randomSubsetOf(conflictingOps, originalDocs);

// Force to request an additional scroll page
assertThat(pages, lessThan(maxDocs));
}
}

int conflictingUpdates = 0;
BulkRequest conflictingUpdatesBulkRequest = new BulkRequest();
for (SearchHit searchHit : docsModifiedConcurrently) {
if (searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD) == false) {
conflictingUpdates++;
if (scriptEnabled && searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD)) {
conflictingOps--;
}

conflictingUpdatesBulkRequest.add(createUpdatedIndexRequest(searchHit, targetIndex, useOptimisticConcurrency));
}

Expand Down Expand Up @@ -275,7 +247,12 @@ Self extends AbstractBulkByScrollRequestBuilder<R, Self>> void executeConcurrent
}

final BulkByScrollResponse bulkByScrollResponse = updateByQueryResponse.actionGet();
resultConsumer.apply(bulkByScrollResponse, maxDocs, conflictingUpdates);
assertThat(bulkByScrollResponse.getVersionConflicts(), lessThanOrEqualTo((long) conflictingOps));
// When scripts are enabled, the first maxDocs are a NoOp
final int candidateOps = scriptEnabled ? numDocs - maxDocs : numDocs;
int successfulOps = Math.min(candidateOps - conflictingOps, maxDocs);
assertThat(bulkByScrollResponse.getNoops(), is((long) (scriptEnabled ? maxDocs : 0)));
resultConsumer.accept(bulkByScrollResponse, successfulOps);
}

private void createIndexWithSingleShard(String index) throws Exception {
Expand Down

0 comments on commit 9fe5874

Please sign in to comment.