Skip to content

Commit

Permalink
Merge branch '2.x' into backport
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta authored Jun 19, 2024
2 parents 6f3c88f + 8148e4d commit f773405
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Make the class CommunityIdProcessor final ([#14448](https://github.com/opensearch-project/OpenSearch/pull/14448))
- Updated the `indices.query.bool.max_clause_count` setting from being static to dynamically updateable ([#13568](https://github.com/opensearch-project/OpenSearch/pull/13568))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

// when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys
// when staleness threshold is lower than staleness, it should clean cache from all indices having stale keys
public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
int cacheCleanIntervalInMillis = 10;
String node = internalCluster().startNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.index.query.QueryStringQueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.SearchModule;
import org.opensearch.search.SearchService;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -101,7 +101,7 @@ public void setup() throws Exception {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT)
.put(SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.opensearch.plugins.Plugin;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.SearchModule;
import org.opensearch.search.SearchService;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.junit.BeforeClass;
Expand All @@ -79,6 +79,7 @@
import static org.opensearch.index.query.QueryBuilders.simpleQueryStringQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING;
import static org.opensearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFailures;
Expand Down Expand Up @@ -122,7 +123,7 @@ public static void createRandomClusterSetting() {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT)
.put(SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT)
.build();
}

Expand Down Expand Up @@ -720,6 +721,52 @@ public void testFieldAliasOnDisallowedFieldType() throws Exception {
assertHits(response.getHits(), "1");
}

public void testDynamicClauseCountUpdate() throws Exception {
client().prepareIndex("testdynamic").setId("1").setSource("field", "foo bar baz").get();
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT - 1))
);
refresh();
StringBuilder sb = new StringBuilder("foo");

// create clause_count + 1 clauses to hit error
for (int i = 0; i <= CLUSTER_MAX_CLAUSE_COUNT; i++) {
sb.append(" OR foo" + i);
}

QueryStringQueryBuilder qb = queryStringQuery(sb.toString()).field("field");

SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> {
client().prepareSearch("testdynamic").setQuery(qb).get();
});

assert (e.getDetailedMessage().contains("maxClauseCount is set to " + (CLUSTER_MAX_CLAUSE_COUNT - 1)));

// increase clause count by 2
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT + 2))
);

Thread.sleep(1);

SearchResponse response = client().prepareSearch("testdynamic").setQuery(qb).get();
assertHitCount(response, 1);
assertHits(response.getHits(), "1");

assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(INDICES_MAX_CLAUSE_COUNT_SETTING.getKey()))
);
}

private void assertHits(SearchHits hits, String... ids) {
assertThat(hits.getTotalHits().value, equalTo((long) ids.length));
Set<String> hitIds = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.search.SearchModule;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
Expand Down Expand Up @@ -541,6 +540,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.MAX_OPEN_PIT_CONTEXT,
SearchService.MAX_PIT_KEEPALIVE_SETTING,
SearchService.MAX_AGGREGATION_REWRITE_FILTERS,
SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING,
SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD,
CreatePitController.PIT_INIT_KEEP_ALIVE,
Node.WRITE_PORTS_FILE_SETTING,
Expand Down Expand Up @@ -591,7 +591,6 @@ public void apply(Settings value, Settings current, Settings previous) {
ResourceWatcherService.RELOAD_INTERVAL_HIGH,
ResourceWatcherService.RELOAD_INTERVAL_MEDIUM,
ResourceWatcherService.RELOAD_INTERVAL_LOW,
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
Node.BREAKER_TYPE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.QueryShardException;
import org.opensearch.search.SearchModule;
import org.opensearch.search.SearchService;

import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -180,7 +180,7 @@ static Map<String, Float> resolveMappingField(
}

static void checkForTooManyFields(int numberOfFields, QueryShardContext context, @Nullable String inputPattern) {
Integer limit = SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING.get(context.getIndexSettings().getSettings());
int limit = SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING.get(context.getIndexSettings().getSettings());
if (numberOfFields > limit) {
StringBuilder errorMsg = new StringBuilder("field expansion ");
if (inputPattern != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -507,15 +508,15 @@ public int hashCode() {
* */
class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;

IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
this.stalenessThreshold = stalenessThreshold;
this.keysToClean = ConcurrentCollections.newConcurrentSet();
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
this.cleanupKeyToCountMap = new ConcurrentHashMap<>();
this.staleKeysCount = new AtomicInteger(0);
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
Expand Down Expand Up @@ -573,8 +574,7 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {

// pkg-private for testing
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
.merge(readerCacheKeyId, 1, Integer::sum);
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()).merge(readerCacheKeyId, 1, Integer::sum);
}

/**
Expand Down Expand Up @@ -832,7 +832,7 @@ public void close() {
}

// for testing
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

Expand Down
9 changes: 0 additions & 9 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.geo.GeoShapeType;
import org.opensearch.common.geo.ShapesAvailability;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.ParseFieldRegistry;
import org.opensearch.core.ParseField;
Expand Down Expand Up @@ -318,13 +317,6 @@
* @opensearch.internal
*/
public class SearchModule {
public static final Setting<Integer> INDICES_MAX_CLAUSE_COUNT_SETTING = Setting.intSetting(
"indices.query.bool.max_clause_count",
1024,
1,
Integer.MAX_VALUE,
Setting.Property.NodeScope
);

private final Map<String, Highlighter> highlighters;
private final ParseFieldRegistry<MovAvgModel.AbstractModelParser> movingAverageModelParserRegistry = new ParseFieldRegistry<>(
Expand Down Expand Up @@ -1130,7 +1122,6 @@ private void registerQueryParsers(List<SearchPlugin> plugins) {
registerQuery(new QuerySpec<>(MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new, MatchAllQueryBuilder::fromXContent));
registerQuery(new QuerySpec<>(QueryStringQueryBuilder.NAME, QueryStringQueryBuilder::new, QueryStringQueryBuilder::fromXContent));
registerQuery(new QuerySpec<>(BoostingQueryBuilder.NAME, BoostingQueryBuilder::new, BoostingQueryBuilder::fromXContent));
BooleanQuery.setMaxClauseCount(INDICES_MAX_CLAUSE_COUNT_SETTING.get(settings));
registerQuery(new QuerySpec<>(BoolQueryBuilder.NAME, BoolQueryBuilder::new, BoolQueryBuilder::fromXContent));
registerQuery(new QuerySpec<>(TermQueryBuilder.NAME, TermQueryBuilder::new, TermQueryBuilder::fromXContent));
registerQuery(new QuerySpec<>(TermsQueryBuilder.NAME, TermsQueryBuilder::new, TermsQueryBuilder::fromXContent));
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TopDocs;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
Expand Down Expand Up @@ -282,6 +283,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

public static final Setting<Integer> INDICES_MAX_CLAUSE_COUNT_SETTING = Setting.intSetting(
"indices.query.bool.max_clause_count",
1024,
1,
Integer.MAX_VALUE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Boolean> CLUSTER_ALLOW_DERIVED_FIELD_SETTING = Setting.boolSetting(
"search.derived_field.enabled",
true,
Expand Down Expand Up @@ -412,6 +422,9 @@ public SearchService(
lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);

IndexSearcher.setMaxClauseCount(INDICES_MAX_CLAUSE_COUNT_SETTING.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(INDICES_MAX_CLAUSE_COUNT_SETTING, IndexSearcher::setMaxClauseCount);

allowDerivedField = CLUSTER_ALLOW_DERIVED_FIELD_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
indexShard.hashCode()
);
// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
);

// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
assertEquals(1, cache.count());
// test the mappings
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));

cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
Expand Down Expand Up @@ -796,15 +798,15 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
}

// test adding to cleanupKeyToCountMap with multiple threads
public void testAddToCleanupKeyToCountMap() throws Exception {
public void testAddingToCleanupKeyToCountMapWorksAppropriatelyWithMultipleThreads() throws Exception {
threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
cache = getIndicesRequestCache(settings);

int numberOfThreads = 10;
int numberOfIterations = 1000;
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
AtomicBoolean concurrentModificationExceptionDetected = new AtomicBoolean(false);

ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

Expand All @@ -817,7 +819,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
}
});
}
Expand All @@ -836,13 +838,17 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
}
});

executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
assertFalse(exceptionDetected.get());
assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS));
assertEquals(
numberOfThreads * numberOfIterations,
cache.cacheCleanupManager.getCleanupKeyToCountMap().get(indexShard.shardId()).size()
);
assertFalse(concurrentModificationExceptionDetected.get());
}

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
Expand Down

0 comments on commit f773405

Please sign in to comment.