diff --git a/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java b/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java index 3a5e21fc8ef65..ee2067c591cef 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java @@ -77,7 +77,7 @@ import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; -import org.opensearch.indices.recovery.RecoveryFileChunkRequest; +import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.plugins.Plugin; import org.opensearch.snapshots.SnapshotState; @@ -397,7 +397,7 @@ public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionExc internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), (connection, requestId, action, request, options) -> { if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { - RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; + FileChunkRequest req = (FileChunkRequest) request; byte[] array = BytesRef.deepCopyOf(req.content().toBytesRef()).bytes; int i = randomIntBetween(0, req.content().length() - 1); array[i] = (byte) ~array[i]; // flip one byte in the content @@ -474,11 +474,11 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), (connection, requestId, action, request, options) -> { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { - RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; + FileChunkRequest req = (FileChunkRequest) request; if (truncate && req.length() > 1) { BytesRef bytesRef = req.content().toBytesRef(); BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1); - request = new RecoveryFileChunkRequest( + request = new FileChunkRequest( req.recoveryId(), req.requestSeqNo(), req.shardId(), diff --git a/server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java index 06475f1e7ac9d..1f16cc0363686 100644 --- a/server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java @@ -67,7 +67,7 @@ import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; -import org.opensearch.indices.recovery.RecoveryFileChunkRequest; +import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.plugins.Plugin; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -809,7 +809,7 @@ public void sendRequest( TransportRequestOptions options ) throws IOException { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { - RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request; + FileChunkRequest chunkRequest = (FileChunkRequest) request; if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) { // corrupting the segments_N files in order to make sure future recovery re-send files logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name()); diff --git a/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java index 1708454faf7b3..b5d7bd476059d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java @@ -43,7 +43,7 @@ import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.recovery.PeerRecoveryTargetService; -import org.opensearch.indices.recovery.RecoveryFileChunkRequest; +import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.node.RecoverySettingsChunkSizePlugin; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -146,7 +146,7 @@ public void testCancelRecoveryAndResume() throws Exception { internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), (connection, requestId, action, request, options) -> { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { - RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; + FileChunkRequest req = (FileChunkRequest) request; logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk()); if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) { latch.countDown(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryFileChunkRequest.java b/server/src/main/java/org/opensearch/indices/recovery/FileChunkRequest.java similarity index 95% rename from server/src/main/java/org/opensearch/indices/recovery/RecoveryFileChunkRequest.java rename to server/src/main/java/org/opensearch/indices/recovery/FileChunkRequest.java index 886de8d56645c..3594495224481 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryFileChunkRequest.java +++ b/server/src/main/java/org/opensearch/indices/recovery/FileChunkRequest.java @@ -43,11 +43,11 @@ import java.io.IOException; /** - * Request for a recovery file chunk + * Request containing a file chunk. * * @opensearch.internal */ -public final class RecoveryFileChunkRequest extends RecoveryTransportRequest { +public final class FileChunkRequest extends RecoveryTransportRequest { private final boolean lastChunk; private final long recoveryId; private final ShardId shardId; @@ -58,7 +58,7 @@ public final class RecoveryFileChunkRequest extends RecoveryTransportRequest { private final int totalTranslogOps; - public RecoveryFileChunkRequest(StreamInput in) throws IOException { + public FileChunkRequest(StreamInput in) throws IOException { super(in); recoveryId = in.readLong(); shardId = new ShardId(in); @@ -75,7 +75,7 @@ public RecoveryFileChunkRequest(StreamInput in) throws IOException { sourceThrottleTimeInNanos = in.readLong(); } - public RecoveryFileChunkRequest( + public FileChunkRequest( long recoveryId, final long requestSeqNo, ShardId shardId, diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index e13022afa81ba..85141556657f3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -36,20 +36,17 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.RateLimiter; -import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.action.support.ChannelActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Settings; @@ -60,7 +57,6 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.engine.RecoveryEngineException; import org.opensearch.index.mapper.MapperException; -import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IllegalIndexShardStateException; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -71,7 +67,6 @@ import org.opensearch.index.translog.TranslogCorruptedException; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; -import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -148,7 +143,7 @@ public PeerRecoveryTargetService( transportService.registerRequestHandler( Actions.FILE_CHUNK, ThreadPool.Names.GENERIC, - RecoveryFileChunkRequest::new, + FileChunkRequest::new, new FileChunkTransportRequestHandler() ); transportService.registerRequestHandler( @@ -354,12 +349,13 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) { try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request); + final RecoveryTarget recoveryTarget = recoveryRef.get(); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.PREPARE_TRANSLOG, request); if (listener == null) { return; } - recoveryRef.get().prepareForTranslogOperations(request.totalTranslogOps(), listener); + recoveryTarget.prepareForTranslogOperations(request.totalTranslogOps(), listener); } } } @@ -369,12 +365,13 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request); + final RecoveryTarget recoveryTarget = recoveryRef.get(); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FINALIZE, request); if (listener == null) { return; } - recoveryRef.get().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener); + recoveryTarget.finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener); } } } @@ -399,8 +396,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin throws IOException { try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); - final ActionListener listener = createOrFinishListener( - recoveryRef, + final ActionListener listener = recoveryTarget.createOrFinishListener( channel, Actions.TRANSLOG_OPS, request, @@ -484,20 +480,20 @@ class FilesInfoRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request); + final RecoveryTarget recoveryTarget = recoveryRef.get(); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FILES_INFO, request); if (listener == null) { return; } - recoveryRef.get() - .receiveFileInfo( - request.phase1FileNames, - request.phase1FileSizes, - request.phase1ExistingFileNames, - request.phase1ExistingFileSizes, - request.totalTranslogOps, - listener - ); + recoveryTarget.receiveFileInfo( + request.phase1FileNames, + request.phase1FileSizes, + request.phase1ExistingFileNames, + request.phase1ExistingFileSizes, + request.totalTranslogOps, + listener + ); } } } @@ -507,90 +503,37 @@ class CleanFilesRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request); + final RecoveryTarget recoveryTarget = recoveryRef.get(); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.CLEAN_FILES, request); if (listener == null) { return; } - recoveryRef.get() - .cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(), listener); + recoveryTarget.cleanFiles( + request.totalTranslogOps(), + request.getGlobalCheckpoint(), + request.sourceMetaSnapshot(), + listener + ); } } } - class FileChunkTransportRequestHandler implements TransportRequestHandler { + class FileChunkTransportRequestHandler implements TransportRequestHandler { // How many bytes we've copied since we last called RateLimiter.pause final AtomicLong bytesSinceLastPause = new AtomicLong(); @Override - public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception { + public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception { try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request); - if (listener == null) { - return; - } - - final ReplicationLuceneIndex indexState = recoveryTarget.state().getIndex(); - if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) { - indexState.addSourceThrottling(request.sourceThrottleTimeInNanos()); - } - - RateLimiter rateLimiter = recoverySettings.rateLimiter(); - if (rateLimiter != null) { - long bytes = bytesSinceLastPause.addAndGet(request.content().length()); - if (bytes > rateLimiter.getMinPauseCheckBytes()) { - // Time to pause - bytesSinceLastPause.addAndGet(-bytes); - long throttleTimeInNanos = rateLimiter.pause(bytes); - indexState.addTargetThrottling(throttleTimeInNanos); - recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos); - } - } - recoveryTarget.writeFileChunk( - request.metadata(), - request.position(), - request.content(), - request.lastChunk(), - request.totalTranslogOps(), - listener - ); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request); + recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); } } } - private ActionListener createOrFinishListener( - final ReplicationRef recoveryRef, - final TransportChannel channel, - final String action, - final RecoveryTransportRequest request - ) { - return createOrFinishListener(recoveryRef, channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE); - } - - private ActionListener createOrFinishListener( - final ReplicationRef recoveryRef, - final TransportChannel channel, - final String action, - final RecoveryTransportRequest request, - final CheckedFunction responseFn - ) { - final RecoveryTarget recoveryTarget = recoveryRef.get(); - final ActionListener channelListener = new ChannelActionListener<>(channel, action, request); - final ActionListener voidListener = ActionListener.map(channelListener, responseFn); - - final long requestSeqNo = request.requestSeqNo(); - final ActionListener listener; - if (requestSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { - listener = recoveryTarget.markRequestReceivedAndCreateListener(requestSeqNo, voidListener); - } else { - listener = voidListener; - } - - return listener; - } - class RecoveryRunner extends AbstractRunnable { final long recoveryId; diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java index a3c7adb755145..57208ab029bf4 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java @@ -260,6 +260,7 @@ public Translog getTranslog() { return translog; } + @Override public ReplicationTimer getTimer() { return timer; } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 92897ab19ad64..1735bb015c90c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.opensearch.Assertions; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.cluster.node.DiscoveryNode; @@ -141,7 +142,7 @@ public String description() { } @Override - public void notifyListener(Exception e, boolean sendShardFailure) { + public void notifyListener(OpenSearchException e, boolean sendShardFailure) { listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index fd6de6322bb0a..ab6466feb11f8 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -35,38 +35,24 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; -import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; -import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionListenerResponseHandler; -import org.opensearch.action.support.RetryableAction; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.breaker.CircuitBreakingException; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.CancellableThreads; -import org.opensearch.common.util.concurrent.ConcurrentCollections; -import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.EmptyTransportResponseHandler; -import org.opensearch.transport.RemoteTransportException; -import org.opensearch.transport.SendRequestTransportException; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -80,12 +66,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private static final Logger logger = LogManager.getLogger(RemoteRecoveryTargetHandler.class); private final TransportService transportService; - private final ThreadPool threadPool; private final long recoveryId; private final ShardId shardId; private final DiscoveryNode targetNode; private final RecoverySettings recoverySettings; - private final Map> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap(); private final TransportRequestOptions translogOpsRequestOptions; private final TransportRequestOptions fileChunkRequestOptions; @@ -94,8 +78,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private final AtomicLong requestSeqNoGenerator = new AtomicLong(0); private final Consumer onSourceThrottle; - private final boolean retriesSupported; - private volatile boolean isCancelled = false; + private final RetryableTransportClient retryableTransportClient; public RemoteRecoveryTargetHandler( long recoveryId, @@ -106,7 +89,15 @@ public RemoteRecoveryTargetHandler( Consumer onSourceThrottle ) { this.transportService = transportService; - this.threadPool = transportService.getThreadPool(); + // It is safe to pass the retry timeout value here because RemoteRecoveryTargetHandler + // created per recovery. Any change to RecoverySettings will be applied on the next + // recovery. + this.retryableTransportClient = new RetryableTransportClient( + transportService, + targetNode, + recoverySettings.internalActionRetryTimeout(), + logger + ); this.recoveryId = recoveryId; this.shardId = shardId; this.targetNode = targetNode; @@ -120,7 +111,6 @@ public RemoteRecoveryTargetHandler( .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionTimeout()) .build(); - this.retriesSupported = targetNode.getVersion().onOrAfter(LegacyESVersion.V_7_9_0); } public DiscoveryNode targetNode() { @@ -137,12 +127,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener reader = in -> TransportResponse.Empty.INSTANCE; final ActionListener responseListener = ActionListener.map(listener, r -> null); - executeRetryableAction(action, request, options, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } @Override @@ -156,12 +143,9 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe globalCheckpoint, trimAboveSeqNo ); - final TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout(recoverySettings.internalActionLongTimeout()) - .build(); final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; final ActionListener responseListener = ActionListener.map(listener, r -> null); - executeRetryableAction(action, request, options, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } @Override @@ -200,7 +184,7 @@ public void indexTranslogOperations( ); final Writeable.Reader reader = RecoveryTranslogOperationsResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r.localCheckpoint); - executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader); } @Override @@ -224,12 +208,9 @@ public void receiveFileInfo( phase1ExistingFileSizes, totalTranslogOps ); - final TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; final ActionListener responseListener = ActionListener.map(listener, r -> null); - executeRetryableAction(action, request, options, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } @Override @@ -249,12 +230,9 @@ public void cleanFiles( totalTranslogOps, globalCheckpoint ); - final TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; final ActionListener responseListener = ActionListener.map(listener, r -> null); - executeRetryableAction(action, request, options, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } @Override @@ -294,7 +272,7 @@ public void writeFileChunk( * see how many translog ops we accumulate while copying files across the network. A future optimization * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ - final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest( + final FileChunkRequest request = new FileChunkRequest( recoveryId, requestSeqNo, shardId, @@ -306,71 +284,17 @@ public void writeFileChunk( throttleTimeInNanos ); final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - executeRetryableAction(action, request, fileChunkRequestOptions, ActionListener.map(listener, r -> null), reader); + retryableTransportClient.executeRetryableAction( + action, + request, + fileChunkRequestOptions, + ActionListener.map(listener, r -> null), + reader + ); } @Override public void cancel() { - isCancelled = true; - if (onGoingRetryableActions.isEmpty()) { - return; - } - final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("recovery was cancelled"); - // Dispatch to generic as cancellation calls can come on the cluster state applier thread - threadPool.generic().execute(() -> { - for (RetryableAction action : onGoingRetryableActions.values()) { - action.cancel(exception); - } - onGoingRetryableActions.clear(); - }); - } - - private void executeRetryableAction( - String action, - RecoveryTransportRequest request, - TransportRequestOptions options, - ActionListener actionListener, - Writeable.Reader reader - ) { - final Object key = new Object(); - final ActionListener removeListener = ActionListener.runBefore(actionListener, () -> onGoingRetryableActions.remove(key)); - final TimeValue initialDelay = TimeValue.timeValueMillis(200); - final TimeValue timeout = recoverySettings.internalActionRetryTimeout(); - final RetryableAction retryableAction = new RetryableAction(logger, threadPool, initialDelay, timeout, removeListener) { - - @Override - public void tryAction(ActionListener listener) { - transportService.sendRequest( - targetNode, - action, - request, - options, - new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC) - ); - } - - @Override - public boolean shouldRetry(Exception e) { - return retriesSupported && retryableException(e); - } - }; - onGoingRetryableActions.put(key, retryableAction); - retryableAction.run(); - if (isCancelled) { - retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("recovery was cancelled")); - } - } - - private static boolean retryableException(Exception e) { - if (e instanceof ConnectTransportException) { - return true; - } else if (e instanceof SendRequestTransportException) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - return cause instanceof ConnectTransportException; - } else if (e instanceof RemoteTransportException) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - return cause instanceof CircuitBreakingException || cause instanceof OpenSearchRejectedExecutionException; - } - return false; + retryableTransportClient.cancel(); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java new file mode 100644 index 0000000000000..bc10cc80b7fdc --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.LegacyESVersion; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionListenerResponseHandler; +import org.opensearch.action.support.RetryableAction; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.breaker.CircuitBreakingException; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ConnectTransportException; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.SendRequestTransportException; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestOptions; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportService; + +import java.util.Map; + +/** + * Client that implements retry functionality for transport layer requests. + * + * @opensearch.internal + */ +public final class RetryableTransportClient { + + private final ThreadPool threadPool; + private final Map> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap(); + private volatile boolean isCancelled = false; + private final TransportService transportService; + private final TimeValue retryTimeout; + private final DiscoveryNode targetNode; + + private final Logger logger; + + public RetryableTransportClient(TransportService transportService, DiscoveryNode targetNode, TimeValue retryTimeout, Logger logger) { + this.threadPool = transportService.getThreadPool(); + this.transportService = transportService; + this.retryTimeout = retryTimeout; + this.targetNode = targetNode; + this.logger = logger; + } + + /** + * Execute a retryable action. + * @param action {@link String} Action Name. + * @param request {@link TransportRequest} Transport request to execute. + * @param actionListener {@link ActionListener} Listener to complete + * @param reader {@link Writeable.Reader} Reader to read the response stream. + * @param {@link TransportResponse} type. + */ + public void executeRetryableAction( + String action, + TransportRequest request, + ActionListener actionListener, + Writeable.Reader reader + ) { + final TransportRequestOptions options = TransportRequestOptions.builder().withTimeout(retryTimeout).build(); + executeRetryableAction(action, request, options, actionListener, reader); + } + + void executeRetryableAction( + String action, + TransportRequest request, + TransportRequestOptions options, + ActionListener actionListener, + Writeable.Reader reader + ) { + final Object key = new Object(); + final ActionListener removeListener = ActionListener.runBefore(actionListener, () -> onGoingRetryableActions.remove(key)); + final TimeValue initialDelay = TimeValue.timeValueMillis(200); + final RetryableAction retryableAction = new RetryableAction(logger, threadPool, initialDelay, retryTimeout, removeListener) { + + @Override + public void tryAction(ActionListener listener) { + transportService.sendRequest( + targetNode, + action, + request, + options, + new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC) + ); + } + + @Override + public boolean shouldRetry(Exception e) { + return targetNode.getVersion().onOrAfter(LegacyESVersion.V_7_9_0) && retryableException(e); + } + }; + onGoingRetryableActions.put(key, retryableAction); + retryableAction.run(); + if (isCancelled) { + retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("retryable action was cancelled")); + } + } + + public void cancel() { + isCancelled = true; + if (onGoingRetryableActions.isEmpty()) { + return; + } + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); + // Dispatch to generic as cancellation calls can come on the cluster state applier thread + threadPool.generic().execute(() -> { + for (RetryableAction action : onGoingRetryableActions.values()) { + action.cancel(exception); + } + onGoingRetryableActions.clear(); + }); + } + + private static boolean retryableException(Exception e) { + if (e instanceof ConnectTransportException) { + return true; + } else if (e instanceof SendRequestTransportException) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + return cause instanceof ConnectTransportException; + } else if (e instanceof RemoteTransportException) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + return cause instanceof CircuitBreakingException || cause instanceof OpenSearchRejectedExecutionException; + } + return false; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java new file mode 100644 index 0000000000000..a73a3b54184da --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Set; + +/** + * Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos + * associated with a particular {@link ReplicationCheckpoint}. The {@link SegmentReplicationSource} may determine that + * the requested {@link ReplicationCheckpoint} is behind and return a different {@link ReplicationCheckpoint} in this response. + * + * @opensearch.internal + */ +public class CheckpointInfoResponse extends TransportResponse { + + private final ReplicationCheckpoint checkpoint; + private final Store.MetadataSnapshot snapshot; + private final byte[] infosBytes; + // pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos + // but are still referenced by the latest commit point (Segments_N). + private final Set pendingDeleteFiles; + + public CheckpointInfoResponse( + final ReplicationCheckpoint checkpoint, + final Store.MetadataSnapshot snapshot, + final byte[] infosBytes, + final Set additionalFiles + ) { + this.checkpoint = checkpoint; + this.snapshot = snapshot; + this.infosBytes = infosBytes; + this.pendingDeleteFiles = additionalFiles; + } + + public CheckpointInfoResponse(StreamInput in) throws IOException { + this.checkpoint = new ReplicationCheckpoint(in); + this.snapshot = new Store.MetadataSnapshot(in); + this.infosBytes = in.readByteArray(); + this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + checkpoint.writeTo(out); + snapshot.writeTo(out); + out.writeByteArray(infosBytes); + out.writeCollection(pendingDeleteFiles); + } + + public ReplicationCheckpoint getCheckpoint() { + return checkpoint; + } + + public Store.MetadataSnapshot getSnapshot() { + return snapshot; + } + + public byte[] getInfosBytes() { + return infosBytes; + } + + public Set getPendingDeleteFiles() { + return pendingDeleteFiles; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java new file mode 100644 index 0000000000000..6dc7e293b2c31 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.List; + +/** + * Response from a {@link SegmentReplicationSource} indicating that a replication event has completed. + * + * @opensearch.internal + */ +public class GetSegmentFilesResponse extends TransportResponse { + + List files; + + public GetSegmentFilesResponse(List files) { + this.files = files; + } + + public GetSegmentFilesResponse(StreamInput out) throws IOException { + out.readList(StoreFileMetadata::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(files); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java new file mode 100644 index 0000000000000..8628a266ea7d0 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.ActionListener; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.util.List; + +/** + * Represents the source of a replication event. + * + * @opensearch.internal + */ +public interface SegmentReplicationSource { + + /** + * Get Metadata for a ReplicationCheckpoint. + * + * @param replicationId {@link long} - ID of the replication event. + * @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for. + * @param listener {@link ActionListener} listener that completes with a {@link CheckpointInfoResponse}. + */ + void getCheckpointMetadata(long replicationId, ReplicationCheckpoint checkpoint, ActionListener listener); + + /** + * Fetch the requested segment files. Passes a listener that completes when files are stored locally. + * + * @param replicationId {@link long} - ID of the replication event. + * @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for. + * @param filesToFetch {@link List} List of files to fetch. + * @param store {@link Store} Reference to the local store. + * @param listener {@link ActionListener} Listener that completes with the list of files copied. + */ + void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ); +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java new file mode 100644 index 0000000000000..3ca31503f176d --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.transport.TransportService; + +/** + * Factory to build {@link SegmentReplicationSource} used by {@link SegmentReplicationTargetService}. + * + * @opensearch.internal + */ +public class SegmentReplicationSourceFactory { + + private TransportService transportService; + private RecoverySettings recoverySettings; + private ClusterService clusterService; + + public SegmentReplicationSourceFactory( + TransportService transportService, + RecoverySettings recoverySettings, + ClusterService clusterService + ) { + this.transportService = transportService; + this.recoverySettings = recoverySettings; + this.clusterService = clusterService; + } + + public SegmentReplicationSource get(IndexShard shard) { + // TODO: Default to an implementation that uses the primary shard. + return null; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java new file mode 100644 index 0000000000000..b01016d2a1e62 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; +import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.indices.replication.common.ReplicationTimer; + +/** + * ReplicationState implementation to track Segment Replication events. + * + * @opensearch.internal + */ +public class SegmentReplicationState implements ReplicationState { + + /** + * The stage of the recovery state + * + * @opensearch.internal + */ + public enum Stage { + DONE((byte) 0), + + INIT((byte) 1); + + private static final Stage[] STAGES = new Stage[Stage.values().length]; + + static { + for (Stage stage : Stage.values()) { + assert stage.id() < STAGES.length && stage.id() >= 0; + STAGES[stage.id] = stage; + } + } + + private final byte id; + + Stage(byte id) { + this.id = id; + } + + public byte id() { + return id; + } + + public static Stage fromId(byte id) { + if (id < 0 || id >= STAGES.length) { + throw new IllegalArgumentException("No mapping for id [" + id + "]"); + } + return STAGES[id]; + } + } + + public SegmentReplicationState() { + this.stage = Stage.INIT; + } + + private Stage stage; + + @Override + public ReplicationLuceneIndex getIndex() { + // TODO + return null; + } + + @Override + public ReplicationTimer getTimer() { + // TODO + return null; + } + + public Stage getStage() { + return stage; + } + + public void setStage(Stage stage) { + this.stage = stage; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java new file mode 100644 index 0000000000000..7933ea5f0344b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; +import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.indices.replication.common.ReplicationTarget; + +import java.io.IOException; + +/** + * Represents the target of a replication event. + * + * @opensearch.internal + */ +public class SegmentReplicationTarget extends ReplicationTarget { + + private final ReplicationCheckpoint checkpoint; + private final SegmentReplicationSource source; + private final SegmentReplicationState state; + + public SegmentReplicationTarget( + ReplicationCheckpoint checkpoint, + IndexShard indexShard, + SegmentReplicationSource source, + SegmentReplicationTargetService.SegmentReplicationListener listener + ) { + super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); + this.checkpoint = checkpoint; + this.source = source; + this.state = new SegmentReplicationState(); + } + + @Override + protected void closeInternal() { + // TODO + } + + @Override + protected String getPrefix() { + // TODO + return null; + } + + @Override + protected void onDone() { + this.state.setStage(SegmentReplicationState.Stage.DONE); + } + + @Override + protected void onCancel(String reason) { + // TODO + } + + @Override + public ReplicationState state() { + return state; + } + + @Override + public ReplicationTarget retryCopy() { + // TODO + return null; + } + + @Override + public String description() { + // TODO + return null; + } + + @Override + public void notifyListener(OpenSearchException e, boolean sendShardFailure) { + listener.onFailure(state(), e, sendShardFailure); + } + + @Override + public boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException { + // TODO + return false; + } + + @Override + public void writeFileChunk( + StoreFileMetadata metadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + // TODO + } + + /** + * Start the Replication event. + * @param listener {@link ActionListener} listener. + */ + public void startReplication(ActionListener listener) { + // TODO + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java new file mode 100644 index 0000000000000..1c6053a72a4c5 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -0,0 +1,170 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.Nullable; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexEventListener; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.transport.TransportService; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Service class that orchestrates replication events on replicas. + * + * @opensearch.internal + */ +public final class SegmentReplicationTargetService implements IndexEventListener { + + private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); + + private final ThreadPool threadPool; + private final RecoverySettings recoverySettings; + + private final ReplicationCollection onGoingReplications; + + private final SegmentReplicationSourceFactory sourceFactory; + + /** + * The internal actions + * + * @opensearch.internal + */ + public static class Actions { + public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk"; + } + + public SegmentReplicationTargetService( + final ThreadPool threadPool, + final RecoverySettings recoverySettings, + final TransportService transportService, + final SegmentReplicationSourceFactory sourceFactory + ) { + this.threadPool = threadPool; + this.recoverySettings = recoverySettings; + this.onGoingReplications = new ReplicationCollection<>(logger, threadPool); + this.sourceFactory = sourceFactory; + + transportService.registerRequestHandler( + Actions.FILE_CHUNK, + ThreadPool.Names.GENERIC, + FileChunkRequest::new, + new FileChunkTransportRequestHandler() + ); + } + + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + if (indexShard != null) { + onGoingReplications.cancelForShard(shardId, "shard closed"); + } + } + + public void startReplication( + final ReplicationCheckpoint checkpoint, + final IndexShard indexShard, + final SegmentReplicationListener listener + ) { + startReplication(new SegmentReplicationTarget(checkpoint, indexShard, sourceFactory.get(indexShard), listener)); + } + + public void startReplication(final SegmentReplicationTarget target) { + final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout()); + logger.trace(() -> new ParameterizedMessage("Starting replication {}", replicationId)); + threadPool.generic().execute(new ReplicationRunner(replicationId)); + } + + /** + * Listener that runs on changes in Replication state + * + * @opensearch.internal + */ + public interface SegmentReplicationListener extends ReplicationListener { + + @Override + default void onDone(ReplicationState state) { + onReplicationDone((SegmentReplicationState) state); + } + + @Override + default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + onReplicationFailure((SegmentReplicationState) state, e, sendShardFailure); + } + + void onReplicationDone(SegmentReplicationState state); + + void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure); + } + + /** + * Runnable implementation to trigger a replication event. + */ + private class ReplicationRunner implements Runnable { + + final long replicationId; + + public ReplicationRunner(long replicationId) { + this.replicationId = replicationId; + } + + @Override + public void run() { + start(replicationId); + } + } + + private void start(final long replicationId) { + try (ReplicationRef replicationRef = onGoingReplications.get(replicationId)) { + replicationRef.get().startReplication(new ActionListener<>() { + @Override + public void onResponse(Void o) { + onGoingReplications.markAsDone(replicationId); + } + + @Override + public void onFailure(Exception e) { + onGoingReplications.fail(replicationId, new OpenSearchException("Segment Replication failed", e), true); + } + }); + } + } + + private class FileChunkTransportRequestHandler implements TransportRequestHandler { + + // How many bytes we've copied since we last called RateLimiter.pause + final AtomicLong bytesSinceLastPause = new AtomicLong(); + + @Override + public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception { + try (ReplicationRef ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) { + final SegmentReplicationTarget target = ref.get(); + final ActionListener listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request); + target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index 609825eb5227b..b8295f0685a7f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -133,7 +133,7 @@ public T reset(final long id, final TimeValue activityTimeout) { } catch (Exception e) { // fail shard to be safe assert oldTarget != null; - oldTarget.notifyListener(e, true); + oldTarget.notifyListener(new OpenSearchException("Unable to reset target", e), true); return null; } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java index 7942fa8938dd0..029fcb6a3b690 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java @@ -14,5 +14,7 @@ * @opensearch.internal */ public interface ReplicationState { + ReplicationLuceneIndex getIndex(); + ReplicationTimer getTimer(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 0192270907fd2..f8dc07f122c02 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -9,14 +9,25 @@ package org.opensearch.indices.replication.common; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.RateLimiter; import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.Nullable; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.AbstractRefCounted; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoveryTransportRequest; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportResponse; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -64,7 +75,7 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } - public abstract void notifyListener(Exception e, boolean sendShardFailure); + public abstract void notifyListener(OpenSearchException e, boolean sendShardFailure); public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) { super(name); @@ -98,6 +109,7 @@ public void setLastAccessTime() { lastAccessTime = System.nanoTime(); } + @Nullable public ActionListener markRequestReceivedAndCreateListener(long requestSeqNo, ActionListener listener) { return requestTracker.markReceivedAndCreateListener(requestSeqNo, listener); } @@ -172,4 +184,86 @@ protected void ensureRefCount() { } } + @Nullable + public ActionListener createOrFinishListener( + final TransportChannel channel, + final String action, + final RecoveryTransportRequest request + ) { + return createOrFinishListener(channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE); + } + + @Nullable + public ActionListener createOrFinishListener( + final TransportChannel channel, + final String action, + final RecoveryTransportRequest request, + final CheckedFunction responseFn + ) { + final ActionListener channelListener = new ChannelActionListener<>(channel, action, request); + final ActionListener voidListener = ActionListener.map(channelListener, responseFn); + + final long requestSeqNo = request.requestSeqNo(); + final ActionListener listener; + if (requestSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + listener = markRequestReceivedAndCreateListener(requestSeqNo, voidListener); + } else { + listener = voidListener; + } + + return listener; + } + + /** + * Handle a FileChunkRequest for a {@link ReplicationTarget}. + * + * @param request {@link FileChunkRequest} Request containing the file chunk. + * @param bytesSinceLastPause {@link AtomicLong} Bytes since the last pause. + * @param rateLimiter {@link RateLimiter} Rate limiter. + * @param listener {@link ActionListener} listener that completes when the chunk has been written. + * @throws IOException When there is an issue pausing the rate limiter. + */ + public void handleFileChunk( + final FileChunkRequest request, + final ReplicationTarget replicationTarget, + final AtomicLong bytesSinceLastPause, + final RateLimiter rateLimiter, + final ActionListener listener + ) throws IOException { + + if (listener == null) { + return; + } + final ReplicationLuceneIndex indexState = replicationTarget.state().getIndex(); + if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) { + indexState.addSourceThrottling(request.sourceThrottleTimeInNanos()); + } + if (rateLimiter != null) { + long bytes = bytesSinceLastPause.addAndGet(request.content().length()); + if (bytes > rateLimiter.getMinPauseCheckBytes()) { + // Time to pause + bytesSinceLastPause.addAndGet(-bytes); + long throttleTimeInNanos = rateLimiter.pause(bytes); + indexState.addTargetThrottling(throttleTimeInNanos); + replicationTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos); + } + } + writeFileChunk( + request.metadata(), + request.position(), + request.content(), + request.lastChunk(), + request.totalTranslogOps(), + listener + ); + } + + public abstract void writeFileChunk( + StoreFileMetadata metadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ); } diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index e54f06937cad3..bda2a910d922e 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -105,7 +105,7 @@ public void testWriteFileChunksConcurrently() throws Exception { receiveFileInfoFuture ); receiveFileInfoFuture.actionGet(); - List requests = new ArrayList<>(); + List requests = new ArrayList<>(); long seqNo = 0; for (StoreFileMetadata md : mdFiles) { try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) { @@ -115,7 +115,7 @@ public void testWriteFileChunksConcurrently() throws Exception { byte[] buffer = new byte[length]; in.readBytes(buffer, 0, length); requests.add( - new RecoveryFileChunkRequest( + new FileChunkRequest( 0, seqNo++, sourceShard.shardId(), @@ -132,7 +132,7 @@ public void testWriteFileChunksConcurrently() throws Exception { } } Randomness.shuffle(requests); - BlockingQueue queue = new ArrayBlockingQueue<>(requests.size()); + BlockingQueue queue = new ArrayBlockingQueue<>(requests.size()); queue.addAll(requests); Thread[] senders = new Thread[between(1, 4)]; CyclicBarrier barrier = new CyclicBarrier(senders.length); @@ -140,7 +140,7 @@ public void testWriteFileChunksConcurrently() throws Exception { senders[i] = new Thread(() -> { try { barrier.await(); - RecoveryFileChunkRequest r; + FileChunkRequest r; while ((r = queue.poll()) != null) { recoveryTarget.writeFileChunk( r.metadata(), diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java new file mode 100644 index 0000000000000..aa17dec5767da --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -0,0 +1,127 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.junit.Assert; +import org.mockito.Mockito; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { + + private IndexShard indexShard; + private ReplicationCheckpoint checkpoint; + private SegmentReplicationSource replicationSource; + private SegmentReplicationTargetService sut; + + @Override + public void setUp() throws Exception { + super.setUp(); + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + final TransportService transportService = mock(TransportService.class); + indexShard = newShard(false, settings); + checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); + SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); + replicationSource = mock(SegmentReplicationSource.class); + when(replicationSourceFactory.get(indexShard)).thenReturn(replicationSource); + + sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory); + } + + @Override + public void tearDown() throws Exception { + closeShards(indexShard); + super.tearDown(); + } + + public void testTargetReturnsSuccess_listenerCompletes() throws IOException { + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + assertEquals(SegmentReplicationState.Stage.DONE, state.getStage()); + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + Assert.fail(); + } + } + ); + final SegmentReplicationTarget spy = Mockito.spy(target); + doAnswer(invocation -> { + final ActionListener listener = invocation.getArgument(0); + listener.onResponse(null); + return null; + }).when(spy).startReplication(any()); + sut.startReplication(spy); + closeShards(indexShard); + } + + public void testTargetThrowsException() throws IOException { + final OpenSearchException expectedError = new OpenSearchException("Fail"); + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + Assert.fail(); + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + assertEquals(SegmentReplicationState.Stage.INIT, state.getStage()); + assertEquals(expectedError, e.getCause()); + assertTrue(sendShardFailure); + } + } + ); + final SegmentReplicationTarget spy = Mockito.spy(target); + doAnswer(invocation -> { + final ActionListener listener = invocation.getArgument(0); + listener.onFailure(expectedError); + return null; + }).when(spy).startReplication(any()); + sut.startReplication(spy); + closeShards(indexShard); + } + + public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException { + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ); + final SegmentReplicationTarget spy = Mockito.spy(target); + sut.startReplication(spy); + sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, Settings.EMPTY); + Mockito.verify(spy, times(1)).cancel(any()); + closeShards(indexShard); + } +}