Skip to content

Commit

Permalink
Adding test for null checkpoint publisher and addreesing PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed May 23, 2022
1 parent bd17e4f commit 6c75f6d
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 19 deletions.
4 changes: 1 addition & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,7 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary()
? checkpointPublisher
: SegmentReplicationCheckpointPublisher.EMPTY
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -324,7 +324,7 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
final SegmentReplicationCheckpointPublisher checkpointPublisher
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -407,7 +407,7 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
if (indexSettings.isSegRepEnabled() == true) {
if (checkpointPublisher != null) {
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
} else {
this.checkpointRefreshListener = null;
Expand Down Expand Up @@ -1372,10 +1372,16 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}
}

/**
* Returns the lastest Replication Checkpoint that shard received
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
}

/**
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
*/
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
assert shardRouting.primary() == false;
// TODO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.action.support.replication.ReplicationRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.indices.replication.copy;
package org.opensearch.indices.replication.checkpoint;

import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3440,6 +3440,17 @@ public void testCheckpointRefreshListener() throws IOException {
closeShards(shard);
}

/**
* here we are mocking a SegmentReplicationcheckpointPublisher and testing that when a refresh happens on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List
*/
public void testCheckpointRefreshListenerWithNull() throws IOException {
IndexShard shard = newStartedShard(p -> newShard(null), true);
shard.refresh("test");
List<ReferenceManager.RefreshListener> refreshListeners = shard.getEngine().config().getInternalRefreshListener();
assertFalse(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener));
closeShards(shard);
}

/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* current node id the shard is assigned to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.TestThreadPool;
Expand Down

0 comments on commit 6c75f6d

Please sign in to comment.