Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky SearchCancellationIT tests to avoid race condition #5656

Merged
merged 4 commits into from
Dec 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
Expand All @@ -88,6 +87,10 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
public class SearchCancellationIT extends OpenSearchIntegTestCase {

private TimeValue requestCancellationTimeout = TimeValue.timeValueSeconds(1);
private TimeValue clusterCancellationTimeout = TimeValue.timeValueMillis(1500);
private TimeValue keepAlive = TimeValue.timeValueSeconds(5);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(ScriptedBlockPlugin.class);
Expand Down Expand Up @@ -233,15 +236,13 @@ public void testCancellationDuringQueryPhaseUsingRequestParameter() throws Excep
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setCancelAfterTimeInterval(cancellationTimeout)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.setAllowPartialSearchResults(randomBoolean())
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute();
awaitForBlock(plugins);
// sleep for cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(requestCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
Expand All @@ -251,19 +252,19 @@ public void testCancellationDuringQueryPhaseUsingClusterSetting() throws Excepti
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout).build())
.setPersistentSettings(
Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout).build()
)
.get();
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setAllowPartialSearchResults(randomBoolean())
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute();
awaitForBlock(plugins);
// sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(clusterCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
Expand All @@ -288,14 +289,12 @@ public void testCancellationDuringFetchPhase() throws Exception {
public void testCancellationDuringFetchPhaseUsingRequestParameter() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setCancelAfterTimeInterval(cancellationTimeout)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.addScriptField("test_field", new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))
.execute();
awaitForBlock(plugins);
// sleep for request cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(requestCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
Expand All @@ -307,7 +306,7 @@ public void testCancellationOfScrollSearches() throws Exception {

logger.info("Executing search");
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setScroll(TimeValue.timeValueSeconds(10))
.setScroll(keepAlive)
.setSize(5)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.execute();
Expand All @@ -326,16 +325,16 @@ public void testCancellationOfScrollSearches() throws Exception {
public void testCancellationOfFirstScrollSearchRequestUsingRequestParameter() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setScroll(TimeValue.timeValueSeconds(10))
.setCancelAfterTimeInterval(cancellationTimeout)
.setScroll(keepAlive)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.setSize(5)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.execute();

awaitForBlock(plugins);
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(requestCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
SearchResponse response = ensureSearchWasCancelled(searchResponse);
if (response != null) {
Expand All @@ -354,7 +353,6 @@ public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exceptio
disableBlocks(plugins);

logger.info("Executing search");
TimeValue keepAlive = TimeValue.timeValueSeconds(5);
SearchResponse searchResponse = client().prepareSearch("test")
.setScroll(keepAlive)
.setSize(2)
Expand Down Expand Up @@ -394,11 +392,9 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception

// Disable block so the first request would pass
disableBlocks(plugins);
TimeValue keepAlive = TimeValue.timeValueSeconds(5);
TimeValue cancellationTimeout = TimeValue.timeValueSeconds(2);
SearchResponse searchResponse = client().prepareSearch("test")
.setScroll(keepAlive)
.setCancelAfterTimeInterval(cancellationTimeout)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.setSize(2)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.get();
Expand All @@ -418,8 +414,8 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception
.execute();

awaitForBlock(plugins);
// sleep for cancellation timeout to ensure there is no scheduled task for cancellation
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(requestCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);

// wait for response and ensure there is no failure
Expand All @@ -432,20 +428,20 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception
public void testDisableCancellationAtRequestLevel() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout).build())
.setPersistentSettings(
Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout).build()
)
.get();
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setAllowPartialSearchResults(randomBoolean())
.setCancelAfterTimeInterval(NO_TIMEOUT)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute();
awaitForBlock(plugins);
// sleep for cancellation timeout to ensure there is no scheduled task for cancellation
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(clusterCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
// ensure search was successful since cancellation was disabled at request level
Expand All @@ -455,7 +451,6 @@ public void testDisableCancellationAtRequestLevel() throws Exception {
public void testDisableCancellationAtClusterLevel() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
Expand All @@ -466,8 +461,7 @@ public void testDisableCancellationAtClusterLevel() throws Exception {
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute();
awaitForBlock(plugins);
// sleep for cancellation timeout to ensure there is no scheduled task for cancellation
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(clusterCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
// ensure search was successful since cancellation was disabled at request level
Expand Down Expand Up @@ -501,11 +495,12 @@ public void testCancelMultiSearch() throws Exception {
public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout).build())
.setPersistentSettings(
Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout).build()
)
.get();
ActionFuture<MultiSearchResponse> mSearchResponse = client().prepareMultiSearch()
.setMaxConcurrentSearchRequests(2)
Expand All @@ -526,8 +521,7 @@ public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws
)
.execute();
awaitForBlock(plugins);
// sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(clusterCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
// both child requests are expected to fail
Expand All @@ -544,8 +538,6 @@ public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws
public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue reqCancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
TimeValue clusterCancellationTimeout = new TimeValue(3, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
Expand All @@ -558,7 +550,7 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception
.add(
client().prepareSearch("test")
.setAllowPartialSearchResults(randomBoolean())
.setCancelAfterTimeInterval(reqCancellationTimeout)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.setQuery(
scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))
)
Expand All @@ -581,8 +573,7 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception
)
.execute();
awaitForBlock(plugins);
// sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(Math.max(reqCancellationTimeout.getMillis(), clusterCancellationTimeout.getMillis()));
sleepForAtLeast(Math.max(requestCancellationTimeout.getMillis(), clusterCancellationTimeout.getMillis()));
// unblock the search thread
disableBlocks(plugins);
// only first and last child request are expected to fail
Expand All @@ -592,6 +583,16 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception
ensureMSearchWasCancelled(mSearchResponse, expectedFailedRequests);
}

/**
* Sleeps for the specified number of milliseconds plus a 100ms buffer to account for system timer/scheduler inaccuracies.
*
* @param milliseconds The minimum time to sleep
* @throws InterruptedException if interrupted during sleep
*/
private static void sleepForAtLeast(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds + 100L);
}

public static class ScriptedBlockPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_block";

Expand Down