-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCR] Sync mappings between leader and follow index #30115
Conversation
Pinging @elastic/es-distributed |
The shard changes api returns the minimum IndexMetadata version the leader index needs to have. If the leader side is behind on IndexMetadata version then follow shard task waits with processing write operations until the mapping has been fetched from leader index and applied in follower index in the background. The cluster state api is used to fetch the leader mapping and put mapping api to apply the mapping in the follower index. This works because put mapping api accepts fields that are already defined. Relates to elastic#30086
* origin/ccr: (166 commits) Introduce soft-deletes retention policy based on global checkpoint (elastic#30335) Enable MockHttpTransport in ShardChangsIT Remove old sha files from dated Lucene snapshot Update InternalEngine tests on ccr side for elastic#30121 Set the new lucene version for 6.4.0 [ML][TEST] Clean up jobs in ModelPlotIT Upgrade to 7.4.0-snapshot-1ed95c097b (elastic#30357) Watcher: Ensure trigger service pauses execution (elastic#30363) [DOCS] Added coming qualifiers in changelog [DOCS] Commented out empty sections in the changelog to fix the doc build. (elastic#30372) Security: reduce garbage during index resolution (elastic#30180) Make RepositoriesMetaData contents unmodifiable (elastic#30361) Change quad tree max levels to 29. Closes elastic#21191 (elastic#29663) Test: use trial license in qa tests with security [ML] Add integration test for model plots (elastic#30359) SQL: Fix bug caused by empty composites (elastic#30343) [ML] Account for gaps in data counts after job is reopened (elastic#30294) InternalEngineTests.testConcurrentOutOfOrderDocsOnReplica should use two documents (elastic#30121) Change signature of Get Repositories Response (elastic#30333) Tests: Use different watch ids per test in smoke test (elastic#30331) ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general flow looks good; I left one critical comment about avoiding put mapping storms on the follower.
int result = 1; | ||
result += Objects.hashCode(indexMetadataVersion); | ||
result += Arrays.hashCode(operations); | ||
return result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that Object.hash(indexMetadataVersion, operations)
is fine here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately Object.hash(...)
doesn't handle arrays correctly, so I changed it back to use Arrays.hashCode(...)
.
@@ -252,7 +266,8 @@ protected Response newResponse() { | |||
|
|||
private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; | |||
|
|||
static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException { | |||
static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a bit odd to push the metadata into this method only so that it can push it out in the response. I would rather see something like the following, I think it gives a clearer separation of concerns:
diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java
index 63314c95db0..38fc7ca38c2 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java
@@ -240,8 +240,10 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());
- IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
- return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes, indexMetaData);
+ final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
+ final Translog.Operation[] operations =
+ getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
+ return new Response(indexMetaDataVersion, operations);
}
@Override
@@ -266,8 +268,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
- static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit,
- IndexMetaData indexMetaData) throws IOException {
+ static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}
@@ -291,17 +292,17 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
seenBytes += orderedOp.estimateSize();
operations.add(orderedOp);
if (nextExpectedSeqNo > maxSeqNo) {
- return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
+ return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}
} else {
- return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
+ return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}
}
}
}
if (nextExpectedSeqNo >= maxSeqNo) {
- return new Response(indexMetaData.getVersion(), operations.toArray(EMPTY_OPERATIONS_ARRAY));
+ return operations.toArray(EMPTY_OPERATIONS_ARRAY);
} else {
String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo +
"] found, tracker checkpoint [" + nextExpectedSeqNo + "]";
diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java
index 787b3bfc0b8..e98cf2633ca 100644
--- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java
+++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java
@@ -53,26 +53,25 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
int min = randomIntBetween(0, numWrites - 1);
int max = randomIntBetween(min, numWrites - 1);
- final ShardChangesAction.Response r =
- ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE, indexMetaData);
+ final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE);
/*
* We are not guaranteed that operations are returned to us in order they are in the translog (if our read crosses multiple
* generations) so the best we can assert is that we see the expected operations.
*/
- final Set<Long> seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toSet());
+ final Set<Long> seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toSet());
final Set<Long> expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toSet());
assertThat(seenSeqNos, equalTo(expectedSeqNos));
}
// get operations for a range no operations exists:
Exception e = expectThrows(IllegalStateException.class,
- () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE, indexMetaData));
+ () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE));
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + numWrites + "] and max_seq_no [" +
(numWrites + 1) +"] found, tracker checkpoint ["));
// get operations for a range some operations do not exist:
e = expectThrows(IllegalStateException.class,
- () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE, indexMetaData));
+ () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE));
assertThat(e.getMessage(), containsString("Not all operations between min_seq_no [" + (numWrites - 10) + "] and max_seq_no [" +
(numWrites + 10) +"] found, tracker checkpoint ["));
}
@@ -90,8 +89,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING);
Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting);
- expectThrows(IndexShardNotStartedException.class,
- () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE, indexMetaData));
+ expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE));
}
public void testGetOperationsBetweenExceedByteLimit() throws Exception {
@@ -107,21 +105,20 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
}
final IndexShard indexShard = indexService.getShard(0);
- final ShardChangesAction.Response r =
- ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256, indexService.getMetaData());
- assertThat(r.getOperations().length, equalTo(12));
- assertThat(r.getOperations()[0].seqNo(), equalTo(0L));
- assertThat(r.getOperations()[1].seqNo(), equalTo(1L));
- assertThat(r.getOperations()[2].seqNo(), equalTo(2L));
- assertThat(r.getOperations()[3].seqNo(), equalTo(3L));
- assertThat(r.getOperations()[4].seqNo(), equalTo(4L));
- assertThat(r.getOperations()[5].seqNo(), equalTo(5L));
- assertThat(r.getOperations()[6].seqNo(), equalTo(6L));
- assertThat(r.getOperations()[7].seqNo(), equalTo(7L));
- assertThat(r.getOperations()[8].seqNo(), equalTo(8L));
- assertThat(r.getOperations()[9].seqNo(), equalTo(9L));
- assertThat(r.getOperations()[10].seqNo(), equalTo(10L));
- assertThat(r.getOperations()[11].seqNo(), equalTo(11L));
+ final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256);
+ assertThat(operations.length, equalTo(12));
+ assertThat(operations[0].seqNo(), equalTo(0L));
+ assertThat(operations[1].seqNo(), equalTo(1L));
+ assertThat(operations[2].seqNo(), equalTo(2L));
+ assertThat(operations[3].seqNo(), equalTo(3L));
+ assertThat(operations[4].seqNo(), equalTo(4L));
+ assertThat(operations[5].seqNo(), equalTo(5L));
+ assertThat(operations[6].seqNo(), equalTo(6L));
+ assertThat(operations[7].seqNo(), equalTo(7L));
+ assertThat(operations[8].seqNo(), equalTo(8L));
+ assertThat(operations[9].seqNo(), equalTo(9L));
+ assertThat(operations[10].seqNo(), equalTo(10L));
+ assertThat(operations[11].seqNo(), equalTo(11L));
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 In a first version of this pr there was a check in getOperationsBetween(...)
that used IMD, so that is why I initially changed this method to accept IMD, but that check no longer exists and I should have changed this when I removed the check.
shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint); | ||
prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint); | ||
}, task::markAsFailed); | ||
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(params.getLeaderShardId().getIndex(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think checker
might be better as imdVersionChecker
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean to rename the class IndexMetadataVersionChecker
to Checker
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean that the variable name checker
is unclear.
} | ||
|
||
public void accept(Long minimumRequiredIndexMetadataVersion, Consumer<Exception> handler) { | ||
if (minimumRequiredIndexMetadataVersion <= currentIndexMetadataVersion.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the condition is easier to read along with the log message if it is written as >=
(i.e., currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion
).
|
||
leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { | ||
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); | ||
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an assertion is in order here.
"] times, aborting...", e)); | ||
} | ||
return; | ||
}final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: newline
"] times, aborting...", e)); | ||
} | ||
return; | ||
}final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations()); | ||
followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener<BulkShardOperationsResponse>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the indentation of this entire block of code is off by one level.
} | ||
|
||
void updateMapping(Consumer<Exception> handler) { | ||
ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want a guard otherwise concurrent shard changes responses after a mapping update on the leader are going to all put a new mapping on the follower and maybe we should avoid such an update storm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made this change in a separate commit: 9635fe3
when a newer imd version is detected for all chunk processors of a shard follow task executor.
* es/ccr: (37 commits) Default to one shard (elastic#30539) Unmute IndexUpgradeIT tests Forbid expensive query parts in ranking evaluation (elastic#30151) Docs: Update HighLevelRestClient migration docs (elastic#30544) Clients: Switch to new performRequest (elastic#30543) [TEST] Fix typo in MovAvgIT test Add missing dependencies on testClasses (elastic#30527) [TEST] Mute ML test that needs updating to following ml-cpp changes Document woes between auto-expand-replicas and allocation filtering (elastic#30531) Moved tokenizers to analysis common module (elastic#30538) Adjust copy settings versions Mute ShrinkIndexIT suite SQL: SYS TABLES ordered according to *DBC specs (elastic#30530) Deprecate not copy settings and explicitly disallow (elastic#30404) [ML] Improve state persistence log message Build: Add mavenPlugin cluster configuration method (elastic#30541) Re-enable FlushIT tests Bump Gradle heap to 2 GB (elastic#30535) SQL: Use request flavored methods in tests (elastic#30345) Suppress hdfsFixture if there are spaces in the path (elastic#30302) ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still thinking about the proposed concurrency model but I left a comment.
try { | ||
updateMappingSemaphore.acquire(); | ||
} catch (InterruptedException e) { | ||
handler.accept(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a missed release of the semaphore here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I wonder if the release method needs to be invoked here, but maybe I misunderstand this.
The jdoc of the acquire method states:
If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
The way I understand this is that if the thread waiting for the semaphore and it is interrupted (and thus the InterruptedException
is thrown) then that thread never had a permit and there is no need to invoke the release method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have it correct and I did not; it would be a bug to release here.
handler.accept(e); | ||
indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> { | ||
if (e != null) { | ||
if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want a retry on every exception type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, I think not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this change: b569f88
clusterStateRequest.indices(leaderIndex.getName()); | ||
|
||
leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { | ||
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as follow up somewhere we need to make a note to only fetch the right index metadata - this can lead to transferring 500MB of data (the entire cluster state metadata ) for one index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes Only the IMD of a single index is being fetched now: https://github.com/elastic/elasticsearch/pull/30115/files#diff-27bdc34d556e302e5a2f3801100e1af7R494
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martijnvg awesome. thx. Also we should probably validate the index uuid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I was too terse. I was saying this in the context of https://github.com/elastic/elasticsearch/pull/30115/files#diff-27bdc34d556e302e5a2f3801100e1af7R379
handler.accept(null); | ||
}, e -> { | ||
updateMappingSemaphore.release(); | ||
handler.accept(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a mapping collision here is not an error we should just retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should retry in case of a mapping collision. It means something bad has happened (most likely a bug in our side). Retrying wouldn't help, because the same error would occur in subsequent requests.
In case that several tasks update the mapping, no error should occur, only the first mapping update request would alter the mapping the other requests would be noops.
private final Client leaderClient; | ||
private final Index leaderIndex; | ||
private final Index followIndex; | ||
private final AtomicLong currentIndexMetadataVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be good to expose it once we have stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
The shard changes api returns the minimum IndexMetadata version the leader index needs to have. If the leader side is behind on IndexMetadata version then follow shard task waits with processing write operations until the mapping has been fetched from leader index and applied in follower index in the background. The cluster state api is used to fetch the leader mapping and put mapping api to apply the mapping in the follower index. This works because put mapping api accepts fields that are already defined. Relates to #30086
The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.
The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.
Relates to #30086