Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add new segrep settings #6523

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -42,6 +41,7 @@ public class PrimaryShardReplicationSource implements SegmentReplicationSource {
private final DiscoveryNode sourceNode;
private final DiscoveryNode targetNode;
private final String targetAllocationId;
private final RecoverySettings recoverySettings;

public PrimaryShardReplicationSource(
DiscoveryNode targetNode,
Expand All @@ -59,6 +59,7 @@ public PrimaryShardReplicationSource(
);
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.recoverySettings = recoverySettings;
}

@Override
Expand All @@ -83,17 +84,6 @@ public void getSegmentFiles(
) {
final Writeable.Reader<GetSegmentFilesResponse> reader = GetSegmentFilesResponse::new;
final ActionListener<GetSegmentFilesResponse> responseListener = ActionListener.map(listener, r -> r);
// Few of the below assumptions and calculations are added for experimental release of segment replication feature in 2.3
// version. These will be changed in next release.

// Storing the size of files to fetch in bytes.
final long sizeOfSegmentFiles = filesToFetch.stream().mapToLong(file -> file.length()).sum();

// Maximum size of files to fetch (segment files) in bytes, that can be processed in 1 minute for a m5.xlarge machine.
long baseSegmentFilesSize = 100000000;

// Formula for calculating time needed to process a replication event's files to fetch process
final long timeToGetSegmentFiles = 1 + (sizeOfSegmentFiles / baseSegmentFilesSize);
final GetSegmentFilesRequest request = new GetSegmentFilesRequest(
replicationId,
targetAllocationId,
Expand All @@ -102,7 +92,7 @@ public void getSegmentFiles(
checkpoint
);
final TransportRequestOptions options = TransportRequestOptions.builder()
.withTimeout(TimeValue.timeValueMinutes(timeToGetSegmentFiles))
.withTimeout(recoverySettings.internalActionLongTimeout())
.build();
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.transport.TestTransportChannel;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;
import org.junit.After;
Expand Down Expand Up @@ -218,6 +219,17 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
super.onSendRequest(requestId, action, request, destination);
}
}

@Override
protected void onSendRequest(
long requestId,
String action,
TransportRequest request,
DiscoveryNode destination,
TransportRequestOptions options
) {
onSendRequest(requestId, action, request, destination);
}
};
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
TransportService transportService = capturingTransport.createTransportService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class PrimaryShardReplicationSourceTests extends IndexShardTestCase {
private IndexShard indexShard;
private DiscoveryNode sourceNode;

private RecoverySettings recoverySettings;

@Override
public void setUp() throws Exception {
super.setUp();
Expand All @@ -73,6 +75,7 @@ public void setUp() throws Exception {

indexShard = newStartedShard(true);

this.recoverySettings = recoverySettings;
replicationSource = new PrimaryShardReplicationSource(
localNode,
indexShard.routingEntry().allocationId().toString(),
Expand Down Expand Up @@ -130,6 +133,34 @@ public void testGetSegmentFiles() {
assertTrue(capturedRequest.request instanceof GetSegmentFilesRequest);
}

/**
* This test verifies the transport request timeout value for fetching the segment files.
*/
public void testTransportTimeoutForGetSegmentFilesAction() {
long fileSize = (long) (Math.pow(10, 9));
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
SEQ_NO,
VERSION
);
StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", fileSize, "checksum", Version.LATEST);
replicationSource.getSegmentFiles(
REPLICATION_ID,
checkpoint,
Arrays.asList(testMetadata),
mock(Store.class),
mock(ActionListener.class)
);
CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear();
assertEquals(1, requestList.length);
CapturingTransport.CapturedRequest capturedRequest = requestList[0];
assertEquals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, capturedRequest.action);
assertEquals(sourceNode, capturedRequest.node);
assertEquals(recoverySettings.internalActionLongTimeout(), capturedRequest.options.timeout());
}

public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -54,12 +55,14 @@ public static class CapturedRequest {
public final long requestId;
public final String action;
public final TransportRequest request;
public final TransportRequestOptions options;

CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) {
CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) {
this.node = node;
this.requestId = requestId;
this.action = action;
this.request = request;
this.options = options;
}
}

Expand Down Expand Up @@ -123,6 +126,16 @@ public void clear() {
}

protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request));
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request, null));
}

protected void onSendRequest(
long requestId,
String action,
TransportRequest request,
DiscoveryNode node,
TransportRequestOptions options
) {
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request, options));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,23 @@ public DiscoveryNode getNode() {
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
requests.put(requestId, Tuple.tuple(node, action));
onSendRequest(requestId, action, request, node);
onSendRequest(requestId, action, request, node, options);
}
};
}

protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {}

protected void onSendRequest(
long requestId,
String action,
TransportRequest request,
DiscoveryNode node,
TransportRequestOptions options
) {
onSendRequest(requestId, action, request, node);
}

@Override
public void setMessageListener(TransportMessageListener listener) {
if (this.listener != null) {
Expand Down