Skip to content

Commit

Permalink
Merge branch 'main' into issue-4269
Browse files Browse the repository at this point in the history
  • Loading branch information
dreamer-89 committed Sep 13, 2022
2 parents 43d181d + 763a89f commit eecf977
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))

### Deprecated

Expand All @@ -59,6 +60,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Bug]: gradle check failing with java heap OutOfMemoryError (([#4328](https://github.com/opensearch-project/OpenSearch/
- `opensearch.bat` fails to execute when install path includes spaces ([#4362](https://github.com/opensearch-project/OpenSearch/pull/4362))
- Getting security exception due to access denied 'java.lang.RuntimePermission' 'accessDeclaredMembers' when trying to get snapshot with S3 IRSA ([#4469](https://github.com/opensearch-project/OpenSearch/pull/4469))
- Fixed flaky test `ResourceAwareTasksTests.testTaskIdPersistsInThreadContext` ([#4484](https://github.com/opensearch-project/OpenSearch/pull/4484))

### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,17 @@ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo,
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true, accurateCount);
}

/**
* Creates a new history snapshot from the translog instead of the lucene index. Required for cross cluster replication.
* Use the recommended {@link #getHistoryOperations(String, long, long, boolean)} method for other cases.
* This method should only be invoked if Segment Replication or Remote Store is not enabled.
*/
public Translog.Snapshot getHistoryOperationsFromTranslog(long startingSeqNo, long endSeqNo) throws IOException {
assert (indexSettings.isSegRepEnabled() || indexSettings.isRemoteStoreEnabled()) == false
: "unsupported operation for segment replication enabled indices or remote store backed indices";
return getEngine().translogManager().newChangesSnapshot(startingSeqNo, endSeqNo, true);
}

/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public void rollTranslogGeneration() throws TranslogException {
}
}

@Override
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return translog.newSnapshot(fromSeqNo, toSeqNo, requiredFullRange);
}

/**
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,9 @@ public Translog.Operation readOperation(Translog.Location location) throws IOExc
public Translog.Location add(Translog.Operation operation) throws IOException {
return new Translog.Location(0, 0, 0);
}

@Override
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public interface TranslogManager {
*/
int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) throws IOException;

/**
* Creates a new history snapshot from the translog file instead of the lucene index.
*/
Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException;

/**
* Checks if the underlying storage sync is required.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, lo
public void skipTranslogRecovery() {
// Do nothing.
}

@Override
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -616,6 +617,10 @@ public void onFailure(Exception e) {

taskTestContext.requestCompleteLatch.await();

// It is possible for the MockTaskManagerListener to be called after the response is sent already.
// Wait enough time for taskId to be added to taskIdsRemovedFromThreadContext before performing validations.
waitUntil(() -> taskIdsAddedToThreadContext.size() == taskIdsRemovedFromThreadContext.size(), 5, TimeUnit.SECONDS);

assertEquals(expectedTaskIdInThreadContext.get(), actualTaskIdInThreadContext.get());
assertThat(taskIdsAddedToThreadContext, containsInAnyOrder(taskIdsRemovedFromThreadContext.toArray()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,66 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception
closeShards(replica);
}

public void testGetChangesSnapshotThrowsAssertForSegRep() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT.toString())
.build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
}

public void testGetChangesSnapshotThrowsAssertForRemoteStore() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
}

public void testGlobalCheckpointSync() throws IOException {
// create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked
final ShardId shardId = new ShardId("index", "_na_", 0);
Expand Down

0 comments on commit eecf977

Please sign in to comment.