-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding onNewCheckpoint to Start Replication on Replica Shard when Segment Replication is turned on #3540
Adding onNewCheckpoint to Start Replication on Replica Shard when Segment Replication is turned on #3540
Changes from 10 commits
8b288d8
02c1be3
45f8026
2052ff2
e9168e8
c2d57b2
3cda46c
27173d6
d585645
2a1d718
9aec215
512ba04
f6686f1
8b22a66
33564c3
37bf71b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,7 +160,6 @@ | |
import org.opensearch.indices.recovery.RecoveryListener; | ||
import org.opensearch.indices.recovery.RecoveryState; | ||
import org.opensearch.indices.recovery.RecoveryTarget; | ||
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; | ||
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; | ||
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; | ||
import org.opensearch.repositories.RepositoriesService; | ||
|
@@ -1377,19 +1376,52 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti | |
} | ||
} | ||
|
||
/** | ||
* Returns the lastest segmentInfos | ||
*/ | ||
public SegmentInfos getLatestSegmentInfos() { | ||
return getEngine().getSegmentInfosSnapshot().get(); | ||
} | ||
|
||
/** | ||
* Returns the lastest Replication Checkpoint that shard received | ||
*/ | ||
public ReplicationCheckpoint getLatestReplicationCheckpoint() { | ||
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); | ||
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); | ||
return new ReplicationCheckpoint( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a NRTReplicationEngine that is starting up, it will not yet have anything returned from |
||
this.shardId, | ||
getOperationPrimaryTerm(), | ||
latestSegmentInfos.getGeneration(), | ||
getProcessedLocalCheckpoint(), | ||
latestSegmentInfos.getVersion() | ||
); | ||
} | ||
|
||
/** | ||
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process. | ||
*/ | ||
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { | ||
assert shardRouting.primary() == false; | ||
// TODO | ||
* Checks if checkpoint should be processed | ||
* | ||
* @param requestCheckpoint received checkpoint that is checked for processing | ||
* @return true if checkpoint should be processed | ||
*/ | ||
public boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit - this method can be final. |
||
if (state().equals(IndexShardState.STARTED) == false) { | ||
logger.trace("Ignoring new replication checkpoint - shard is not started {}", state()); | ||
return false; | ||
} | ||
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); | ||
if (localCheckpoint.isAheadOf(requestCheckpoint)) { | ||
logger.trace( | ||
kartg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", | ||
localCheckpoint, | ||
requestCheckpoint | ||
); | ||
return false; | ||
} | ||
if (localCheckpoint.equals(requestCheckpoint)) { | ||
logger.trace("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint); | ||
return false; | ||
} | ||
return true; | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
import org.opensearch.OpenSearchException; | ||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.common.Nullable; | ||
import org.opensearch.common.inject.Inject; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.index.shard.IndexEventListener; | ||
import org.opensearch.index.shard.IndexShard; | ||
|
@@ -38,7 +39,7 @@ | |
* | ||
* @opensearch.internal | ||
*/ | ||
public final class SegmentReplicationTargetService implements IndexEventListener { | ||
mch2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public class SegmentReplicationTargetService implements IndexEventListener { | ||
|
||
private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); | ||
|
||
|
@@ -58,6 +59,7 @@ public static class Actions { | |
public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk"; | ||
} | ||
|
||
@Inject | ||
public SegmentReplicationTargetService( | ||
final ThreadPool threadPool, | ||
final RecoverySettings recoverySettings, | ||
|
@@ -84,6 +86,37 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh | |
} | ||
} | ||
|
||
/** | ||
* Invoked when a new checkpoint is received from a primary shard. | ||
* It checks if a new checkpoint should be processed or not and starts replication if needed. | ||
* @param requestCheckpoint received checkpoint that is checked for processing | ||
* @param indexShard replica shard on which checkpoint is received | ||
*/ | ||
public synchronized void onNewCheckpoint(final ReplicationCheckpoint requestCheckpoint, final IndexShard indexShard) { | ||
mch2 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this class is no longer final, this method can be. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still not possible as shardOperationOnReplica() in PublishCheckpointAction still needs to assert onNewCheckpoint(). Possibly need to use mockito-inline for making this or entire class final
Rishikesh1159 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (onGoingReplications.isShardReplicating(indexShard.shardId())) { | ||
logger.trace( | ||
mch2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", | ||
indexShard.getLatestReplicationCheckpoint() | ||
); | ||
return; | ||
} | ||
if (indexShard.shouldProcessCheckpoint(requestCheckpoint)) { | ||
startReplication(requestCheckpoint, indexShard, new SegmentReplicationListener() { | ||
@Override | ||
public void onReplicationDone(SegmentReplicationState state) {} | ||
|
||
@Override | ||
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { | ||
if (sendShardFailure == true) { | ||
logger.error("replication failure", e); | ||
indexShard.failShard("replication failure", e); | ||
} | ||
} | ||
}); | ||
|
||
} | ||
} | ||
|
||
public void startReplication( | ||
final ReplicationCheckpoint checkpoint, | ||
final IndexShard indexShard, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -235,6 +235,16 @@ public boolean cancelForShard(ShardId shardId, String reason) { | |
return cancelled; | ||
} | ||
|
||
/** | ||
* check if a shard is currently replicating | ||
* | ||
* @param shardId shardId for which to check if replicating | ||
* @return true if shard is currently replicating | ||
*/ | ||
public boolean isShardReplicating(ShardId shardId) { | ||
return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit-pick - this is a problem to be solved down the line:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes we can do that. But we have to change it in multiple places, so thinking of doing that in different PR |
||
} | ||
|
||
/** | ||
* a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference | ||
* causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,8 @@ | |
import org.apache.lucene.util.SetOnce; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.index.IndexingPressureService; | ||
import org.opensearch.indices.replication.SegmentReplicationSourceFactory; | ||
import org.opensearch.indices.replication.SegmentReplicationTargetService; | ||
import org.opensearch.indices.replication.SegmentReplicationSourceService; | ||
import org.opensearch.watcher.ResourceWatcherService; | ||
import org.opensearch.Assertions; | ||
|
@@ -935,6 +937,15 @@ protected Node( | |
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); | ||
b.bind(PeerRecoveryTargetService.class) | ||
.toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); | ||
b.bind(SegmentReplicationTargetService.class) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you are binding here, you do not need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this binding also be gated by the feature flag, like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes removed |
||
.toInstance( | ||
new SegmentReplicationTargetService( | ||
threadPool, | ||
recoverySettings, | ||
transportService, | ||
new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) | ||
) | ||
); | ||
if (FeatureFlags.isEnabled(REPLICATION_TYPE)) { | ||
b.bind(SegmentReplicationSourceService.class) | ||
.toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed this before - getSegmentInfosSnapshot returns a
GatedCloseable
, we should use try-with-resources.there is also a
getSegmentInfosSnapshot
method in IndexShard, we don't have to fetch off the engine.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the note above. How is this different from the existing getSegmentInfosSnapshot ?
OpenSearch/server/src/main/java/org/opensearch/index/shard/IndexShard.java
Lines 4029 to 4031 in 04e43a7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is same as getSegmentInfosSnapshot(). So we can call that directly