Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Sync mappings between leader and follow index #30115

Merged
merged 12 commits into from
May 28, 2018

Conversation

martijnvg
Copy link
Member

The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.

The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.

Relates to #30086

@martijnvg martijnvg added review :Distributed Indexing/CCR Issues around the Cross Cluster State Replication features labels Apr 25, 2018
@martijnvg martijnvg requested a review from jasontedor April 25, 2018 09:03
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.

The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.

Relates to elastic#30086
* origin/ccr: (166 commits)
  Introduce soft-deletes retention policy based on global checkpoint (elastic#30335)
  Enable MockHttpTransport in ShardChangsIT
  Remove old sha files from dated Lucene snapshot
  Update InternalEngine tests on ccr side for elastic#30121
  Set the new lucene version for 6.4.0
  [ML][TEST] Clean up jobs in ModelPlotIT
  Upgrade to 7.4.0-snapshot-1ed95c097b (elastic#30357)
  Watcher: Ensure trigger service pauses execution (elastic#30363)
  [DOCS] Added coming qualifiers in changelog
  [DOCS] Commented out empty sections in the changelog to fix the doc build. (elastic#30372)
  Security: reduce garbage during index resolution (elastic#30180)
  Make RepositoriesMetaData contents unmodifiable (elastic#30361)
  Change quad tree max levels to 29. Closes elastic#21191 (elastic#29663)
  Test: use trial license in qa tests with security
  [ML] Add integration test for model plots (elastic#30359)
  SQL: Fix bug caused by empty composites (elastic#30343)
  [ML] Account for gaps in data counts after job is reopened (elastic#30294)
  InternalEngineTests.testConcurrentOutOfOrderDocsOnReplica should use two documents (elastic#30121)
  Change signature of Get Repositories Response (elastic#30333)
  Tests: Use different watch ids per test in smoke test (elastic#30331)
  ...
Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general flow looks good; I left one critical comment about avoiding put mapping storms on the follower.

int result = 1;
result += Objects.hashCode(indexMetadataVersion);
result += Arrays.hashCode(operations);
return result;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that Object.hash(indexMetadataVersion, operations) is fine here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately Object.hash(...) doesn't handle arrays correctly, so I changed it back to use Arrays.hashCode(...).

@@ -252,7 +266,8 @@ protected Response newResponse() {

private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];

static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException {
static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a bit odd to push the metadata into this method only so that it can push it out in the response. I would rather see something like the following, I think it gives a clearer separation of concerns:

diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java
index 63314c95db0..38fc7ca38c2 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java
@@ -240,8 +240,10 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
             IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
             IndexShard indexShard = indexService.getShard(request.getShard().id());
 
-            IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
-            return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes, indexMetaData);
+            final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
+            final Translog.Operation[] operations =
+                    getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
+            return new Response(indexMetaDataVersion, operations);
         }
 
         @Override
@@ -266,8 +268,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
 
     private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
 
-    static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit,
-                                         IndexMetaData indexMetaData) throws IOException {
+    static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException {
         if (indexShard.state() != IndexShardState.STARTED) {
             throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
         }
@@ -291,17 +292,17 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
                         seenBytes += orderedOp.estimateSize();
                         operations.add(orderedOp);
                         if (nextExpectedSeqNo > maxSeqNo) {
-                            return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
+                            return operations.toArray(EMPTY_OPERATIONS_ARRAY);
                         }
                     } else {
-                        return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
+                        return operations.toArray(EMPTY_OPERATIONS_ARRAY);
                     }
                 }
             }
         }
 
         if (nextExpectedSeqNo >= maxSeqNo) {
-            return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
+            return operations.toArray(EMPTY_OPERATIONS_ARRAY);
         } else {
             String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo +
                     "] found, tracker checkpoint [" + nextExpectedSeqNo + "]";
diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java
index 787b3bfc0b8..e98cf2633ca 100644
--- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java
+++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java
@@ -53,26 +53,25 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
             int min = randomIntBetween(0, numWrites - 1);
             int max = randomIntBetween(min, numWrites - 1);
 
-            final ShardChangesAction.Response r =
-                    ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE, indexMetaData);
+            final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE);
             /*
              * We are not guaranteed that operations are returned to us in order they are in the translog (if our read crosses multiple
              * generations) so the best we can assert is that we see the expected operations.
              */
-            final Set<Long> seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toSet());
+            final Set<Long> seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toSet());
             final Set<Long> expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toSet());
             assertThat(seenSeqNos, equalTo(expectedSeqNos));
         }
 
         // get operations for a range no operations exists:
         Exception e = expectThrows(IllegalStateException.class,
-                () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE, indexMetaData));
+                () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE));
         assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + numWrites + "] and max_seq_no [" +
                 (numWrites + 1) +"] found, tracker checkpoint ["));
 
         // get operations for a range some operations do not exist:
         e = expectThrows(IllegalStateException.class,
-                () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites  - 10, numWrites + 10, Long.MAX_VALUE, indexMetaData));
+                () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites  - 10, numWrites + 10, Long.MAX_VALUE));
         assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + (numWrites - 10) + "] and max_seq_no [" +
                 (numWrites + 10) +"] found, tracker checkpoint ["));
     }
@@ -90,8 +89,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
 
         ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING);
         Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting);
-        expectThrows(IndexShardNotStartedException.class,
-                () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE, indexMetaData));
+        expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE));
     }
 
     public void testGetOperationsBetweenExceedByteLimit() throws Exception {
@@ -107,21 +105,20 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
         }
 
         final IndexShard indexShard = indexService.getShard(0);
-        final ShardChangesAction.Response r =
-                ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256, indexService.getMetaData());
-        assertThat(r.getOperations().length, equalTo(12));
-        assertThat(r.getOperations()[0].seqNo(), equalTo(0L));
-        assertThat(r.getOperations()[1].seqNo(), equalTo(1L));
-        assertThat(r.getOperations()[2].seqNo(), equalTo(2L));
-        assertThat(r.getOperations()[3].seqNo(), equalTo(3L));
-        assertThat(r.getOperations()[4].seqNo(), equalTo(4L));
-        assertThat(r.getOperations()[5].seqNo(), equalTo(5L));
-        assertThat(r.getOperations()[6].seqNo(), equalTo(6L));
-        assertThat(r.getOperations()[7].seqNo(), equalTo(7L));
-        assertThat(r.getOperations()[8].seqNo(), equalTo(8L));
-        assertThat(r.getOperations()[9].seqNo(), equalTo(9L));
-        assertThat(r.getOperations()[10].seqNo(), equalTo(10L));
-        assertThat(r.getOperations()[11].seqNo(), equalTo(11L));
+        final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256);
+        assertThat(operations.length, equalTo(12));
+        assertThat(operations[0].seqNo(), equalTo(0L));
+        assertThat(operations[1].seqNo(), equalTo(1L));
+        assertThat(operations[2].seqNo(), equalTo(2L));
+        assertThat(operations[3].seqNo(), equalTo(3L));
+        assertThat(operations[4].seqNo(), equalTo(4L));
+        assertThat(operations[5].seqNo(), equalTo(5L));
+        assertThat(operations[6].seqNo(), equalTo(6L));
+        assertThat(operations[7].seqNo(), equalTo(7L));
+        assertThat(operations[8].seqNo(), equalTo(8L));
+        assertThat(operations[9].seqNo(), equalTo(9L));
+        assertThat(operations[10].seqNo(), equalTo(10L));
+        assertThat(operations[11].seqNo(), equalTo(11L));
     }
 
 }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 In a first version of this pr there was a check in getOperationsBetween(...) that used IMD, so that is why I initially changed this method to accept IMD, but that check no longer exists and I should have changed this when I removed the check.

shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint);
prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint);
}, task::markAsFailed);
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(params.getLeaderShardId().getIndex(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think checker might be better as imdVersionChecker?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean to rename the class IndexMetadataVersionChecker to Checker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that the variable name checker is unclear.

}

public void accept(Long minimumRequiredIndexMetadataVersion, Consumer<Exception> handler) {
if (minimumRequiredIndexMetadataVersion <= currentIndexMetadataVersion.get()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the condition is easier to read along with the log message if it is written as >= (i.e., currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion).


leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an assertion is in order here.

"] times, aborting...", e));
}
return;
}final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newline

"] times, aborting...", e));
}
return;
}final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener<BulkShardOperationsResponse>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the indentation of this entire block of code is off by one level.

}

void updateMapping(Consumer<Exception> handler) {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want a guard otherwise concurrent shard changes responses after a mapping update on the leader are going to all put a new mapping on the follower and maybe we should avoid such an update storm?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made this change in a separate commit: 9635fe3

martijnvg added 7 commits May 7, 2018 17:30
when a newer imd version is detected for all chunk processors of a
shard follow task executor.
* es/ccr: (37 commits)
  Default to one shard (elastic#30539)
  Unmute IndexUpgradeIT tests
  Forbid expensive query parts in ranking evaluation (elastic#30151)
  Docs: Update HighLevelRestClient migration docs (elastic#30544)
  Clients: Switch to new performRequest (elastic#30543)
  [TEST] Fix typo in MovAvgIT test
  Add missing dependencies on testClasses (elastic#30527)
  [TEST] Mute ML test that needs updating to following ml-cpp changes
  Document woes between auto-expand-replicas and allocation filtering (elastic#30531)
  Moved tokenizers to analysis common module (elastic#30538)
  Adjust copy settings versions
  Mute ShrinkIndexIT suite
  SQL: SYS TABLES ordered according to *DBC specs (elastic#30530)
  Deprecate not copy settings and explicitly disallow (elastic#30404)
  [ML] Improve state persistence log message
  Build: Add mavenPlugin cluster configuration method (elastic#30541)
  Re-enable FlushIT tests
  Bump Gradle heap to 2 GB (elastic#30535)
  SQL: Use request flavored methods in tests (elastic#30345)
  Suppress hdfsFixture if there are spaces in the path (elastic#30302)
  ...
Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still thinking about the proposed concurrency model but I left a comment.

try {
updateMappingSemaphore.acquire();
} catch (InterruptedException e) {
handler.accept(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a missed release of the semaphore here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I wonder if the release method needs to be invoked here, but maybe I misunderstand this.

The jdoc of the acquire method states:

If no permit is available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until
     * one of two things happens:
     * <ul>
     * <li>Some other thread invokes the {@link #release} method for this
     * semaphore and the current thread is next to be assigned a permit; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.

The way I understand this is that if the thread waiting for the semaphore and it is interrupted (and thus the InterruptedException is thrown) then that thread never had a permit and there is no need to invoke the release method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have it correct and I did not; it would be a bug to release here.

handler.accept(e);
indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> {
if (e != null) {
if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want a retry on every exception type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I think not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change: b569f88

clusterStateRequest.indices(leaderIndex.getName());

leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as follow up somewhere we need to make a note to only fetch the right index metadata - this can lead to transferring 500MB of data (the entire cluster state metadata ) for one index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martijnvg awesome. thx. Also we should probably validate the index uuid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handler.accept(null);
}, e -> {
updateMappingSemaphore.release();
handler.accept(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a mapping collision here is not an error we should just retry.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should retry in case of a mapping collision. It means something bad has happened (most likely a bug in our side). Retrying wouldn't help, because the same error would occur in subsequent requests.

In case that several tasks update the mapping, no error should occur, only the first mapping update request would alter the mapping the other requests would be noops.

private final Client leaderClient;
private final Index leaderIndex;
private final Index followIndex;
private final AtomicLong currentIndexMetadataVersion;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be good to expose it once we have stats.

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@martijnvg martijnvg merged commit 51caefe into elastic:ccr May 28, 2018
martijnvg added a commit that referenced this pull request May 28, 2018
The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.

The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.

Relates to #30086
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/CCR Issues around the Cross Cluster State Replication features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants