Skip to content

Commit

Permalink
Fix flaky test testDropPrimaryDuringReplication. (opensearch-project#…
Browse files Browse the repository at this point in the history
…8715)

This change fixes testDropPrimaryDuringReplication and an edge case where after an updateSegments call on
NRTReplicationReaderManager the reader is not actually refreshed because of another concurrent refresh call.
Fixes by using a blocking refresh and removes unnecessary synchronization around the updatesegments method to avoid
a deadlock case where the concurrent refresh picks up the new segments but is unable to acquire the object monitor to refresh internally in ReferenceManager.swapReference.

Signed-off-by: Marc Handalian <handalm@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
mch2 authored and shiv0408 committed Apr 25, 2024
1 parent 8f6459b commit dc9aa72
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
* @param infos {@link SegmentInfos} infos
* @throws IOException - When Refresh fails with an IOException.
*/
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
public void updateSegments(SegmentInfos infos) throws IOException {
// roll over the currentInfo's generation, this ensures the on-disk gen
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefresh();
maybeRefreshBlocking();
}

public SegmentInfos getSegmentInfos() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.engine;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.util.Version;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.index.store.Store;

import java.io.IOException;

public class NRTReplicationReaderManagerTests extends EngineTestCase {

public void testCreateNRTreaderManager() throws IOException {
try (final Store store = createStore()) {
store.createEmpty(Version.LATEST);
final DirectoryReader reader = DirectoryReader.open(store.directory());
final SegmentInfos initialInfos = ((StandardDirectoryReader) reader).getSegmentInfos();
NRTReplicationReaderManager readerManager = new NRTReplicationReaderManager(
OpenSearchDirectoryReader.wrap(reader, shardId),
(files) -> {},
(files) -> {}
);
assertEquals(initialInfos, readerManager.getSegmentInfos());
try (final OpenSearchDirectoryReader acquire = readerManager.acquire()) {
assertNull(readerManager.refreshIfNeeded(acquire));
}

// create an updated infos
final SegmentInfos infos_2 = readerManager.getSegmentInfos().clone();
infos_2.changed();

readerManager.updateSegments(infos_2);
assertEquals(infos_2, readerManager.getSegmentInfos());
try (final OpenSearchDirectoryReader acquire = readerManager.acquire()) {
final StandardDirectoryReader standardReader = NRTReplicationReaderManager.unwrapStandardReader(acquire);
assertEquals(infos_2, standardReader.getSegmentInfos());
}
}
}

public void testUpdateSegmentsWhileRefreshing() throws IOException, InterruptedException {
try (final Store store = createStore()) {
store.createEmpty(Version.LATEST);
final DirectoryReader reader = DirectoryReader.open(store.directory());
NRTReplicationReaderManager readerManager = new NRTReplicationReaderManager(
OpenSearchDirectoryReader.wrap(reader, shardId),
(files) -> {},
(files) -> {}
);

final SegmentInfos infos_2 = readerManager.getSegmentInfos().clone();
infos_2.changed();

Thread refreshThread = new Thread(() -> {
try {
readerManager.maybeRefresh();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Thread updateThread = new Thread(() -> {
try {
readerManager.updateSegments(infos_2);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
refreshThread.start();
updateThread.start();
refreshThread.join();
updateThread.join();
try (final OpenSearchDirectoryReader acquire = readerManager.acquire()) {
final StandardDirectoryReader standardReader = NRTReplicationReaderManager.unwrapStandardReader(acquire);
assertEquals(infos_2.version, standardReader.getSegmentInfos().version);
}
assertEquals(infos_2, readerManager.getSegmentInfos());
}
}
}

0 comments on commit dc9aa72

Please sign in to comment.