Skip to content

Commit

Permalink
Revert "[Segment Replication] Use engine codec and replica shard Repl…
Browse files Browse the repository at this point in the history
…icationCheckpoint for replication events (opensearch-project#7732)"

This reverts commit adf7e2c.
  • Loading branch information
dreamer-89 committed Jun 2, 2023
1 parent c0be2bc commit 1379730
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ public ReplicationCheckpoint getCheckpoint() {
return this.checkpoint;
}

public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) {
public SegmentReplicationTarget(
ReplicationCheckpoint checkpoint,
IndexShard indexShard,
SegmentReplicationSource source,
ReplicationListener listener
) {
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
this.checkpoint = indexShard.getLatestReplicationCheckpoint();
this.checkpoint = checkpoint;
this.source = source;
this.state = new SegmentReplicationState(
indexShard.routingEntry(),
Expand Down Expand Up @@ -96,7 +101,7 @@ public SegmentReplicationState state() {
}

public SegmentReplicationTarget retryCopy() {
return new SegmentReplicationTarget(indexShard, source, listener);
return new SegmentReplicationTarget(checkpoint, indexShard, source, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(replicaShard, new SegmentReplicationListener() {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
Expand Down Expand Up @@ -301,8 +301,17 @@ protected void updateLatestReceivedCheckpoint(ReplicationCheckpoint receivedChec
}
}

public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, sourceFactory.get(indexShard), listener);
public SegmentReplicationTarget startReplication(
final ReplicationCheckpoint checkpoint,
final IndexShard indexShard,
final SegmentReplicationListener listener
) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
indexShard,
sourceFactory.get(indexShard),
listener
);
startReplication(target);
return target;
}
Expand Down Expand Up @@ -420,49 +429,57 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
channel.sendResponse(TransportResponse.Empty.INSTANCE);
return;
}
startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete to {}, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
indexShard.getLatestReplicationCheckpoint(),
state.getTimingData()
)
);
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
startReplication(
ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete to {}, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
indexShard.getLatestReplicationCheckpoint(),
state.getTimingData()
)
);
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
try {
channel.sendResponse(e);
} catch (IOException ex) {
throw new RuntimeException(ex);
@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
try {
channel.sendResponse(e);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
});
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.CheckpointInfoResponse;
Expand Down Expand Up @@ -295,7 +294,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
IndexShard primaryShard = newStartedShard(true);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primaryShard, null, mock(TransportService.class), mock(IndicesService.class));
sut = prepareForReplication(primaryShard, null);
SegmentReplicationTargetService spy = spy(sut);

// Starting a new shard in PrimaryMode and shard routing primary.
Expand All @@ -315,7 +314,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any());
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(primaryShard);
}

Expand Down Expand Up @@ -1028,10 +1027,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
try {
final CopyState copyState = new CopyState(
ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()),
primary
);
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
Expand All @@ -1045,6 +1041,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Loading

0 comments on commit 1379730

Please sign in to comment.