diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 9098f280da708..10dcf6943f867 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -218,8 +218,6 @@ import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseActions; -import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.persistent.CompletionPersistentTaskAction; @@ -548,8 +546,6 @@ public void reg // internal actions actions.register(GlobalCheckpointSyncAction.TYPE, GlobalCheckpointSyncAction.class); - actions.register(RetentionLeaseBackgroundSyncAction.TYPE, RetentionLeaseBackgroundSyncAction.class); - actions.register(RetentionLeaseSyncAction.TYPE, RetentionLeaseSyncAction.class); actions.register(TransportNodesSnapshotsStatus.TYPE, TransportNodesSnapshotsStatus.class); actions.register(TransportNodesListGatewayMetaState.TYPE, TransportNodesListGatewayMetaState.class); actions.register(TransportVerifyShardBeforeCloseAction.TYPE, TransportVerifyShardBeforeCloseAction.class); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java index 1ef9a484a2777..58e48f0215586 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -91,7 +91,7 @@ public synchronized void rescheduleIfNecessary() { if (logger.isTraceEnabled()) { logger.trace("scheduling {} every {}", toString(), interval); } - cancellable = threadPool.schedule(this, interval, getThreadPool()); + cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool()); isScheduledOrRunning = true; } else { logger.trace("scheduled {} disabled", toString()); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 0427d9c152dc8..1c8599f66cf31 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -827,10 +827,7 @@ private boolean invariant() { assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; } - if (primaryMode - && indexSettings.isSoftDeleteEnabled() - && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN - && hasAllPeerRecoveryRetentionLeases) { + if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { @@ -898,7 +895,9 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; - this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0); + this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) || + (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && + indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN); this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; @@ -1011,34 +1010,32 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() { assert primaryMode; assert Thread.holdsLock(this); - if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) { - final ShardRouting primaryShard = routingTable.primaryShard(); - final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); - if (retentionLeases.get(leaseId) == null) { - if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) { - assert primaryShard.allocationId().getId().equals(shardAllocationId) - : routingTable.assignedShards() + " vs " + shardAllocationId; - // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication - // group. - logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId); - innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), - PEER_RECOVERY_RETENTION_LEASE_SOURCE); - hasAllPeerRecoveryRetentionLeases = true; - } else { - /* - * We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention - * leases for every shard copy, but in this case we do not expect any leases to exist. - */ - assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases; - logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases); - } - } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting -> - retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) - || checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) { - // Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we - // don't need to do any more work. + final ShardRouting primaryShard = routingTable.primaryShard(); + final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); + if (retentionLeases.get(leaseId) == null) { + if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) { + assert primaryShard.allocationId().getId().equals(shardAllocationId) + : routingTable.assignedShards() + " vs " + shardAllocationId; + // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication + // group. + logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId); + innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), + PEER_RECOVERY_RETENTION_LEASE_SOURCE); hasAllPeerRecoveryRetentionLeases = true; + } else { + /* + * We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention + * leases for every shard copy, but in this case we do not expect any leases to exist. + */ + assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases; + logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases); } + } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting -> + retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) + || checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) { + // Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we + // don't need to do any more work. + hasAllPeerRecoveryRetentionLeases = true; } } @@ -1356,10 +1353,7 @@ private synchronized void setHasAllPeerRecoveryRetentionLeases() { * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { - if (indexSettings().isSoftDeleteEnabled() - && indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN - && hasAllPeerRecoveryRetentionLeases == false) { - + if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) { final List shardRoutings = routingTable.assignedShards(); final GroupedActionListener groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { setHasAllPeerRecoveryRetentionLeases(); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index d93500a5c6a72..e3d3fed4a5107 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -21,12 +21,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -37,12 +40,19 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Map; import java.util.Objects; /** @@ -56,9 +66,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi RetentionLeaseBackgroundSyncAction.Request, ReplicationResponse> { - public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync"; - public static ActionType TYPE = new ActionType<>(ACTION_NAME, ReplicationResponse::new); - + public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync"; private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); protected Logger getLogger() { @@ -90,6 +98,52 @@ public RetentionLeaseBackgroundSyncAction( ThreadPool.Names.MANAGEMENT); } + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + assert false : "use RetentionLeaseBackgroundSyncAction#backgroundSync"; + } + + final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) { + final Request request = new Request(shardId, retentionLeases); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request); + transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + } + + @Override + public void handleException(TransportException e) { + task.setPhase("finished"); + taskManager.unregister(task); + if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { + // node shutting down + return; + } + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { + // the shard is closed + return; + } + getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); + } + }); + } + @Override protected void shardOperationOnPrimary( final Request request, @@ -137,6 +191,11 @@ public void writeTo(final StreamOutput out) throws IOException { retentionLeases.writeTo(out); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ReplicationTask(id, type, action, "retention_lease_background_sync shardId=" + shardId, parentTaskId, headers); + } + @Override public String toString() { return "RetentionLeaseBackgroundSyncAction.Request{" + diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 69de0ed64f9ce..b93deccd51474 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -21,13 +21,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -39,12 +42,18 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Map; import java.util.Objects; /** @@ -54,9 +63,7 @@ public class RetentionLeaseSyncAction extends TransportWriteAction { - public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync"; - public static ActionType TYPE = new ActionType<>(ACTION_NAME, Response::new); - + public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync"; private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); protected Logger getLogger() { @@ -88,6 +95,49 @@ public RetentionLeaseSyncAction( ThreadPool.Names.MANAGEMENT, false); } + @Override + protected void doExecute(Task parentTask, Request request, ActionListener listener) { + assert false : "use RetentionLeaseSyncAction#sync"; + } + + final void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases, + ActionListener listener) { + final Request request = new Request(shardId, retentionLeases); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request); + transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + listener.onResponse(response); + } + + @Override + public void handleException(TransportException e) { + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); + } + task.setPhase("finished"); + taskManager.unregister(task); + listener.onFailure(e); + } + }); + } + @Override protected void shardOperationOnPrimary(Request request, IndexShard primary, ActionListener> listener) { @@ -141,6 +191,11 @@ public void writeTo(final StreamOutput out) throws IOException { retentionLeases.writeTo(out); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ReplicationTask(id, type, action, "retention_lease_sync shardId=" + shardId, parentTaskId, headers); + } + @Override public String toString() { return "RetentionLeaseSyncAction.Request{" + diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java index 7de6bad3f1102..40f80fee2b01d 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java @@ -21,36 +21,52 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.shard.ShardId; -/** - * A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on - * the primary. - */ -public interface RetentionLeaseSyncer { +import java.util.Objects; - /** - * Represents a method that when invoked syncs retention leases to replica shards after a new retention lease is added on the primary. - * The specified listener is invoked when the syncing completes with success or failure. - * - * @param shardId the shard ID - * @param retentionLeases the retention leases to sync - * @param listener the callback when sync completes - */ - void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener); +public class RetentionLeaseSyncer { + private final SyncAction syncAction; + private final BackgroundSyncAction backgroundSyncAction; - void backgroundSync(ShardId shardId, RetentionLeases retentionLeases); + @Inject + public RetentionLeaseSyncer(RetentionLeaseSyncAction syncAction, RetentionLeaseBackgroundSyncAction backgroundSyncAction) { + this(syncAction::sync, backgroundSyncAction::backgroundSync); + } - RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() { - @Override - public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener listener) { - listener.onResponse(new ReplicationResponse()); - } + public RetentionLeaseSyncer(SyncAction syncAction, BackgroundSyncAction backgroundSyncAction) { + this.syncAction = Objects.requireNonNull(syncAction); + this.backgroundSyncAction = Objects.requireNonNull(backgroundSyncAction); + } - @Override - public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) { + public static final RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer( + (shardId, primaryAllocationId, primaryTerm, retentionLeases, listener) -> listener.onResponse(new ReplicationResponse()), + (shardId, primaryAllocationId, primaryTerm, retentionLeases) -> { }); - } - }; + public void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, + RetentionLeases retentionLeases, ActionListener listener) { + syncAction.sync(shardId, primaryAllocationId, primaryTerm, retentionLeases, listener); + } + public void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) { + backgroundSyncAction.backgroundSync(shardId, primaryAllocationId, primaryTerm, retentionLeases); + } + + /** + * Represents an action that is invoked to sync retention leases to replica shards after a retention lease is added + * or removed on the primary. The specified listener is invoked when the syncing completes with success or failure. + */ + public interface SyncAction { + void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, + RetentionLeases retentionLeases, ActionListener listener); + } + + /** + * Represents an action that is invoked periodically to sync retention leases to replica shards after some retention + * lease has been renewed or expired. + */ + public interface BackgroundSyncAction { + void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6de1c0cd052be..d160d2035bff8 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -340,7 +340,7 @@ public IndexShard( UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, - (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener), + (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener), this::getSafeCommitInfo); // the query cache is a node-level thing, however we want the most popular filters @@ -2182,6 +2182,8 @@ public void syncRetentionLeases() { logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); retentionLeaseSyncer.sync( shardId, + shardRouting.allocationId().getId(), + getPendingPrimaryTerm(), retentionLeases.v2(), ActionListener.wrap( r -> {}, @@ -2191,7 +2193,8 @@ public void syncRetentionLeases() { e))); } else { logger.trace("background syncing retention leases [{}] after expiration check", retentionLeases.v2()); - retentionLeaseSyncer.backgroundSync(shardId, retentionLeases.v2()); + retentionLeaseSyncer.backgroundSync( + shardId, shardRouting.allocationId().getId(), getPendingPrimaryTerm(), retentionLeases.v2()); } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java index dae7e1ed4e9b4..7584fda21c329 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -56,6 +56,9 @@ import org.elasticsearch.index.mapper.TextFieldMapper; import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.flush.SyncedFlushService; @@ -238,6 +241,9 @@ protected void configure() { bind(SyncedFlushService.class).asEagerSingleton(); bind(TransportResyncReplicationAction.class).asEagerSingleton(); bind(PrimaryReplicaSyncer.class).asEagerSingleton(); + bind(RetentionLeaseSyncAction.class).asEagerSingleton(); + bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); + bind(RetentionLeaseSyncer.class).asEagerSingleton(); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9c267e4e97399..50a3e2bea351f 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -26,7 +26,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -58,10 +57,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.ReplicationTracker; -import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; -import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -77,7 +73,6 @@ import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.search.SearchService; import org.elasticsearch.snapshots.SnapshotShardsService; @@ -91,7 +86,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -104,7 +98,7 @@ import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED; import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.REOPENED; -public class IndicesClusterStateService extends AbstractLifecycleComponent implements ClusterStateApplier, RetentionLeaseSyncer { +public class IndicesClusterStateService extends AbstractLifecycleComponent implements ClusterStateApplier { private static final Logger logger = LogManager.getLogger(IndicesClusterStateService.class); final AllocatedIndices> indicesService; @@ -127,6 +121,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final boolean sendRefreshMapping; private final List buildInIndexListener; private final PrimaryReplicaSyncer primaryReplicaSyncer; + private final RetentionLeaseSyncer retentionLeaseSyncer; private final NodeClient client; @Inject @@ -144,6 +139,7 @@ public IndicesClusterStateService( final PeerRecoverySourceService peerRecoverySourceService, final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, + final RetentionLeaseSyncer retentionLeaseSyncer, final NodeClient client) { this( settings, @@ -159,6 +155,7 @@ public IndicesClusterStateService( peerRecoverySourceService, snapshotShardsService, primaryReplicaSyncer, + retentionLeaseSyncer, client); } @@ -177,6 +174,7 @@ public IndicesClusterStateService( final PeerRecoverySourceService peerRecoverySourceService, final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, + final RetentionLeaseSyncer retentionLeaseSyncer, final NodeClient client) { this.settings = settings; this.buildInIndexListener = @@ -194,6 +192,7 @@ public IndicesClusterStateService( this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.repositoriesService = repositoriesService; this.primaryReplicaSyncer = primaryReplicaSyncer; + this.retentionLeaseSyncer = retentionLeaseSyncer; this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); this.client = client; } @@ -300,54 +299,6 @@ protected void updateGlobalCheckpointForShard(final ShardId shardId) { } } - @Override - public void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener) { - Objects.requireNonNull(shardId); - Objects.requireNonNull(retentionLeases); - Objects.requireNonNull(listener); - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we have to execute under the system context so that if security is enabled the sync is authorized - threadContext.markAsSystemContext(); - client.executeLocally(RetentionLeaseSyncAction.TYPE, - new RetentionLeaseSyncAction.Request(shardId, retentionLeases), - ActionListener.wrap( - listener::onResponse, - e -> { - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { - getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); - } - listener.onFailure(e); - })); - } - } - - @Override - public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { - Objects.requireNonNull(shardId); - Objects.requireNonNull(retentionLeases); - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we have to execute under the system context so that if security is enabled the sync is authorized - threadContext.markAsSystemContext(); - client.executeLocally(RetentionLeaseBackgroundSyncAction.TYPE, - new RetentionLeaseBackgroundSyncAction.Request(shardId, retentionLeases), - ActionListener.wrap( - r -> {}, - e -> { - if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { - // node shutting down - return; - } - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { - // the shard is closed - return; - } - getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); - })); - } - } - // overrideable by tests Logger getLogger() { return logger; @@ -670,7 +621,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR repositoriesService, failedShardHandler, this::updateGlobalCheckpointForShard, - this); + retentionLeaseSyncer); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 1d45d048c9ba4..c372cc4571a7c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -38,7 +38,6 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedSupplier; @@ -154,8 +153,7 @@ public void recoverToTarget(ActionListener listener) { IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); }; - final boolean useRetentionLeases = shard.indexSettings().isSoftDeleteEnabled() - && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE; + final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled(); final SetOnce retentionLeaseRef = new SetOnce<>(); runUnderPrimaryPermit(() -> { @@ -167,7 +165,7 @@ public void recoverToTarget(ActionListener listener) { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(useRetentionLeases ? shard.getRetentionLeases().get( + retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get( ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); @@ -178,7 +176,7 @@ public void recoverToTarget(ActionListener listener) { = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) - && (useRetentionLeases == false + && (softDeletesEnabled == false || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's @@ -186,7 +184,7 @@ && isTargetSameHistory() // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // without having a complete history. - if (isSequenceNumberBasedRecovery && useRetentionLeases) { + if (isSequenceNumberBasedRecovery && softDeletesEnabled) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); @@ -225,7 +223,7 @@ && isTargetSameHistory() // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // down. - startingSeqNo = useRetentionLeases + startingSeqNo = softDeletesEnabled ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L : 0; logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); @@ -243,7 +241,7 @@ && isTargetSameHistory() }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - if (useRetentionLeases) { + if (softDeletesEnabled) { runUnderPrimaryPermit(() -> { try { // If the target previously had a copy of this shard then a file-based recovery might move its global @@ -266,7 +264,7 @@ && isTargetSameHistory() assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); final Consumer> createRetentionLeaseAsync; - if (useRetentionLeases) { + if (softDeletesEnabled) { createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l); } else { createRetentionLeaseAsync = l -> l.onResponse(null); @@ -304,7 +302,7 @@ && isTargetSameHistory() final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); resources.add(phase2Snapshot); - if (useRetentionLeases == false || isSequenceNumberBasedRecovery == false) { + if (softDeletesEnabled == false || isSequenceNumberBasedRecovery == false) { // we can release the retention lock here because the snapshot itself will retain the required operations. retentionLock.close(); } diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 554b75a3dc539..35064b0063676 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -323,6 +323,40 @@ public void testDoNotCancelRecoveryForBrokenNode() throws Exception { transportService.clearAllRules(); } + public void testPeerRecoveryForClosedIndices() throws Exception { + String indexName = "peer_recovery_closed_indices"; + internalCluster().ensureAtLeastNumDataNodes(1); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 100)) + .mapToObj(n -> client().prepareIndex(indexName).setSource("num", n)).collect(Collectors.toList())); + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + int numberOfReplicas = randomIntBetween(1, 2); + internalCluster().ensureAtLeastNumDataNodes(2 + numberOfReplicas); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas))); + ensureGreen(indexName); + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "primaries").build())); + internalCluster().fullRestart(); + ensureYellow(indexName); + if (randomBoolean()) { + assertAcked(client().admin().indices().prepareOpen(indexName)); + client().admin().indices().prepareForceMerge(indexName).get(); + } + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.enable").build())); + ensureGreen(indexName); + assertNoOpRecoveries(indexName); + } + private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception { assertBusy(() -> { Index index = resolveIndex(indexName); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 6c28c7bc2dc2c..3590c52e505bf 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -21,7 +21,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -30,7 +29,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; -import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -52,7 +50,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; -import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; @@ -484,12 +482,9 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod null, null, primaryReplicaSyncer, + RetentionLeaseSyncer.EMPTY, client) { @Override - public void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener) {} - @Override - public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) {} - @Override protected void updateGlobalCheckpointForShard(final ShardId shardId) {} }; } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java deleted file mode 100644 index 6a35f268c987e..0000000000000 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.cluster; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.action.support.replication.ReplicationResponse; -import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; -import org.elasticsearch.index.seqno.RetentionLeases; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardClosedException; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.node.NodeClosedException; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskManager; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.SendRequestTransportException; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportService; -import org.junit.Before; -import org.mockito.ArgumentCaptor; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; -import static org.elasticsearch.mock.orig.Mockito.when; -import static org.hamcrest.Matchers.arrayContaining; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -public class IndicesClusterStateServiceTests extends ESTestCase { - - ThreadPool threadPool; - - @Before - public void createServices() { - threadPool = mock(ThreadPool.class); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - } - - public void testRetentionLeaseBackgroundSyncExecution() { - final IndicesService indicesService = mock(IndicesService.class); - - final Index index = new Index("index", "uuid"); - final IndexService indexService = mock(IndexService.class); - when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); - - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); - - final Logger mockLogger = mock(Logger.class); - final TaskManager taskManager = mock(TaskManager.class); - when(taskManager.registerAndExecute(any(), any(), any(), any(), any())).thenCallRealMethod(); - final TransportService transportService = mock(TransportService.class); - when(transportService.getTaskManager()).thenReturn(taskManager); - - final RetentionLeases retentionLeases = mock(RetentionLeases.class); - final AtomicBoolean invoked = new AtomicBoolean(); - final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction( - Settings.EMPTY, - transportService, - null, - indicesService, - threadPool, - null, - new ActionFilters(Collections.emptySet()), - new IndexNameExpressionResolver()) { - - @Override - protected void doExecute(Task task, RetentionLeaseBackgroundSyncAction.Request request, - ActionListener listener) { - assertTrue(threadPool.getThreadContext().isSystemContext()); - assertThat(request.shardId(), sameInstance(indexShard.shardId())); - assertThat(request.getRetentionLeases(), sameInstance(retentionLeases)); - if (randomBoolean()) { - listener.onResponse(new ReplicationResponse()); - } else { - final Exception e = randomFrom( - new AlreadyClosedException("closed"), - new IndexShardClosedException(indexShard.shardId()), - new TransportException("failed"), - new SendRequestTransportException(null, randomFrom( - "some-action", - "indices:admin/seq_no/retention_lease_background_sync[p]" - ), new NodeClosedException((DiscoveryNode) null)), - new RuntimeException("failed")); - listener.onFailure(e); - if (e.getMessage().equals("failed")) { - final ArgumentCaptor captor = ArgumentCaptor.forClass(ParameterizedMessage.class); - verify(mockLogger).warn(captor.capture(), same(e)); - final ParameterizedMessage message = captor.getValue(); - assertThat(message.getFormat(), equalTo("{} retention lease background sync failed")); - assertThat(message.getParameters(), arrayContaining(indexShard.shardId())); - } - verifyNoMoreInteractions(mockLogger); - } - invoked.set(true); - } - }; - NodeClient client = new NodeClient(Settings.EMPTY, null); - Map actions = Collections.singletonMap(RetentionLeaseBackgroundSyncAction.TYPE, action); - client.initialize(actions, taskManager, null, null); - IndicesClusterStateService service = new IndicesClusterStateService(Settings.EMPTY, null, null, threadPool, null, null, null, - null, null, null, null, null, null, client) { - @Override - protected Logger getLogger() { - return mockLogger; - } - }; - - service.backgroundSync(indexShard.shardId(), retentionLeases); - assertTrue(invoked.get()); - } - - - public void testRetentionLeaseSyncExecution() { - final IndicesService indicesService = mock(IndicesService.class); - - final Index index = new Index("index", "uuid"); - final IndexService indexService = mock(IndexService.class); - when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); - - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); - - final Logger mockLogger = mock(Logger.class); - final TaskManager taskManager = mock(TaskManager.class); - when(taskManager.registerAndExecute(any(), any(), any(), any(), any())).thenCallRealMethod(); - final TransportService transportService = mock(TransportService.class); - when(transportService.getTaskManager()).thenReturn(taskManager); - - final RetentionLeases retentionLeases = mock(RetentionLeases.class); - final AtomicBoolean invoked = new AtomicBoolean(); - final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( - Settings.EMPTY, - transportService, - null, - indicesService, - threadPool, - null, - new ActionFilters(Collections.emptySet()), - new IndexNameExpressionResolver()) { - - @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - assertTrue(threadPool.getThreadContext().isSystemContext()); - assertThat(request.shardId(), sameInstance(indexShard.shardId())); - assertThat(request.getRetentionLeases(), sameInstance(retentionLeases)); - if (randomBoolean()) { - listener.onResponse(new Response()); - } else { - final Exception e = randomFrom( - new AlreadyClosedException("closed"), - new IndexShardClosedException(indexShard.shardId()), - new RuntimeException("failed")); - listener.onFailure(e); - if (e instanceof AlreadyClosedException == false && e instanceof IndexShardClosedException == false) { - final ArgumentCaptor captor = ArgumentCaptor.forClass(ParameterizedMessage.class); - verify(mockLogger).warn(captor.capture(), same(e)); - final ParameterizedMessage message = captor.getValue(); - assertThat(message.getFormat(), equalTo("{} retention lease sync failed")); - assertThat(message.getParameters(), arrayContaining(indexShard.shardId())); - } - verifyNoMoreInteractions(mockLogger); - } - invoked.set(true); - } - }; - NodeClient client = new NodeClient(Settings.EMPTY, null); - Map actions = Collections.singletonMap(RetentionLeaseSyncAction.TYPE, action); - client.initialize(actions, taskManager, null, null); - IndicesClusterStateService service = new IndicesClusterStateService(Settings.EMPTY, null, null, threadPool, null, null, null, - null, null, null, null, null, null, client) { - @Override - protected Logger getLogger() { - return mockLogger; - } - }; - - // execution happens on the test thread, so no need to register an actual listener to callback - service.sync(indexShard.shardId(), retentionLeases, ActionListener.wrap(() -> {})); - assertTrue(invoked.get()); - } - -} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 454a407103382..b4ba7fbc9269b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -139,8 +139,7 @@ import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; @@ -1081,12 +1080,6 @@ public void onFailure(final Exception e) { actions.put(GlobalCheckpointSyncAction.TYPE, new GlobalCheckpointSyncAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, indexNameExpressionResolver)); - actions.put(RetentionLeaseBackgroundSyncAction.TYPE, - new RetentionLeaseBackgroundSyncAction(settings, transportService, clusterService, indicesService, threadPool, - shardStateAction, actionFilters, indexNameExpressionResolver)); - actions.put(RetentionLeaseSyncAction.TYPE, - new RetentionLeaseSyncAction(settings, transportService, clusterService, indicesService, threadPool, - shardStateAction, actionFilters, indexNameExpressionResolver)); final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService); indicesClusterStateService = new IndicesClusterStateService( settings, @@ -1112,6 +1105,7 @@ public void onFailure(final Exception e) { shardStateAction, actionFilters, indexNameExpressionResolver)), + RetentionLeaseSyncer.EMPTY, client); final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 0837f431fff9c..db3818832f6e4 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -383,7 +383,7 @@ public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, @Override public Runnable preserveContext(Runnable command) { - throw new UnsupportedOperationException(); + return command; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 413af96615638..62f624335c864 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -178,21 +178,15 @@ protected class ReplicationGroup implements AutoCloseable, Iterable } }); - private final RetentionLeaseSyncer retentionLeaseSyncer = new RetentionLeaseSyncer() { - @Override - public void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener) { - syncRetentionLeases(shardId, retentionLeases, listener); - } - - @Override - public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { - sync(shardId, retentionLeases, ActionListener.wrap( + private final RetentionLeaseSyncer retentionLeaseSyncer = new RetentionLeaseSyncer( + (shardId, primaryAllocationId, primaryTerm, retentionLeases, listener) -> + syncRetentionLeases(shardId, retentionLeases, listener), + (shardId, primaryAllocationId, primaryTerm, retentionLeases) -> syncRetentionLeases(shardId, retentionLeases, + ActionListener.wrap( r -> { }, e -> { throw new AssertionError("failed to background sync retention lease", e); - })); - } - }; + }))); protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); diff --git a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index c9c74e658f4fd..9f41ae758bf94 100644 --- a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -202,7 +202,7 @@ public void testForgetFollower() throws IOException { assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow"))); - try (RestClient leaderClient = buildLeaderClient(restClientSettings())) { + try (RestClient leaderClient = buildLeaderClient(restAdminSettings())) { final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower"); final String requestBody = "{" + "\"follower_cluster\":\"follow-cluster\"," +