From 6c75f6d2b52c287f461a5718bc982ee69d5f034d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 23 May 2022 18:52:05 +0000 Subject: [PATCH] Adding test for null checkpoint publisher and addreesing PR comments Signed-off-by: Rishikesh1159 --- .../main/java/org/opensearch/index/IndexService.java | 4 +--- .../java/org/opensearch/index/shard/IndexShard.java | 12 +++++++++--- .../checkpoint/PublishCheckpointRequest.java | 1 - .../{copy => checkpoint}/ReplicationCheckpoint.java | 2 +- .../indices/replication/copy/package-info.java | 10 ---------- .../org/opensearch/index/shard/IndexShardTests.java | 11 +++++++++++ .../checkpoint/PublishCheckpointActionTests.java | 1 - 7 files changed, 22 insertions(+), 19 deletions(-) rename server/src/main/java/org/opensearch/indices/replication/{copy => checkpoint}/ReplicationCheckpoint.java (98%) delete mode 100644 server/src/main/java/org/opensearch/indices/replication/copy/package-info.java diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 37508ea040d73..0a6d1501f2bea 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -533,9 +533,7 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - this.indexSettings.isSegRepEnabled() && routing.primary() - ? checkpointPublisher - : SegmentReplicationCheckpointPublisher.EMPTY + this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 87436094bcc9b..8bccaadd697d6 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -161,7 +161,7 @@ import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; -import org.opensearch.indices.replication.copy.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -324,7 +324,7 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - final SegmentReplicationCheckpointPublisher checkpointPublisher + @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -407,7 +407,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); - if (indexSettings.isSegRepEnabled() == true) { + if (checkpointPublisher != null) { this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); } else { this.checkpointRefreshListener = null; @@ -1372,10 +1372,16 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } } + /** + * Returns the lastest Replication Checkpoint that shard received + */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); } + /** + * Invoked when a new checkpoint is received from a primary shard. Starts the copy process. + */ public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { assert shardRouting.primary() == false; // TODO diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java index 356ba9969644d..740fd3bccb7c4 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java @@ -11,7 +11,6 @@ import org.opensearch.action.support.replication.ReplicationRequest; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java similarity index 98% rename from server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java rename to server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index ba7da34596666..98ab9cc4c1708 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.indices.replication.copy; +package org.opensearch.indices.replication.checkpoint; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/package-info.java b/server/src/main/java/org/opensearch/indices/replication/copy/package-info.java deleted file mode 100644 index 8d810f88dde00..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/copy/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** Package containing classes to implement a copy of replication */ -package org.opensearch.indices.replication.copy; diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index ff5cf8393cec9..0ce571b80cdc8 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3440,6 +3440,17 @@ public void testCheckpointRefreshListener() throws IOException { closeShards(shard); } + /** + * here we are mocking a SegmentReplicationcheckpointPublisher and testing that when a refresh happens on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List + */ + public void testCheckpointRefreshListenerWithNull() throws IOException { + IndexShard shard = newStartedShard(p -> newShard(null), true); + shard.refresh("test"); + List refreshListeners = shard.getEngine().config().getInternalRefreshListener(); + assertFalse(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener)); + closeShards(shard); + } + /** * creates a new initializing shard. The shard will will be put in its proper path under the * current node id the shard is assigned to. diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 2842c3f260d90..074b5ff613b08 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -23,7 +23,6 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool;