From c79d9d8b2137aaab1c5801ae725674e3dbd1e145 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 10 Apr 2019 11:26:55 +0200 Subject: [PATCH 1/4] Simplify TransortShardBulkAction#performOnReplica * Resolve TODO since 8.0 doesn't have to worry about pre 6.x nodes * Remove test for removed method since the logic is now completely internal to `performOnReplica` --- .../action/bulk/TransportShardBulkAction.java | 84 ++++--------------- .../bulk/TransportShardBulkActionTests.java | 43 ---------- 2 files changed, 15 insertions(+), 112 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index da30dedfe5e60..9eca339a95cbd 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -338,9 +338,8 @@ private static boolean isConflictException(final Exception e) { /** * Creates a new bulk item result from the given requests and result of performing the update operation on the shard. */ - static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex, - BulkItemResponse operationResponse, - final UpdateHelper.Result translate) { + private static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex, + BulkItemResponse operationResponse, final UpdateHelper.Result translate) { final BulkItemResponse response; DocWriteResponse.Result translatedResult = translate.getResponseResult(); if (operationResponse.isFailed()) { @@ -382,54 +381,6 @@ static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, return response; } - - /** Modes for executing item request on replica depending on corresponding primary execution result */ - public enum ReplicaItemExecutionMode { - - /** - * When primary execution succeeded - */ - NORMAL, - - /** - * When primary execution failed before sequence no was generated - * or primary execution was a noop (only possible when request is originating from pre-6.0 nodes) - */ - NOOP, - - /** - * When primary execution failed after sequence no was generated - */ - FAILURE - } - - /** - * Determines whether a bulk item request should be executed on the replica. - * - * @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures - * {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation - * {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or - * when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes) - */ - static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest request, final int index) { - final BulkItemResponse primaryResponse = request.getPrimaryResponse(); - assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]"; - if (primaryResponse.isFailed()) { - return primaryResponse.getFailure().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO - ? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op - : ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication - } else { - // TODO: once we know for sure that every operation that has been processed on the primary is assigned a seq# - // (i.e., all nodes on the cluster are on v6.0.0 or higher) we can use the existence of a seq# to indicate whether - // an operation should be processed or be treated as a noop. This means we could remove this method and the - // ReplicaItemExecutionMode enum and have a simple boolean check for seq != UNASSIGNED_SEQ_NO which will work for - // both failures and indexing operations. - return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP - ? ReplicaItemExecutionMode.NORMAL // execution successful on primary - : ReplicaItemExecutionMode.NOOP; // ignore replication - } - } - @Override public WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { final Translog.Location location = performOnReplica(request, replica); @@ -442,25 +393,20 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index BulkItemRequest item = request.items()[i]; final Engine.Result operationResult; DocWriteRequest docWriteRequest = item.request(); - switch (replicaItemExecutionMode(item, i)) { - case NORMAL: - final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); - operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica); - assert operationResult != null : "operation result must never be null when primary response has no failure"; - location = syncOperationResultOrThrow(operationResult, location); - break; - case NOOP: - break; - case FAILURE: - final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); - assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned"; - operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage()); - assert operationResult != null : "operation result must never be null when primary response has no failure"; - location = syncOperationResultOrThrow(operationResult, location); - break; - default: - throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest); + final BulkItemResponse response = item.getPrimaryResponse(); + final BulkItemResponse.Failure failure = response.getFailure(); + final DocWriteResponse writeResponse = response.getResponse(); + final long seqNum = failure == null ? writeResponse.getSeqNo() : failure.getSeqNo(); + if (seqNum == SequenceNumbers.UNASSIGNED_SEQ_NO) { + continue; + } + if (failure == null) { + operationResult = performOpOnReplica(writeResponse, docWriteRequest, replica); + } else { + operationResult = replica.markSeqNoAsNoop(seqNum, failure.getMessage()); } + assert operationResult != null : "operation result must never be null when primary response has no failure"; + location = syncOperationResultOrThrow(operationResult, location); } return location; } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 62217f7873138..55078840153f2 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.LatchedActionListener; -import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -59,7 +58,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; @@ -96,47 +94,6 @@ private IndexMetaData indexMetaData() throws IOException { .primaryTerm(0, 1).build(); } - public void testShouldExecuteReplicaItem() throws Exception { - // Successful index request should be replicated - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); - DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean()); - BulkItemRequest request = new BulkItemRequest(0, writeRequest); - request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); - assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.NORMAL)); - - // Failed index requests without sequence no should not be replicated - writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); - request = new BulkItemRequest(0, writeRequest); - request.setPrimaryResponse( - new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, - new BulkItemResponse.Failure("index", "type", "id", - new IllegalArgumentException("i died")))); - assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.NOOP)); - - // Failed index requests with sequence no should be replicated - request = new BulkItemRequest(0, writeRequest); - request.setPrimaryResponse( - new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, - new BulkItemResponse.Failure("index", "type", "id", - new IllegalArgumentException( - "i died after sequence no was generated"), - 1))); - assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.FAILURE)); - // NOOP requests should not be replicated - DocWriteRequest updateRequest = new UpdateRequest("index", "type", "id"); - response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP); - request = new BulkItemRequest(0, updateRequest); - request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, - response)); - assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.NOOP)); - } - public void testExecuteBulkIndexRequest() throws Exception { IndexShard shard = newStartedShard(true); From 23ea5d5c74963de0ddf974d6ca6712c3f796b70e Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 12 Apr 2019 08:29:08 +0200 Subject: [PATCH 2/4] CR: add assertion --- .../org/elasticsearch/action/bulk/TransportShardBulkAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 9eca339a95cbd..ab440c14acf85 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -398,6 +398,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index final DocWriteResponse writeResponse = response.getResponse(); final long seqNum = failure == null ? writeResponse.getSeqNo() : failure.getSeqNo(); if (seqNum == SequenceNumbers.UNASSIGNED_SEQ_NO) { + assert failure != null; continue; } if (failure == null) { From 316dc15b9daed9ca5c5fb5ea25273601d54be2df Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 12 Apr 2019 09:40:45 +0200 Subject: [PATCH 3/4] Revert "CR: add assertion" This reverts commit 23ea5d5c74963de0ddf974d6ca6712c3f796b70e. --- .../org/elasticsearch/action/bulk/TransportShardBulkAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index ab440c14acf85..9eca339a95cbd 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -398,7 +398,6 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index final DocWriteResponse writeResponse = response.getResponse(); final long seqNum = failure == null ? writeResponse.getSeqNo() : failure.getSeqNo(); if (seqNum == SequenceNumbers.UNASSIGNED_SEQ_NO) { - assert failure != null; continue; } if (failure == null) { From b42e82ee43f23e1b5414055505ea627fb6d5cb8a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 14 Apr 2019 08:13:13 +0200 Subject: [PATCH 4/4] add relaxed assertion --- .../org/elasticsearch/action/bulk/TransportShardBulkAction.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 9eca339a95cbd..f49521a37463e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -398,6 +398,8 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index final DocWriteResponse writeResponse = response.getResponse(); final long seqNum = failure == null ? writeResponse.getSeqNo() : failure.getSeqNo(); if (seqNum == SequenceNumbers.UNASSIGNED_SEQ_NO) { + assert failure != null || writeResponse.getResult() == DocWriteResponse.Result.NOOP + || writeResponse.getResult() == DocWriteResponse.Result.NOT_FOUND; continue; } if (failure == null) {