Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding cache eviction and listener for invalidating index field type … #142

Merged
merged 7 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.service.categorizer;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.plugin.insights.settings.QueryCategorizationSettings;

jainankitk marked this conversation as resolved.
Show resolved Hide resolved
/**
* Cache implementation specifically for maintaining the field name type mappings
* for indices that are part of successful search requests
*/
public class IndicesFieldTypeCache {

private static final Logger logger = LogManager.getLogger(IndicesFieldTypeCache.class);
private final Cache<Index, IndexFieldMap> cache;

public IndicesFieldTypeCache(Settings settings) {
final long sizeInBytes = QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY.get(settings).getBytes();
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
CacheBuilder<Index, IndexFieldMap> cacheBuilder = CacheBuilder.<Index, IndexFieldMap>builder();
if (sizeInBytes > 0) {
cacheBuilder.setMaximumWeight(sizeInBytes).weigher((k, v) -> RamUsageEstimator.sizeOfObject(k) + v.weight());
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
}
cache = cacheBuilder.build();
}

public IndexFieldMap getOrInitialize(Index index) {
try {
return cache.computeIfAbsent(index, k -> new IndexFieldMap());
} catch (ExecutionException ex) {
logger.error("Unexpected execution exception while initializing for index " + index);
}

// Should never return null as the ExecutionException is only thrown
// if loader throws an exception or returns a null value, which cannot
// be the case in this scenario
return null;
}

public void invalidate(Index index) {
cache.invalidate(index);
}

public Iterable<Index> keySet() {
return cache.keys();
}

static class IndexFieldMap {
private ConcurrentHashMap<String, String> fieldTypeMap;
private CounterMetric weight;
jainankitk marked this conversation as resolved.
Show resolved Hide resolved

IndexFieldMap() {
fieldTypeMap = new ConcurrentHashMap<>();
weight = new CounterMetric();
}

public String get(String fieldName) {
return fieldTypeMap.get(fieldName);
}

public void putIfAbsent(String key, String value) {
// Increment the weight only if the key value pair added to the Map
if (fieldTypeMap.putIfAbsent(key, value) == null) {
weight.inc(RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(value));
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
}
}

public long weight() {
return weight.count();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@

package org.opensearch.plugin.insights.core.service.categorizer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.util.BytesRef;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.hash.MurmurHash3;
import org.opensearch.core.common.io.stream.NamedWriteable;
Expand All @@ -33,16 +34,36 @@
/**
* Class to generate query shape
*/
public class QueryShapeGenerator {
public class QueryShapeGenerator implements ClusterStateListener {
static final String EMPTY_STRING = "";
static final String ONE_SPACE_INDENT = " ";
private final ClusterService clusterService;
private final String NO_FIELD_TYPE_VALUE = "";
private final ConcurrentHashMap<Index, ConcurrentHashMap<String, String>> fieldTypeMap;
private final IndicesFieldTypeCache indicesFieldTypeCache;

public QueryShapeGenerator(ClusterService clusterService) {
this.clusterService = clusterService;
this.fieldTypeMap = new ConcurrentHashMap<>();
clusterService.addListener(this);
this.indicesFieldTypeCache = new IndicesFieldTypeCache(clusterService.getSettings());
}

public void clusterChanged(ClusterChangedEvent event) {
final List<Index> indicesDeleted = event.indicesDeleted();
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
for (Index index : indicesDeleted) {
// remove the deleted index mapping from field type cache
indicesFieldTypeCache.invalidate(index);
}

if (event.metadataChanged()) {
final Metadata previousMetadata = event.previousState().metadata();
final Metadata currentMetadata = event.state().metadata();
for (Index index : indicesFieldTypeCache.keySet()) {
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
if (previousMetadata.index(index) != currentMetadata.index(index)) {
// remove the updated index mapping from field type cache
indicesFieldTypeCache.invalidate(index);
}
}
}
}

/**
Expand Down Expand Up @@ -127,20 +148,12 @@ public String buildShape(
}

private Map<String, Object> getPropertiesMapForIndex(Index index) {
Map<String, MappingMetadata> indexMapping;
try {
indexMapping = clusterService.state().metadata().findMappings(new String[] { index.getName() }, input -> str -> true);
} catch (IOException e) {
// If an error occurs while retrieving mappings, return an empty map
return Collections.emptyMap();
}

MappingMetadata mappingMetadata = indexMapping.get(index.getName());
if (mappingMetadata == null) {
IndexMetadata indexMetadata = clusterService.state().metadata().index(index);
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
if (indexMetadata == null) {
return Collections.emptyMap();
}

Map<String, Object> propertiesMap = (Map<String, Object>) mappingMetadata.getSourceAsMap().get("properties");
Map<String, Object> propertiesMap = (Map<String, Object>) indexMetadata.mapping().getSourceAsMap().get("properties");
if (propertiesMap == null) {
return Collections.emptyMap();
}
Expand Down Expand Up @@ -363,8 +376,7 @@ String getFieldType(String fieldName, Map<String, Object> propertiesAsMap, Index
fieldType = getFieldTypeFromProperties(fieldName, propertiesAsMap);

// Cache field type or NO_FIELD_TYPE_VALUE if not found
fieldTypeMap.computeIfAbsent(index, k -> new ConcurrentHashMap<>())
.putIfAbsent(fieldName, fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE);
indicesFieldTypeCache.getOrInitialize(index).putIfAbsent(fieldName, fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE);

return fieldType;
}
Expand Down Expand Up @@ -406,6 +418,6 @@ else if (currentMap.containsKey("type")) {
}

String getFieldTypeFromCache(String fieldName, Index index) {
return fieldTypeMap.getOrDefault(index, new ConcurrentHashMap<>()).get(fieldName);
return indicesFieldTypeCache.getOrInitialize(index).get(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.plugin.insights.settings;

import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.unit.ByteSizeValue;

/**
* Settings for Query Categorization
Expand All @@ -24,6 +25,12 @@ public class QueryCategorizationSettings {
Setting.Property.Dynamic
);

public static final Setting<ByteSizeValue> SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY = Setting.memorySizeSetting(
"search.query.fieldtype.cache.size",
"0.1%",
Setting.Property.NodeScope
);

/**
* Default constructor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public void testGetSettings() {
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
),
queryInsightsPlugin.getSettings()
);
Expand Down
Loading
Loading