diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index ecc1f888..39ba84eb 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -144,12 +144,14 @@ import java.util.Optional import java.util.function.Supplier import org.opensearch.index.engine.NRTReplicationEngine +import org.opensearch.replication.util.ValidationUtil @OpenForTesting internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, RepositoryPlugin, EnginePlugin { private lateinit var client: Client + private lateinit var clusterService: ClusterService private lateinit var threadPool: ThreadPool private lateinit var replicationMetadataManager: ReplicationMetadataManager private lateinit var replicationSettings: ReplicationSettings @@ -207,6 +209,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, repositoriesService: Supplier): Collection { this.client = client this.threadPool = threadPool + this.clusterService = clusterService this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client, ReplicationMetadataStore(client, clusterService, xContentRegistry)) this.replicationSettings = ReplicationSettings(clusterService) @@ -379,9 +382,15 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, } override fun getCustomTranslogDeletionPolicyFactory(): Optional { - return Optional.of(TranslogDeletionPolicyFactory{ - indexSettings, retentionLeasesSupplier -> ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier) - }) + // We don't need a retention lease translog deletion policy for remote store enabled clusters as + // we fetch the operations directly from lucene in such cases. + return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) { + Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier -> + ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier) + }) + } else { + Optional.empty() + } } override fun onIndexModule(indexModule: IndexModule) { diff --git a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt index dbee183d..392555b5 100644 --- a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt @@ -27,6 +27,7 @@ import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.Writeable import org.opensearch.common.unit.TimeValue import org.opensearch.core.index.shard.ShardId +import org.opensearch.index.shard.IndexShard import org.opensearch.index.translog.Translog import org.opensearch.indices.IndicesService import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING @@ -34,10 +35,7 @@ import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_EXECUT import org.opensearch.replication.seqno.RemoteClusterStats import org.opensearch.replication.seqno.RemoteClusterTranslogService import org.opensearch.replication.seqno.RemoteShardMetric -import org.opensearch.replication.util.completeWith -import org.opensearch.replication.util.coroutineContext -import org.opensearch.replication.util.stackTraceToString -import org.opensearch.replication.util.waitForGlobalCheckpoint +import org.opensearch.replication.util.* import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportActionProxy import org.opensearch.transport.TransportService @@ -79,7 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus indexMetric.lastFetchTime.set(relativeStartNanos) val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id) - if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) { + val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService) + if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) { // There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If // the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller // should catch and start a new poll. @@ -88,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus // At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced // to the translog, which means we can't return those changes. Return to the caller to retry. // TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog - if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) { - assert(gcp > indexShard.lastSyncedGlobalCheckpoint) { "Checkpoint didn't advance at all" } + if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) { + assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" } throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...") } } relativeStartNanos = System.nanoTime() // At this point lastSyncedGlobalCheckpoint is at least fromSeqNo - val toSeqNo = min(indexShard.lastSyncedGlobalCheckpoint, request.toSeqNo) + val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo) var ops: List = listOf() - var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) + var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false if(fetchFromTranslog) { try { ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo) @@ -137,12 +136,22 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus indexMetric.ops.addAndGet(ops.size.toLong()) ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) } - - GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, indexShard.lastSyncedGlobalCheckpoint) + GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) } } } + private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long { + // We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store + // enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to + // lastKnownGlobalCheckpoint in such cases. + return if (isRemoteStoreEnabled) { + indexShard.lastKnownGlobalCheckpoint + } else { + indexShard.lastSyncedGlobalCheckpoint + } + } + private fun isTranslogPruningByRetentionLeaseEnabled(shardId: ShardId): Boolean { val enabled = clusterService.state().metadata.indices.get(shardId.indexName) @@ -162,7 +171,9 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus } override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator { + val shardIt = state.routingTable().shardRoutingTable(request.request().shardId) // Random active shards - return state.routingTable().shardRoutingTable(request.request().shardId).activeInitializingShardsRandomIt() + return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt() + else shardIt.activeInitializingShardsRandomIt() } } \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt b/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt index 832977b2..8ea986c7 100644 --- a/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt +++ b/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt @@ -250,6 +250,9 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata // Remove translog pruning for the follower index builder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key) + builder.remove(IndexMetadata.SETTING_REMOTE_STORE_ENABLED) + builder.remove(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY) + builder.remove(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY) val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder) indexMetadata.aliases.values.forEach { diff --git a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt index 1d6b8c2e..3aad9665 100644 --- a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt +++ b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt @@ -24,8 +24,11 @@ import org.opensearch.env.Environment import org.opensearch.index.IndexNotFoundException import java.io.UnsupportedEncodingException import org.opensearch.cluster.service.ClusterService +import org.opensearch.node.Node +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING +import org.opensearch.replication.action.changes.TransportGetChangesAction import java.nio.file.Files import java.nio.file.Path import java.util.Locale @@ -154,4 +157,8 @@ object ValidationUtil { } + fun isRemoteStoreEnabledCluster(clusterService: ClusterService): Boolean { + return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false + } + }