Skip to content

Commit

Permalink
Remote Store refresh/commit fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasvb90 committed Oct 9, 2024
1 parent 621f6c2 commit acdabb8
Show file tree
Hide file tree
Showing 25 changed files with 129 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix

final BlobItemProperties properties = blobItem.getProperties();
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength(), properties.getLastModified().toInstant().toEpochMilli()));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ Map<String, BlobMetadata> listBlobsByPrefix(String path, String prefix) throws I
assert blob.getName().startsWith(path);
if (blob.isDirectory() == false) {
final String suffixName = blob.getName().substring(path.length());
mapBuilder.put(suffixName, new PlainBlobMetadata(suffixName, blob.getSize()));
mapBuilder.put(suffixName, new PlainBlobMetadata(suffixName, blob.getSize(), blob.getUpdateTime()));
}
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public Map<String, BlobMetadata> listBlobsByPrefix(@Nullable final String prefix
Map<String, BlobMetadata> map = new LinkedHashMap<>();
for (FileStatus file : files) {
if (file.isFile()) {
map.put(file.getPath().getName(), new PlainBlobMetadata(file.getPath().getName(), file.getLen()));
map.put(file.getPath().getName(), new PlainBlobMetadata(file.getPath().getName(), file.getLen(), file.getModificationTime()));
}
}
return Collections.unmodifiableMap(map);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ public List<BlobMetadata> listBlobsByPrefixInSortedOrder(String blobNamePrefix,
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
List<BlobMetadata> blobs = executeListing(clientReference, listObjectsRequest(prefix, limit), limit).stream()
.flatMap(listing -> listing.contents().stream())
.map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size()))
.map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size(), s3Object.lastModified().toEpochMilli()))
.collect(Collectors.toList());
return blobs.subList(0, Math.min(limit, blobs.size()));
} catch (final Exception e) {
Expand All @@ -538,7 +538,7 @@ public Map<String, BlobMetadata> listBlobsByPrefix(@Nullable String blobNamePref
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
return executeListing(clientReference, listObjectsRequest(prefix, blobStore)).stream()
.flatMap(listing -> listing.contents().stream())
.map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size()))
.map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size(), s3Object.lastModified().toEpochMilli()))
.collect(Collectors.toMap(PlainBlobMetadata::name, Function.identity()));
} catch (final SdkException e) {
throw new IOException("Exception when listing blobs by prefix [" + prefix + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@

package org.opensearch.shardsplit;

import org.opensearch.OpenSearchParseException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.split.InPlaceShardSplitRequest;
import org.opensearch.action.admin.indices.split.InPlaceShardSplitResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.cat.RestClusterManagerAction;
import org.opensearch.search.SearchHits;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -26,7 +31,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.*;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
Expand Down Expand Up @@ -62,6 +67,11 @@ private void waitForSplit(int numberOfSplits, Set<Integer> childShardIds, int pa
assertEquals(numberOfSplits, startedChildShards);
}, maxWaitTimeMs, TimeUnit.MILLISECONDS);

assertClusterHealth();
logger.info("Shard split completed");
}

private void assertClusterHealth() {
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
Expand All @@ -70,12 +80,14 @@ private void waitForSplit(int numberOfSplits, Set<Integer> childShardIds, int pa
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertThat(clusterHealthResponse, notNullValue());
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
System.out.println("Shard split completed");
assertThat(clusterHealthResponse.status(), equalTo(RestStatus.OK));
assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN));
}

private void verifyAfterSplit(long totalIndexedDocs, Set<String> ids, int parentShardId, Set<Integer> childShardIds) throws InterruptedException {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
ClusterState clusterState = internalCluster().clusterManagerClient().admin().cluster().prepareState().get().getState();
IndexMetadata indexMetadata = clusterState.metadata().index("test");
assertTrue(indexMetadata.isParentShard(parentShardId));
assertEquals(childShardIds, new HashSet<>(indexMetadata.getChildShardIds(parentShardId)));
Expand Down Expand Up @@ -115,7 +127,7 @@ public void testShardSplit() throws Exception {
.put("index.number_of_replicas", 0)).get();
ensureGreen();
int numDocs = scaledRandomIntBetween(200, 2500);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) {
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) {
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
waitForDocs(numDocs, indexer);
logger.info("--> {} docs indexed", numDocs);
Expand All @@ -141,7 +153,7 @@ public void testSplittingShardHavingNonEmptyCommit() throws Exception {
.put("index.number_of_replicas", 0)).get();
ensureGreen();
int numDocs = scaledRandomIntBetween(200, 2500);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) {
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) {
indexer.setIgnoreIndexingFailures(false);
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
waitForDocs(numDocs, indexer);
Expand Down Expand Up @@ -173,7 +185,7 @@ public void testSplittingShardWithNoTranslogReplay() throws Exception {
.put("index.number_of_replicas", 0)).get();
ensureGreen();
int numDocs = scaledRandomIntBetween(200, 2500);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) {
try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) {
indexer.setIgnoreIndexingFailures(false);
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
waitForDocs(numDocs, indexer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public interface RoutingChangesObserver {
*/
void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata);

/**
* Called to determine if shard split has failed in current cluster update.
*/
boolean isSplitOfShardFailed(ShardRouting parentShard);

/**
* Called when started replica is promoted to primary.
*/
Expand Down Expand Up @@ -142,6 +147,11 @@ public void shardFailed(ShardRouting activeShard, UnassignedInfo unassignedInfo)

}

@Override
public boolean isSplitOfShardFailed(ShardRouting parentShard) {
return false;
}

@Override
public void relocationCompleted(ShardRouting removedRelocationSource) {

Expand Down Expand Up @@ -214,6 +224,17 @@ public void splitStarted(ShardRouting startedShard, List<ShardRouting> childSpli
}
}

@Override
public boolean isSplitOfShardFailed(ShardRouting parentShard) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
if (routingChangesObserver.isSplitOfShardFailed(parentShard)) {
return true;
}
}

return false;
}

@Override
public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,8 @@ public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo)

@Override
public boolean isSplitOfShardFailed(ShardRouting parentShard) {
if (failedShard.active() && failedShard.primary() && failedShard.isSplitTarget() == false) {
Updates updates = changes(failedShard.shardId());
if (updates.firstFailedPrimary == null) {
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
updates.firstFailedPrimary = failedShard;
}
increasePrimaryTerm(failedShard.shardId());
}
Updates updates = changes(parentShard.shardId());
return updates.splitFailed;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata) {
setChanged();
}

@Override
public boolean isSplitOfShardFailed(ShardRouting parentShard) {
// Nothing to do here since splitFailed would have already marked changes in case of split failure.
return false;
}

@Override
public void replicaPromoted(ShardRouting replicaShard) {
assert replicaShard.started() && replicaShard.primary() == false : "expected started replica shard " + replicaShard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static Decision canRemainDecision(ShardRouting shardRouting, RoutingNode
// If shardRouting is a started parent shard and fact that it exists is sufficient to conclude
// that it needs to be split.
if (allocation.metadata().getIndexSafe(shardRouting.index()).isParentShard(shardRouting.shardId().id())
&& shardRouting.started()) {
&& shardRouting.started() && allocation.changes().isSplitOfShardFailed(shardRouting) == false) {
return Decision.SPLIT;
}
return Decision.ALWAYS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ default long readBlobPreferredLength() {
*/
enum BlobNameSortOrder {

LEXICOGRAPHIC(Comparator.comparing(BlobMetadata::name));
LEXICOGRAPHIC(Comparator.comparing(BlobMetadata::name)),
CHRONOLOGICAL(Comparator.comparing(BlobMetadata::lastModified)),;

final Comparator<BlobMetadata> comparator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public interface BlobMetadata {
* Gets the size of the blob in bytes.
*/
long length();

/**
* Gets last modified time of blob in epoch millis.
*/
long lastModified();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public String name() {
return delegate.name();
}

@Override
public long lastModified() {
return delegate.lastModified();
}

@Override
public long length() {
U cryptoContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) throws
continue;
}
if (attrs.isRegularFile()) {
builder.put(file.getFileName().toString(), new PlainBlobMetadata(file.getFileName().toString(), attrs.size()));
builder.put(file.getFileName().toString(), new PlainBlobMetadata(file.getFileName().toString(), attrs.size(), attrs.lastModifiedTime().toMillis()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ public class PlainBlobMetadata implements BlobMetadata {

private final long length;

public PlainBlobMetadata(String name, long length) {
private final long lastModified;

public PlainBlobMetadata(String name, long length, long lastModified) {
this.name = name;
this.length = length;
this.lastModified = lastModified;
}

@Override
Expand All @@ -60,6 +63,11 @@ public long length() {
return this.length;
}

@Override
public long lastModified() {
return this.lastModified;
}

@Override
public String toString() {
return "name [" + name + "], length [" + length + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1899,8 +1899,6 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
shouldPeriodicallyFlush
);

logger.info("Refresing in flush");
Thread.sleep(2000);
// we need to refresh in order to clear older version values
refresh("version_table_flush", SearcherScope.INTERNAL, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
Expand Down Expand Up @@ -89,6 +90,7 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh
private final RemoteSegmentTransferTracker segmentTracker;
private final Map<String, String> localSegmentChecksumMap;
private volatile long primaryTerm;
private volatile long commitGen = -1;
private volatile Iterator<TimeValue> backoffDelayIterator;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
private final AtomicBoolean staleCommitDeletionDelayed = new AtomicBoolean();
Expand Down Expand Up @@ -153,7 +155,7 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
boolean successful;
if (shouldSync(didRefresh, false)) {
successful = syncSegments();
successful = syncSegments(false);
} else {
successful = true;
}
Expand Down Expand Up @@ -198,7 +200,7 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
/*
@return false if retry is needed
*/
private boolean syncSegments() {
private boolean syncSegments(boolean firstSyncAfterCommit) {
if (isReadyForUpload() == false) {
// Following check is required to enable retry and make sure that we do not lose this refresh event
// When primary shard is restored from remote store, the recovery happens first followed by changing
Expand All @@ -223,8 +225,19 @@ private boolean syncSegments() {
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles());
}

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = (
firstSyncAfterCommit ?
new GatedCloseable<>(indexShard.store().readLastCommittedSegmentsInfo(), ()->{}) :
indexShard.getSegmentInfosSnapshot()
)
) {
indexShard.store().readLastCommittedSegmentsInfo();
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();

if (firstSyncAfterCommit == false && segmentInfos.getGeneration() != commitGen) {
syncSegments(true);
commitGen = segmentInfos.getGeneration();
}
final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos);
if (checkpoint.getPrimaryTerm() != indexShard.getOperationPrimaryTerm()) {
throw new IllegalStateException(
Expand All @@ -240,7 +253,6 @@ private boolean syncSegments() {
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public Collection<String> listFilesByPrefix(String filenamePrefix) throws IOExce
}

public List<String> listFilesByPrefixInLexicographicOrder(String filenamePrefix, int limit) throws IOException {
return listFilesByPrefixInOrder(filenamePrefix, limit, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
}

public List<String> listFilesByPrefixInOrder(String filenamePrefix, int limit, BlobContainer.BlobNameSortOrder order) throws IOException {
List<String> sortedBlobList = new ArrayList<>();
AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -130,7 +134,7 @@ public void onFailure(Exception e) {
blobContainer.listBlobsByPrefixInSortedOrder(
filenamePrefix,
limit,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC,
order,
actionListener
);
latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.util.Version;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.logging.Loggers;
Expand Down Expand Up @@ -562,9 +563,10 @@ Boolean isLockAcquired(String metadataFile) throws IOException {

// Visible for testing
public String getMetadataFileForCommit(long primaryTerm, long generation) throws IOException {
List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInOrder(
MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, generation),
1
1,
BlobContainer.BlobNameSortOrder.CHRONOLOGICAL
);

if (metadataFiles.isEmpty()) {
Expand Down
Loading

0 comments on commit acdabb8

Please sign in to comment.