Skip to content

Commit

Permalink
[Remote Store] Add capability of doing flush as determined by the tra…
Browse files Browse the repository at this point in the history
…nslog (#12992)

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>
  • Loading branch information
astute-decipher authored Apr 25, 2024
1 parent c055a3d commit 9e9ab6b
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.Translog.Durability;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
Expand Down Expand Up @@ -63,6 +64,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.index.shard.IndexShardTestCase.getTranslog;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -859,4 +861,45 @@ public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
refresh(INDEX_NAME);
assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs + 15);
}

public void testFlushOnTooManyRemoteTranslogFiles() throws Exception {
internalCluster().startClusterManagerOnlyNode();
String datanode = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

IndexShard indexShard = getIndexShard(datanode, INDEX_NAME);
Path translogLocation = getTranslog(indexShard).location();
assertFalse(indexShard.shouldPeriodicallyFlush());

try (Stream<Path> files = Files.list(translogLocation)) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertEquals(totalFiles, 1L);
}

// indexing 100 documents (100 bulk requests), no flush will be triggered yet
for (int i = 0; i < 100; i++) {
indexBulk(INDEX_NAME, 1);
}

try (Stream<Path> files = Files.list(translogLocation)) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertEquals(totalFiles, 101L);
}
// Will flush and trim the translog readers
indexBulk(INDEX_NAME, 1);

assertBusy(() -> {
try (Stream<Path> files = Files.list(translogLocation)) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertEquals(totalFiles, 1L);
}
}, 30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2917,7 +2917,7 @@ public void restoreFromRepository(Repository repository, ActionListener<Boolean>
*
* @return {@code true} if the engine should be flushed
*/
boolean shouldPeriodicallyFlush() {
public boolean shouldPeriodicallyFlush() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
Expand Down Expand Up @@ -4493,6 +4493,7 @@ public Durability getTranslogDurability() {
/**
* Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be
* executed asynchronously on the flush thread pool.
* Can also schedule a flush if decided by translog manager
*/
public void afterWriteOperation() {
if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,12 @@ public String getTranslogUUID() {
* @return if the translog should be flushed
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
/*
* This can trigger flush depending upon translog's implementation
*/
if (translog.shouldFlush()) {
return true;
}
// This is the minimum seqNo that is referred in translog and considered for calculating translog size
long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1);
final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,4 +675,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi
int availablePermits() {
return syncPermit.availablePermits();
}

/**
* Checks whether or not the shard should be flushed based on translog files.
* This checks if number of translog files breaches the threshold count determined by
* {@code cluster.remote_store.translog.max_readers} setting
* @return {@code true} if the shard should be flushed
*/
@Override
protected boolean shouldFlush() {
return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2082,4 +2082,13 @@ public static String createEmptyTranslog(
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minUnrefCheckpointInLastCommit;
}

/**
* Checks whether or not the shard should be flushed based on translog files.
* each translog type can have it's own decider
* @return {@code true} if the shard should be flushed
*/
protected boolean shouldFlush() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -585,4 +585,8 @@ public void onFailure(Exception e) {
throw e;
}
}

public int getMaxRemoteTranslogReadersSettings() {
return this.remoteStoreSettings.getMaxRemoteTranslogReaders();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,23 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls the maximum referenced remote translog files. If breached the shard will be flushed.
*/
public static final Setting<Integer> CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting(
"cluster.remote_store.translog.max_readers",
1000,
100,
Property.Dynamic,
Property.NodeScope
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
private volatile RemoteStoreEnums.PathType pathType;
private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm;
private volatile int maxRemoteTranslogReaders;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand All @@ -124,6 +136,9 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {

pathHashAlgorithm = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, this::setPathHashAlgorithm);

maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders);
}

public TimeValue getClusterRemoteTranslogBufferInterval() {
Expand Down Expand Up @@ -167,4 +182,12 @@ private void setPathType(RemoteStoreEnums.PathType pathType) {
private void setPathHashAlgorithm(RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm) {
this.pathHashAlgorithm = pathHashAlgorithm;
}

public int getMaxRemoteTranslogReaders() {
return maxRemoteTranslogReaders;
}

private void setMaxRemoteTranslogReaders(int maxRemoteTranslogReaders) {
this.maxRemoteTranslogReaders = maxRemoteTranslogReaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,24 @@ public void testClusterRemoteTranslogTransferTimeout() {
);
assertEquals(TimeValue.timeValueSeconds(40), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout());
}

public void testMaxRemoteReferencedTranslogFiles() {
// Test default value
assertEquals(1000, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with valid value
clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "500").build()
);
assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with value less than minimum
assertThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "99").build()
)
);
assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders());
}
}

0 comments on commit 9e9ab6b

Please sign in to comment.