From 06f4d3cff696d409c411483b223df125d22d302f Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Fri, 24 May 2019 14:56:37 +0200 Subject: [PATCH 1/6] Replica allocation consider no-op This is a first step away from sync-ids. We now check if replica and primary are identical using sequence numbers when determining where to allocate a replica shard. If an index is no longer indexed into, issuing a regular flush will now be enough to ensure a no-op recovery is done. This has the nice side-effect of ensuring that closed indices and frozen indices choose existing shard copies with identical data over file-overlap comparison, increasing the chance that we end up doing a no-op recovery (only no-op and file-based recovery is supported by closed indices). Relates #41400 and #33888 Supersedes #41784 --- .../elasticsearch/upgrades/RecoveryIT.java | 35 +++++ .../allocation/NodeAllocationResult.java | 6 + .../gateway/AsyncShardFetch.java | 7 + .../gateway/GatewayAllocator.java | 47 ++++++ .../gateway/ReplicaShardAllocator.java | 52 +++++-- .../elasticsearch/index/shard/IndexShard.java | 119 +++++++++++++- .../org/elasticsearch/index/store/Store.java | 140 ++++++++++++++++- .../index/translog/Translog.java | 27 ++++ .../indices/recovery/RecoveryTarget.java | 45 +----- .../TransportNodesListShardStoreMetaData.java | 53 +++++-- .../ClusterAllocationExplainIT.java | 30 ++-- .../gateway/AsyncShardFetchTests.java | 79 ++++++++++ .../gateway/ReplicaShardAllocatorIT.java | 145 ++++++++++++++++++ .../gateway/ReplicaShardAllocatorTests.java | 55 ++++++- .../index/shard/IndexShardTests.java | 39 +++++ .../elasticsearch/index/store/StoreTests.java | 43 +++++- .../index/translog/TranslogTests.java | 16 +- .../indices/state/CloseIndexIT.java | 53 ++++++- .../test/InternalTestCluster.java | 33 ++-- 19 files changed, 913 insertions(+), 111 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index bbc6d27472467..8457073c7be45 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -48,6 +48,7 @@ import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -359,6 +360,7 @@ public void testRecoveryClosedIndex() throws Exception { .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster .build()); + indexDocs(indexName, 0, randomInt(10)); ensureGreen(indexName); closeIndex(indexName); } @@ -369,6 +371,9 @@ public void testRecoveryClosedIndex() throws Exception { // so we expect the index to be closed and replicated ensureGreen(indexName); assertClosedIndex(indexName, true); + if (minimumNodeVersion().onOrAfter(Version.V_8_0_0)) { // todo: change to 7_X once backported. + assertNoFileBasedRecovery(indexName); + } } else { assertClosedIndex(indexName, false); } @@ -480,4 +485,34 @@ private void assertClosedIndex(final String index, final boolean checkRoutingTab assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue()); } } + + private void assertNoFileBasedRecovery(String indexName) throws IOException { + Map recoveries = entityAsMap(client() + .performRequest(new Request("GET", indexName + "/_recovery?detailed=true"))); + + @SuppressWarnings("unchecked") + List> shards = (List>) XContentMapValues.extractValue(indexName + ".shards", recoveries); + assertNotNull(shards); + boolean foundReplica = false; + for (Map shard : shards) { + if (shard.get("primary") == Boolean.FALSE) { + List details = (List) XContentMapValues.extractValue("index.files.details", shard); + // once detailed recoveries works, remove this if. + if (details == null) { + long totalFiles = ((Number) XContentMapValues.extractValue("index.files.total", shard)).longValue(); + long reusedFiles = ((Number) XContentMapValues.extractValue("index.files.reused", shard)).longValue(); + assertEquals(totalFiles, reusedFiles); + } else { + assertNotNull(details); + assertThat(details, empty()); + } + + long translogRecovered = ((Number) XContentMapValues.extractValue("translog.recovered", shard)).longValue(); + assertEquals("must be noop", 0, translogRecovered); + foundReplica = true; + } + } + + assertTrue("must find replica", foundReplica); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java index 8b97f1357fa00..bc0526ffca1b5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java @@ -228,6 +228,8 @@ public String getAllocationId() { * matching sync ids are irrelevant. */ public boolean hasMatchingSyncId() { + // TODO: this method needs a rename, leaving it for now to not make too many iterations on that until we have full seqno + // based recovery. return matchingBytes == Long.MAX_VALUE; } @@ -274,6 +276,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("allocation_id", allocationId); } if (matchingBytes >= 0) { + // TODO: we should eventually either distinguish between sync-id and non sync-id equivalent closed shard allocation or + // rename this to synced_match + // left this for now, since it changes the API and should preferably be handled together with seqno based + // replica shard allocation, consisting of whether this will be ops based and how many ops to recover. if (hasMatchingSyncId()) { builder.field("matching_sync_id", true); } else { diff --git a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java index d03f6abf7d9bd..0374884c05f2b 100644 --- a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java @@ -231,6 +231,13 @@ protected synchronized void processAsyncFetch(List responses, List> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); + // contains ephemeralIds + private volatile Set lastDataNodes = Collections.emptySet(); + @Inject public GatewayAllocator(RoutingService routingService, TransportNodesListGatewayStartedShards startedAction, @@ -101,6 +110,7 @@ public void applyFailedShards(final RoutingAllocation allocation, final List clearCacheForPrimary(fetch, allocation)); + // recalc to also (lazily) clear out old nodes. + Set newDataNodes = new HashSet<>(nodes.getDataNodes().size()); + for (Iterator iterator = nodes.getDataNodes().valuesIt(); iterator.hasNext(); ) { + newDataNodes.add(iterator.next().getEphemeralId()); + } + this.lastDataNodes = newDataNodes; + } + } + + private void clearCacheForPrimary(AsyncShardFetch fetch, + RoutingAllocation allocation) { + ShardRouting primary = allocation.routingNodes().activePrimary(fetch.shardId); + if (primary != null) { + fetch.clearCacheForNode(primary.currentNodeId()); + } + } + + private boolean hasNewNodes(DiscoveryNodes nodes, Set lastDataNodes) { + for (Iterator iterator = nodes.getDataNodes().valuesIt(); iterator.hasNext(); ) { + DiscoveryNode node = iterator.next(); + if (lastDataNodes.contains(node.getEphemeralId()) == false) { + logger.trace("new node {} found, clearing primary async-fetch-store cache", node); + return true; + } + } + + return false; + } + class InternalAsyncFetch extends AsyncShardFetch { InternalAsyncFetch(Logger logger, String type, ShardId shardId, Lister, T> action) { diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 10bd6115b4c74..07c7d8a45f6db 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -49,7 +49,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -101,17 +100,16 @@ public void processExistingRecoveries(RoutingAllocation allocation) { DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider - final String currentSyncId; + final TransportNodesListShardStoreMetaData.StoreFilesMetaData currentStore; if (shardStores.getData().containsKey(currentNode)) { - currentSyncId = shardStores.getData().get(currentNode).storeFilesMetaData().syncId(); + currentStore = shardStores.getData().get(currentNode).storeFilesMetaData(); } else { - currentSyncId = null; + currentStore = null; } if (currentNode.equals(nodeWithHighestMatch) == false - && Objects.equals(currentSyncId, primaryStore.syncId()) == false - && matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch)) { - // we found a better match that has a full sync id match, the existing allocation is not fully synced - // so we found a better one, cancel this one + && isNoopRecovery(primaryStore, currentStore) == false + && matchingNodes.isNoopRecovery(nodeWithHighestMatch)) { + // we found a better match that can do a fast recovery, cancel current recovery logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]", currentNode, nodeWithHighestMatch); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, @@ -363,10 +361,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData) { - String primarySyncId = primaryStore.syncId(); - String replicaSyncId = storeFilesMetaData.syncId(); - // see if we have a sync id we can make use of - if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) { + if (isNoopRecovery(primaryStore, storeFilesMetaData)) { return Long.MAX_VALUE; } else { long sizeMatched = 0; @@ -380,6 +375,34 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St } } + /** + * Is a "noop recovery", which means expecting no operations to recover (though with sync-id, we could in principle still + * have a few). + */ + private static boolean isNoopRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) { + // keeping syncIdMatch for 7.x to remain backwards compatible with pre-7.2 versions, but will remove for 8.0. + return syncIdMatch(primaryStore, candidateStore) + || noopMatch(primaryStore, candidateStore); + } + + private static boolean syncIdMatch(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) { + String primarySyncId = primaryStore.syncId(); + String replicaSyncId = candidateStore.syncId(); + return (replicaSyncId != null && replicaSyncId.equals(primarySyncId)); + } + + private static boolean noopMatch(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, + TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) { + // We need the maxSeqNo conditions until we support non-noop recovery for closed indices (and preferably also have + // retention leases in place to ensure ops based recovery will actually be performed). + return primaryStore.hasSeqNoInfo() + && primaryStore.maxSeqNo() == candidateStore.maxSeqNo() + && primaryStore.provideRecoverySeqNo() <= candidateStore.requireRecoverySeqNo() + && candidateStore.requireRecoverySeqNo() == primaryStore.maxSeqNo() + 1; + } + protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); /** @@ -418,7 +441,10 @@ public DiscoveryNode getNodeWithHighestMatch() { return this.nodeWithHighestMatch; } - public boolean isNodeMatchBySyncID(DiscoveryNode node) { + /** + * Is supplied node a no-operations recovery, either sync-id match or sequence number match. + */ + public boolean isNoopRecovery(DiscoveryNode node) { return nodesToSize.get(node) == Long.MAX_VALUE; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ee67597efe31a..ac82cbfef0902 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -23,7 +23,10 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; import org.apache.lucene.search.Query; @@ -51,6 +54,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -222,6 +226,15 @@ Runnable getGlobalCheckpointSyncer() { private RecoveryState recoveryState; private final RecoveryStats recoveryStats = new RecoveryStats(); + + /** + * The store recovery metadata to use for snapshotStoreRecoveryMetadata requests until engine is opened. + * + * This protects us from the destructive nature of recovery and allows us to serve this information without risk of getting file + * errors due to concurrent writes. + */ + private volatile CheckedSupplier noEngineStoreRecoveryMetadata; + private final MeanMetric refreshMetric = new MeanMetric(); private final MeanMetric externalRefreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); @@ -352,6 +365,7 @@ public boolean shouldCache(Query query) { searcherWrapper = indexSearcherWrapper; refreshListeners = buildRefreshListeners(); lastSearcherAccess.set(threadPool.relativeTimeInMillis()); + this.noEngineStoreRecoveryMetadata = calculateNoEngineRecoveryMetadata(store, path); persistMetadata(path, indexSettings, shardRouting, null, logger); } @@ -1204,6 +1218,47 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { } } + /** + * Similar to snapshotStoreMetadata, but extended with additional info about sequence numbers for recovery. + * + * @see #snapshotStoreMetadata() for info on lifecycle and exceptions. + */ + public Store.RecoveryMetadataSnapshot snapshotStoreRecoveryMetadata() throws IOException { + Engine engine; + // We primarily take mutex to avoid seeing the interim ReadOnlyEngine used during resetEngineToGlobalCheckpoint. + synchronized (mutex) { + engine = getEngineOrNull(); + if (engine == null) { + return noEngineStoreRecoveryMetadata.get(); + } + } + // safe towards concurrent engine close. + return snapshotStoreRecoveryMetadataFromEngine(engine); + } + + private Store.RecoveryMetadataSnapshot snapshotStoreRecoveryMetadataFromEngine(Engine engine) throws IOException { + try (Engine.IndexCommitRef indexCommit = engine.acquireLastIndexCommit(false)) { + SeqNoStats seqNoStats = engine.getSeqNoStats(-1); + MetadataSnapshot metadata = store.getMetadata(indexCommit.getIndexCommit()); + // before shard is started, we cannot provide anything so use Long.MAX_VALUE + long provideSeqNo = state() == IndexShardState.POST_RECOVERY || state() == IndexShardState.STARTED + ? seqNoStats.getLocalCheckpoint() + 1 + : Long.MAX_VALUE; + return new Store.RecoveryMetadataSnapshot(metadata, provideSeqNo, seqNoStats.getLocalCheckpoint() + 1, + seqNoStats.getMaxSeqNo()); + } + } + + private static CheckedSupplier calculateNoEngineRecoveryMetadata(Store store, + ShardPath shardPath) { + try { + Store.RecoveryMetadataSnapshot recoveryMetadata = store.getRecoveryMetadata(shardPath, true); + return () -> recoveryMetadata; + } catch (IOException | RuntimeException e) { + return () -> { throw e; }; + } + } + /** * Fails the shard and marks the shard store as corrupted if * e is caused by index corruption @@ -1248,6 +1303,9 @@ public void close(String reason, boolean flushEngine) throws IOException { changeState(IndexShardState.CLOSED, reason); } finally { final Engine engine = this.currentEngineReference.getAndSet(null); + if (engine != null) { + noEngineStoreRecoveryMetadata = () -> snapshotStoreRecoveryMetadataFromEngine(engine); + } try { if (engine != null && flushEngine) { engine.flushAndClose(); @@ -1292,6 +1350,64 @@ public void prepareForIndexRecovery() { assert currentEngineReference.get() == null; } + /** + * Finalize index recovery, called after copied files are in place. Cleans up old files, generates new empty translog and does other + * housekeeping for retention leases and maintaining the metadata to use for replica shard allocation. + */ + public void finalizeIndexRecovery(long globalCheckpoint, MetadataSnapshot sourceMetaData) throws IOException { + assert getEngineOrNull() == null; + + final Store store = store(); + store.incRef(); + try { + store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); + assert globalCheckpoint >= Long.parseLong(sourceMetaData.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)) + || indexSettings().getIndexVersionCreated().before(Version.V_7_2_0) : + "invalid global checkpoint[" + globalCheckpoint + "] source_meta_data [" + sourceMetaData.getCommitUserData() + "]"; + final String translogUUID = Translog.createEmptyTranslog( + shardPath().resolveTranslog(), globalCheckpoint, shardId, this.getPendingPrimaryTerm()); + store.associateIndexWithNewTranslog(translogUUID); + + if (getRetentionLeases().leases().isEmpty()) { + // if empty, may be a fresh IndexShard, so write an empty leases file to disk + persistRetentionLeases(); + assert loadRetentionLeases().leases().isEmpty(); + } else { + assert assertRetentionLeasesPersisted(); + } + + SequenceNumbers.CommitInfo commitInfo = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(sourceMetaData.getCommitUserData().entrySet()); + // we know this is a replica under recovery so we cannot provide anything, therefore use provideSeqNo MAX_VALUE + Store.RecoveryMetadataSnapshot newRecoveryMetadata = new Store.RecoveryMetadataSnapshot(sourceMetaData, + Long.MAX_VALUE, commitInfo.localCheckpoint + 1, commitInfo.maxSeqNo); + this.noEngineStoreRecoveryMetadata = () -> newRecoveryMetadata; + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. + try { + try { + store.removeCorruptionMarker(); + } finally { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + } + } catch (Exception e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); + } + this.noEngineStoreRecoveryMetadata = () -> { throw ex; }; + throw ex; + } catch (RuntimeException | IOException e) { + this.noEngineStoreRecoveryMetadata = () -> { throw e; }; + throw e; + } finally { + store.decRef(); + } + } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } @@ -1455,6 +1571,7 @@ private void innerOpenEngineAndTranslog() throws IOException { // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); + noEngineStoreRecoveryMetadata = null; } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -2026,7 +2143,7 @@ public void persistRetentionLeases() throws WriteStateException { replicationTracker.persistRetentionLeases(path.getShardStatePath()); } - public boolean assertRetentionLeasesPersisted() throws IOException { + private boolean assertRetentionLeasesPersisted() throws IOException { return replicationTracker.assertRetentionLeasesPersisted(path.getShardStatePath()); } diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 65d2f8d7812f8..9ce8cced46942 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -81,6 +81,7 @@ import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.translog.Translog; import java.io.Closeable; @@ -247,7 +248,7 @@ final void ensureOpen() { * Note that this method requires the caller verify it has the right to access the store and * no concurrent file changes are happening. If in doubt, you probably want to use one of the following: * - * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking + * {@link #readRecoveryMetadataSnapshot(ShardPath, NodeEnvironment.ShardLocker, Logger)} to read meta data while locking * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * @param commit the index commit to read the snapshot from or null if the latest snapshot should be read from the @@ -271,7 +272,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException { * Note that this method requires the caller verify it has the right to access the store and * no concurrent file changes are happening. If in doubt, you probably want to use one of the following: * - * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking + * {@link #readRecoveryMetadataSnapshot(ShardPath, NodeEnvironment.ShardLocker, Logger)} to read meta data while locking * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * @@ -305,6 +306,55 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t } } + /** + * Returns a new RecoveryMetadataSnapshot for the shard store. + * + * Note that this method requires the caller verify it has the right to access the store and + * no concurrent file changes are happening. If in doubt, you probably want to use one of the following: + * + * {@link #readRecoveryMetadataSnapshot(ShardPath, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking + * {@link IndexShard#snapshotStoreRecoveryMetadata()} to safely read from an existing shard + * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed + * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an + * unexpected exception when opening the index reading the segments file. + * @throws IndexFormatTooOldException if the lucene index is too old to be opened. + * @throws IndexFormatTooNewException if the lucene index is too new to be opened. + * @throws FileNotFoundException if one or more files referenced by a commit are not present. + * @throws NoSuchFileException if one or more files referenced by a commit are not present. + * @throws IndexNotFoundException if the commit point can't be found in this store + */ + public RecoveryMetadataSnapshot getRecoveryMetadata(ShardPath shardPath, boolean lockDirectory) throws IOException { + ensureOpen(); + failIfCorrupted(); + // if we lock the directory we also acquire the write lock since that makes sure that nobody else tries to lock the IW + // on this store at the same time. + java.util.concurrent.locks.Lock lock = lockDirectory ? metadataLock.writeLock() : metadataLock.readLock(); + lock.lock(); + try (Closeable ignored = lockDirectory ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : () -> {} ) { + return readRecoveryMetadataNoLock(shardPath, directory, logger); + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + markStoreCorrupted(ex); + throw ex; + } finally { + lock.unlock(); + } + } + + private static RecoveryMetadataSnapshot readRecoveryMetadataNoLock(ShardPath shardPath, Directory directory, + Logger logger) throws IOException { + MetadataSnapshot lastCommit = new MetadataSnapshot(null, directory, logger); + final String translogUUID = lastCommit.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(shardPath.resolveTranslog(), translogUUID); + final List existingCommits = DirectoryReader.listCommits(directory); + IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint); + final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit); + + return new RecoveryMetadataSnapshot(lastCommit, + Long.MAX_VALUE, // non-started shard cannot provide anything yet. + seqNoStats.localCheckpoint + 1, + Math.max(Translog.readMaxSeqNo(shardPath.resolveTranslog(), translogUUID), seqNoStats.maxSeqNo)); + } + /** * Renames all the given files from the key of the map to the * value of the map. All successfully renamed files are removed from the map in-place. @@ -442,24 +492,27 @@ private void closeInternal() { } /** - * Reads a MetadataSnapshot from the given index locations or returns an empty snapshot if it can't be read. + * Reads a RecoveryMetadataSnapshot from the given index locations or returns an empty snapshot if it can't be read. * * @throws IOException if the index we try to read is corrupted */ - public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, - Logger logger) throws IOException { + public static RecoveryMetadataSnapshot readRecoveryMetadataSnapshot(ShardPath shardPath, + NodeEnvironment.ShardLocker shardLocker, + Logger logger) throws IOException { + ShardId shardId = shardPath.getShardId(); try (ShardLock lock = shardLocker.lock(shardId, "read metadata snapshot", TimeUnit.SECONDS.toMillis(5)); - Directory dir = new SimpleFSDirectory(indexLocation)) { + Directory dir = new SimpleFSDirectory(shardPath.resolveIndex())) { failIfCorrupted(dir, shardId); - return new MetadataSnapshot(null, dir, logger); + return readRecoveryMetadataNoLock(shardPath, dir, logger); } catch (IndexNotFoundException ex) { // that's fine - happens all the time no need to log + logger.trace("{} node reported index not found, responding with empty", shardId); } catch (FileNotFoundException | NoSuchFileException ex) { logger.info("Failed to open / find files while reading metadata snapshot", ex); } catch (ShardLockObtainFailedException ex) { logger.info(() -> new ParameterizedMessage("{}: failed to obtain shard lock", shardId), ex); } - return MetadataSnapshot.EMPTY; + return RecoveryMetadataSnapshot.EMPTY; } /** @@ -1112,6 +1165,77 @@ public String getSyncId() { } } + /** + * Extended meta data snapshot including sequence number information: + *
    + *
  • provideRecoverySeqNo: the sequence number from which this copy can provide all operations (currently)
  • + *
  • requireRecoverySeqNo: the sequence number from which all operations are required by this copy in order to perform + * operations based recovery
  • + *
  • maxSeqNo: the maximum sequence number this copy knows of
  • + *
+ * @see MetadataSnapshot + */ + public static final class RecoveryMetadataSnapshot implements Writeable { + public static final RecoveryMetadataSnapshot EMPTY = new RecoveryMetadataSnapshot(); + private final MetadataSnapshot lastCommit; + private final long provideRecoverySeqNo; + private final long requireRecoverySeqNo; + private final long maxSeqNo; + + public RecoveryMetadataSnapshot(MetadataSnapshot lastCommit, long provideRecoverySeqNo, long requireRecoverySeqNo, long maxSeqNo) { + this.lastCommit = lastCommit; + this.provideRecoverySeqNo = provideRecoverySeqNo; + this.requireRecoverySeqNo = requireRecoverySeqNo; + this.maxSeqNo = maxSeqNo; + } + + public RecoveryMetadataSnapshot(StreamInput in) throws IOException { + this.lastCommit = new MetadataSnapshot(in); + if (in.getVersion().onOrAfter(org.elasticsearch.Version.V_8_0_0)) { // todo: change to 7_X when backported + this.provideRecoverySeqNo = in.readLong(); + this.requireRecoverySeqNo = in.readLong(); + this.maxSeqNo = in.readLong(); + } else { + this.provideRecoverySeqNo = Long.MAX_VALUE; + this.requireRecoverySeqNo = SequenceNumbers.NO_OPS_PERFORMED; + this.maxSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + } + } + + private RecoveryMetadataSnapshot() { + this(MetadataSnapshot.EMPTY, Long.MAX_VALUE, SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.UNASSIGNED_SEQ_NO); + } + + public MetadataSnapshot lastCommit() { + return lastCommit; + } + + public long provideRecoverySeqNo() { + return provideRecoverySeqNo; + } + + public long requireRecoverySeqNo() { + return requireRecoverySeqNo; + } + + /** + * @return max sequence number or UNASSIGNED_SEQ_NO if not available. + */ + public long maxSeqNo() { + return maxSeqNo; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + lastCommit.writeTo(out); + if (out.getVersion().onOrAfter(org.elasticsearch.Version.V_8_0_0)) { // todo: change to 7_X when backported + out.writeLong(provideRecoverySeqNo); + out.writeLong(requireRecoverySeqNo); + out.writeLong(maxSeqNo); + } + } + } + /** * A class representing the diff between a recovery source and recovery target * diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 841201b321549..0837da572ab04 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1797,6 +1797,30 @@ public static long readGlobalCheckpoint(final Path location, final String expect return checkpoint.globalCheckpoint; } + /** + * Reads the maximum sequence number from all active generations of the translog. + * Checks that the translogUUID matches + * + * Notice that trimOperations calls are not taken into account and therefore, the maxSeqNo returned can be higher than the max(seqNo) + * of all operations in translog + * + * @param location the location of the translog + * @return the maximum sequence number in translog or -1 if no operations. + * @throws IOException if an I/O exception occurred reading the checkpoint + * @throws TranslogCorruptedException if the translog is corrupted or mismatched with the given uuid + */ + public static long readMaxSeqNo(final Path location, final String expectedTranslogUUID) throws IOException { + final Checkpoint checkpoint = readCheckpoint(location, expectedTranslogUUID); + + assert checkpoint.minTranslogGeneration >= 0 : "missing minTranslogGeneration"; + long maxSeqNo = checkpoint.maxSeqNo; + for (long i = checkpoint.generation - 1; i >= checkpoint.minTranslogGeneration; i--) { + Checkpoint previous = Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))); + maxSeqNo = Math.max(maxSeqNo, previous.maxSeqNo); + } + return maxSeqNo; + } + private static Checkpoint readCheckpoint(Path location, String expectedTranslogUUID) throws IOException { final Checkpoint checkpoint = readCheckpoint(location); // We need to open at least one translog header to validate the translogUUID. @@ -1835,6 +1859,9 @@ public String getTranslogUUID() { /** * Returns the max seq_no of translog operations found in this translog. Since this value is calculated based on the current * existing readers, this value is not necessary to be the max seq_no of all operations have been stored in this translog. + * + * Notice that trimOperations calls are not taken into account and therefore, the maxSeqNo returned can be higher than the max(seqNo) + * of all operations in translog */ public long getMaxSeqNo() { try (ReleasableLock ignored = readLock.acquire()) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index ce1eb3ac85589..20dfb5b5cb0f4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -20,9 +20,6 @@ package org.elasticsearch.indices.recovery; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -32,14 +29,12 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; import org.elasticsearch.index.shard.IndexShardState; @@ -392,50 +387,12 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada // the actual names, its ok if we have a corrupted index here, since we have replicas // to recover from in case of a full cluster shutdown just when this code executes... multiFileWriter.renameAllTempFiles(); - final Store store = store(); - store.incRef(); try { - store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); - assert globalCheckpoint >= Long.parseLong(sourceMetaData.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)) - || indexShard.indexSettings().getIndexVersionCreated().before(Version.V_7_2_0) : - "invalid global checkpoint[" + globalCheckpoint + "] source_meta_data [" + sourceMetaData.getCommitUserData() + "]"; - final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); - store.associateIndexWithNewTranslog(translogUUID); - - if (indexShard.getRetentionLeases().leases().isEmpty()) { - // if empty, may be a fresh IndexShard, so write an empty leases file to disk - indexShard.persistRetentionLeases(); - assert indexShard.loadRetentionLeases().leases().isEmpty(); - } else { - assert indexShard.assertRetentionLeasesPersisted(); - } - - } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { - // this is a fatal exception at this stage. - // this means we transferred files from the remote that have not be checksummed and they are - // broken. We have to clean up this shard entirely, remove all files and bubble it up to the - // source shard since this index might be broken there as well? The Source can handle this and checks - // its content on disk if possible. - try { - try { - store.removeCorruptionMarker(); - } finally { - Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files - } - } catch (Exception e) { - logger.debug("Failed to clean lucene index", e); - ex.addSuppressed(e); - } - RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); - fail(rfe, true); - throw rfe; + indexShard.finalizeIndexRecovery(globalCheckpoint, sourceMetaData); } catch (Exception ex) { RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); fail(rfe, true); throw rfe; - } finally { - store.decRef(); } } diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 5ebbdab39835b..1955baef501f8 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -43,6 +43,7 @@ import org.elasticsearch.gateway.AsyncShardFetch; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -124,7 +125,7 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException IndexShard indexShard = indexService.getShardOrNull(shardId.id()); if (indexShard != null) { exists = true; - return new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata()); + return new StoreFilesMetaData(shardId, indexShard.snapshotStoreRecoveryMetadata()); } } // try and see if we an list unallocated @@ -138,20 +139,20 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException } if (metaData == null) { logger.trace("{} node doesn't have meta data for the requests index, responding with empty", shardId); - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + return new StoreFilesMetaData(shardId, Store.RecoveryMetadataSnapshot.EMPTY); } final IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(metaData, settings); final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); if (shardPath == null) { - return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY); + logger.trace("{} node doesn't have shard path for the requested shard, responding with empty", shardId); + return new StoreFilesMetaData(shardId, Store.RecoveryMetadataSnapshot.EMPTY); } // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: // 1) a shard is being constructed, which means the master will not use a copy of this replica // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not // reuse local resources. - return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, - nodeEnv::shardLock, logger)); + return new StoreFilesMetaData(shardId, Store.readRecoveryMetadataSnapshot(shardPath, nodeEnv::shardLock, logger)); } finally { TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); if (exists) { @@ -164,12 +165,12 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException public static class StoreFilesMetaData implements Iterable, Streamable { private ShardId shardId; - Store.MetadataSnapshot metadataSnapshot; + private Store.RecoveryMetadataSnapshot metadataSnapshot; StoreFilesMetaData() { } - public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot) { + public StoreFilesMetaData(ShardId shardId, Store.RecoveryMetadataSnapshot metadataSnapshot) { this.shardId = shardId; this.metadataSnapshot = metadataSnapshot; } @@ -179,20 +180,20 @@ public ShardId shardId() { } public boolean isEmpty() { - return metadataSnapshot.size() == 0; + return metadataSnapshot.lastCommit().size() == 0; } @Override public Iterator iterator() { - return metadataSnapshot.iterator(); + return metadataSnapshot.lastCommit().iterator(); } public boolean fileExists(String name) { - return metadataSnapshot.asMap().containsKey(name); + return metadataSnapshot.lastCommit().asMap().containsKey(name); } public StoreFileMetaData file(String name) { - return metadataSnapshot.asMap().get(name); + return metadataSnapshot.lastCommit().asMap().get(name); } public static StoreFilesMetaData readStoreFilesMetaData(StreamInput in) throws IOException { @@ -204,7 +205,7 @@ public static StoreFilesMetaData readStoreFilesMetaData(StreamInput in) throws I @Override public void readFrom(StreamInput in) throws IOException { shardId = ShardId.readShardId(in); - this.metadataSnapshot = new Store.MetadataSnapshot(in); + this.metadataSnapshot = new Store.RecoveryMetadataSnapshot(in); } @Override @@ -217,14 +218,38 @@ public void writeTo(StreamOutput out) throws IOException { * @return commit sync id if exists, else null */ public String syncId() { - return metadataSnapshot.getSyncId(); + return metadataSnapshot.lastCommit().getSyncId(); + } + + public long provideRecoverySeqNo() { + return metadataSnapshot.provideRecoverySeqNo(); + } + + public long requireRecoverySeqNo() { + return metadataSnapshot.requireRecoverySeqNo(); + } + + /** + * @return max sequence number or UNASSIGNED_SEQ_NO if not available. + */ + public long maxSeqNo() { + return metadataSnapshot.maxSeqNo(); + } + + public boolean hasSeqNoInfo() { + return maxSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO; } @Override public String toString() { return "StoreFilesMetaData{" + ", shardId=" + shardId + - ", metadataSnapshot{size=" + metadataSnapshot.size() + ", syncId=" + metadataSnapshot.getSyncId() + "}" + + ", metadataSnapshot{size=" + metadataSnapshot.lastCommit().size() + + ", syncId=" + syncId() + + ", requireSeqNo=" + requireRecoverySeqNo() + + ", provideSeqNo=" + provideRecoverySeqNo() + + ", maxSeqNo=" + maxSeqNo() + + "}" + '}'; } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index 941ad3c658aba..b0bdb7ec0a164 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -263,7 +263,7 @@ public void testUnassignedReplicaDelayedAllocation() throws Exception { nodes.put(primaryNodeName, AllocationDecision.NO); String[] currentNodes = internalCluster().getNodeNames(); nodes.put(currentNodes[0].equals(primaryNodeName) ? currentNodes[1] : currentNodes[0], AllocationDecision.YES); - verifyNodeDecisions(parser, nodes, includeYesDecisions, true); + verifyNodeDecisions(parser, nodes, includeYesDecisions, true, true); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -383,7 +383,7 @@ public void testUnassignedReplicaWithPriorCopy() throws Exception { for (String nodeName : internalCluster().getNodeNames()) { nodeDecisions.put(nodeName, AllocationDecision.NO); } - verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, true); + verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, true, true); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -476,7 +476,7 @@ public void testAllocationFilteringOnIndexCreation() throws Exception { for (String nodeName : internalCluster().getNodeNames()) { nodeDecisions.put(nodeName, AllocationDecision.NO); } - verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, false); + verifyNodeDecisions(parser, nodeDecisions, includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -584,7 +584,7 @@ public void testAllocationFilteringPreventsShardMove() throws Exception { assertEquals("move_explanation", parser.currentName()); parser.nextToken(); assertEquals("cannot move shard to another node, even though it is not allowed to remain on its current node", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -696,7 +696,7 @@ public void testRebalancingNotAllowed() throws Exception { parser.nextToken(); assertEquals("rebalancing is not allowed, even though there is at least one node on which the shard can be allocated", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.YES, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.YES, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -799,7 +799,7 @@ public void testWorseBalance() throws Exception { parser.nextToken(); assertEquals("cannot rebalance as no target node exists that can both allocate this shard and improve the cluster balance", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.WORSE_BALANCE, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.WORSE_BALANCE, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -909,7 +909,7 @@ public void testBetterBalanceButCannotAllocate() throws Exception { parser.nextToken(); assertEquals("cannot rebalance as no target node exists that can both allocate this shard and improve the cluster balance", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, true), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -1006,7 +1006,7 @@ public void testAssignedReplicaOnSpecificNode() throws Exception { assertEquals("rebalance_explanation", parser.currentName()); parser.nextToken(); assertEquals("rebalancing is not allowed", parser.text()); - verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, false), includeYesDecisions, false); + verifyNodeDecisions(parser, allNodeDecisions(AllocationDecision.NO, false), includeYesDecisions, false, false); assertEquals(Token.END_OBJECT, parser.nextToken()); } } @@ -1326,7 +1326,7 @@ private void verifyStaleShardCopyNodeDecisions(XContentParser parser, int numNod } private void verifyNodeDecisions(XContentParser parser, Map expectedNodeDecisions, - boolean includeYesDecisions, boolean reuseStore) throws IOException { + boolean includeYesDecisions, boolean reuseStore, boolean synced) throws IOException { parser.nextToken(); assertEquals("node_allocation_decisions", parser.currentName()); assertEquals(Token.START_ARRAY, parser.nextToken()); @@ -1346,9 +1346,15 @@ private void verifyNodeDecisions(XContentParser parser, Map fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + assertThat(test.reroute.get(), equalTo(0)); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(1)); + + // verify we get back right data from node + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1)); + + // second fetch gets same data + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1)); + + test.clearCacheForNode(node1.getId()); + + // prepare next request + test.addSimulation(node1.getId(), response1_2); + + // no fetched data, new request on going + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(2)); + + // verify we get new data back + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); + } + + public void testConcurrentRequestAndClearCache() throws Exception { + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build(); + test.addSimulation(node1.getId(), response1); + + // no fetched data, request still on going + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + assertThat(test.reroute.get(), equalTo(0)); + + // clear cache while request is still on going, before it is processed + test.clearCacheForNode(node1.getId()); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(1)); + + // prepare next request + test.addSimulation(node1.getId(), response1_2); + + // verify still no fetched data, request still on going + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(2)); + + // verify we get new data back + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); + + } static class TestFetch extends AsyncShardFetch { static class Entry { diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java new file mode 100644 index 0000000000000..8c8334f36942b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -0,0 +1,145 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.InternalTestCluster; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class ReplicaShardAllocatorIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(InternalSettingsPlugin.class); + } + + public void testRecentPrimaryData() throws Exception { + final String indexName = "test"; + internalCluster().startMasterOnlyNode(); + String primary = internalCluster().startDataOnlyNode(); + + assertAcked(client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") + // disable merges to keep segments the same + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false")) + .get()); + + String replica1 = internalCluster().startDataOnlyNode(); + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(100, 200)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + flush(indexName); + + final String initialReplica = allocatedToReplica(indexName); + + String replicaWithFileOverlap = internalCluster().startDataOnlyNode(); + ensureGreen(); + + // this extra node ensures allocation deciders say yes. But due to delayed allocation, it will not be used until we are + // done. + internalCluster().startDataOnlyNode(); + + internalCluster().restartNode(replicaWithFileOverlap, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + // disabled below for now, since it makes test succeed. Problem is that when a node leaves the cluster, + // GatewayAllocator.applyFailedShards is not called. Likewise, when TransportWriteAction marks stale, it does not call + // it because it is no longer in the routing table anyway. + // without below the test triggers the cache issue half the time (since it picks one of the nodes with matching seqno + // randomly). + + // ensure replicaWithFileOverlap is outdated seqno-wise compared to primary +// indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 5)) +// .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + // this should fix above, but cannot do, since it picks wrong node to start (data folder issue). + // clear cache, since this is not done when replica1 dies below (bug?). +// internalCluster().restartNode(primary, new InternalTestCluster.RestartCallback()); + + internalCluster().restartNode(replica1, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")).get()); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)) + .get()); + + // todo: can we wait for first reroute being done, including async-fetching and subsequent processing? + Thread.sleep(100); + + // invalidate primary data compared to cache on master. + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 5)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + return super.onNodeStopped(nodeName); + } + }); + return super.onNodeStopped(nodeName); + } + }); + + assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.YELLOW)); + + logger.info("--> Re-enabling allocation"); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String) null)).get()); + ensureGreen(); + + String after = allocatedToReplica(indexName); + logger.info("--> Now allocated to {}, was {}", after, initialReplica); + assertThat(after, not(equalTo(initialReplica))); + } + + private String allocatedToReplica(String indexName) { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + List replicas = + clusterState.routingTable().allShards(indexName).stream() + .filter(r -> r.primary() == false).filter(ShardRouting::started).map(ShardRouting::currentNodeId).collect(toList()); + assertEquals(1, replicas.size()); + return replicas.get(0); + } +} diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index d30f7eafce4a8..17cd9b6bb1df5 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,11 +48,11 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; -import org.elasticsearch.cluster.ESAllocationTestCase; import org.junit.Before; import java.util.Arrays; @@ -60,6 +61,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.Matchers.equalTo; @@ -74,6 +76,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { private TestAllocator testAllocator; + private AtomicLong sequenceGenerator = new AtomicLong(); + @Before public void buildTestAllocator() { this.testAllocator = new TestAllocator(); @@ -145,6 +149,34 @@ public void testSyncIdMatch() { equalTo(nodeToMatch.getId())); } + /** + * Verifies that if sequence numbers allow a noop recovery, this is preferred over file match. + */ + public void testSequenceNumberNoopMatch() { + RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders()); + DiscoveryNode nodeToMatch = randomBoolean() ? node2 : node3; + DiscoveryNode nodeNotToMatch = nodeToMatch == node2 ? node3 : node2; + long maxSeqNo = randomNonNegativeLong(); + testAllocator + .addData(node1, "MATCH", randomLongBetween(0, maxSeqNo + 1), randomLongBetween(0, maxSeqNo + 1), maxSeqNo, + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) + .addData(nodeToMatch, "NO_MATCH", randomLongBetween(0, maxSeqNo + 1), maxSeqNo + 1, maxSeqNo, + new StoreFileMetaData("file1", 10, "NO_MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + if (randomBoolean()) { + testAllocator.addData(nodeNotToMatch, "NO_MATCH", randomLongBetween(0, maxSeqNo + 1), randomLongBetween(0, maxSeqNo), maxSeqNo, + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + } else { + testAllocator + .addData(nodeNotToMatch, "NO_MATCH", + randomLongBetween(0, maxSeqNo + 1), maxSeqNo + 1, randomLongBetween(maxSeqNo+1, Long.MAX_VALUE), + new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + } + testAllocator.allocateUnassigned(allocation); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(nodeToMatch.getId())); + } + /** * Verifies that when there is no sync id match but files match, we allocate it to matching node. */ @@ -298,10 +330,13 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide } private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason) { + IndexMetaData.State indexState = randomFrom(IndexMetaData.State.values()); + ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT).put(settings)) .numberOfShards(1).numberOfReplicas(1) + .state(indexState) .putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId()))) .build(); // mark shard as delayed if reason is NODE_LEFT @@ -369,6 +404,21 @@ public boolean getFetchDataCalledAndClean() { } public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaData... files) { + long requireSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + long provideSeqNo = Long.MAX_VALUE; + long maxSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + if (randomBoolean()) { + // generate unique sequence numbers, validating that if replica.requireSeqNo != primary.maxSeqNo + 1, it has no effect. + long seqNo = sequenceGenerator.incrementAndGet(); + requireSeqNo = seqNo + 1; + provideSeqNo = seqNo + 1; + maxSeqNo = seqNo; + } + return addData(node, syncId, provideSeqNo, requireSeqNo, maxSeqNo, files); + } + + public TestAllocator addData(DiscoveryNode node, String syncId, long provideSeqNo, long requireSeqNo, long maxSeqNo, + StoreFileMetaData... files) { if (data == null) { data = new HashMap<>(); } @@ -381,7 +431,8 @@ public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaDat commitData.put(Engine.SYNC_COMMIT_ID, syncId); } data.put(node, new TransportNodesListShardStoreMetaData.StoreFilesMetaData(shardId, - new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()))); + new Store.RecoveryMetadataSnapshot(new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), + unmodifiableMap(commitData), randomInt()), provideSeqNo, requireSeqNo, maxSeqNo))); return this; } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b34f364bbed2c..dd583563ee2f1 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1285,32 +1285,43 @@ public void testSnapshotStore() throws IOException { final IndexShard shard = newStartedShard(true); indexDoc(shard, "_doc", "0"); flushShard(shard); + SeqNoStats seqNoStats = shard.seqNoStats(); final IndexShard newShard = reinitShard(shard); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, false); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, false); assertTrue(newShard.recoverFromStore()); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, true); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, true); newShard.close("test", false); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); + verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, false); + + Store.RecoveryMetadataSnapshot recoveryMetadataSnapshot = Store.readRecoveryMetadataSnapshot(shard.shardPath(), + (id, l, d) -> new DummyShardLock(id), logger); + assertThat(recoveryMetadataSnapshot.lastCommit().getSegmentsFile().name(), equalTo("segments_3")); + verifySnapshotRecoveryStoreMetadata(recoveryMetadataSnapshot, snapshot, seqNoStats, false); closeShards(newShard); } @@ -2933,11 +2944,14 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept } indexDoc(indexShard, "_doc", "1", "{}"); indexShard.flush(new FlushRequest()); + SeqNoStats seqNoStats = indexShard.seqNoStats(); + updateGlobalCheckpointOnReplica(indexShard, seqNoStats.getLocalCheckpoint()); closeShards(indexShard); final IndexShard newShard = reinitShard(indexShard); Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); assertTrue("at least 2 files, commit and data: " +storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); + verifySnapshotRecoveryStoreMetadata(newShard, storeFileMetaDatas, seqNoStats, false); AtomicBoolean stop = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); expectThrows(AlreadyClosedException.class, () -> newShard.getEngine()); // no engine @@ -2949,6 +2963,7 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept assertEquals(0, storeFileMetaDatas.recoveryDiff(readMeta).different.size()); assertEquals(0, storeFileMetaDatas.recoveryDiff(readMeta).missing.size()); assertEquals(storeFileMetaDatas.size(), storeFileMetaDatas.recoveryDiff(readMeta).identical.size()); + verifySnapshotRecoveryStoreMetadata(newShard, storeFileMetaDatas, seqNoStats, false); } catch (IOException e) { throw new AssertionError(e); } @@ -3100,6 +3115,8 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { } } indexShard.flush(new FlushRequest()); + SeqNoStats seqNoStats = indexShard.seqNoStats(); + updateGlobalCheckpointOnReplica(indexShard, seqNoStats.getLocalCheckpoint()); closeShards(indexShard); final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), @@ -3126,6 +3143,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { assertThat(storeFileMetaDatas.recoveryDiff(readMeta).different.size(), equalTo(0)); assertThat(storeFileMetaDatas.recoveryDiff(readMeta).missing.size(), equalTo(0)); assertThat(storeFileMetaDatas.recoveryDiff(readMeta).identical.size(), equalTo(storeFileMetaDatas.size())); + verifySnapshotRecoveryStoreMetadata(newShard, storeFileMetaDatas, seqNoStats, false); } catch (IOException e) { throw new AssertionError(e); } @@ -3982,4 +4000,25 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs)); closeShards(readonlyShard); } + + private void updateGlobalCheckpointOnReplica(IndexShard indexShard, long localCheckpoint) { + if (indexShard.routingEntry().primary() == false) { + indexShard.updateGlobalCheckpointOnReplica(localCheckpoint, "test"); + } + } + + private void verifySnapshotRecoveryStoreMetadata(IndexShard shard, Store.MetadataSnapshot snapshot, SeqNoStats seqNoStats, + boolean engineOn) throws IOException { + Store.RecoveryMetadataSnapshot recoveryMetadataSnapshot = shard.snapshotStoreRecoveryMetadata(); + verifySnapshotRecoveryStoreMetadata(recoveryMetadataSnapshot, snapshot, seqNoStats, engineOn); + } + + private void verifySnapshotRecoveryStoreMetadata(Store.RecoveryMetadataSnapshot recoveryMetadataSnapshot, + Store.MetadataSnapshot snapshot, SeqNoStats seqNoStats, boolean engineOn) { + assertEquals(snapshot.asMap().keySet(), recoveryMetadataSnapshot.lastCommit().asMap().keySet()); + assertEquals(snapshot.getCommitUserData(), recoveryMetadataSnapshot.lastCommit().getCommitUserData()); + assertEquals(seqNoStats.getLocalCheckpoint() + 1, recoveryMetadataSnapshot.requireRecoverySeqNo()); + assertEquals(seqNoStats.getMaxSeqNo(), recoveryMetadataSnapshot.maxSeqNo()); + assertEquals(engineOn ? seqNoStats.getLocalCheckpoint() + 1 : Long.MAX_VALUE, recoveryMetadataSnapshot.provideRecoverySeqNo()); + } } diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 9e8fae209dd81..2513e0925a01c 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -58,6 +58,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -72,6 +73,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.VersionUtils; import org.hamcrest.Matchers; import java.io.ByteArrayInputStream; @@ -89,9 +91,12 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -861,10 +866,11 @@ public void testStreamStoreFilesMetaData() throws Exception { Store.MetadataSnapshot metadataSnapshot = createMetaDataSnapshot(); TransportNodesListShardStoreMetaData.StoreFilesMetaData outStoreFileMetaData = new TransportNodesListShardStoreMetaData.StoreFilesMetaData(new ShardId("test", "_na_", 0), - metadataSnapshot); + new Store.RecoveryMetadataSnapshot(metadataSnapshot, randomLong(), randomLong(), randomLong())); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); - org.elasticsearch.Version targetNodeVersion = randomVersion(random()); + org.elasticsearch.Version targetNodeVersion = + randomVersionBetween(random(), org.elasticsearch.Version.V_8_0_0, org.elasticsearch.Version.CURRENT); out.setVersion(targetNodeVersion); outStoreFileMetaData.writeTo(out); ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); @@ -877,6 +883,39 @@ public void testStreamStoreFilesMetaData() throws Exception { assertThat(inFile.name(), equalTo(outFiles.next().name())); } assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId())); + assertEquals(outStoreFileMetaData.maxSeqNo(), inStoreFileMetaData.maxSeqNo()); + assertEquals(outStoreFileMetaData.provideRecoverySeqNo(), inStoreFileMetaData.provideRecoverySeqNo()); + assertEquals(outStoreFileMetaData.requireRecoverySeqNo(), inStoreFileMetaData.requireRecoverySeqNo()); + } + + public void testStreamCompatibilityForRecoveryMetaDataSnapshot() throws IOException { + Store.MetadataSnapshot metadataSnapshot = createMetaDataSnapshot(); + Store.RecoveryMetadataSnapshot recoverySnapshot = new Store.RecoveryMetadataSnapshot(metadataSnapshot, randomLong(), randomLong() + , randomLong()); + + verifyStreamCompatibility(metadataSnapshot, Function.identity(), in -> new Store.RecoveryMetadataSnapshot(in).lastCommit()); + verifyStreamCompatibility(recoverySnapshot, Store.RecoveryMetadataSnapshot::lastCommit, Store.MetadataSnapshot::new); + } + + private void verifyStreamCompatibility(S source, + Function toMetaData, + Writeable.Reader reader) throws IOException { + // todo: change to V_7_X once backported. + org.elasticsearch.Version targetNodeVersion = + randomFrom(VersionUtils.allVersions().stream().filter(org.elasticsearch.Version.V_8_0_0::after).collect(Collectors.toList())); + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(targetNodeVersion); + source.writeTo(out); + + ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); + in.setVersion(targetNodeVersion); + + Store.MetadataSnapshot inData = reader.read(in); + Store.MetadataSnapshot outData = toMetaData.apply(source); + assertEquals(outData.asMap().keySet(), inData.asMap().keySet()); + assertEquals(outData.getCommitUserData(), inData.getCommitUserData()); } public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index ceca8a811a67f..5c0db9ce92690 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.translog; import com.carrotsearch.randomizedtesting.generators.RandomPicks; - import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.document.Field; @@ -3137,6 +3136,7 @@ void callCloseOnTragicEvent() { } public void testMaxSeqNo() throws Exception { + final String translogUUID = translog.getTranslogUUID(); Map maxSeqNoPerGeneration = new HashMap<>(); for (int iterations = between(1, 10), i = 0; i < iterations; i++) { long startSeqNo = randomLongBetween(0, Integer.MAX_VALUE); @@ -3152,13 +3152,21 @@ public void testMaxSeqNo() throws Exception { translog.rollGeneration(); } translog.sync(); - assertThat(translog.getMaxSeqNo(), - equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values()))); + + long expectedMaxSeqNo = maxSeqNoPerGeneration.isEmpty() + ? SequenceNumbers.NO_OPS_PERFORMED + : Collections.max(maxSeqNoPerGeneration.values()); + assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo)); + assertThat(Translog.readMaxSeqNo(translogDir, translogUUID), equalTo(expectedMaxSeqNo)); + long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration()); - long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() + expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() .filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue()) .max().orElse(SequenceNumbers.NO_OPS_PERFORMED); assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo)); + + assertThat(Translog.readMaxSeqNo(translogDir, translogUUID), equalTo(expectedMaxSeqNo)); + expectThrows(TranslogCorruptedException.class, () -> Translog.readMaxSeqNo(translogDir, UUIDs.randomBase64UUID())); } static class SortedSnapshot implements Translog.Snapshot { diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 740034f12ecc5..01c6500634e1d 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -421,6 +422,56 @@ public Settings onNodeStopped(String nodeName) throws Exception { } } + /** + * Verify that if we have two shard copies around, we prefer one with identical sequence numbers and do + * a noop recovery. + */ + public void testClosedIndexRecoversFast() throws Exception { + final String indexName = "closed-index-fast-recovery"; + internalCluster().ensureAtLeastNumDataNodes(3); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureGreen(indexName); + if (randomBoolean()) { + flush(indexName); + } + + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", "Extra" + i).setSource("num", i)).collect(toList())); + ensureGreen(); + + assertAcked(client().admin().indices().prepareClose(indexName).get()); + ensureGreen(); + + // disable replica allocation to ensure ReplicaShardAllocator sees both options at the same time. + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")).get()); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ensureYellow(); + return super.onNodeStopped(nodeName); + } + }); + return super.onNodeStopped(nodeName); + } + }); + + assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.YELLOW)); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String) null)).get()); + ensureGreen(); + assertNoFileBasedRecovery(indexName); + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { @@ -467,7 +518,7 @@ static void assertException(final Throwable throwable, final String indexName) { } } - void assertNoFileBasedRecovery(String indexName) { + private void assertNoFileBasedRecovery(String indexName) { for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { if (recovery.getPrimary() == false) { assertThat(recovery.getIndex().fileDetails(), empty()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 7ff928c4413d2..a460db7868cfa 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -148,8 +148,8 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE; -import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE; +import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.getTestTransportType; @@ -562,9 +562,14 @@ private NodeAndClient getRandomNodeAndClient() { return getRandomNodeAndClient(nc -> true); } - private synchronized NodeAndClient getRandomNodeAndClient(Predicate predicate) { + private NodeAndClient getRandomNodeAndClient(Predicate predicate) { + return getRandomNodeAndClientIncludingClosed(((Predicate) nc -> nc.isClosed() == false).and(predicate)); + } + + private synchronized NodeAndClient getRandomNodeAndClientIncludingClosed(Predicate predicate) { ensureOpen(); - List values = nodes.values().stream().filter(predicate).collect(Collectors.toList()); + List values = nodes.values().stream().filter(predicate) + .collect(Collectors.toList()); if (values.isEmpty() == false) { return randomFrom(random, values); } @@ -1003,6 +1008,10 @@ public void close() throws IOException { } } + public boolean isClosed() { + return closed.get(); + } + private void markNodeDataDirsAsPendingForWipe(Node node) { assert Thread.holdsLock(InternalTestCluster.this); NodeEnvironment nodeEnv = node.getNodeEnvironment(); @@ -1178,10 +1187,11 @@ public synchronized void validateClusterFormed() { /** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */ private synchronized void validateClusterFormed(String viaNode) { - Set expectedNodes = new HashSet<>(); - for (NodeAndClient nodeAndClient : nodes.values()) { - expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode()); - } + Set expectedNodes = + nodes.values().stream() + .filter(nc -> nc.isClosed() == false) + .map(nc -> getInstanceFromNode(ClusterService.class, nc.node()).localNode()) + .collect(Collectors.toSet()); logger.trace("validating cluster formed via [{}], expecting {}", viaNode, expectedNodes); final Client client = client(viaNode); try { @@ -1515,7 +1525,7 @@ public T getMasterNodeInstance(Class clazz) { } private synchronized T getInstance(Class clazz, Predicate predicate) { - NodeAndClient randomNodeAndClient = getRandomNodeAndClient(predicate); + NodeAndClient randomNodeAndClient = getRandomNodeAndClientIncludingClosed(predicate); assert randomNodeAndClient != null; return getInstanceFromNode(clazz, randomNodeAndClient.node); } @@ -1533,7 +1543,7 @@ private static T getInstanceFromNode(Class clazz, Node node) { @Override public int size() { - return nodes.size(); + return Math.toIntExact(nodes.values().stream().filter(nc -> nc.isClosed() == false).count()); } @Override @@ -2085,7 +2095,10 @@ private static int getMinMasterNodes(int eligibleMasterNodes) { } private int getMasterNodesCount() { - return (int) nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); + return (int) nodes.values().stream() + .filter(n -> n.isClosed() == false) + .filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())) + .count(); } public String startMasterOnlyNode() { From 94a3c8d104197f122093ade9d85997c3bb2caa36 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 28 May 2019 12:26:34 +0200 Subject: [PATCH 2/6] Fixed rolling-upgrade test --- .../elasticsearch/upgrades/RecoveryIT.java | 54 +++++++++++++++---- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 8fd58bf3c4329..b93afcb9fe61d 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -396,10 +397,9 @@ public void testRecoveryClosedIndex() throws Exception { // then delayed allocation will kick in. When the node comes back, the master will search for a copy // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN // before timing out - .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "30s") + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster .build()); - indexDocs(indexName, 0, randomInt(10)); ensureGreen(indexName); closeIndex(indexName); } @@ -410,9 +410,6 @@ public void testRecoveryClosedIndex() throws Exception { // so we expect the index to be closed and replicated ensureGreen(indexName); assertClosedIndex(indexName, true); - if (minimumNodeVersion().onOrAfter(Version.V_8_0_0)) { // todo: change to 7_X once backported. - assertNoFileBasedRecovery(indexName); - } } else { assertClosedIndex(indexName, false); } @@ -447,6 +444,44 @@ public void testCloseIndexDuringRollingUpgrade() throws Exception { } } + + /** + * We test that a closed index makes no-op replica allocation only. + */ + public void testClosedIndexReplicaAllocation() throws Exception { + final String indexName = "closed_index_replica_allocation"; + if (CLUSTER_TYPE == ClusterType.OLD) { + createIndex(indexName, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "30s") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster + .put("index.routing.allocation.include._name", "node-0") + .build()); + indexDocs(indexName, 0, randomInt(10)); + // allocate replica to node-2 + updateIndexSettings(indexName, Settings.builder().put("index.routing.allocation.include._name", + "node-0,node-2,upgraded-node-*")); + ensureGreen(indexName); + closeIndex(indexName); + } + + final Version indexVersionCreated = indexVersionCreated(indexName); + if (indexVersionCreated.onOrAfter(Version.V_7_2_0)) { + // index was created on a version that supports the replication of closed indices, + // so we expect the index to be closed and replicated + ensureGreen(indexName); + assertClosedIndex(indexName, true); + // todo: change to 7_X once backported. + if (CLUSTER_TYPE != ClusterType.OLD && minimumNodeVersion().onOrAfter(Version.V_8_0_0)) { + assertNoFileBasedRecovery(indexName, s-> s.startsWith("upgraded")); + } + } else { + assertClosedIndex(indexName, false); + } + + } /** * Returns the version in which the given index has been created */ @@ -573,7 +608,7 @@ private void ensureGlobalCheckpointSynced(String index) throws Exception { }, 60, TimeUnit.SECONDS); } - private void assertNoFileBasedRecovery(String indexName) throws IOException { + private void assertNoFileBasedRecovery(String indexName, Predicate targetNode) throws IOException { Map recoveries = entityAsMap(client() .performRequest(new Request("GET", indexName + "/_recovery?detailed=true"))); @@ -582,20 +617,21 @@ private void assertNoFileBasedRecovery(String indexName) throws IOException { assertNotNull(shards); boolean foundReplica = false; for (Map shard : shards) { - if (shard.get("primary") == Boolean.FALSE) { + if (shard.get("primary") == Boolean.FALSE + && targetNode.test((String) XContentMapValues.extractValue("target.name", shard))) { List details = (List) XContentMapValues.extractValue("index.files.details", shard); // once detailed recoveries works, remove this if. if (details == null) { long totalFiles = ((Number) XContentMapValues.extractValue("index.files.total", shard)).longValue(); long reusedFiles = ((Number) XContentMapValues.extractValue("index.files.reused", shard)).longValue(); - assertEquals(totalFiles, reusedFiles); + assertEquals("must reuse all files, recoveries [" + recoveries + "]", totalFiles, reusedFiles); } else { assertNotNull(details); assertThat(details, empty()); } long translogRecovered = ((Number) XContentMapValues.extractValue("translog.recovered", shard)).longValue(); - assertEquals("must be noop", 0, translogRecovered); + assertEquals("must be noop, recoveries [" + recoveries + "]", 0, translogRecovered); foundReplica = true; } } From b01c3fc1716b8d2ae29aaeff8eafdffd7596283d Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 28 May 2019 15:44:35 +0200 Subject: [PATCH 3/6] Increase delayed allocation time Hopefully this makes test succeed in CI too. --- .../test/java/org/elasticsearch/upgrades/RecoveryIT.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index b93afcb9fe61d..102e479d0142e 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -455,14 +455,13 @@ public void testClosedIndexReplicaAllocation() throws Exception { .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") - .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "30s") - .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "120s") .put("index.routing.allocation.include._name", "node-0") .build()); indexDocs(indexName, 0, randomInt(10)); // allocate replica to node-2 - updateIndexSettings(indexName, Settings.builder().put("index.routing.allocation.include._name", - "node-0,node-2,upgraded-node-*")); + updateIndexSettings(indexName, + Settings.builder().put("index.routing.allocation.include._name", "node-0,node-2,upgraded-node-*")); ensureGreen(indexName); closeIndex(indexName); } From 0c9da5be8fe01ee6d256a4a3c18031a931ad7274 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 4 Jun 2019 11:06:14 +0200 Subject: [PATCH 4/6] Lock during cleanup files Now lock during cleanup files to protect snapshotRecoveryMetadata from seeing half copied data. snapshotRecoveryMetadata now handles peer recovery and existing store recovery specifically, returning empty snapshot in other recovery types (local shards, restore snapshot). --- .../elasticsearch/index/shard/IndexShard.java | 172 +++++++++-------- .../indices/recovery/RecoveryTarget.java | 8 +- .../index/shard/IndexShardTests.java | 182 +++++++++++++++--- 3 files changed, 248 insertions(+), 114 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 50fbc3447cf6e..01694746f0b26 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -53,8 +53,8 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -227,14 +227,6 @@ Runnable getGlobalCheckpointSyncer() { private final RecoveryStats recoveryStats = new RecoveryStats(); - /** - * The store recovery metadata to use for snapshotStoreRecoveryMetadata requests until engine is opened. - * - * This protects us from the destructive nature of recovery and allows us to serve this information without risk of getting file - * errors due to concurrent writes. - */ - private volatile CheckedSupplier noEngineStoreRecoveryMetadata; - private final MeanMetric refreshMetric = new MeanMetric(); private final MeanMetric externalRefreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); @@ -365,7 +357,6 @@ public boolean shouldCache(Query query) { searcherWrapper = indexSearcherWrapper; refreshListeners = buildRefreshListeners(); lastSearcherAccess.set(threadPool.relativeTimeInMillis()); - this.noEngineStoreRecoveryMetadata = calculateNoEngineRecoveryMetadata(store, path); persistMetadata(path, indexSettings, shardRouting, null, logger); } @@ -1196,65 +1187,88 @@ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { * @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present. */ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { - Engine.IndexCommitRef indexCommit = null; + return snapshot(this::snapshotStoreMetadataFromEngine, recoverySource -> store.getMetadata(null, true)); + } + + /** + * Similar to snapshotStoreMetadata, but extended with additional info about sequence numbers for recovery. + * + * @see #snapshotStoreMetadata() for info on lifecycle and exceptions. + */ + public Store.RecoveryMetadataSnapshot snapshotStoreRecoveryMetadata() throws IOException { + return snapshot(this::snapshotStoreRecoveryMetadataFromEngine, this::snapshotStoreRecoveryMetadataFromStore); + } + + /** + * snapshot data from either engine if available or else from store + * @param engineSnapper snap data from engine, must be safe towards concurrent close. + * @param storeSnapper snap data from store, will be run under mutex. Recovery source will be null if shard is closed. + */ + private T snapshot(CheckedFunction engineSnapper, + CheckedFunction storeSnapper) throws IOException { store.incRef(); try { Engine engine; + // We primarily take mutex to avoid seeing the interim ReadOnlyEngine used during resetEngineToGlobalCheckpoint. synchronized (mutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. // That can be done out of mutex, since the engine can be closed half way. engine = getEngineOrNull(); if (engine == null) { - return store.getMetadata(null, true); + // take recovery source from shard routing since we are then sure it is available from the beginning of the IndexShard + // lifecycle. + assert shardRouting.initializing() || state == IndexShardState.CLOSED; + if (shardRouting.initializing()) { + return storeSnapper.apply(shardRouting.recoverySource()); + } else { + return storeSnapper.apply(null); + } } } - indexCommit = engine.acquireLastIndexCommit(false); - return store.getMetadata(indexCommit.getIndexCommit()); + // engineSnaper must be safe towards concurrent engine close. + return engineSnapper.apply(engine); } finally { store.decRef(); - IOUtils.close(indexCommit); } } - /** - * Similar to snapshotStoreMetadata, but extended with additional info about sequence numbers for recovery. - * - * @see #snapshotStoreMetadata() for info on lifecycle and exceptions. - */ - public Store.RecoveryMetadataSnapshot snapshotStoreRecoveryMetadata() throws IOException { - Engine engine; - // We primarily take mutex to avoid seeing the interim ReadOnlyEngine used during resetEngineToGlobalCheckpoint. - synchronized (mutex) { - engine = getEngineOrNull(); - if (engine == null) { - return noEngineStoreRecoveryMetadata.get(); - } + private Store.MetadataSnapshot snapshotStoreMetadataFromEngine(Engine engine) throws IOException { + try (Engine.IndexCommitRef indexCommit = engine.acquireLastIndexCommit(false)) { + return store.getMetadata(indexCommit.getIndexCommit()); } - // safe towards concurrent engine close. - return snapshotStoreRecoveryMetadataFromEngine(engine); } private Store.RecoveryMetadataSnapshot snapshotStoreRecoveryMetadataFromEngine(Engine engine) throws IOException { - try (Engine.IndexCommitRef indexCommit = engine.acquireLastIndexCommit(false)) { + try (Engine.IndexCommitRef lastCommit = engine.acquireLastIndexCommit(false)) { SeqNoStats seqNoStats = engine.getSeqNoStats(-1); - MetadataSnapshot metadata = store.getMetadata(indexCommit.getIndexCommit()); - // before shard is started, we cannot provide anything so use Long.MAX_VALUE - long provideSeqNo = state() == IndexShardState.POST_RECOVERY || state() == IndexShardState.STARTED - ? seqNoStats.getLocalCheckpoint() + 1 - : Long.MAX_VALUE; - return new Store.RecoveryMetadataSnapshot(metadata, provideSeqNo, seqNoStats.getLocalCheckpoint() + 1, - seqNoStats.getMaxSeqNo()); + MetadataSnapshot metadata = store.getMetadata(lastCommit.getIndexCommit()); + try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + SequenceNumbers.CommitInfo safeCommitSeqNoInfo = Store.loadSeqNoInfo(safeCommit.getIndexCommit()); + // before shard is started, we cannot provide anything so use Long.MAX_VALUE + long provideSeqNo = state() == IndexShardState.POST_RECOVERY || state() == IndexShardState.STARTED + ? safeCommitSeqNoInfo.localCheckpoint + 1 + : Long.MAX_VALUE; + return new Store.RecoveryMetadataSnapshot(metadata, provideSeqNo, safeCommitSeqNoInfo.localCheckpoint + 1, + seqNoStats.getMaxSeqNo()); + } } } - private static CheckedSupplier calculateNoEngineRecoveryMetadata(Store store, - ShardPath shardPath) { - try { - Store.RecoveryMetadataSnapshot recoveryMetadata = store.getRecoveryMetadata(shardPath, true); - return () -> recoveryMetadata; - } catch (IOException | RuntimeException e) { - return () -> { throw e; }; + private Store.RecoveryMetadataSnapshot snapshotStoreRecoveryMetadataFromStore(RecoverySource recoverySource) throws IOException { + assert Thread.holdsLock(mutex); + // if closed, PEER recovery or existing store, we know files are intact/not concurrently modified while holding mutex (except for + // smaller modifications done under store writeLock). + // "shouldBootstrapNewHistory" could really go in both buckets (not important since replica shard allocator will re-fetch data + // from primary after it is started). + if (recoverySource == null + || recoverySource.getType() == RecoverySource.Type.PEER + || recoverySource.getType() == RecoverySource.Type.EXISTING_STORE) { + return store.getRecoveryMetadata(path, true); + } else { + // During restore from snapshot we have to wait until recovery is done before serving any meaningful data. + // Likewise for local shards, though in principle, we could be smarter about those in the future. + return Store.RecoveryMetadataSnapshot.EMPTY; } } @@ -1302,9 +1316,6 @@ public void close(String reason, boolean flushEngine) throws IOException { changeState(IndexShardState.CLOSED, reason); } finally { final Engine engine = this.currentEngineReference.getAndSet(null); - if (engine != null) { - noEngineStoreRecoveryMetadata = () -> snapshotStoreRecoveryMetadataFromEngine(engine); - } try { if (engine != null && flushEngine) { engine.flushAndClose(); @@ -1350,19 +1361,43 @@ public void prepareForIndexRecovery() { } /** - * Finalize index recovery, called after copied files are in place. Cleans up old files, generates new empty translog and does other - * housekeeping for retention leases and maintaining the metadata to use for replica shard allocation. + * Finalize index recovery. Manipulate store files, clean up old files, generate new empty translog and do other + * housekeeping for retention leases. */ - public void finalizeIndexRecovery(long globalCheckpoint, MetadataSnapshot sourceMetaData) throws IOException { + public void finalizeIndexRecovery(CheckedRunnable manipulateStore, long globalCheckpoint, + MetadataSnapshot sourceMetaData) throws IOException { assert getEngineOrNull() == null; final Store store = store(); store.incRef(); try { - store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); - final String translogUUID = Translog.createEmptyTranslog( - shardPath().resolveTranslog(), globalCheckpoint, shardId, this.getPendingPrimaryTerm()); - store.associateIndexWithNewTranslog(translogUUID); + // protect snapshotRecoveryMetadata readers from seeing invalid/half copied index data + synchronized (mutex) { + manipulateStore.run(); + try { + store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); + final String translogUUID = Translog.createEmptyTranslog( + shardPath().resolveTranslog(), globalCheckpoint, shardId, this.getPendingPrimaryTerm()); + store.associateIndexWithNewTranslog(translogUUID); + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. + try { + try { + store.removeCorruptionMarker(); + } finally { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + } + } catch (Exception e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); + } + throw ex; + } + } if (getRetentionLeases().leases().isEmpty()) { // if empty, may be a fresh IndexShard, so write an empty leases file to disk @@ -1371,33 +1406,7 @@ public void finalizeIndexRecovery(long globalCheckpoint, MetadataSnapshot source } else { assert assertRetentionLeasesPersisted(); } - - SequenceNumbers.CommitInfo commitInfo = - SequenceNumbers.loadSeqNoInfoFromLuceneCommit(sourceMetaData.getCommitUserData().entrySet()); - // we know this is a replica under recovery so we cannot provide anything, therefore use provideSeqNo MAX_VALUE - Store.RecoveryMetadataSnapshot newRecoveryMetadata = new Store.RecoveryMetadataSnapshot(sourceMetaData, - Long.MAX_VALUE, commitInfo.localCheckpoint + 1, commitInfo.maxSeqNo); - this.noEngineStoreRecoveryMetadata = () -> newRecoveryMetadata; - } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { - // this is a fatal exception at this stage. - // this means we transferred files from the remote that have not be checksummed and they are - // broken. We have to clean up this shard entirely, remove all files and bubble it up to the - // source shard since this index might be broken there as well? The Source can handle this and checks - // its content on disk if possible. - try { - try { - store.removeCorruptionMarker(); - } finally { - Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files - } - } catch (Exception e) { - logger.debug("Failed to clean lucene index", e); - ex.addSuppressed(e); - } - this.noEngineStoreRecoveryMetadata = () -> { throw ex; }; - throw ex; } catch (RuntimeException | IOException e) { - this.noEngineStoreRecoveryMetadata = () -> { throw e; }; throw e; } finally { store.decRef(); @@ -1567,7 +1576,6 @@ private void innerOpenEngineAndTranslog() throws IOException { // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); - noEngineStoreRecoveryMetadata = null; } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 4261fd618470b..945956df7242f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -389,12 +389,10 @@ public void receiveFileInfo(List phase1FileNames, @Override public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); - // first, we go and move files that were created with the recovery id suffix to - // the actual names, its ok if we have a corrupted index here, since we have replicas - // to recover from in case of a full cluster shutdown just when this code executes... - multiFileWriter.renameAllTempFiles(); try { - indexShard.finalizeIndexRecovery(globalCheckpoint, sourceMetaData); + // rename files under lock to ensure that we do not concurrently try to read same files from store. + // rename is not atomic, but in case this fails/stops halfway through, a subsequent future recovery will repair. + indexShard.finalizeIndexRecovery(multiFileWriter::renameAllTempFiles, globalCheckpoint, sourceMetaData); } catch (Exception ex) { RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); fail(rfe, true); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5ca1277a8e12d..fb5af626a3cd0 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -30,6 +30,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.elasticsearch.Assertions; @@ -63,6 +64,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -107,6 +109,7 @@ import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreUtils; import org.elasticsearch.index.translog.TestTranslog; @@ -116,6 +119,7 @@ import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.IndexId; @@ -1351,43 +1355,44 @@ public void testSnapshotStore() throws IOException { final IndexShard shard = newStartedShard(true); indexDoc(shard, "_doc", "0"); flushShard(shard); - SeqNoStats seqNoStats = shard.seqNoStats(); final IndexShard newShard = reinitShard(shard); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); - verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, false); + Store.RecoveryMetadataSnapshot cannotProvideRecoveryMetadata = new Store.RecoveryMetadataSnapshot(snapshot, Long.MAX_VALUE, 1, 0); + Store.RecoveryMetadataSnapshot canProvideRecoveryMetadata = new Store.RecoveryMetadataSnapshot(snapshot, 1, 1, 0); + assertIdenticalRecoveryMetadataSnapshot(cannotProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); - verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, false); + assertIdenticalRecoveryMetadataSnapshot(cannotProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); assertTrue(newShard.recoverFromStore()); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); - verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, true); + assertIdenticalRecoveryMetadataSnapshot(canProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); - verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, true); + assertIdenticalRecoveryMetadataSnapshot(canProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); newShard.close("test", false); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); - verifySnapshotRecoveryStoreMetadata(newShard, snapshot, seqNoStats, false); + assertIdenticalRecoveryMetadataSnapshot(cannotProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); Store.RecoveryMetadataSnapshot recoveryMetadataSnapshot = Store.readRecoveryMetadataSnapshot(shard.shardPath(), (id, l, d) -> new DummyShardLock(id), logger); assertThat(recoveryMetadataSnapshot.lastCommit().getSegmentsFile().name(), equalTo("segments_3")); - verifySnapshotRecoveryStoreMetadata(recoveryMetadataSnapshot, snapshot, seqNoStats, false); + assertIdenticalRecoveryMetadataSnapshot(cannotProvideRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); closeShards(newShard); } @@ -2038,7 +2043,9 @@ public void testRecoverFromStore() throws IOException { translogOps = 0; } String historyUUID = shard.getHistoryUUID(); + Store.RecoveryMetadataSnapshot beforeSnapshot = shard.snapshotStoreRecoveryMetadata(); IndexShard newShard = reinitShard(shard); + verifySnapshotRecoveryStoreMetadata(beforeSnapshot, false, newShard); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(newShard.recoverFromStore()); @@ -2046,6 +2053,8 @@ public void testRecoverFromStore() throws IOException { assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + verifySnapshotRecoveryStoreMetadata(newShard.snapshotStoreMetadata(), beforeSnapshot.maxSeqNo() + 1, beforeSnapshot.maxSeqNo() + 1, beforeSnapshot.maxSeqNo(), + newShard.snapshotStoreRecoveryMetadata()); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); // check that local checkpoint of new primary is properly tracked after recovery assertThat(newShard.getLocalCheckpoint(), equalTo(totalOps - 1L)); @@ -2160,6 +2169,7 @@ public void testRecoverFromCleanStore() throws IOException { IndexShard newShard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shardRouting, RecoverySource.EmptyStoreRecoverySource.INSTANCE) ); + assertEquals(Store.RecoveryMetadataSnapshot.EMPTY, newShard.snapshotStoreRecoveryMetadata()); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); @@ -2168,6 +2178,8 @@ public void testRecoverFromCleanStore() throws IOException { assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + verifySnapshotRecoveryStoreMetadata(newShard.snapshotStoreMetadata(), 0, 0, -1, + newShard.snapshotStoreRecoveryMetadata()); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, 0); closeShards(newShard); @@ -2304,6 +2316,7 @@ public void testRestoreShard() throws IOException { routing = ShardRoutingHelper.newWithRestoreSource(routing, new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); target = reinitShard(target, routing); + assertEquals(Store.RecoveryMetadataSnapshot.EMPTY, target.snapshotStoreRecoveryMetadata()); Store sourceStore = source.store(); Store targetStore = target.store(); @@ -2329,12 +2342,14 @@ public void restoreShard(Store store, SnapshotId snapshotId, assertThat(target.getLocalCheckpoint(), equalTo(2L)); assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); + verifySnapshotRecoveryStoreMetadata(target.snapshotStoreMetadata(), 1, 1, 2, target.snapshotStoreRecoveryMetadata()); IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard( target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L)); assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L)); assertDocs(target, "0", "2"); + verifySnapshotRecoveryStoreMetadata(target.snapshotStoreMetadata(), 1, 1, 2, target.snapshotStoreRecoveryMetadata()); closeShard(source, false); closeShards(target); @@ -2817,6 +2832,7 @@ public void testRecoverFromLocalShard() throws IOException { Map requestedMappingUpdates = ConcurrentCollections.newConcurrentMap(); { targetShard = newShard(targetRouting); + assertEquals(Store.RecoveryMetadataSnapshot.EMPTY, targetShard.snapshotStoreRecoveryMetadata()); targetShard.markAsRecovering("store", new RecoveryState(targetShard.routingEntry(), localNode, null)); BiConsumer mappingConsumer = (type, mapping) -> { @@ -2844,6 +2860,11 @@ public void testRecoverFromLocalShard() throws IOException { // check that local checkpoint of new primary is properly tracked after recovery assertThat(targetShard.getLocalCheckpoint(), equalTo(1L)); assertThat(targetShard.getReplicationTracker().getGlobalCheckpoint(), equalTo(1L)); + + long maxSeqNo = sourceShard.seqNoStats().getMaxSeqNo(); + verifySnapshotRecoveryStoreMetadata(targetShard.snapshotStoreMetadata(), maxSeqNo + 1, maxSeqNo + 1, maxSeqNo, + targetShard.snapshotStoreRecoveryMetadata()); + IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry())); assertThat(targetShard.getReplicationTracker().getTrackedLocalCheckpointForShard( targetShard.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(1L)); @@ -3044,15 +3065,17 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept indexShard.refresh("test"); } indexDoc(indexShard, "_doc", "1", "{}"); + updateGlobalCheckpointOnReplica(indexShard); indexShard.flush(new FlushRequest()); - SeqNoStats seqNoStats = indexShard.seqNoStats(); - updateGlobalCheckpointOnReplica(indexShard, seqNoStats.getLocalCheckpoint()); + Store.RecoveryMetadataSnapshot before = indexShard.snapshotStoreRecoveryMetadata(); closeShards(indexShard); final IndexShard newShard = reinitShard(indexShard); Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); assertTrue("at least 2 files, commit and data: " +storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); - verifySnapshotRecoveryStoreMetadata(newShard, storeFileMetaDatas, seqNoStats, false); + Store.RecoveryMetadataSnapshot expectedRecoveryMetadata = + new Store.RecoveryMetadataSnapshot(storeFileMetaDatas, Long.MAX_VALUE, before.requireRecoverySeqNo(), before.maxSeqNo()); + assertIdenticalRecoveryMetadataSnapshot(expectedRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); AtomicBoolean stop = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); expectThrows(AlreadyClosedException.class, () -> newShard.getEngine()); // no engine @@ -3064,7 +3087,7 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept assertEquals(0, storeFileMetaDatas.recoveryDiff(readMeta).different.size()); assertEquals(0, storeFileMetaDatas.recoveryDiff(readMeta).missing.size()); assertEquals(storeFileMetaDatas.size(), storeFileMetaDatas.recoveryDiff(readMeta).identical.size()); - verifySnapshotRecoveryStoreMetadata(newShard, storeFileMetaDatas, seqNoStats, false); + assertIdenticalRecoveryMetadataSnapshot(expectedRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); } catch (IOException e) { throw new AssertionError(e); } @@ -3215,9 +3238,9 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { indexShard.refresh("test"); } } + updateGlobalCheckpointOnReplica(indexShard); indexShard.flush(new FlushRequest()); - SeqNoStats seqNoStats = indexShard.seqNoStats(); - updateGlobalCheckpointOnReplica(indexShard, seqNoStats.getLocalCheckpoint()); + Store.RecoveryMetadataSnapshot before = indexShard.snapshotStoreRecoveryMetadata(); closeShards(indexShard); final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), @@ -3233,6 +3256,8 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); + Store.RecoveryMetadataSnapshot expectedRecoveryMetadata = + new Store.RecoveryMetadataSnapshot(storeFileMetaDatas, Long.MAX_VALUE, before.requireRecoverySeqNo(), before.maxSeqNo()); AtomicBoolean stop = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); Thread snapshotter = new Thread(() -> { @@ -3244,7 +3269,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { assertThat(storeFileMetaDatas.recoveryDiff(readMeta).different.size(), equalTo(0)); assertThat(storeFileMetaDatas.recoveryDiff(readMeta).missing.size(), equalTo(0)); assertThat(storeFileMetaDatas.recoveryDiff(readMeta).identical.size(), equalTo(storeFileMetaDatas.size())); - verifySnapshotRecoveryStoreMetadata(newShard, storeFileMetaDatas, seqNoStats, false); + assertIdenticalRecoveryMetadataSnapshot(expectedRecoveryMetadata, newShard.snapshotStoreRecoveryMetadata()); } catch (IOException e) { throw new AssertionError(e); } @@ -3895,6 +3920,7 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover try { readyToSnapshotLatch.await(); shard.snapshotStoreMetadata(); + shard.snapshotStoreRecoveryMetadata(); try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(randomBoolean())) { shard.store().getMetadata(indexCommitRef.getIndexCommit()); } @@ -3988,6 +4014,100 @@ public void onFailure(final Exception e) { closeShard(replica, false); } + /** + * Go through the steps of peer recovery and check that snapshotRecoveryMetadata returns correct answers at each step. + */ + public void testSnapshotRecoveryMetadataDuringFileBasedPeerRecovery() throws IOException { + final IndexShard primary = newStartedShard(true); + int numDocs = randomInt(10); + for (int i = 0; i < numDocs; ++i) { + indexDoc(primary, "_doc", "x" + i); + } + + if (randomBoolean()) { + flushShard(primary); + } + + IndexShard replica = newShard(primary.shardId(), false); + recoverReplica(replica, primary, true); + Store.RecoveryMetadataSnapshot originalReplicaMetadata = replica.snapshotStoreRecoveryMetadata(); + long maxSeqNo = primary.seqNoStats().getMaxSeqNo(); + verifySnapshotRecoveryStoreMetadata(replica.snapshotStoreMetadata(), maxSeqNo + 1, maxSeqNo + 1, maxSeqNo, originalReplicaMetadata); + replica = reinitShard(replica); + verifySnapshotRecoveryStoreMetadata(originalReplicaMetadata, false, replica); + + numDocs = randomInt(10); + for (int i = 0; i < numDocs; ++i) { + indexDoc(primary, "_doc", "y" + i); + } + primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null, 1000L, + Collections.singleton(primary.routingEntry().allocationId().getId()), + new IndexShardRoutingTable.Builder(primary.shardId()).addShard(primary.routingEntry()).build()); + flushShard(primary); + Store.RecoveryMetadataSnapshot flushedMetadata = primary.snapshotStoreRecoveryMetadata(); + + numDocs = randomInt(10); + for (int i = 0; i < numDocs; ++i) { + indexDoc(primary, "_doc", "z" + i); + } + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + replica.markAsRecovering("testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + verifySnapshotRecoveryStoreMetadata(originalReplicaMetadata, false, replica); + + replica.prepareForIndexRecovery(); + verifySnapshotRecoveryStoreMetadata(originalReplicaMetadata, false, replica); + + RecoveryState.Index indexRecoveryState = new RecoveryState.Index(); + MultiFileWriter writer = new MultiFileWriter(replica.store(), indexRecoveryState, "tmp_", logger, () -> {}); + try (Engine.IndexCommitRef phase1Snapshot = primary.acquireSafeIndexCommit()) { + byte[] buffer = new byte[8192]; + Store.MetadataSnapshot metadata = primary.store().getMetadata(phase1Snapshot.getIndexCommit()); + for (StoreFileMetaData md : metadata) { + indexRecoveryState.addFileDetail(md.name(), md.length(), false); + try (IndexInput input = primary.store().directory().openInput(md.name(), IOContext.DEFAULT)) { + InputStreamIndexInput in = new InputStreamIndexInput(input, md.length()); + long position = 0; + int bytesRead; + while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) { + final boolean lastChunk = position + bytesRead == md.length(); + writer.writeFileChunk(md, position, new BytesArray(buffer, 0, bytesRead), lastChunk); + position += bytesRead; + } + } + } + verifySnapshotRecoveryStoreMetadata(originalReplicaMetadata, false, replica); + + replica.finalizeIndexRecovery(writer::renameAllTempFiles, primary.getGlobalCheckpoint(), metadata); + } + + Store.RecoveryMetadataSnapshot actualAfterCleanFiles = replica.snapshotStoreRecoveryMetadata(); + Store.RecoveryMetadataSnapshot expectedAfterCleanFiles = new Store.RecoveryMetadataSnapshot(replica.snapshotStoreMetadata(), + Long.MAX_VALUE, flushedMetadata.requireRecoverySeqNo(), flushedMetadata.maxSeqNo()); + assertIdenticalRecoveryMetadataSnapshot(expectedAfterCleanFiles, actualAfterCleanFiles); + + replica.openEngineAndSkipTranslogRecovery(); + assertIdenticalRecoveryMetadataSnapshot(expectedAfterCleanFiles, replica.snapshotStoreRecoveryMetadata()); + + try (final Translog.Snapshot phase2Snapshot = primary.getHistoryOperations("peer-recovery", 0)) { + Translog.Operation operation; + while ((operation = phase2Snapshot.next()) != null) { + replica.applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); + } + } + Store.RecoveryMetadataSnapshot primarySnapshot = primary.snapshotStoreRecoveryMetadata(); + Store.RecoveryMetadataSnapshot expectedAfterTranslog = + new Store.RecoveryMetadataSnapshot(replica.snapshotStoreMetadata(), + primarySnapshot.provideRecoverySeqNo(), primarySnapshot.requireRecoverySeqNo(), primarySnapshot.maxSeqNo()); + verifySnapshotRecoveryStoreMetadata(expectedAfterTranslog, false, replica); + replica.finalizeRecovery(); + verifySnapshotRecoveryStoreMetadata(expectedAfterTranslog, false, replica); + + replica.postRecovery("testing"); + verifySnapshotRecoveryStoreMetadata(expectedAfterTranslog, true, replica); + + closeShards(replica, primary); + } + @Override public Settings threadPoolSettings() { return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build(); @@ -4102,24 +4222,32 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { closeShards(readonlyShard); } - private void updateGlobalCheckpointOnReplica(IndexShard indexShard, long localCheckpoint) { + private void updateGlobalCheckpointOnReplica(IndexShard indexShard) { if (indexShard.routingEntry().primary() == false) { - indexShard.updateGlobalCheckpointOnReplica(localCheckpoint, "test"); + indexShard.updateGlobalCheckpointOnReplica(indexShard.seqNoStats().getLocalCheckpoint(), "test"); } } - private void verifySnapshotRecoveryStoreMetadata(IndexShard shard, Store.MetadataSnapshot snapshot, SeqNoStats seqNoStats, - boolean engineOn) throws IOException { - Store.RecoveryMetadataSnapshot recoveryMetadataSnapshot = shard.snapshotStoreRecoveryMetadata(); - verifySnapshotRecoveryStoreMetadata(recoveryMetadataSnapshot, snapshot, seqNoStats, engineOn); + private void verifySnapshotRecoveryStoreMetadata(Store.MetadataSnapshot snapshot, long provide, long require, long maxSeqNo, + Store.RecoveryMetadataSnapshot actual) { + assertIdenticalRecoveryMetadataSnapshot(new Store.RecoveryMetadataSnapshot(snapshot, provide, require, maxSeqNo), + actual); + } + + private void verifySnapshotRecoveryStoreMetadata(Store.RecoveryMetadataSnapshot expected, boolean canProvide, + IndexShard verifyShard) throws IOException { + if (canProvide == false) { + expected = new Store.RecoveryMetadataSnapshot(expected.lastCommit(), + Long.MAX_VALUE, expected.requireRecoverySeqNo(), expected.maxSeqNo()); + } + assertIdenticalRecoveryMetadataSnapshot(expected, verifyShard.snapshotStoreRecoveryMetadata()); } - private void verifySnapshotRecoveryStoreMetadata(Store.RecoveryMetadataSnapshot recoveryMetadataSnapshot, - Store.MetadataSnapshot snapshot, SeqNoStats seqNoStats, boolean engineOn) { - assertEquals(snapshot.asMap().keySet(), recoveryMetadataSnapshot.lastCommit().asMap().keySet()); - assertEquals(snapshot.getCommitUserData(), recoveryMetadataSnapshot.lastCommit().getCommitUserData()); - assertEquals(seqNoStats.getLocalCheckpoint() + 1, recoveryMetadataSnapshot.requireRecoverySeqNo()); - assertEquals(seqNoStats.getMaxSeqNo(), recoveryMetadataSnapshot.maxSeqNo()); - assertEquals(engineOn ? seqNoStats.getLocalCheckpoint() + 1 : Long.MAX_VALUE, recoveryMetadataSnapshot.provideRecoverySeqNo()); + private void assertIdenticalRecoveryMetadataSnapshot(Store.RecoveryMetadataSnapshot expected, Store.RecoveryMetadataSnapshot actual) { + assertEquals(expected.lastCommit().asMap().keySet(), actual.lastCommit().asMap().keySet()); + assertEquals(expected.lastCommit().getCommitUserData(), actual.lastCommit().getCommitUserData()); + assertEquals(expected.requireRecoverySeqNo(), actual.requireRecoverySeqNo()); + assertEquals(expected.provideRecoverySeqNo(), actual.provideRecoverySeqNo()); + assertEquals(expected.maxSeqNo(), actual.maxSeqNo()); } } From 647f21dec6b38e1ec5b0697ce53cc068f34cc939 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 4 Jun 2019 11:15:50 +0200 Subject: [PATCH 5/6] Check style fixes --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index fb5af626a3cd0..07b758ff1563a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2053,7 +2053,8 @@ public void testRecoverFromStore() throws IOException { assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - verifySnapshotRecoveryStoreMetadata(newShard.snapshotStoreMetadata(), beforeSnapshot.maxSeqNo() + 1, beforeSnapshot.maxSeqNo() + 1, beforeSnapshot.maxSeqNo(), + verifySnapshotRecoveryStoreMetadata(newShard.snapshotStoreMetadata(), + beforeSnapshot.maxSeqNo() + 1, beforeSnapshot.maxSeqNo() + 1, beforeSnapshot.maxSeqNo(), newShard.snapshotStoreRecoveryMetadata()); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); // check that local checkpoint of new primary is properly tracked after recovery @@ -4088,7 +4089,7 @@ public void testSnapshotRecoveryMetadataDuringFileBasedPeerRecovery() throws IOE replica.openEngineAndSkipTranslogRecovery(); assertIdenticalRecoveryMetadataSnapshot(expectedAfterCleanFiles, replica.snapshotStoreRecoveryMetadata()); - try (final Translog.Snapshot phase2Snapshot = primary.getHistoryOperations("peer-recovery", 0)) { + try (Translog.Snapshot phase2Snapshot = primary.getHistoryOperations("peer-recovery", 0)) { Translog.Operation operation; while ((operation = phase2Snapshot.next()) != null) { replica.applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); From a7dd7ee784a6e0bfd1ced170e43d249c627d53cb Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 4 Jun 2019 14:13:51 +0200 Subject: [PATCH 6/6] Fixed a few outdated comments. --- .../cluster/routing/allocation/NodeAllocationResult.java | 2 +- .../java/org/elasticsearch/gateway/ReplicaShardAllocator.java | 2 +- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java index bc0526ffca1b5..7014d08b03f69 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationResult.java @@ -276,7 +276,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("allocation_id", allocationId); } if (matchingBytes >= 0) { - // TODO: we should eventually either distinguish between sync-id and non sync-id equivalent closed shard allocation or + // TODO: we should eventually either distinguish between sync-id and non sync-id equivalent shard allocation or // rename this to synced_match // left this for now, since it changes the API and should preferably be handled together with seqno based // replica shard allocation, consisting of whether this will be ops based and how many ops to recover. diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 28879f42ef7f3..59aa02527c675 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -381,7 +381,7 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St */ private static boolean isNoopRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) { - // keeping syncIdMatch for 7.x to remain backwards compatible with pre-7.2 versions, but will remove for 8.0. + // keeping syncIdMatch for 7.x to remain backwards compatible with pre-7.3 versions, but will remove for 8.0. return syncIdMatch(primaryStore, candidateStore) || noopMatch(primaryStore, candidateStore); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 01694746f0b26..7689df2d81e63 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1209,7 +1209,6 @@ private T snapshot(CheckedFunction engineSnapper, store.incRef(); try { Engine engine; - // We primarily take mutex to avoid seeing the interim ReadOnlyEngine used during resetEngineToGlobalCheckpoint. synchronized (mutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.