-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding CheckpointRefreshListener to trigger when Segment replication is turned on and Primary shard refreshes #3108
Changes from 15 commits
4addc10
37f49d2
3210e33
730b601
b216116
4928270
7777d1e
fa39d47
5907f0b
e40dc41
5a9d544
46557bb
7ac0b90
ff91dec
9bbfe20
4c97019
bd17e4f
6c75f6d
d78e8ae
cf7c92e
591e608
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.shard; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.lucene.search.ReferenceManager; | ||
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas. | ||
* This class is only used with Segment Replication enabled. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class CheckpointRefreshListener implements ReferenceManager.RefreshListener { | ||
|
||
protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class); | ||
|
||
private final IndexShard shard; | ||
private final SegmentReplicationCheckpointPublisher publisher; | ||
|
||
public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) { | ||
this.shard = shard; | ||
this.publisher = publisher; | ||
} | ||
|
||
@Override | ||
public void beforeRefresh() throws IOException { | ||
// Do nothing | ||
} | ||
|
||
@Override | ||
public void afterRefresh(boolean didRefresh) throws IOException { | ||
if (didRefresh) { | ||
publisher.publish(shard); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,6 +159,9 @@ | |
import org.opensearch.indices.recovery.RecoveryFailedException; | ||
import org.opensearch.indices.recovery.RecoveryState; | ||
import org.opensearch.indices.recovery.RecoveryTarget; | ||
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; | ||
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; | ||
import org.opensearch.indices.replication.copy.ReplicationCheckpoint; | ||
import org.opensearch.repositories.RepositoriesService; | ||
import org.opensearch.repositories.Repository; | ||
import org.opensearch.rest.RestStatus; | ||
|
@@ -298,6 +301,7 @@ Runnable getGlobalCheckpointSyncer() { | |
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>(); | ||
private final RefreshPendingLocationListener refreshPendingLocationListener; | ||
private volatile boolean useRetentionLeasesInPeerRecovery; | ||
private final ReferenceManager.RefreshListener checkpointRefreshListener; | ||
|
||
public IndexShard( | ||
final ShardRouting shardRouting, | ||
|
@@ -319,7 +323,8 @@ public IndexShard( | |
final List<IndexingOperationListener> listeners, | ||
final Runnable globalCheckpointSyncer, | ||
final RetentionLeaseSyncer retentionLeaseSyncer, | ||
final CircuitBreakerService circuitBreakerService | ||
final CircuitBreakerService circuitBreakerService, | ||
final SegmentReplicationCheckpointPublisher checkpointPublisher | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking we should overload the constructor here (or subclass indexShard) and conditionally use it if segrep is enabled when IndexShards are created from IndexService. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for creating an |
||
) throws IOException { | ||
super(shardRouting.shardId(), indexSettings); | ||
assert shardRouting.initializing(); | ||
|
@@ -402,6 +407,11 @@ public boolean shouldCache(Query query) { | |
persistMetadata(path, indexSettings, shardRouting, null, logger); | ||
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); | ||
this.refreshPendingLocationListener = new RefreshPendingLocationListener(); | ||
if (indexSettings.isSegRepEnabled() == true) { | ||
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); | ||
kartg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
this.checkpointRefreshListener = null; | ||
Rishikesh1159 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
public ThreadPool getThreadPool() { | ||
|
@@ -1362,6 +1372,15 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti | |
} | ||
} | ||
|
||
public ReplicationCheckpoint getLatestReplicationCheckpoint() { | ||
Rishikesh1159 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); | ||
} | ||
|
||
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { | ||
assert shardRouting.primary() == false; | ||
// TODO | ||
} | ||
|
||
/** | ||
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, | ||
* without having to worry about the current state of the engine and concurrent flushes. | ||
|
@@ -3105,6 +3124,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { | |
} | ||
}; | ||
|
||
final List<ReferenceManager.RefreshListener> internalRefreshListener; | ||
if (this.checkpointRefreshListener != null) { | ||
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener); | ||
} else { | ||
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't used anywhere? It should be wired up in Should this be an internal or external listener? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I will wire this up. But not sure if it should be external or internal listener. I am using internal listener for now, we can have a discussion on this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets push this down to the creation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I liked this idea at first, but I think we should maybe leave this block while the feature is still behind feature flags. Once the feature flag is removed then we wire up the listener even if its a NoOp. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick - can be written as:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kartg this we how we have it now, The above highlighted one is outdated.
Also if we try to write it as your suggested way, then internalRefreshListeners cannot be final, we have been using it as final even before this PR. Not sure if changing internalRefreshListener to a non-final variable is a good idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that this block declares and defines the If you'd really like to retain the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kartg I tried to incorporate your code block, but it throws me UnsupportedOperation Exception when we are adding, |
||
} | ||
|
||
return this.engineConfigFactory.newEngineConfig( | ||
shardId, | ||
threadPool, | ||
|
@@ -3121,7 +3147,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { | |
translogConfig, | ||
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), | ||
Arrays.asList(refreshListeners, refreshPendingLocationListener), | ||
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), | ||
internalRefreshListener, | ||
indexSort, | ||
circuitBreakerService, | ||
globalCheckpointSupplier, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be gated by the feature flag too? Alternatively, should
indexSettings.isSegRepEnabled()
check the feature flag internally?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @mch2 said we will do this in a separate change not in this PR