From ee5f8c452264d47181600d00427b32d08b0d60e2 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 18 Jan 2017 16:58:39 -0500 Subject: [PATCH] Consolidate some reindex utility classes (#22666) Everything that extended `AbstractAsyncBulkByScrollAction` also extended `AbstractAsyncBulkIndexByScrollAction` so this removes `AbstractAsyncBulkIndexByScrollAction`, merging it into `AbstractAsyncBulkByScrollAction`. --- .../AbstractAsyncBulkByScrollAction.java | 507 +++++++++++++++- .../AbstractAsyncBulkIndexByScrollAction.java | 548 ------------------ .../reindex/TransportDeleteByQueryAction.java | 16 +- .../index/reindex/TransportReindexAction.java | 12 +- .../reindex/TransportUpdateByQueryAction.java | 13 +- ...ncBulkByScrollActionMetadataTestCase.java} | 6 +- ...syncBulkByScrollActionScriptTestCase.java} | 14 +- ...tractAsyncBulkByScrollActionTestCase.java} | 2 +- .../reindex/AsyncBulkByScrollActionTests.java | 75 +-- .../index/reindex/ReindexMetadataTests.java | 14 +- .../index/reindex/ReindexScriptTests.java | 8 +- .../reindex/UpdateByQueryMetadataTests.java | 6 +- .../reindex/UpdateByQueryWithScriptTests.java | 8 +- 13 files changed, 591 insertions(+), 638 deletions(-) delete mode 100644 modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java rename modules/reindex/src/test/java/org/elasticsearch/index/reindex/{AbstractAsyncBulkIndexbyScrollActionMetadataTestCase.java => AbstractAsyncBulkByScrollActionMetadataTestCase.java} (82%) rename modules/reindex/src/test/java/org/elasticsearch/index/reindex/{AbstractAsyncBulkIndexByScrollActionScriptTestCase.java => AbstractAsyncBulkByScrollActionScriptTestCase.java} (86%) rename modules/reindex/src/test/java/org/elasticsearch/index/reindex/{AbstractAsyncBulkIndexByScrollActionTestCase.java => AbstractAsyncBulkByScrollActionTestCase.java} (96%) diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index 0328e606d9edb..07f1d943fcde7 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -30,27 +31,50 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.Retry; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.ParentTaskAssigningClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.IndexFieldMapper; +import org.elasticsearch.index.mapper.ParentFieldMapper; +import org.elasticsearch.index.mapper.RoutingFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.TypeFieldMapper; +import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; +import org.elasticsearch.script.CompiledScript; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableList; import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; @@ -66,6 +90,9 @@ public abstract class AbstractAsyncBulkByScrollActionrequest variables all representing child * requests of this mainRequest. @@ -80,17 +107,28 @@ public abstract class AbstractAsyncBulkByScrollAction, ScrollableHitSource.Hit, RequestWrapper> scriptApplier; + public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, - ThreadPool threadPool, Request mainRequest, ActionListener listener) { + ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState, + ActionListener listener) { this.task = task; this.logger = logger; this.client = client; this.threadPool = threadPool; + this.scriptService = scriptService; + this.clusterState = clusterState; this.mainRequest = mainRequest; this.listener = listener; BackoffPolicy backoffPolicy = buildBackoffPolicy(); bulkRetry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.wrap(backoffPolicy, task::countBulkRetry)); scrollSource = buildScrollableResultSource(backoffPolicy); + scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null"); /* * Default to sorting by doc. We can't do this in the request itself because it is normal to *add* to the sorts rather than replace * them and if we add _doc as the first sort by default then sorts will never work.... So we add it here, only if there isn't @@ -103,12 +141,71 @@ public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logg mainRequest.getSearchRequest().source().version(needsSourceDocumentVersions()); } + /** + * Build the {@link BiFunction} to apply to all {@link RequestWrapper}. + */ + protected BiFunction, ScrollableHitSource.Hit, RequestWrapper> buildScriptApplier() { + // The default script applier executes a no-op + return (request, searchHit) -> request; + } + /** * Does this operation need the versions of the source documents? */ protected abstract boolean needsSourceDocumentVersions(); - protected abstract BulkRequest buildBulk(Iterable docs); + /** + * Build the {@link RequestWrapper} for a single search hit. This shouldn't handle + * metadata or scripting. That will be handled by copyMetadata and + * apply functions that can be overridden. + */ + protected abstract RequestWrapper buildRequest(ScrollableHitSource.Hit doc); + + /** + * Copies the metadata from a hit to the request. + */ + protected RequestWrapper copyMetadata(RequestWrapper request, ScrollableHitSource.Hit doc) { + request.setParent(doc.getParent()); + copyRouting(request, doc.getRouting()); + return request; + } + + /** + * Copy the routing from a search hit to the request. + */ + protected void copyRouting(RequestWrapper request, String routing) { + request.setRouting(routing); + } + + /** + * Used to accept or ignore a search hit. Ignored search hits will be excluded + * from the bulk request. It is also where we fail on invalid search hits, like + * when the document has no source but it's required. + */ + protected boolean accept(ScrollableHitSource.Hit doc) { + if (doc.getSource() == null) { + /* + * Either the document didn't store _source or we didn't fetch it for some reason. Since we don't allow the user to + * change the "fields" part of the search request it is unlikely that we got here because we didn't fetch _source. + * Thus the error message assumes that it wasn't stored. + */ + throw new IllegalArgumentException("[" + doc.getIndex() + "][" + doc.getType() + "][" + doc.getId() + "] didn't store _source"); + } + return true; + } + + private BulkRequest buildBulk(Iterable docs) { + BulkRequest bulkRequest = new BulkRequest(); + for (ScrollableHitSource.Hit doc : docs) { + if (accept(doc)) { + RequestWrapper request = scriptApplier.apply(copyMetadata(buildRequest(doc), doc), doc); + if (request != null) { + bulkRequest.add(request.self()); + } + } + } + return bulkRequest; + } protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) { return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, this::finishHim, client, @@ -397,4 +494,410 @@ void addDestinationIndices(Collection indices) { void setScroll(String scroll) { scrollSource.setScroll(scroll); } + + /** + * Wrapper for the {@link DocWriteRequest} that are used in this action class. + */ + interface RequestWrapper> { + + void setIndex(String index); + + String getIndex(); + + void setType(String type); + + String getType(); + + void setId(String id); + + String getId(); + + void setVersion(long version); + + long getVersion(); + + void setVersionType(VersionType versionType); + + void setParent(String parent); + + String getParent(); + + void setRouting(String routing); + + String getRouting(); + + void setSource(Map source); + + Map getSource(); + + Self self(); + } + + /** + * {@link RequestWrapper} for {@link IndexRequest} + */ + public static class IndexRequestWrapper implements RequestWrapper { + + private final IndexRequest request; + + IndexRequestWrapper(IndexRequest request) { + this.request = Objects.requireNonNull(request, "Wrapped IndexRequest can not be null"); + } + + @Override + public void setIndex(String index) { + request.index(index); + } + + @Override + public String getIndex() { + return request.index(); + } + + @Override + public void setType(String type) { + request.type(type); + } + + @Override + public String getType() { + return request.type(); + } + + @Override + public void setId(String id) { + request.id(id); + } + + @Override + public String getId() { + return request.id(); + } + + @Override + public void setVersion(long version) { + request.version(version); + } + + @Override + public long getVersion() { + return request.version(); + } + + @Override + public void setVersionType(VersionType versionType) { + request.versionType(versionType); + } + + @Override + public void setParent(String parent) { + request.parent(parent); + } + + @Override + public String getParent() { + return request.parent(); + } + + @Override + public void setRouting(String routing) { + request.routing(routing); + } + + @Override + public String getRouting() { + return request.routing(); + } + + @Override + public Map getSource() { + return request.sourceAsMap(); + } + + @Override + public void setSource(Map source) { + request.source(source); + } + + @Override + public IndexRequest self() { + return request; + } + } + + /** + * Wraps a {@link IndexRequest} in a {@link RequestWrapper} + */ + static RequestWrapper wrap(IndexRequest request) { + return new IndexRequestWrapper(request); + } + + /** + * {@link RequestWrapper} for {@link DeleteRequest} + */ + public static class DeleteRequestWrapper implements RequestWrapper { + + private final DeleteRequest request; + + DeleteRequestWrapper(DeleteRequest request) { + this.request = Objects.requireNonNull(request, "Wrapped DeleteRequest can not be null"); + } + + @Override + public void setIndex(String index) { + request.index(index); + } + + @Override + public String getIndex() { + return request.index(); + } + + @Override + public void setType(String type) { + request.type(type); + } + + @Override + public String getType() { + return request.type(); + } + + @Override + public void setId(String id) { + request.id(id); + } + + @Override + public String getId() { + return request.id(); + } + + @Override + public void setVersion(long version) { + request.version(version); + } + + @Override + public long getVersion() { + return request.version(); + } + + @Override + public void setVersionType(VersionType versionType) { + request.versionType(versionType); + } + + @Override + public void setParent(String parent) { + request.parent(parent); + } + + @Override + public String getParent() { + return request.parent(); + } + + @Override + public void setRouting(String routing) { + request.routing(routing); + } + + @Override + public String getRouting() { + return request.routing(); + } + + @Override + public Map getSource() { + throw new UnsupportedOperationException("unable to get source from action request [" + request.getClass() + "]"); + } + + @Override + public void setSource(Map source) { + throw new UnsupportedOperationException("unable to set [source] on action request [" + request.getClass() + "]"); + } + + @Override + public DeleteRequest self() { + return request; + } + } + + /** + * Wraps a {@link DeleteRequest} in a {@link RequestWrapper} + */ + static RequestWrapper wrap(DeleteRequest request) { + return new DeleteRequestWrapper(request); + } + + /** + * Apply a {@link Script} to a {@link RequestWrapper} + */ + public abstract class ScriptApplier implements BiFunction, ScrollableHitSource.Hit, RequestWrapper> { + + private final WorkingBulkByScrollTask task; + private final ScriptService scriptService; + private final Script script; + private final Map params; + + private ExecutableScript executable; + private Map context; + + public ScriptApplier(WorkingBulkByScrollTask task, ScriptService scriptService, Script script, + Map params) { + this.task = task; + this.scriptService = scriptService; + this.script = script; + this.params = params; + } + + @Override + @SuppressWarnings("unchecked") + public RequestWrapper apply(RequestWrapper request, ScrollableHitSource.Hit doc) { + if (script == null) { + return request; + } + if (executable == null) { + CompiledScript compiled = scriptService.compile(script, ScriptContext.Standard.UPDATE, emptyMap()); + executable = scriptService.executable(compiled, params); + } + if (context == null) { + context = new HashMap<>(); + } else { + context.clear(); + } + + context.put(IndexFieldMapper.NAME, doc.getIndex()); + context.put(TypeFieldMapper.NAME, doc.getType()); + context.put(IdFieldMapper.NAME, doc.getId()); + Long oldVersion = doc.getVersion(); + context.put(VersionFieldMapper.NAME, oldVersion); + String oldParent = doc.getParent(); + context.put(ParentFieldMapper.NAME, oldParent); + String oldRouting = doc.getRouting(); + context.put(RoutingFieldMapper.NAME, oldRouting); + context.put(SourceFieldMapper.NAME, request.getSource()); + + OpType oldOpType = OpType.INDEX; + context.put("op", oldOpType.toString()); + + executable.setNextVar("ctx", context); + executable.run(); + + Map resultCtx = (Map) executable.unwrap(context); + String newOp = (String) resultCtx.remove("op"); + if (newOp == null) { + throw new IllegalArgumentException("Script cleared operation type"); + } + + /* + * It'd be lovely to only set the source if we know its been modified + * but it isn't worth keeping two copies of it around just to check! + */ + request.setSource((Map) resultCtx.remove(SourceFieldMapper.NAME)); + + Object newValue = resultCtx.remove(IndexFieldMapper.NAME); + if (false == doc.getIndex().equals(newValue)) { + scriptChangedIndex(request, newValue); + } + newValue = resultCtx.remove(TypeFieldMapper.NAME); + if (false == doc.getType().equals(newValue)) { + scriptChangedType(request, newValue); + } + newValue = resultCtx.remove(IdFieldMapper.NAME); + if (false == doc.getId().equals(newValue)) { + scriptChangedId(request, newValue); + } + newValue = resultCtx.remove(VersionFieldMapper.NAME); + if (false == Objects.equals(oldVersion, newValue)) { + scriptChangedVersion(request, newValue); + } + newValue = resultCtx.remove(ParentFieldMapper.NAME); + if (false == Objects.equals(oldParent, newValue)) { + scriptChangedParent(request, newValue); + } + /* + * Its important that routing comes after parent in case you want to + * change them both. + */ + newValue = resultCtx.remove(RoutingFieldMapper.NAME); + if (false == Objects.equals(oldRouting, newValue)) { + scriptChangedRouting(request, newValue); + } + + OpType newOpType = OpType.fromString(newOp); + if (newOpType != oldOpType) { + return scriptChangedOpType(request, oldOpType, newOpType); + } + + if (false == resultCtx.isEmpty()) { + throw new IllegalArgumentException("Invalid fields added to context [" + String.join(",", resultCtx.keySet()) + ']'); + } + return request; + } + + protected RequestWrapper scriptChangedOpType(RequestWrapper request, OpType oldOpType, OpType newOpType) { + switch (newOpType) { + case NOOP: + task.countNoop(); + return null; + case DELETE: + RequestWrapper delete = wrap(new DeleteRequest(request.getIndex(), request.getType(), request.getId())); + delete.setVersion(request.getVersion()); + delete.setVersionType(VersionType.INTERNAL); + delete.setParent(request.getParent()); + delete.setRouting(request.getRouting()); + return delete; + default: + throw new IllegalArgumentException("Unsupported operation type change from [" + oldOpType + "] to [" + newOpType + "]"); + } + } + + protected abstract void scriptChangedIndex(RequestWrapper request, Object to); + + protected abstract void scriptChangedType(RequestWrapper request, Object to); + + protected abstract void scriptChangedId(RequestWrapper request, Object to); + + protected abstract void scriptChangedVersion(RequestWrapper request, Object to); + + protected abstract void scriptChangedRouting(RequestWrapper request, Object to); + + protected abstract void scriptChangedParent(RequestWrapper request, Object to); + + } + + public enum OpType { + + NOOP("noop"), + INDEX("index"), + DELETE("delete"); + + private final String id; + + OpType(String id) { + this.id = id; + } + + public static OpType fromString(String opType) { + String lowerOpType = opType.toLowerCase(Locale.ROOT); + switch (lowerOpType) { + case "noop": + return OpType.NOOP; + case "index": + return OpType.INDEX; + case "delete": + return OpType.DELETE; + default: + throw new IllegalArgumentException("Operation type [" + lowerOpType + "] not allowed, only " + + Arrays.toString(values()) + " are allowed"); + } + } + + @Override + public String toString() { + return id.toLowerCase(Locale.ROOT); + } + } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java deleted file mode 100644 index 34ad6e2ee25ee..0000000000000 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java +++ /dev/null @@ -1,548 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.reindex; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.ParentTaskAssigningClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.mapper.IdFieldMapper; -import org.elasticsearch.index.mapper.IndexFieldMapper; -import org.elasticsearch.index.mapper.ParentFieldMapper; -import org.elasticsearch.index.mapper.RoutingFieldMapper; -import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.TypeFieldMapper; -import org.elasticsearch.index.mapper.VersionFieldMapper; -import org.elasticsearch.script.CompiledScript; -import org.elasticsearch.script.ExecutableScript; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptContext; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.function.BiFunction; - -import static java.util.Collections.emptyMap; - -/** - * Abstract base for scrolling across a search and executing bulk indexes on all - * results. - */ -public abstract class AbstractAsyncBulkIndexByScrollAction> - extends AbstractAsyncBulkByScrollAction { - - protected final ScriptService scriptService; - protected final ClusterState clusterState; - - /** - * This BiFunction is used to apply various changes depending of the Reindex action and the search hit, - * from copying search hit metadata (parent, routing, etc) to potentially transforming the - * {@link RequestWrapper} completely. - */ - private final BiFunction, ScrollableHitSource.Hit, RequestWrapper> scriptApplier; - - public AbstractAsyncBulkIndexByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, - ThreadPool threadPool, Request mainRequest, - ActionListener listener, - ScriptService scriptService, ClusterState clusterState) { - super(task, logger, client, threadPool, mainRequest, listener); - this.scriptService = scriptService; - this.clusterState = clusterState; - this.scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null"); - } - - /** - * Build the {@link BiFunction} to apply to all {@link RequestWrapper}. - */ - protected BiFunction, ScrollableHitSource.Hit, RequestWrapper> buildScriptApplier() { - // The default script applier executes a no-op - return (request, searchHit) -> request; - } - - @Override - protected BulkRequest buildBulk(Iterable docs) { - BulkRequest bulkRequest = new BulkRequest(); - for (ScrollableHitSource.Hit doc : docs) { - if (accept(doc)) { - RequestWrapper request = scriptApplier.apply(copyMetadata(buildRequest(doc), doc), doc); - if (request != null) { - bulkRequest.add(request.self()); - } - } - } - return bulkRequest; - } - - /** - * Used to accept or ignore a search hit. Ignored search hits will be excluded - * from the bulk request. It is also where we fail on invalid search hits, like - * when the document has no source but it's required. - */ - protected boolean accept(ScrollableHitSource.Hit doc) { - if (doc.getSource() == null) { - /* - * Either the document didn't store _source or we didn't fetch it for some reason. Since we don't allow the user to - * change the "fields" part of the search request it is unlikely that we got here because we didn't fetch _source. - * Thus the error message assumes that it wasn't stored. - */ - throw new IllegalArgumentException("[" + doc.getIndex() + "][" + doc.getType() + "][" + doc.getId() + "] didn't store _source"); - } - return true; - } - - /** - * Build the {@link RequestWrapper} for a single search hit. This shouldn't handle - * metadata or scripting. That will be handled by copyMetadata and - * apply functions that can be overridden. - */ - protected abstract RequestWrapper buildRequest(ScrollableHitSource.Hit doc); - - /** - * Copies the metadata from a hit to the request. - */ - protected RequestWrapper copyMetadata(RequestWrapper request, ScrollableHitSource.Hit doc) { - request.setParent(doc.getParent()); - copyRouting(request, doc.getRouting()); - return request; - } - - /** - * Copy the routing from a search hit to the request. - */ - protected void copyRouting(RequestWrapper request, String routing) { - request.setRouting(routing); - } - - /** - * Wrapper for the {@link DocWriteRequest} that are used in this action class. - */ - interface RequestWrapper> { - - void setIndex(String index); - - String getIndex(); - - void setType(String type); - - String getType(); - - void setId(String id); - - String getId(); - - void setVersion(long version); - - long getVersion(); - - void setVersionType(VersionType versionType); - - void setParent(String parent); - - String getParent(); - - void setRouting(String routing); - - String getRouting(); - - void setSource(Map source); - - Map getSource(); - - Self self(); - } - - /** - * {@link RequestWrapper} for {@link IndexRequest} - */ - public static class IndexRequestWrapper implements RequestWrapper { - - private final IndexRequest request; - - IndexRequestWrapper(IndexRequest request) { - this.request = Objects.requireNonNull(request, "Wrapped IndexRequest can not be null"); - } - - @Override - public void setIndex(String index) { - request.index(index); - } - - @Override - public String getIndex() { - return request.index(); - } - - @Override - public void setType(String type) { - request.type(type); - } - - @Override - public String getType() { - return request.type(); - } - - @Override - public void setId(String id) { - request.id(id); - } - - @Override - public String getId() { - return request.id(); - } - - @Override - public void setVersion(long version) { - request.version(version); - } - - @Override - public long getVersion() { - return request.version(); - } - - @Override - public void setVersionType(VersionType versionType) { - request.versionType(versionType); - } - - @Override - public void setParent(String parent) { - request.parent(parent); - } - - @Override - public String getParent() { - return request.parent(); - } - - @Override - public void setRouting(String routing) { - request.routing(routing); - } - - @Override - public String getRouting() { - return request.routing(); - } - - @Override - public Map getSource() { - return request.sourceAsMap(); - } - - @Override - public void setSource(Map source) { - request.source(source); - } - - @Override - public IndexRequest self() { - return request; - } - } - - /** - * Wraps a {@link IndexRequest} in a {@link RequestWrapper} - */ - static RequestWrapper wrap(IndexRequest request) { - return new IndexRequestWrapper(request); - } - - /** - * {@link RequestWrapper} for {@link DeleteRequest} - */ - public static class DeleteRequestWrapper implements RequestWrapper { - - private final DeleteRequest request; - - DeleteRequestWrapper(DeleteRequest request) { - this.request = Objects.requireNonNull(request, "Wrapped DeleteRequest can not be null"); - } - - @Override - public void setIndex(String index) { - request.index(index); - } - - @Override - public String getIndex() { - return request.index(); - } - - @Override - public void setType(String type) { - request.type(type); - } - - @Override - public String getType() { - return request.type(); - } - - @Override - public void setId(String id) { - request.id(id); - } - - @Override - public String getId() { - return request.id(); - } - - @Override - public void setVersion(long version) { - request.version(version); - } - - @Override - public long getVersion() { - return request.version(); - } - - @Override - public void setVersionType(VersionType versionType) { - request.versionType(versionType); - } - - @Override - public void setParent(String parent) { - request.parent(parent); - } - - @Override - public String getParent() { - return request.parent(); - } - - @Override - public void setRouting(String routing) { - request.routing(routing); - } - - @Override - public String getRouting() { - return request.routing(); - } - - @Override - public Map getSource() { - throw new UnsupportedOperationException("unable to get source from action request [" + request.getClass() + "]"); - } - - @Override - public void setSource(Map source) { - throw new UnsupportedOperationException("unable to set [source] on action request [" + request.getClass() + "]"); - } - - @Override - public DeleteRequest self() { - return request; - } - } - - /** - * Wraps a {@link DeleteRequest} in a {@link RequestWrapper} - */ - static RequestWrapper wrap(DeleteRequest request) { - return new DeleteRequestWrapper(request); - } - - /** - * Apply a {@link Script} to a {@link RequestWrapper} - */ - public abstract class ScriptApplier implements BiFunction, ScrollableHitSource.Hit, RequestWrapper> { - - private final WorkingBulkByScrollTask task; - private final ScriptService scriptService; - private final Script script; - private final Map params; - - private ExecutableScript executable; - private Map context; - - public ScriptApplier(WorkingBulkByScrollTask task, ScriptService scriptService, Script script, - Map params) { - this.task = task; - this.scriptService = scriptService; - this.script = script; - this.params = params; - } - - @Override - @SuppressWarnings("unchecked") - public RequestWrapper apply(RequestWrapper request, ScrollableHitSource.Hit doc) { - if (script == null) { - return request; - } - if (executable == null) { - CompiledScript compiled = scriptService.compile(script, ScriptContext.Standard.UPDATE, emptyMap()); - executable = scriptService.executable(compiled, params); - } - if (context == null) { - context = new HashMap<>(); - } else { - context.clear(); - } - - context.put(IndexFieldMapper.NAME, doc.getIndex()); - context.put(TypeFieldMapper.NAME, doc.getType()); - context.put(IdFieldMapper.NAME, doc.getId()); - Long oldVersion = doc.getVersion(); - context.put(VersionFieldMapper.NAME, oldVersion); - String oldParent = doc.getParent(); - context.put(ParentFieldMapper.NAME, oldParent); - String oldRouting = doc.getRouting(); - context.put(RoutingFieldMapper.NAME, oldRouting); - context.put(SourceFieldMapper.NAME, request.getSource()); - - OpType oldOpType = OpType.INDEX; - context.put("op", oldOpType.toString()); - - executable.setNextVar("ctx", context); - executable.run(); - - Map resultCtx = (Map) executable.unwrap(context); - String newOp = (String) resultCtx.remove("op"); - if (newOp == null) { - throw new IllegalArgumentException("Script cleared operation type"); - } - - /* - * It'd be lovely to only set the source if we know its been modified - * but it isn't worth keeping two copies of it around just to check! - */ - request.setSource((Map) resultCtx.remove(SourceFieldMapper.NAME)); - - Object newValue = resultCtx.remove(IndexFieldMapper.NAME); - if (false == doc.getIndex().equals(newValue)) { - scriptChangedIndex(request, newValue); - } - newValue = resultCtx.remove(TypeFieldMapper.NAME); - if (false == doc.getType().equals(newValue)) { - scriptChangedType(request, newValue); - } - newValue = resultCtx.remove(IdFieldMapper.NAME); - if (false == doc.getId().equals(newValue)) { - scriptChangedId(request, newValue); - } - newValue = resultCtx.remove(VersionFieldMapper.NAME); - if (false == Objects.equals(oldVersion, newValue)) { - scriptChangedVersion(request, newValue); - } - newValue = resultCtx.remove(ParentFieldMapper.NAME); - if (false == Objects.equals(oldParent, newValue)) { - scriptChangedParent(request, newValue); - } - /* - * Its important that routing comes after parent in case you want to - * change them both. - */ - newValue = resultCtx.remove(RoutingFieldMapper.NAME); - if (false == Objects.equals(oldRouting, newValue)) { - scriptChangedRouting(request, newValue); - } - - OpType newOpType = OpType.fromString(newOp); - if (newOpType != oldOpType) { - return scriptChangedOpType(request, oldOpType, newOpType); - } - - if (false == resultCtx.isEmpty()) { - throw new IllegalArgumentException("Invalid fields added to context [" + String.join(",", resultCtx.keySet()) + ']'); - } - return request; - } - - protected RequestWrapper scriptChangedOpType(RequestWrapper request, OpType oldOpType, OpType newOpType) { - switch (newOpType) { - case NOOP: - task.countNoop(); - return null; - case DELETE: - RequestWrapper delete = wrap(new DeleteRequest(request.getIndex(), request.getType(), request.getId())); - delete.setVersion(request.getVersion()); - delete.setVersionType(VersionType.INTERNAL); - delete.setParent(request.getParent()); - delete.setRouting(request.getRouting()); - return delete; - default: - throw new IllegalArgumentException("Unsupported operation type change from [" + oldOpType + "] to [" + newOpType + "]"); - } - } - - protected abstract void scriptChangedIndex(RequestWrapper request, Object to); - - protected abstract void scriptChangedType(RequestWrapper request, Object to); - - protected abstract void scriptChangedId(RequestWrapper request, Object to); - - protected abstract void scriptChangedVersion(RequestWrapper request, Object to); - - protected abstract void scriptChangedRouting(RequestWrapper request, Object to); - - protected abstract void scriptChangedParent(RequestWrapper request, Object to); - - } - - public enum OpType { - - NOOP("noop"), - INDEX("index"), - DELETE("delete"); - - private final String id; - - OpType(String id) { - this.id = id; - } - - public static OpType fromString(String opType) { - String lowerOpType = opType.toLowerCase(Locale.ROOT); - switch (lowerOpType) { - case "noop": - return OpType.NOOP; - case "index": - return OpType.INDEX; - case "delete": - return OpType.DELETE; - default: - throw new IllegalArgumentException("Operation type [" + lowerOpType + "] not allowed, only " + - Arrays.toString(values()) + " are allowed"); - } - } - - @Override - public String toString() { - return id.toLowerCase(Locale.ROOT); - } - } -} diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java index 9ee5d63d01b71..dd01f8b9889db 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java @@ -59,8 +59,8 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener { - + static class AsyncDeleteBySearchAction extends AbstractAsyncBulkByScrollAction { public AsyncDeleteBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, - ThreadPool threadPool, DeleteByQueryRequest request, ActionListener listener, - ScriptService scriptService, ClusterState clusterState) { - super(task, logger, client, threadPool, request, listener, scriptService, clusterState); + ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService, ClusterState clusterState, + ActionListener listener) { + super(task, logger, client, threadPool, request, scriptService, clusterState, listener); } @Override @@ -107,8 +106,7 @@ protected RequestWrapper buildRequest(ScrollableHitSource.Hit doc } /** - * Overrides the parent {@link AbstractAsyncBulkIndexByScrollAction#copyMetadata(RequestWrapper, ScrollableHitSource.Hit)} - * method that is much more Update/Reindex oriented and so also copies things like timestamp/ttl which we + * Overrides the parent's implementation is much more Update/Reindex oriented and so also copies things like timestamp/ttl which we * don't care for a deletion. */ @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index 183b396b6def5..4e0562668f8c7 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -113,8 +113,8 @@ protected void doExecute(Task task, ReindexRequest request, ActionListener { + static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction { /** * List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin * {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the @@ -240,9 +240,9 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollActi private List createdThreads = emptyList(); public AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, - ThreadPool threadPool, ReindexRequest request, ActionListener listener, - ScriptService scriptService, ClusterState clusterState) { - super(task, logger, client, threadPool, request, listener, scriptService, clusterState); + ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState, + ActionListener listener) { + super(task, logger, client, threadPool, request, scriptService, clusterState, listener); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index 009ffabb73bff..04e676a03cfa1 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -70,8 +70,8 @@ protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener } else { ClusterState state = clusterService.state(); ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); - new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService, - state).start(); + new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state, + listener).start(); } } @@ -83,12 +83,11 @@ protected void doExecute(UpdateByQueryRequest request, ActionListener { - + static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction { public AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, - ThreadPool threadPool, UpdateByQueryRequest request, ActionListener listener, - ScriptService scriptService, ClusterState clusterState) { - super(task, logger, client, threadPool, request, listener, scriptService, clusterState); + ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState, + ActionListener listener) { + super(task, logger, client, threadPool, request, scriptService, clusterState, listener); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexbyScrollActionMetadataTestCase.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionMetadataTestCase.java similarity index 82% rename from modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexbyScrollActionMetadataTestCase.java rename to modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionMetadataTestCase.java index cb9ec0c273ba2..81a9aa46abcd0 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexbyScrollActionMetadataTestCase.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionMetadataTestCase.java @@ -19,14 +19,14 @@ package org.elasticsearch.index.reindex; -public abstract class AbstractAsyncBulkIndexbyScrollActionMetadataTestCase< +public abstract class AbstractAsyncBulkByScrollActionMetadataTestCase< Request extends AbstractBulkIndexByScrollRequest, Response extends BulkIndexByScrollResponse> - extends AbstractAsyncBulkIndexByScrollActionTestCase { + extends AbstractAsyncBulkByScrollActionTestCase { protected ScrollableHitSource.BasicHit doc() { return new ScrollableHitSource.BasicHit("index", "type", "id", 0); } - protected abstract AbstractAsyncBulkIndexByScrollAction action(); + protected abstract AbstractAsyncBulkByScrollAction action(); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionScriptTestCase.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionScriptTestCase.java similarity index 86% rename from modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionScriptTestCase.java rename to modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionScriptTestCase.java index 36c392622bc9b..f065eba40a274 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionScriptTestCase.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionScriptTestCase.java @@ -22,8 +22,8 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.index.reindex.AbstractAsyncBulkIndexByScrollAction.OpType; -import org.elasticsearch.index.reindex.AbstractAsyncBulkIndexByScrollAction.RequestWrapper; +import org.elasticsearch.index.reindex.AbstractAsyncBulkByScrollAction.OpType; +import org.elasticsearch.index.reindex.AbstractAsyncBulkByScrollAction.RequestWrapper; import org.elasticsearch.script.CompiledScript; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.Script; @@ -40,10 +40,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase< +public abstract class AbstractAsyncBulkByScrollActionScriptTestCase< Request extends AbstractBulkIndexByScrollRequest, Response extends BulkIndexByScrollResponse> - extends AbstractAsyncBulkIndexByScrollActionTestCase { + extends AbstractAsyncBulkByScrollActionTestCase { private static final Script EMPTY_SCRIPT = new Script(""); @@ -62,8 +62,8 @@ protected T applyScript(Consumer> when(scriptService.executable(any(CompiledScript.class), Matchers.>any())) .thenReturn(executableScript); - AbstractAsyncBulkIndexByScrollAction action = action(scriptService, request().setScript(EMPTY_SCRIPT)); - RequestWrapper result = action.buildScriptApplier().apply(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc); + AbstractAsyncBulkByScrollAction action = action(scriptService, request().setScript(EMPTY_SCRIPT)); + RequestWrapper result = action.buildScriptApplier().apply(AbstractAsyncBulkByScrollAction.wrap(index), doc); return (result != null) ? (T) result.self() : null; } @@ -104,5 +104,5 @@ public void testSetOpTypeUnknown() throws Exception { assertThat(e.getMessage(), equalTo("Operation type [unknown] not allowed, only [noop, index, delete] are allowed")); } - protected abstract AbstractAsyncBulkIndexByScrollAction action(ScriptService scriptService, Request request); + protected abstract AbstractAsyncBulkByScrollAction action(ScriptService scriptService, Request request); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java similarity index 96% rename from modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java rename to modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java index 3df61f369153e..0fba30bd422e8 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java @@ -27,7 +27,7 @@ import org.junit.After; import org.junit.Before; -public abstract class AbstractAsyncBulkIndexByScrollActionTestCase< +public abstract class AbstractAsyncBulkByScrollActionTestCase< Request extends AbstractBulkIndexByScrollRequest, Response extends BulkIndexByScrollResponse> extends ESTestCase { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index de33688a4c8a4..23192f083d987 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -55,6 +55,7 @@ import org.elasticsearch.client.FilterClient; import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.unit.TimeValue; @@ -170,7 +171,7 @@ private String scrollId() { public void testStartRetriesOnRejectionAndSucceeds() throws Exception { client.searchesToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1); - DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); + DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); action.start(); assertBusy(() -> assertEquals(client.searchesToReject + 1, client.searchAttempts.get())); if (listener.isDone()) { @@ -183,7 +184,7 @@ public void testStartRetriesOnRejectionAndSucceeds() throws Exception { public void testStartRetriesOnRejectionButFailsOnTooManyRejections() throws Exception { client.searchesToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100); - DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); + DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); action.start(); assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.searchAttempts.get())); assertBusy(() -> assertTrue(listener.isDone())); @@ -195,7 +196,7 @@ public void testStartRetriesOnRejectionButFailsOnTooManyRejections() throws Exce public void testStartNextScrollRetriesOnRejectionAndSucceeds() throws Exception { client.scrollsToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1); - DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); + DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); action.setScroll(scrollId()); action.startNextScroll(timeValueNanos(System.nanoTime()), 0); assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get())); @@ -209,7 +210,7 @@ public void testStartNextScrollRetriesOnRejectionAndSucceeds() throws Exception public void testStartNextScrollRetriesOnRejectionButFailsOnTooManyRejections() throws Exception { client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100); - DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); + DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); action.setScroll(scrollId()); action.startNextScroll(timeValueNanos(System.nanoTime()), 0); assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get())); @@ -226,7 +227,7 @@ public void testScrollResponseSetsTotal() { long total = randomIntBetween(0, Integer.MAX_VALUE); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null); - simulateScrollResponse(new DummyAbstractAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response); assertEquals(total, testTask.getStatus().getTotal()); } @@ -238,7 +239,7 @@ public void testScrollResponseBatchingBehavior() throws Exception { for (int batches = 1; batches < maxBatches; batches++) { Hit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null); - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response); // Use assert busy because the update happens on another thread @@ -291,7 +292,7 @@ public void testBulkResponseSetsLotsOfStatus() { opType, new IndexResponse(shardId, "type", "id" + i, randomInt(20), randomInt(), createdResponse)); } - new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0)); + new DummyAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0)); assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts()); assertEquals(updated, testTask.getStatus().getUpdated()); assertEquals(created, testTask.getStatus().getCreated()); @@ -316,7 +317,7 @@ public ScheduledFuture schedule(TimeValue delay, String name, Runnable comman } }); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null); - simulateScrollResponse(new DummyAbstractAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]")); assertThat(client.scrollsCleared, contains(scrollId)); @@ -333,7 +334,7 @@ public void testShardFailuresAbortRequest() throws Exception { SearchFailure shardFailure = new SearchFailure(new RuntimeException("test")); ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0, emptyList(), null); - simulateScrollResponse(new DummyAbstractAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse); BulkIndexByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), empty()); assertThat(response.getSearchFailures(), contains(shardFailure)); @@ -347,7 +348,7 @@ public void testShardFailuresAbortRequest() throws Exception { */ public void testSearchTimeoutsAbortRequest() throws Exception { ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null); - simulateScrollResponse(new DummyAbstractAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse); BulkIndexByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), empty()); assertThat(response.getSearchFailures(), empty()); @@ -361,7 +362,7 @@ public void testSearchTimeoutsAbortRequest() throws Exception { */ public void testBulkFailuresAbortRequest() throws Exception { Failure failure = new Failure("index", "type", "id", new RuntimeException("test")); - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong()); action.onBulkResponse(timeValueNanos(System.nanoTime()), bulkResponse); @@ -374,14 +375,15 @@ public void testBulkFailuresAbortRequest() throws Exception { /** * Mimicks script failures or general wrongness by implementers. */ - public void testListenerReceiveBuildBulkExceptions() throws Exception { - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction() { + public void testBuildRequestThrowsException() throws Exception { + DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction() { @Override - protected BulkRequest buildBulk(Iterable docs) { + protected AbstractAsyncBulkByScrollAction.RequestWrapper buildRequest(Hit doc) { throw new RuntimeException("surprise"); } }; - Hit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0); + ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0); + hit.setSource(new BytesArray("{}")); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null); simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); @@ -426,7 +428,7 @@ public ScheduledFuture schedule(TimeValue delay, String name, Runnable comman } }); - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); action.setScroll(scrollId()); // Set the base for the scroll to wait - this is added to the figure we calculate below @@ -478,7 +480,7 @@ private void bulkRetryTestCase(boolean failWithRejection) throws Exception { * deal with it. We just wait for it to happen. */ CountDownLatch successLatch = new CountDownLatch(1); - DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() { + DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() { @Override void startNextScroll(TimeValue lastBatchStartTime, int lastBatchSize) { successLatch.countDown(); @@ -504,7 +506,7 @@ void startNextScroll(TimeValue lastBatchStartTime, int lastBatchSize) { * The default retry time matches what we say it is in the javadoc for the request. */ public void testDefaultRetryTimes() { - Iterator policy = new DummyAbstractAsyncBulkByScrollAction().buildBackoffPolicy().iterator(); + Iterator policy = new DummyAsyncBulkByScrollAction().buildBackoffPolicy().iterator(); long millis = 0; while (policy.hasNext()) { millis += policy.next().millis(); @@ -537,7 +539,7 @@ private void refreshTestCase(Boolean refresh, boolean addDestinationIndexes, boo if (refresh != null) { testRequest.setRefresh(refresh); } - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); if (addDestinationIndexes) { action.addDestinationIndices(singleton("foo")); } @@ -550,34 +552,34 @@ private void refreshTestCase(Boolean refresh, boolean addDestinationIndexes, boo } public void testCancelBeforeInitialSearch() throws Exception { - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.start()); + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.start()); } public void testCancelBeforeScrollResponse() throws Exception { // We bail so early we don't need to pass in a half way valid response. - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1, + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1, null)); } public void testCancelBeforeSendBulkRequest() throws Exception { // We bail so early we don't need to pass in a half way valid request. - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.sendBulkRequest(timeValueNanos(System.nanoTime()), null)); + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.sendBulkRequest(timeValueNanos(System.nanoTime()), null)); } public void testCancelBeforeOnBulkResponse() throws Exception { // We bail so early we don't need to pass in a half way valid response. - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(new BulkItemResponse[0], 0))); } public void testCancelBeforeStartNextScroll() throws Exception { - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll(timeValueNanos(System.nanoTime()), 0)); + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.startNextScroll(timeValueNanos(System.nanoTime()), 0)); } public void testCancelBeforeRefreshAndFinish() throws Exception { // Refresh or not doesn't matter - we don't try to refresh. testRequest.setRefresh(usually()); - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.refreshAndFinish(emptyList(), emptyList(), false)); + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.refreshAndFinish(emptyList(), emptyList(), false)); assertNull("No refresh was attempted", client.lastRefreshRequest.get()); } @@ -610,7 +612,7 @@ public ScheduledFuture schedule(TimeValue delay, String name, Runnable comman }); // Send the scroll response which will trigger the custom thread pool above, canceling the request before running the response - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); boolean previousScrollSet = usually(); if (previousScrollSet) { action.setScroll(scrollId()); @@ -629,8 +631,8 @@ public ScheduledFuture schedule(TimeValue delay, String name, Runnable comman } } - private void cancelTaskCase(Consumer testMe) throws Exception { - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + private void cancelTaskCase(Consumer testMe) throws Exception { + DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); boolean previousScrollSet = usually(); if (previousScrollSet) { action.setScroll(scrollId()); @@ -648,17 +650,16 @@ private void cancelTaskCase(Consumer testM /** * Simulate a scroll response by setting the scroll id and firing the onScrollResponse method. */ - private void simulateScrollResponse(DummyAbstractAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize, + private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize, ScrollableHitSource.Response response) { action.setScroll(scrollId()); action.onScrollResponse(lastBatchTime, lastBatchSize, response); } - private class DummyAbstractAsyncBulkByScrollAction - extends AbstractAsyncBulkByScrollAction { - public DummyAbstractAsyncBulkByScrollAction() { + private class DummyAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction { + public DummyAsyncBulkByScrollAction() { super(testTask, AsyncBulkByScrollActionTests.this.logger, new ParentTaskAssigningClient(client, localNode, testTask), - client.threadPool(), testRequest, listener); + client.threadPool(), testRequest, null, null, listener); } @Override @@ -667,15 +668,15 @@ protected boolean needsSourceDocumentVersions() { } @Override - protected BulkRequest buildBulk(Iterable docs) { - return new BulkRequest(); + protected AbstractAsyncBulkByScrollAction.RequestWrapper buildRequest(Hit doc) { + throw new UnsupportedOperationException("Use another override to test this."); } } /** - * An extension to {@linkplain DummyAbstractAsyncBulkByScrollAction} that uses a 0 delaying backoff policy. + * An extension to {@linkplain DummyAsyncBulkByScrollAction} that uses a 0 delaying backoff policy. */ - private class DummyActionWithoutBackoff extends DummyAbstractAsyncBulkByScrollAction { + private class DummyActionWithoutBackoff extends DummyAsyncBulkByScrollAction { @Override BackoffPolicy buildBackoffPolicy() { // Force a backoff time of 0 to prevent sleeping diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java index dab0cab8d8a3b..b24ab339e4d35 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java @@ -25,10 +25,10 @@ /** * Index-by-search test for ttl, timestamp, and routing. */ -public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMetadataTestCase { +public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadataTestCase { public void testRoutingCopiedByDefault() throws Exception { IndexRequest index = new IndexRequest(); - action().copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo")); + action().copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo")); assertEquals("foo", index.routing()); } @@ -36,7 +36,7 @@ public void testRoutingCopiedIfRequested() throws Exception { TransportReindexAction.AsyncIndexBySearchAction action = action(); action.mainRequest.getDestination().routing("keep"); IndexRequest index = new IndexRequest(); - action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo")); + action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo")); assertEquals("foo", index.routing()); } @@ -44,7 +44,7 @@ public void testRoutingDiscardedIfRequested() throws Exception { TransportReindexAction.AsyncIndexBySearchAction action = action(); action.mainRequest.getDestination().routing("discard"); IndexRequest index = new IndexRequest(); - action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo")); + action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo")); assertEquals(null, index.routing()); } @@ -52,7 +52,7 @@ public void testRoutingSetIfRequested() throws Exception { TransportReindexAction.AsyncIndexBySearchAction action = action(); action.mainRequest.getDestination().routing("=cat"); IndexRequest index = new IndexRequest(); - action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo")); + action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo")); assertEquals("cat", index.routing()); } @@ -60,13 +60,13 @@ public void testRoutingSetIfWithDegenerateValue() throws Exception { TransportReindexAction.AsyncIndexBySearchAction action = action(); action.mainRequest.getDestination().routing("==]"); IndexRequest index = new IndexRequest(); - action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo")); + action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo")); assertEquals("=]", index.routing()); } @Override protected TransportReindexAction.AsyncIndexBySearchAction action() { - return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), listener(), null, null); + return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), null, null, listener()); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java index 66b681b149477..957322557ac6c 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java @@ -31,7 +31,7 @@ /** * Tests index-by-search with a script modifying the documents. */ -public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScriptTestCase { +public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTestCase { public void testSetIndex() throws Exception { Object dest = randomFrom(new Object[] {234, 234L, "pancake"}); @@ -109,8 +109,8 @@ protected ReindexRequest request() { } @Override - protected AbstractAsyncBulkIndexByScrollAction action(ScriptService scriptService, ReindexRequest request) { - return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, listener(), scriptService, - null); + protected TransportReindexAction.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) { + return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService, null, + listener()); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java index 6ebb0749792da..c31d7ae1deb51 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java @@ -23,16 +23,16 @@ import org.elasticsearch.action.search.SearchRequest; public class UpdateByQueryMetadataTests - extends AbstractAsyncBulkIndexbyScrollActionMetadataTestCase { + extends AbstractAsyncBulkByScrollActionMetadataTestCase { public void testRoutingIsCopied() throws Exception { IndexRequest index = new IndexRequest(); - action().copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo")); + action().copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo")); assertEquals("foo", index.routing()); } @Override protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action() { - return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), listener(), null, null); + return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), null, null, listener()); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java index 5ff54e4e06d80..6b8ca186145a6 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java @@ -28,7 +28,7 @@ import static org.hamcrest.Matchers.containsString; public class UpdateByQueryWithScriptTests - extends AbstractAsyncBulkIndexByScrollActionScriptTestCase { + extends AbstractAsyncBulkByScrollActionScriptTestCase { public void testModifyingCtxNotAllowed() { /* @@ -53,8 +53,8 @@ protected UpdateByQueryRequest request() { } @Override - protected AbstractAsyncBulkIndexByScrollAction action(ScriptService scriptService, UpdateByQueryRequest request) { - return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, listener(), - scriptService, null); + protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action(ScriptService scriptService, UpdateByQueryRequest request) { + return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService, null, + listener()); } }