Skip to content

Commit

Permalink
Move timestamp pinning settings to RemoteStoreSettings (#15294) (#15548)
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale committed Aug 31, 2024
1 parent 10b955d commit a58d09b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testTimestampPinUnpin() throws Exception {
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener);

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_2 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
Expand All @@ -63,7 +63,7 @@ public void testTimestampPinUnpin() throws Exception {
assertEquals(Set.of(timestamp1, timestamp2, timestamp3), pinnedTimestampWithFetchTimestamp_2.v2());
});

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// This should be a no-op as pinning entity is different
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener);
Expand All @@ -72,7 +72,7 @@ public void testTimestampPinUnpin() throws Exception {
// Adding different entity to already pinned timestamp
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
Expand All @@ -81,6 +81,6 @@ public void testTimestampPinUnpin() throws Exception {
assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2());
});

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@
import org.opensearch.node.Node.DiscoverySettings;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.decider.EnableAssignmentDecider;
Expand Down Expand Up @@ -763,7 +762,9 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL,

SystemTemplatesService.SETTING_APPLICATION_BASED_CONFIGURATION_TEMPLATES_ENABLED,

// WorkloadManagement settings
Expand All @@ -772,7 +773,7 @@ public void apply(Settings value, Settings current, Settings previous) {
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,

RemoteStorePinnedTimestampService.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

// Composite index settings
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,27 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls pinned timestamp scheduler interval
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL = Setting.timeSetting(
"cluster.remote_store.pinned_timestamps.scheduler_interval",
TimeValue.timeValueMinutes(3),
TimeValue.timeValueMinutes(1),
Setting.Property.NodeScope
);

/**
* Controls allowed timestamp values to be pinned from past
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL = Setting.timeSetting(
"cluster.remote_store.pinned_timestamps.lookback_interval",
TimeValue.timeValueMinutes(1),
TimeValue.timeValueMinutes(1),
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
Expand All @@ -142,6 +163,8 @@ public class RemoteStoreSettings {
private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm;
private volatile int maxRemoteTranslogReaders;
private volatile boolean isTranslogMetadataEnabled;
private static volatile TimeValue pinnedTimestampsSchedulerInterval;
private static volatile TimeValue pinnedTimestampsLookbackInterval;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -179,6 +202,9 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING,
this::setClusterRemoteSegmentTransferTimeout
);

pinnedTimestampsSchedulerInterval = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.get(settings);
pinnedTimestampsLookbackInterval = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL.get(settings);
}

public TimeValue getClusterRemoteTranslogBufferInterval() {
Expand Down Expand Up @@ -246,4 +272,12 @@ public int getMaxRemoteTranslogReaders() {
private void setMaxRemoteTranslogReaders(int maxRemoteTranslogReaders) {
this.maxRemoteTranslogReaders = maxRemoteTranslogReaders;
}

public static TimeValue getPinnedTimestampsSchedulerInterval() {
return pinnedTimestampsSchedulerInterval;
}

public static TimeValue getPinnedTimestampsLookbackInterval() {
return pinnedTimestampsLookbackInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
Expand All @@ -24,6 +23,7 @@
import org.opensearch.gateway.remote.model.RemotePinnedTimestamps.PinnedTimestamps;
import org.opensearch.gateway.remote.model.RemoteStorePinnedTimestampsBlobStore;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -61,30 +61,8 @@ public class RemoteStorePinnedTimestampService implements Closeable {
private BlobStoreTransferService blobStoreTransferService;
private RemoteStorePinnedTimestampsBlobStore pinnedTimestampsBlobStore;
private AsyncUpdatePinnedTimestampTask asyncUpdatePinnedTimestampTask;
private volatile TimeValue pinnedTimestampsSchedulerInterval;
private final Semaphore updateTimetampPinningSemaphore = new Semaphore(1);

/**
* Controls pinned timestamp scheduler interval
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL = Setting.timeSetting(
"cluster.remote_store.pinned_timestamps.scheduler_interval",
TimeValue.timeValueMinutes(3),
TimeValue.timeValueMinutes(1),
Setting.Property.NodeScope
);

/**
* Controls allowed timestamp values to be pinned from past
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL = Setting.timeSetting(
"cluster.remote_store.pinned_timestamps.lookback_interval",
TimeValue.timeValueMinutes(1),
TimeValue.timeValueMinutes(1),
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope
);

public RemoteStorePinnedTimestampService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
Expand All @@ -95,8 +73,6 @@ public RemoteStorePinnedTimestampService(
this.settings = settings;
this.threadPool = threadPool;
this.clusterService = clusterService;

pinnedTimestampsSchedulerInterval = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.get(settings);
}

/**
Expand All @@ -107,7 +83,7 @@ public RemoteStorePinnedTimestampService(
public void start() {
validateRemoteStoreConfiguration();
initializeComponents();
startAsyncUpdateTask();
startAsyncUpdateTask(RemoteStoreSettings.getPinnedTimestampsSchedulerInterval());
}

private void validateRemoteStoreConfiguration() {
Expand All @@ -132,7 +108,7 @@ private void initializeComponents() {
);
}

private void startAsyncUpdateTask() {
private void startAsyncUpdateTask(TimeValue pinnedTimestampsSchedulerInterval) {
asyncUpdatePinnedTimestampTask = new AsyncUpdatePinnedTimestampTask(logger, threadPool, pinnedTimestampsSchedulerInterval, true);
}

Expand All @@ -147,8 +123,8 @@ private void startAsyncUpdateTask() {
public void pinTimestamp(long timestamp, String pinningEntity, ActionListener<Void> listener) {
// If a caller uses current system time to pin the timestamp, following check will almost always fail.
// So, we allow pinning timestamp in the past upto some buffer
long lookbackIntervalInMills = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL.get(settings).millis();
if (timestamp < TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - lookbackIntervalInMills) {
long lookbackIntervalInMills = RemoteStoreSettings.getPinnedTimestampsLookbackInterval().millis();
if (timestamp < (System.currentTimeMillis() - lookbackIntervalInMills)) {
throw new IllegalArgumentException(
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
);
Expand Down Expand Up @@ -256,17 +232,12 @@ public void close() throws IOException {
asyncUpdatePinnedTimestampTask.close();
}

// Visible for testing
public void setPinnedTimestampsSchedulerInterval(TimeValue pinnedTimestampsSchedulerInterval) {
this.pinnedTimestampsSchedulerInterval = pinnedTimestampsSchedulerInterval;
rescheduleAsyncUpdatePinnedTimestampTask();
}

private void rescheduleAsyncUpdatePinnedTimestampTask() {
// Used in integ tests
public void rescheduleAsyncUpdatePinnedTimestampTask(TimeValue pinnedTimestampsSchedulerInterval) {
if (pinnedTimestampsSchedulerInterval != null) {
pinnedTimestampsSet = new Tuple<>(-1L, Set.of());
asyncUpdatePinnedTimestampTask.close();
startAsyncUpdateTask();
startAsyncUpdateTask(pinnedTimestampsSchedulerInterval);
}
}

Expand Down

0 comments on commit a58d09b

Please sign in to comment.