Skip to content

Commit

Permalink
Rename RemoteRefreshSegmentPressureService to RemoteStorePressureService
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <sabhumik@amazon.com>
  • Loading branch information
Bhumika Saini committed Aug 14, 2023
1 parent 50e886e commit 46e8338
Show file tree
Hide file tree
Showing 19 changed files with 93 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.IndexService;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardNotFoundException;
Expand All @@ -50,7 +50,7 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct

private final IndicesService indicesService;

private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;
private final RemoteStorePressureService remoteStorePressureService;

@Inject
public TransportRemoteStoreStatsAction(
Expand All @@ -59,7 +59,7 @@ public TransportRemoteStoreStatsAction(
IndicesService indicesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
RemoteStorePressureService remoteStorePressureService
) {
super(
RemoteStoreStatsAction.NAME,
Expand All @@ -71,7 +71,7 @@ public TransportRemoteStoreStatsAction(
ThreadPool.Names.MANAGEMENT
);
this.indicesService = indicesService;
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
this.remoteStorePressureService = remoteStorePressureService;
}

/**
Expand Down Expand Up @@ -153,7 +153,7 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard
throw new ShardNotFoundException(indexShard.shardId());
}

RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(
RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(
indexShard.shardId()
);
assert Objects.nonNull(remoteSegmentTransferTracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -137,7 +137,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
private final SegmentReplicationPressureService segmentReplicationPressureService;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;
private final RemoteStorePressureService remoteStorePressureService;

/**
* This action is used for performing primary term validation. With remote translog enabled, the translogs would
Expand All @@ -161,7 +161,7 @@ public TransportShardBulkAction(
ActionFilters actionFilters,
IndexingPressureService indexingPressureService,
SegmentReplicationPressureService segmentReplicationPressureService,
RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService,
RemoteStorePressureService remoteStorePressureService,
SystemIndices systemIndices
) {
super(
Expand All @@ -183,7 +183,7 @@ public TransportShardBulkAction(
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
this.segmentReplicationPressureService = segmentReplicationPressureService;
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
this.remoteStorePressureService = remoteStorePressureService;

this.transportPrimaryTermValidationAction = ACTION_NAME + "[validate_primary_term]";

Expand Down Expand Up @@ -539,9 +539,8 @@ protected Releasable checkPrimaryLimits(BulkShardRequest request, boolean rerout
}
// TODO - While removing remote store flag, this can be encapsulated to single class with common interface for backpressure
// service
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)
&& remoteRefreshSegmentPressureService.isSegmentsUploadBackpressureEnabled()) {
remoteRefreshSegmentPressureService.validateSegmentsUploadLag(request.shardId());
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE) && remoteStorePressureService.isSegmentsUploadBackpressureEnabled()) {
remoteStorePressureService.validateSegmentsUploadLag(request.shardId());
}
}
return super.checkPrimaryLimits(request, rerouteWasLocal, localRerouteInitiatedByNodeClient);
Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.SearchIndexNameMatcher;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -440,7 +440,7 @@ public synchronized IndexShard createShard(
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
final RemoteStorePressureService remoteStorePressureService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -509,7 +509,7 @@ public synchronized IndexShard createShard(
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
remoteRefreshSegmentPressureService
remoteStorePressureService
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
*
* @opensearch.internal
*/
public class RemoteRefreshSegmentPressureService implements IndexEventListener {
public class RemoteStorePressureService implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(RemoteRefreshSegmentPressureService.class);
private static final Logger logger = LogManager.getLogger(RemoteStorePressureService.class);

/**
* Keeps map of remote-backed index shards and their corresponding backpressure tracker.
Expand All @@ -47,7 +47,7 @@ public class RemoteRefreshSegmentPressureService implements IndexEventListener {
private final List<LagValidator> lagValidators;

@Inject
public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settings settings) {
public RemoteStorePressureService(ClusterService clusterService, Settings settings) {
pressureSettings = new RemoteStorePressureSettings(clusterService, settings, this);
lagValidators = Arrays.asList(
new ConsecutiveFailureValidator(pressureSettings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private static class Defaults {
public RemoteStorePressureSettings(
ClusterService clusterService,
Settings settings,
RemoteRefreshSegmentPressureService remoteUploadPressureService
RemoteStorePressureService remoteUploadPressureService
) {
ClusterSettings clusterSettings = clusterService.getClusterSettings();

Expand Down
15 changes: 7 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.search.stats.ShardSearchStats;
Expand Down Expand Up @@ -333,8 +333,7 @@ Runnable getGlobalCheckpointSyncer() {
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final boolean isTimeSeriesIndex;

private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;
private final RemoteStorePressureService remoteStorePressureService;

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();

Expand Down Expand Up @@ -362,7 +361,7 @@ public IndexShard(
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
final RemoteStorePressureService remoteStorePressureService
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -457,7 +456,7 @@ public boolean shouldCache(Query query) {
this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null)
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
this.remoteStorePressureService = remoteStorePressureService;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -547,8 +546,8 @@ public QueryCachingPolicy getQueryCachingPolicy() {
}

/** Only used for testing **/
protected RemoteRefreshSegmentPressureService getRemoteRefreshSegmentPressureService() {
return remoteRefreshSegmentPressureService;
protected RemoteStorePressureService getRemoteStorePressureService() {
return remoteStorePressureService;
}

@Override
Expand Down Expand Up @@ -3697,7 +3696,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())
remoteStorePressureService.getRemoteRefreshSegmentTracker(shardId())
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.opensearch.index.mapper.SourceFieldMapper;
import org.opensearch.index.mapper.TextFieldMapper;
import org.opensearch.index.mapper.VersionFieldMapper;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.opensearch.index.seqno.RetentionLeaseSyncAction;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
Expand Down Expand Up @@ -289,7 +289,7 @@ protected void configure() {
bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton();
bind(SegmentReplicationPressureService.class).asEagerSingleton();
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) {
bind(RemoteRefreshSegmentPressureService.class).asEagerSingleton();
bind(RemoteStorePressureService.class).asEagerSingleton();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
import org.opensearch.index.query.QueryRewriteContext;
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.seqno.RetentionLeaseStats;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
Expand Down Expand Up @@ -927,7 +927,7 @@ public IndexShard createShard(
final RetentionLeaseSyncer retentionLeaseSyncer,
final DiscoveryNode targetNode,
final DiscoveryNode sourceNode,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
final RemoteStorePressureService remoteStorePressureService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
ensureChangesAllowed();
Expand All @@ -939,7 +939,7 @@ public IndexShard createShard(
globalCheckpointSyncer,
retentionLeaseSyncer,
checkpointPublisher,
remoteRefreshSegmentPressureService
remoteStorePressureService
);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.opensearch.index.IndexComponent;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.seqno.GlobalCheckpointSyncAction;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
Expand Down Expand Up @@ -149,7 +149,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple

private final SegmentReplicationCheckpointPublisher checkpointPublisher;

private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;
private final RemoteStorePressureService remoteStorePressureService;

@Inject
public IndicesClusterStateService(
Expand All @@ -170,7 +170,7 @@ public IndicesClusterStateService(
final GlobalCheckpointSyncAction globalCheckpointSyncAction,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
final RemoteStorePressureService remoteStorePressureService
) {
this(
settings,
Expand All @@ -190,7 +190,7 @@ public IndicesClusterStateService(
primaryReplicaSyncer,
globalCheckpointSyncAction::updateGlobalCheckpointForShard,
retentionLeaseSyncer,
remoteRefreshSegmentPressureService
remoteStorePressureService
);
}

Expand All @@ -213,7 +213,7 @@ public IndicesClusterStateService(
final PrimaryReplicaSyncer primaryReplicaSyncer,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
final RemoteStorePressureService remoteStorePressureService
) {
this.settings = settings;
this.checkpointPublisher = checkpointPublisher;
Expand All @@ -225,7 +225,7 @@ public IndicesClusterStateService(
indexEventListeners.add(segmentReplicationSourceService);
// if remote store feature is not enabled, do not wire the remote upload pressure service as an IndexEventListener.
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) {
indexEventListeners.add(remoteRefreshSegmentPressureService);
indexEventListeners.add(remoteStorePressureService);
}
this.segmentReplicationTargetService = segmentReplicationTargetService;
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
Expand All @@ -240,7 +240,7 @@ public IndicesClusterStateService(
this.globalCheckpointSyncer = globalCheckpointSyncer;
this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer);
this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
this.remoteStorePressureService = remoteStorePressureService;
}

@Override
Expand Down Expand Up @@ -683,7 +683,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
retentionLeaseSyncer,
nodes.getLocalNode(),
sourceNode,
remoteRefreshSegmentPressureService
remoteStorePressureService
);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
Expand Down Expand Up @@ -1042,7 +1042,7 @@ T createShard(
RetentionLeaseSyncer retentionLeaseSyncer,
DiscoveryNode targetNode,
@Nullable DiscoveryNode sourceNode,
RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
RemoteStorePressureService remoteStorePressureService
) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.indices.IndicesService;
Expand All @@ -52,7 +52,7 @@

public class TransportRemoteStoreStatsActionTests extends IndexShardTestCase {
private IndicesService indicesService;
private RemoteRefreshSegmentPressureService pressureService;
private RemoteStorePressureService pressureService;
private IndexMetadata remoteStoreIndexMetadata;
private TransportService transportService;
private ClusterService clusterService;
Expand All @@ -66,7 +66,7 @@ public void setUp() throws Exception {
indicesService = mock(IndicesService.class);
IndexService indexService = mock(IndexService.class);
clusterService = mock(ClusterService.class);
pressureService = mock(RemoteRefreshSegmentPressureService.class);
pressureService = mock(RemoteStorePressureService.class);
MockTransport mockTransport = new MockTransport();
localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT);
remoteStoreIndexMetadata = IndexMetadata.builder(INDEX.getName())
Expand Down
Loading

0 comments on commit 46e8338

Please sign in to comment.