Skip to content

Commit

Permalink
Use the remaining scroll response documents on update by query bulk r…
Browse files Browse the repository at this point in the history
…equests

In update by query requests where max_docs < size and conflicts=proceed
we weren't using the remaining documents from the scroll response in
cases where there were conflicts and in the first bulk request the
successful updates < max_docs. This commit address that problem and
use the remaining documents from the scroll response instead of
requesting a new page.

Closes elastic#63671
Backport of elastic#71430
  • Loading branch information
fcofdez committed Apr 20, 2021
1 parent 6e513f4 commit e555cde
Show file tree
Hide file tree
Showing 6 changed files with 500 additions and 25 deletions.
5 changes: 4 additions & 1 deletion docs/reference/docs/delete-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ and all failed requests are returned in the response. Any delete requests that
completed successfully still stick, they are not rolled back.

You can opt to count version conflicts instead of halting and returning by
setting `conflicts` to `proceed`.
setting `conflicts` to `proceed`. Note that if you opt to count version conflicts
the operation could attempt to delete more documents from the source
than `max_docs` until it has successfully deleted `max_docs` documents, or it has gone through
every document in the source query.

===== Refreshing shards

Expand Down
8 changes: 7 additions & 1 deletion docs/reference/docs/reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ By default, version conflicts abort the `_reindex` process.
To continue reindexing if there are conflicts, set the `"conflicts"` request body parameter to `proceed`.
In this case, the response includes a count of the version conflicts that were encountered.
Note that the handling of other error types is unaffected by the `"conflicts"` parameter.
Additionally, if you opt to count version conflicts the operation could attempt to reindex more documents
from the source than `max_docs` until it has successfully indexed `max_docs` documents into the target, or it has gone
through every document in the source query.

[[docs-reindex-task-api]]
===== Running reindex asynchronously
Expand Down Expand Up @@ -497,6 +500,7 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=max_docs]
[[docs-reindex-api-request-body]]
==== {api-request-body-title}

[[conflicts]]
`conflicts`::
(Optional, enum) Set to `proceed` to continue reindexing even if there are conflicts.
Defaults to `abort`.
Expand All @@ -507,7 +511,9 @@ Defaults to `abort`.
Also accepts a comma-separated list to reindex from multiple sources.

`max_docs`:::
(Optional, integer) The maximum number of documents to reindex.
(Optional, integer) The maximum number of documents to reindex. If <<conflicts, conflicts>> is equal to
`proceed`, reindex could attempt to reindex more documents from the source than `max_docs` until it has successfully
indexed `max_docs` documents into the target, or it has gone through every document in the source query.

`query`:::
(Optional, <<query-dsl, query object>>) Specifies the documents to reindex using the Query DSL.
Expand Down
5 changes: 4 additions & 1 deletion docs/reference/docs/update-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ When the versions match, the document is updated and the version number is incre
If a document changes between the time that the snapshot is taken and
the update operation is processed, it results in a version conflict and the operation fails.
You can opt to count version conflicts instead of halting and returning by
setting `conflicts` to `proceed`.
setting `conflicts` to `proceed`. Note that if you opt to count
version conflicts the operation could attempt to update more documents from the source than
`max_docs` until it has successfully updated `max_docs` documents, or it has gone through every document
in the source query.

NOTE: Documents with a version equal to 0 cannot be updated using update by
query because `internal` versioning does not support 0 as a valid
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.reindex;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.elasticsearch.common.lucene.uid.Versions.MATCH_DELETED;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests extends ReindexTestCase {
private static final String SCRIPT_LANG = "fake_lang";
private static final String NOOP_GENERATOR = "modificationScript";
private static final String RETURN_NOOP_FIELD = "return_noop";
private static final String SORTING_FIELD = "num";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), CustomScriptPlugin.class);
}

public static class CustomScriptPlugin extends MockScriptPlugin {
@Override
@SuppressWarnings("unchecked")
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
return org.elasticsearch.common.collect.Map.of(NOOP_GENERATOR, (vars) -> {
final Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
final Map<String, Object> source = (Map<String, Object>) ctx.get("_source");
if (source.containsKey(RETURN_NOOP_FIELD)) {
ctx.put("op", "noop");
}
return vars;
});
}

@Override
public String pluginScriptLang() {
return SCRIPT_LANG;
}
}

@Before
public void setUpCluster() {
internalCluster().startMasterOnlyNode();
// Use a single thread pool for writes so we can enforce a consistent ordering
internalCluster().startDataOnlyNode(Settings.builder().put("thread_pool.write.size", 1).build());
}

public void testUpdateByQuery() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final boolean scriptEnabled = randomBoolean();
executeConcurrentUpdatesOnSubsetOfDocs(indexName,
indexName,
scriptEnabled,
updateByQuery(),
true,
(bulkByScrollResponse, updatedDocCount) -> {
assertThat(bulkByScrollResponse.getUpdated(), is((long) updatedDocCount));
});
}

public void testReindex() throws Exception {
final String sourceIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String targetIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndexWithSingleShard(targetIndex);

final ReindexRequestBuilder reindexRequestBuilder = reindex();
reindexRequestBuilder.destination(targetIndex);
reindexRequestBuilder.destination().setVersionType(VersionType.INTERNAL);
// Force MATCH_DELETE version so we get reindex conflicts
reindexRequestBuilder.destination().setVersion(MATCH_DELETED);

final boolean scriptEnabled = randomBoolean();
executeConcurrentUpdatesOnSubsetOfDocs(sourceIndex,
targetIndex,
scriptEnabled,
reindexRequestBuilder,
false,
(bulkByScrollResponse, reindexDocCount) -> {
assertThat(bulkByScrollResponse.getCreated(), is((long) reindexDocCount));
});
}

public void testDeleteByQuery() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
executeConcurrentUpdatesOnSubsetOfDocs(indexName,
indexName,
false,
deleteByQuery(),
true,
(bulkByScrollResponse, deletedDocCount) -> {
assertThat(bulkByScrollResponse.getDeleted(), is((long) deletedDocCount));
});
}

<R extends AbstractBulkByScrollRequest<R>,
Self extends AbstractBulkByScrollRequestBuilder<R, Self>> void executeConcurrentUpdatesOnSubsetOfDocs(String sourceIndex,
String targetIndex,
boolean scriptEnabled,
AbstractBulkByScrollRequestBuilder<R, Self> requestBuilder,
boolean useOptimisticConcurrency,
BiConsumer<BulkByScrollResponse, Integer> resultConsumer) throws Exception {
createIndexWithSingleShard(sourceIndex);

final int numDocs = 100;
final int maxDocs = 10;
final int scrollSize = randomIntBetween(maxDocs, numDocs);

List<IndexRequestBuilder> indexRequests = new ArrayList<>(numDocs);
int noopDocs = 0;
for (int i = numDocs; i > 0; i--) {
Map<String, Object> source = new HashMap<>();
source.put(SORTING_FIELD, i);
// Force that the first maxDocs are transformed into a noop
if (scriptEnabled && noopDocs < maxDocs) {
// Add a marker on the document to signal that this
// document should return a noop in the script
source.put(RETURN_NOOP_FIELD, true);
noopDocs++;
}
indexRequests.add(client().prepareIndex(sourceIndex, "_doc").setId(Integer.toString(i)).setSource(source));
}
indexRandom(true, indexRequests);

final ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class);

final int writeThreads = threadPool.info(ThreadPool.Names.WRITE).getMax();
assertThat(writeThreads, equalTo(1));
final EsThreadPoolExecutor writeThreadPool = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE);
final CyclicBarrier barrier = new CyclicBarrier(writeThreads + 1);
final CountDownLatch latch = new CountDownLatch(1);

// Block the write thread pool
writeThreadPool.submit(() -> {
try {
barrier.await();
latch.await();
} catch (Exception e) {
throw new AssertionError(e);
}
});
// Ensure that the write thread blocking task is currently executing
barrier.await();

final SearchResponse searchResponse = client().prepareSearch(sourceIndex)
.setSize(numDocs) // Get all indexed docs
.addSort(SORTING_FIELD, SortOrder.DESC)
.execute()
.actionGet();

// Modify a subset of the target documents concurrently
final List<SearchHit> originalDocs = Arrays.asList(searchResponse.getHits().getHits());
int conflictingOps = randomIntBetween(maxDocs, numDocs);
final List<SearchHit> docsModifiedConcurrently = randomSubsetOf(conflictingOps, originalDocs);

BulkRequest conflictingUpdatesBulkRequest = new BulkRequest();
for (SearchHit searchHit : docsModifiedConcurrently) {
if (scriptEnabled && searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD)) {
conflictingOps--;
}
conflictingUpdatesBulkRequest.add(createUpdatedIndexRequest(searchHit, targetIndex, useOptimisticConcurrency));
}

// The bulk request is enqueued before the update by query
final ActionFuture<BulkResponse> bulkFuture = client().bulk(conflictingUpdatesBulkRequest);

// Ensure that the concurrent writes are enqueued before the update by query request is sent
assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(1)));

requestBuilder.source(sourceIndex)
.maxDocs(maxDocs)
.abortOnVersionConflict(false);

if (scriptEnabled) {
final Script script = new Script(ScriptType.INLINE, SCRIPT_LANG, NOOP_GENERATOR, Collections.emptyMap());
((AbstractBulkIndexByScrollRequestBuilder) requestBuilder).script(script);
}

final SearchRequestBuilder source = requestBuilder.source();
source.setSize(scrollSize);
source.addSort(SORTING_FIELD, SortOrder.DESC);
source.setQuery(QueryBuilders.matchAllQuery());
final ActionFuture<BulkByScrollResponse> updateByQueryResponse = requestBuilder.execute();

assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(2)));

// Allow tasks from the write thread to make progress
latch.countDown();

final BulkResponse bulkItemResponses = bulkFuture.actionGet();
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
assertThat(Strings.toString(bulkItemResponses), bulkItemResponse.isFailed(), is(false));
}

final BulkByScrollResponse bulkByScrollResponse = updateByQueryResponse.actionGet();
assertThat(bulkByScrollResponse.getVersionConflicts(), lessThanOrEqualTo((long) conflictingOps));
// When scripts are enabled, the first maxDocs are a NoOp
final int candidateOps = scriptEnabled ? numDocs - maxDocs : numDocs;
int successfulOps = Math.min(candidateOps - conflictingOps, maxDocs);
assertThat(bulkByScrollResponse.getNoops(), is((long) (scriptEnabled ? maxDocs : 0)));
resultConsumer.accept(bulkByScrollResponse, successfulOps);
}

private void createIndexWithSingleShard(String index) throws Exception {
final Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
final XContentBuilder mappings = jsonBuilder();
{
mappings.startObject();
{
mappings.startObject("properties");
{
mappings.startObject(SORTING_FIELD);
mappings.field("type", "integer");
mappings.endObject();
}
{
mappings.startObject(RETURN_NOOP_FIELD);
mappings.field("type", "boolean");
mappings.endObject();
}
mappings.endObject();
}
mappings.endObject();
}

// Use explicit mappings so we don't have to create those on demands and the task ordering
// can change to wait for mapping updates
assertAcked(
prepareCreate(index)
.setSettings(indexSettings)
.addMapping("type", mappings)
);
}

private IndexRequest createUpdatedIndexRequest(SearchHit searchHit, String targetIndex, boolean useOptimisticUpdate) {
final BytesReference sourceRef = searchHit.getSourceRef();
final XContentType xContentType = sourceRef != null ? XContentHelper.xContentType(sourceRef) : null;
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(targetIndex);
indexRequest.id(searchHit.getId());
indexRequest.source(sourceRef, xContentType);
if (useOptimisticUpdate) {
indexRequest.setIfSeqNo(searchHit.getSeqNo());
indexRequest.setIfPrimaryTerm(searchHit.getPrimaryTerm());
} else {
indexRequest.version(MATCH_DELETED);
}
return indexRequest;
}
}
Loading

0 comments on commit e555cde

Please sign in to comment.