Skip to content

Commit

Permalink
use a Semaphore to ensure that only one mapping update is executed
Browse files Browse the repository at this point in the history
when a newer imd version is detected for all chunk processors of a
shard follow task executor.
  • Loading branch information
martijnvg committed May 7, 2018
1 parent 5ea5c4d commit 9635fe3
Showing 1 changed file with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -102,7 +103,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask param
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(params.getLeaderShardId().getIndex(),
params.getFollowShardId().getIndex(), client, leaderClient);
logger.info("[{}] initial leader mapping with follower mapping syncing", params);
checker.updateMapping(e -> {
checker.updateMapping(1L /* Force update, version is initially 0L */, e -> {
if (e == null) {
logger.info("Starting shard following [{}]", params);
fetchGlobalCheckpoint(client, params.getFollowShardId(),
Expand Down Expand Up @@ -408,6 +409,7 @@ static final class IndexMetadataVersionChecker implements BiConsumer<Long, Consu
private final Index leaderIndex;
private final Index followIndex;
private final AtomicLong currentIndexMetadataVersion;
private final Semaphore updateMappingSemaphore = new Semaphore(1);

IndexMetadataVersionChecker(Index leaderIndex, Index followIndex, Client followClient, Client leaderClient) {
this.followClient = followClient;
Expand All @@ -423,11 +425,25 @@ public void accept(Long minimumRequiredIndexMetadataVersion, Consumer<Exception>
currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion);
handler.accept(null);
} else {
updateMapping(handler);
updateMapping(minimumRequiredIndexMetadataVersion, handler);
}
}

void updateMapping(Consumer<Exception> handler) {
void updateMapping(long minimumRequiredIndexMetadataVersion, Consumer<Exception> handler) {
try {
updateMappingSemaphore.acquire();
} catch (InterruptedException e) {
handler.accept(e);
return;
}
if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) {
updateMappingSemaphore.release();
LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion);
handler.accept(null);
return;
}

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
Expand All @@ -443,9 +459,16 @@ void updateMapping(Consumer<Exception> handler) {
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
followClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(putMappingResponse -> {
currentIndexMetadataVersion.set(indexMetaData.getVersion());
updateMappingSemaphore.release();
handler.accept(null);
}, handler));
}, handler));
}, e -> {
updateMappingSemaphore.release();
handler.accept(e);
}));
}, e -> {
updateMappingSemaphore.release();
handler.accept(e);
}));
}
}

Expand Down

0 comments on commit 9635fe3

Please sign in to comment.