From 9635fe3999322d5083a7b8f281258487a99b3162 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 7 May 2018 18:17:03 +0200 Subject: [PATCH] use a `Semaphore` to ensure that only one mapping update is executed when a newer imd version is detected for all chunk processors of a shard follow task executor. --- .../ccr/action/ShardFollowTasksExecutor.java | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 9b15c9966808b..038cfa8023ce3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -46,6 +46,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -102,7 +103,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask param IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(params.getLeaderShardId().getIndex(), params.getFollowShardId().getIndex(), client, leaderClient); logger.info("[{}] initial leader mapping with follower mapping syncing", params); - checker.updateMapping(e -> { + checker.updateMapping(1L /* Force update, version is initially 0L */, e -> { if (e == null) { logger.info("Starting shard following [{}]", params); fetchGlobalCheckpoint(client, params.getFollowShardId(), @@ -408,6 +409,7 @@ static final class IndexMetadataVersionChecker implements BiConsumer currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion); handler.accept(null); } else { - updateMapping(handler); + updateMapping(minimumRequiredIndexMetadataVersion, handler); } } - void updateMapping(Consumer handler) { + void updateMapping(long minimumRequiredIndexMetadataVersion, Consumer handler) { + try { + updateMappingSemaphore.acquire(); + } catch (InterruptedException e) { + handler.accept(e); + return; + } + if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) { + updateMappingSemaphore.release(); + LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", + currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion); + handler.accept(null); + return; + } + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.clear(); clusterStateRequest.metaData(true); @@ -443,9 +459,16 @@ void updateMapping(Consumer handler) { putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); followClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(putMappingResponse -> { currentIndexMetadataVersion.set(indexMetaData.getVersion()); + updateMappingSemaphore.release(); handler.accept(null); - }, handler)); - }, handler)); + }, e -> { + updateMappingSemaphore.release(); + handler.accept(e); + })); + }, e -> { + updateMappingSemaphore.release(); + handler.accept(e); + })); } }