diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java index 37835a5add3d6..6792ff339b5a4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java @@ -23,7 +23,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.index.IndexService; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardNotFoundException; @@ -50,7 +50,7 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct private final IndicesService indicesService; - private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final RemoteStorePressureService remoteStorePressureService; @Inject public TransportRemoteStoreStatsAction( @@ -59,7 +59,7 @@ public TransportRemoteStoreStatsAction( IndicesService indicesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + RemoteStorePressureService remoteStorePressureService ) { super( RemoteStoreStatsAction.NAME, @@ -71,7 +71,7 @@ public TransportRemoteStoreStatsAction( ThreadPool.Names.MANAGEMENT ); this.indicesService = indicesService; - this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.remoteStorePressureService = remoteStorePressureService; } /** @@ -153,7 +153,7 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard throw new ShardNotFoundException(indexShard.shardId()); } - RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker( + RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStorePressureService.getRemoteRefreshSegmentTracker( indexShard.shardId() ); assert Objects.nonNull(remoteSegmentTransferTracker); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index aee8819606e93..a0d973070eda6 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -89,7 +89,7 @@ import org.opensearch.index.mapper.MapperException; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.core.index.shard.ShardId; @@ -137,7 +137,7 @@ public class TransportShardBulkAction extends TransportWriteAction globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -509,7 +509,7 @@ public synchronized IndexShard createShard( translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore, - remoteRefreshSegmentPressureService + remoteStorePressureService ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java similarity index 98% rename from server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java rename to server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java index e10f8ccd2fddf..f2bbb94fd02ee 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java @@ -30,9 +30,9 @@ * * @opensearch.internal */ -public class RemoteRefreshSegmentPressureService implements IndexEventListener { +public class RemoteStorePressureService implements IndexEventListener { - private static final Logger logger = LogManager.getLogger(RemoteRefreshSegmentPressureService.class); + private static final Logger logger = LogManager.getLogger(RemoteStorePressureService.class); /** * Keeps map of remote-backed index shards and their corresponding backpressure tracker. @@ -47,7 +47,7 @@ public class RemoteRefreshSegmentPressureService implements IndexEventListener { private final List lagValidators; @Inject - public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settings settings) { + public RemoteStorePressureService(ClusterService clusterService, Settings settings) { pressureSettings = new RemoteStorePressureSettings(clusterService, settings, this); lagValidators = Arrays.asList( new ConsecutiveFailureValidator(pressureSettings), diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java index d1e5beb6ec495..3f665890d43e9 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java @@ -106,7 +106,7 @@ private static class Defaults { public RemoteStorePressureSettings( ClusterService clusterService, Settings settings, - RemoteRefreshSegmentPressureService remoteUploadPressureService + RemoteStorePressureService remoteUploadPressureService ) { ClusterSettings clusterSettings = clusterService.getClusterSettings(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 440f175aaf46d..7b32b8af3be3e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -144,7 +144,7 @@ import org.opensearch.index.merge.MergeStats; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.search.stats.ShardSearchStats; @@ -333,8 +333,7 @@ Runnable getGlobalCheckpointSyncer() { private final Store remoteStore; private final BiFunction translogFactorySupplier; private final boolean isTimeSeriesIndex; - - private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final RemoteStorePressureService remoteStorePressureService; private final List internalRefreshListener = new ArrayList<>(); @@ -362,7 +361,7 @@ public IndexShard( final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -457,7 +456,7 @@ public boolean shouldCache(Query query) { this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null) ? false : mapperService.documentMapper().mappers().containsTimeStampField(); - this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.remoteStorePressureService = remoteStorePressureService; } public ThreadPool getThreadPool() { @@ -547,8 +546,8 @@ public QueryCachingPolicy getQueryCachingPolicy() { } /** Only used for testing **/ - protected RemoteRefreshSegmentPressureService getRemoteRefreshSegmentPressureService() { - return remoteRefreshSegmentPressureService; + protected RemoteStorePressureService getRemoteStorePressureService() { + return remoteStorePressureService; } @Override @@ -3697,7 +3696,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro this, // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()) + remoteStorePressureService.getRemoteRefreshSegmentTracker(shardId()) ) ); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 9d2eef5f67a86..a10c93d9c1580 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -70,7 +70,7 @@ import org.opensearch.index.mapper.SourceFieldMapper; import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.index.mapper.VersionFieldMapper; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -289,7 +289,7 @@ protected void configure() { bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); bind(SegmentReplicationPressureService.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { - bind(RemoteRefreshSegmentPressureService.class).asEagerSingleton(); + bind(RemoteStorePressureService.class).asEagerSingleton(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index f1cd0d1d1f291..e7c1502191aee 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -119,7 +119,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.RetentionLeaseStats; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -927,7 +927,7 @@ public IndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); @@ -939,7 +939,7 @@ public IndexShard createShard( globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher, - remoteRefreshSegmentPressureService + remoteStorePressureService ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index a527ae09d7666..2daae9e7d81ea 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -65,7 +65,7 @@ import org.opensearch.index.IndexComponent; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -149,7 +149,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final RemoteStorePressureService remoteStorePressureService; @Inject public IndicesClusterStateService( @@ -170,7 +170,7 @@ public IndicesClusterStateService( final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) { this( settings, @@ -190,7 +190,7 @@ public IndicesClusterStateService( primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, retentionLeaseSyncer, - remoteRefreshSegmentPressureService + remoteStorePressureService ); } @@ -213,7 +213,7 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -225,7 +225,7 @@ public IndicesClusterStateService( indexEventListeners.add(segmentReplicationSourceService); // if remote store feature is not enabled, do not wire the remote upload pressure service as an IndexEventListener. if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { - indexEventListeners.add(remoteRefreshSegmentPressureService); + indexEventListeners.add(remoteStorePressureService); } this.segmentReplicationTargetService = segmentReplicationTargetService; this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); @@ -240,7 +240,7 @@ public IndicesClusterStateService( this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); - this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.remoteStorePressureService = remoteStorePressureService; } @Override @@ -683,7 +683,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR retentionLeaseSyncer, nodes.getLocalNode(), sourceNode, - remoteRefreshSegmentPressureService + remoteStorePressureService ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1042,7 +1042,7 @@ T createShard( RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + RemoteStorePressureService remoteStorePressureService ) throws IOException; /** diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java index aa3e7ab1fb2c7..8b1f838732f8f 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java @@ -28,7 +28,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.IndicesService; @@ -52,7 +52,7 @@ public class TransportRemoteStoreStatsActionTests extends IndexShardTestCase { private IndicesService indicesService; - private RemoteRefreshSegmentPressureService pressureService; + private RemoteStorePressureService pressureService; private IndexMetadata remoteStoreIndexMetadata; private TransportService transportService; private ClusterService clusterService; @@ -66,7 +66,7 @@ public void setUp() throws Exception { indicesService = mock(IndicesService.class); IndexService indexService = mock(IndexService.class); clusterService = mock(ClusterService.class); - pressureService = mock(RemoteRefreshSegmentPressureService.class); + pressureService = mock(RemoteStorePressureService.class); MockTransport mockTransport = new MockTransport(); localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); remoteStoreIndexMetadata = IndexMetadata.builder(INDEX.getName()) diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 73bf93bcf5a9c..3706f8c3848c5 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -77,7 +77,7 @@ import org.opensearch.index.mapper.Mapping; import org.opensearch.index.mapper.MetadataFieldMapper; import org.opensearch.index.mapper.RootObjectMapper; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -1073,7 +1073,7 @@ public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1104,7 +1104,7 @@ public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1135,7 +1135,7 @@ public void testHandlePrimaryTermValidationRequestSuccess() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1177,7 +1177,7 @@ private TransportShardBulkAction createAction() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java index f3ce27c9f31fa..94934d5b4dca6 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java @@ -48,7 +48,7 @@ public void setUp() throws Exception { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, mock(RemoteRefreshSegmentPressureService.class)); + pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, mock(RemoteStorePressureService.class)); shardId = new ShardId("index", "uuid", 0); directoryFileTransferTracker = new DirectoryFileTransferTracker(); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java similarity index 91% rename from server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java rename to server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java index c7d42d8366617..d79e5ae99b696 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java @@ -31,7 +31,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class RemoteRefreshSegmentPressureServiceTests extends OpenSearchTestCase { +public class RemoteStorePressureServiceTests extends OpenSearchTestCase { private ClusterService clusterService; @@ -39,7 +39,7 @@ public class RemoteRefreshSegmentPressureServiceTests extends OpenSearchTestCase private ShardId shardId; - private RemoteRefreshSegmentPressureService pressureService; + private RemoteStorePressureService pressureService; @Override public void setUp() throws Exception { @@ -60,7 +60,7 @@ public void tearDown() throws Exception { } public void testIsSegmentsUploadBackpressureEnabled() { - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); assertFalse(pressureService.isSegmentsUploadBackpressureEnabled()); Settings newSettings = Settings.builder() @@ -73,21 +73,21 @@ public void testIsSegmentsUploadBackpressureEnabled() { public void testAfterIndexShardCreatedForRemoteBackedIndex() { IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNotNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); } public void testAfterIndexShardCreatedForNonRemoteBackedIndex() { IndexShard indexShard = createIndexShard(shardId, false); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); } public void testAfterIndexShardClosed() { IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNotNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); @@ -98,7 +98,7 @@ public void testAfterIndexShardClosed() { public void testValidateSegmentUploadLag() { // Create the pressure tracker IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); RemoteSegmentTransferTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java index fc323bfa7230a..9c5ec69cf6be9 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java @@ -48,7 +48,7 @@ public void testGetDefaultSettings() { RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, Settings.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); // Check remote refresh segment pressure enabled is false @@ -86,7 +86,7 @@ public void testGetConfiguredSettings() { RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, settings, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); // Check remote refresh segment pressure enabled is true @@ -115,7 +115,7 @@ public void testUpdateAfterGetDefaultSettings() { RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, Settings.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); Settings newSettings = Settings.builder() @@ -164,7 +164,7 @@ public void testUpdateAfterGetConfiguredSettings() { RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, settings, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); Settings newSettings = Settings.builder() @@ -208,7 +208,7 @@ public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { AtomicInteger updatedUploadBytesPerSecWindowSize = new AtomicInteger(); AtomicInteger updatedUploadTimeWindowSize = new AtomicInteger(); - RemoteRefreshSegmentPressureService pressureService = mock(RemoteRefreshSegmentPressureService.class); + RemoteStorePressureService pressureService = mock(RemoteStorePressureService.class); // Upload bytes doAnswer(invocation -> { diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 73bfc12bffb8d..0386eed2a6597 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -89,7 +89,6 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.Assertions; -import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -1822,7 +1821,7 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) .build() ); - RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteRefreshSegmentPressureService() + RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteStorePressureService() .getRemoteRefreshSegmentTracker(shard.shardId); populateSampleRemoteStoreStats(remoteRefreshSegmentTracker); ShardStats shardStats = new ShardStats( diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 2b2b1d744d061..549fca5d81d13 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -27,7 +27,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngineFactory; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.store.RemoteDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; @@ -60,7 +60,7 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; private ClusterService clusterService; private RemoteStoreRefreshListener remoteStoreRefreshListener; - private RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private RemoteStorePressureService remoteStorePressureService; public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard = newStartedShard( @@ -84,9 +84,9 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); - remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); - RemoteSegmentTransferTracker tracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + remoteStorePressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); + remoteStorePressureService.afterIndexShardCreated(indexShard); + RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker); } @@ -317,14 +317,14 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 0); } @@ -338,14 +338,14 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 1); } @@ -384,14 +384,14 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 2); } @@ -406,9 +406,9 @@ private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segme } public void testTrackerData() throws Exception { - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); RemoteStoreRefreshListener listener = tuple.v1(); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteSegmentTransferTracker tracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLag(tracker); indexDocs(100, randomIntBetween(100, 200)); @@ -431,13 +431,12 @@ private void assertNoLag(RemoteSegmentTransferTracker tracker) { assertEquals(0, tracker.getTotalUploadsFailed()); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( - int succeedOnAttempt - ) throws IOException { + private Tuple mockIndexShardWithRetryAndScheduleRefresh(int succeedOnAttempt) + throws IOException { return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, null, null); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch @@ -446,7 +445,7 @@ private Tuple m return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch, @@ -533,17 +532,14 @@ private Tuple m new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService( - clusterService, - Settings.EMPTY - ); + RemoteStorePressureService remoteStorePressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); when(shard.shardId()).thenReturn(indexShard.shardId()); - remoteRefreshSegmentPressureService.afterIndexShardCreated(shard); - RemoteSegmentTransferTracker tracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + remoteStorePressureService.afterIndexShardCreated(shard); + RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); refreshListener.afterRefresh(true); - return Tuple.tuple(refreshListener, remoteRefreshSegmentPressureService); + return Tuple.tuple(refreshListener, remoteStorePressureService); } public static class TestFilterDirectory extends FilterDirectory { diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 3cc374fa1bfbf..f0db8880dd25b 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -46,7 +46,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -264,7 +264,7 @@ public MockIndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index c3784028d5ba8..106fe9098562f 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -177,7 +177,7 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; @@ -2125,7 +2125,7 @@ public void onFailure(final Exception e) { ), RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); final SystemIndices systemIndices = new SystemIndices(emptyMap()); @@ -2176,7 +2176,7 @@ public void onFailure(final Exception e) { mock(ShardStateAction.class), mock(ThreadPool.class) ), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), new SystemIndices(emptyMap()) ); actions.put( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index a880aa25a4b4e..001421af03d46 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -98,7 +98,7 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -639,7 +639,7 @@ protected IndexShard newShard( clusterSettings ); Store remoteStore = null; - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = null; + RemoteStorePressureService remoteStorePressureService = null; RepositoriesService mockRepoSvc = mock(RepositoriesService.class); if (indexSettings.isRemoteStoreEnabled()) { @@ -654,7 +654,7 @@ protected IndexShard newShard( remoteStore = createRemoteStore(remotePath, routing, indexMetadata); - remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, indexSettings.getSettings()); + remoteStorePressureService = new RemoteStorePressureService(clusterService, indexSettings.getSettings()); BlobStoreRepository repo = createRepository(remotePath); when(mockRepoSvc.repository(any())).thenAnswer(invocationOnMock -> repo); } @@ -694,11 +694,11 @@ protected IndexShard newShard( translogFactorySupplier, checkpointPublisher, remoteStore, - remoteRefreshSegmentPressureService + remoteStorePressureService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); - if (remoteRefreshSegmentPressureService != null) { - remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + if (remoteStorePressureService != null) { + remoteStorePressureService.afterIndexShardCreated(indexShard); } success = true; } finally {