Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make number of segment metadata files in remote segment store configurable #11329

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Add back half_float BKD based sort query optimization ([#11024](https://github.com/opensearch-project/OpenSearch/pull/11024))
- Make number of segment metadata files in remote segment store configurable ([#11329](https://github.com/opensearch-project/OpenSearch/pull/11329))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.opensearch.test.transport.MockTransportService;
import org.hamcrest.MatcherAssert;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -42,10 +44,10 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand Down Expand Up @@ -167,7 +169,7 @@ public void testRemoteTranslogCleanup() throws Exception {
}

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
internalCluster().startNode();
String dataNode = internalCluster().startNode();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
Expand All @@ -177,17 +179,20 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");

IndexShard indexShard = getIndexShard(dataNode);
int lastNMetadataFilesToKeep = indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles();
// Delete is async.
assertBusy(() -> {
int actualFileCount = getFileCount(indexPath);
if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) {
if (numberOfIterations <= lastNMetadataFilesToKeep) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1))
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
}
}, 30, TimeUnit.SECONDS);
Expand All @@ -209,6 +214,44 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
}

public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING.getKey(), "3");
internalCluster().startNode(settings);

createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
}

public void testStaleCommitDeletionWithMinSegmentFiles_0() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING.getKey(), "0");
internalCluster().startNode(settings);

createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(12, 18);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1)));
}

/**
* Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster
* default.
Expand Down Expand Up @@ -532,4 +575,50 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte
indexShard = getIndexShard(replicaShardNode);
assertFalse(indexShard.isSearchIdleSupported());
}

public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
internalCluster().startClusterManagerOnlyNode();
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);

// 1. Create index with 0 replica
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

// 2. Index docs
indexBulk(INDEX_NAME, 50);
flushAndRefresh(INDEX_NAME);

// 3. Delete data from remote segment store
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path segmentDataPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/data");

try (Stream<Path> files = Files.list(segmentDataPath)) {
files.forEach(p -> {
try {
Files.delete(p);
} catch (IOException e) {
// Ignore
}
});
}

// 4. Start recovery by changing number of replicas to 1
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// 5. Ensure green and verify number of docs
ensureGreen(INDEX_NAME);
assertBusy(() -> {
assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ Runnable getGlobalCheckpointSyncer() {

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
private final RemoteStoreFileDownloader fileDownloader;
private final RecoverySettings recoverySettings;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -469,6 +470,7 @@ public boolean shouldCache(Query query) {
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
this.recoverySettings = recoverySettings;
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
}

Expand Down Expand Up @@ -567,6 +569,10 @@ public String getNodeId() {
return translogConfig.getNodeId();
}

public RecoverySettings getRecoverySettings() {
return recoverySettings;
}

public RemoteStoreFileDownloader getFileDownloader() {
return fileDownloader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL
);

public static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
public static final int LAST_N_METADATA_FILES_TO_KEEP = 10;

private final IndexShard indexShard;
private final Directory storeDirectory;
Expand Down Expand Up @@ -205,7 +203,7 @@ private boolean syncSegments() {
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
if (isRefreshAfterCommit()) {
remoteDirectory.deleteStaleSegmentsAsync(LAST_N_METADATA_FILES_TO_KEEP);
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles());
}

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,10 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
* @throws IOException in case of I/O error while reading from / writing to remote segment store
*/
public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
if (lastNMetadataFilesToKeep <= 0) {
logger.info("Stale segment deletion is disabled if cluster.remote_store.index.min.segment-metadata is set to < 1");
return;
}
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
List<String> sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,14 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime);
try {
indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime);
} catch (Exception e) {
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
logger.error(
"Exception while downloading segment files from remote store, will continue with peer to peer segment copy",
e
);
}
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ public class RecoverySettings {
Property.NodeScope
);

/**
* Controls minimum number of metadata files to keep in remote segment store.
* {@code value < 1} will disable deletion of stale segment metadata files.
*/
public static final Setting<Integer> CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING = Setting.intSetting(
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
"cluster.remote_store.index.min.segment-metadata",
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
10,
Property.NodeScope,
Property.Dynamic
);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

Expand All @@ -171,6 +182,7 @@ public class RecoverySettings {
private volatile TimeValue internalActionTimeout;
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;
private volatile int minRemoteSegmentMetadataFiles;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;

Expand Down Expand Up @@ -212,6 +224,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this::setInternalActionLongTimeout
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
minRemoteSegmentMetadataFiles = CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING,
this::setMinRemoteSegmentMetadataFiles
);
}

public RateLimiter rateLimiter() {
Expand Down Expand Up @@ -307,4 +324,12 @@ public int getMaxConcurrentRemoteStoreStreams() {
private void setMaxConcurrentRemoteStoreStreams(int maxConcurrentRemoteStoreStreams) {
this.maxConcurrentRemoteStoreStreams = maxConcurrentRemoteStoreStreams;
}

private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles) {
this.minRemoteSegmentMetadataFiles = minRemoteSegmentMetadataFiles;
}

public int getMinRemoteSegmentMetadataFiles() {
return this.minRemoteSegmentMetadataFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -237,7 +238,7 @@ public void testAfterMultipleCommits() throws IOException {
setup(true, 3);
assertDocs(indexShard, "1", "2", "3");

for (int i = 0; i < RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP + 3; i++) {
for (int i = 0; i < indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles() + 3; i++) {
indexDocs(4 * (i + 1), 4);
flushShard(indexShard);
}
Expand Down Expand Up @@ -550,6 +551,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = indexShard.getRemoteStoreStatsTrackerFactory();
when(shard.indexSettings()).thenReturn(indexShard.indexSettings());
when(shard.shardId()).thenReturn(indexShard.shardId());
RecoverySettings recoverySettings = mock(RecoverySettings.class);
when(recoverySettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10);
when(shard.getRecoverySettings()).thenReturn(recoverySettings);
RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker);
refreshListener.afterRefresh(true);
Expand Down
Loading