Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Extend FileChunkWriter to allow cancel on transport client #4386

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365))
- Bugs for dependabot changelog verifier workflow ([#4364](https://github.com/opensearch-project/OpenSearch/pull/4364))
- Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352))
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))

### Security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,23 @@
import org.opensearch.index.engine.Segment;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -65,6 +71,11 @@ public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Override
public Settings indexSettings() {
return Settings.builder()
Expand Down Expand Up @@ -318,6 +329,65 @@ public void testReplicationAfterForceMerge() throws Exception {
}
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
primaryNode
);
final IndexShard primaryShard = getIndexShard(primaryNode);

CountDownLatch latch = new CountDownLatch(1);

MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
FileChunkRequest req = (FileChunkRequest) request;
logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk());
if (req.name().endsWith("cfs") && req.lastChunk()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this specific file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to execute if block only once, so added one random file check as well along with lastChunk()

try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
connection.sendRequest(requestId, action, request, options);
}
);

final int docCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);

flush(INDEX_NAME);
}
segmentReplicationSourceService.beforeIndexShardClosed(primaryShard.shardId(), primaryShard, indexSettings());
latch.countDown();
assertDocCounts(docCount, primaryNode);
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ void writeFileChunk(
int totalTranslogOps,
ActionListener<Void> listener
);

default void cancel() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF

/**
* Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current
* nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
* node's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
* local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas
* with the list of required files.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,9 @@ public void writeFileChunk(
reader
);
}

@Override
public void cancel() {
retryableTransportClient.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class SegmentReplicationSourceHandler {
private final DiscoveryNode targetNode;
private final String allocationId;

private final FileChunkWriter writer;

/**
* Constructor.
*
Expand Down Expand Up @@ -96,6 +98,7 @@ class SegmentReplicationSourceHandler {
);
this.allocationId = allocationId;
this.copyState = copyState;
this.writer = writer;
}

/**
Expand Down Expand Up @@ -186,9 +189,10 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
}

/**
* Cancels the recovery and interrupts all eligible threads.
* Cancels the replication and interrupts all eligible threads.
*/
public void cancel(String reason) {
writer.cancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does order matter here at all? Should we cancel threads first?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the transport requests are taken up in TransportService thread. So, these two actions can be in any order.

cancellableThreads.cancel(reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;

public class SegmentReplicationSourceHandlerTests extends IndexShardTestCase {

Expand Down Expand Up @@ -241,6 +243,7 @@ public void onFailure(Exception e) {
}
});
latch.await(2, TimeUnit.SECONDS);
verify(chunkWriter, times(1)).cancel();
assertEquals("listener should have resolved with failure", 0, latch.getCount());
}
}