Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add field type cache data to health stats API #193

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public QueryInsightsListener(
this.clusterService = clusterService;
this.queryInsightsService = queryInsightsService;
this.queryShapeGenerator = new QueryShapeGenerator(clusterService);
queryInsightsService.setQueryShapeGenerator(queryShapeGenerator);

// Setting endpoints set up for top n queries, including enabling top n queries, window size, and top n size
// Expected metricTypes are Latency, CPU, and Memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -36,6 +38,7 @@
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
Expand Down Expand Up @@ -100,9 +103,14 @@ public class QueryInsightsService extends AbstractLifecycleComponent {

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;
private final SearchQueryCategorizer searchQueryCategorizer;

private NamedXContentRegistry namedXContentRegistry;
private final NamedXContentRegistry namedXContentRegistry;

/**
* Query shape generator instance
*/
private QueryShapeGenerator queryShapeGenerator;

/**
* Constructor of the QueryInsightsService
Expand Down Expand Up @@ -496,10 +504,14 @@ public QueryInsightsHealthStats getHealthStats() {
Map<MetricType, TopQueriesHealthStats> topQueriesHealthStatsMap = topQueriesServices.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getHealthStats()));
Map<String, Long> fieldTypeCacheStats = Optional.ofNullable(queryShapeGenerator)
.map(QueryShapeGenerator::getFieldTypeCacheStats)
.orElse(Collections.emptyMap());
return new QueryInsightsHealthStats(
threadPool.info(QUERY_INSIGHTS_EXECUTOR),
this.queryRecordsQueue.size(),
topQueriesHealthStatsMap
topQueriesHealthStatsMap,
fieldTypeCacheStats
);
}

Expand All @@ -511,4 +523,11 @@ private void deleteExpiredTopNIndices() {
topQueriesServices.get(metricType).deleteExpiredTopNIndices(clusterService.state().metadata().indices());
}
}

/**
* Set query shape generator
*/
public void setQueryShapeGenerator(final QueryShapeGenerator queryShapeGenerator) {
this.queryShapeGenerator = queryShapeGenerator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ public class IndicesFieldTypeCache {

private static final Logger logger = LogManager.getLogger(IndicesFieldTypeCache.class);
private final Cache<Index, IndexFieldMap> cache;
/**
* Count of cache evictions
*/
private final CounterMetric evictionCount;
/**
* Count of items in cache
*/
private final CounterMetric entryCount;
/**
* Weight of cache in bytes
*/
private final CounterMetric weight;

public IndicesFieldTypeCache(Settings settings) {
final long sizeInBytes = QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY.get(settings).getBytes();
Expand All @@ -36,9 +48,12 @@ public IndicesFieldTypeCache(Settings settings) {
cacheBuilder.setMaximumWeight(sizeInBytes).weigher((k, v) -> RamUsageEstimator.sizeOfObject(k) + v.weight());
}
cache = cacheBuilder.build();
evictionCount = new CounterMetric();
entryCount = new CounterMetric();
weight = new CounterMetric();
}

public IndexFieldMap getOrInitialize(Index index) {
IndexFieldMap getOrInitialize(Index index) {
try {
return cache.computeIfAbsent(index, k -> new IndexFieldMap());
} catch (ExecutionException ex) {
Expand All @@ -52,16 +67,50 @@ public IndexFieldMap getOrInitialize(Index index) {
}

public void invalidate(Index index) {
IndexFieldMap indexFieldMap = cache.get(index);
evictionCount.inc(indexFieldMap.fieldTypeMap.size());
entryCount.dec(indexFieldMap.fieldTypeMap.size());
weight.dec(indexFieldMap.weight());
cache.invalidate(index);
}

public Iterable<Index> keySet() {
return cache.keys();
}

public void incrementCountAndWeight(String key, String value) {
entryCount.inc();
weight.inc(RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(value));
}

/**
* Get eviction count
*/
public Long getEvictionCount() {
return evictionCount.count();
}

/**
* Get entry count
*/
public Long getEntryCount() {
return entryCount.count();
}

/**
* Get cache weight in bytes
*/
public Long getWeight() {
return weight.count();
}

static class IndexFieldMap {
private ConcurrentHashMap<String, String> fieldTypeMap;
private CounterMetric weight;
private final ConcurrentHashMap<String, String> fieldTypeMap;

/**
* Estimated memory consumption of fieldTypeMap in bytes
*/
private final CounterMetric weight;

IndexFieldMap() {
fieldTypeMap = new ConcurrentHashMap<>();
Expand All @@ -72,11 +121,18 @@ public String get(String fieldName) {
return fieldTypeMap.get(fieldName);
}

public void putIfAbsent(String key, String value) {
/**
* Stores key, value if absent
*
* @return {@code true} if key was absent, else {@code false}
*/
public boolean putIfAbsent(String key, String value) {
// Increment the weight only if the key value pair added to the Map
if (fieldTypeMap.putIfAbsent(key, value) == null) {
weight.inc(RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(value));
return true;
}
return false;
}

public long weight() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,23 @@ public class QueryShapeGenerator implements ClusterStateListener {
static final String EMPTY_STRING = "";
static final String ONE_SPACE_INDENT = " ";
private final ClusterService clusterService;
private final String NO_FIELD_TYPE_VALUE = "";
private final IndicesFieldTypeCache indicesFieldTypeCache;
private long cacheHitCount;
private long cacheMissCount;

private final String NO_FIELD_TYPE_VALUE = "";
public static final String HIT_COUNT = "hit_count";
public static final String MISS_COUNT = "miss_count";
public static final String EVICTIONS = "evictions";
public static final String ENTRY_COUNT = "entry_count";
public static final String SIZE_IN_BYTES = "size_in_bytes";

public QueryShapeGenerator(ClusterService clusterService) {
this.clusterService = clusterService;
clusterService.addListener(this);
this.indicesFieldTypeCache = new IndicesFieldTypeCache(clusterService.getSettings());
this.cacheHitCount = 0;
this.cacheMissCount = 0;
}

public void clusterChanged(ClusterChangedEvent event) {
Expand Down Expand Up @@ -369,14 +379,20 @@ String getFieldType(String fieldName, Map<String, Object> propertiesAsMap, Index
String fieldType = getFieldTypeFromCache(fieldName, index);

if (fieldType != null) {
cacheHitCount += 1;
return fieldType;
} else {
cacheMissCount += 1;
}

// Retrieve field type from mapping and cache it if found
fieldType = getFieldTypeFromProperties(fieldName, propertiesAsMap);

// Cache field type or NO_FIELD_TYPE_VALUE if not found
indicesFieldTypeCache.getOrInitialize(index).putIfAbsent(fieldName, fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE);
fieldType = fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE;
if (indicesFieldTypeCache.getOrInitialize(index).putIfAbsent(fieldName, fieldType)) {
indicesFieldTypeCache.incrementCountAndWeight(fieldName, fieldType);
}

return fieldType;
}
Expand Down Expand Up @@ -420,4 +436,24 @@ else if (currentMap.containsKey("type")) {
String getFieldTypeFromCache(String fieldName, Index index) {
return indicesFieldTypeCache.getOrInitialize(index).get(fieldName);
}

/**
* Get field type cache stats
*
* @return Map containing cache hit count, miss count, and byte stats
*/
public Map<String, Long> getFieldTypeCacheStats() {
return Map.of(
SIZE_IN_BYTES,
indicesFieldTypeCache.getWeight(),
ENTRY_COUNT,
indicesFieldTypeCache.getEntryCount(),
EVICTIONS,
indicesFieldTypeCache.getEvictionCount(),
HIT_COUNT,
cacheHitCount,
MISS_COUNT,
cacheMissCount
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@

package org.opensearch.plugin.insights.rules.model.healthStats;

import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.ENTRY_COUNT;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.EVICTIONS;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.HIT_COUNT;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.MISS_COUNT;
import static org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator.SIZE_IN_BYTES;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -26,10 +35,12 @@ public class QueryInsightsHealthStats implements ToXContentFragment, Writeable {
private final ThreadPool.Info threadPoolInfo;
private final int queryRecordsQueueSize;
private final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats;
private Map<String, Long> fieldTypeCacheStats;

private static final String THREAD_POOL_INFO = "ThreadPoolInfo";
private static final String QUERY_RECORDS_QUEUE_SIZE = "QueryRecordsQueueSize";
private static final String TOP_QUERIES_HEALTH_STATS = "TopQueriesHealthStats";
private static final String FIELD_TYPE_CACHE_STATS = "FieldTypeCacheStats";

/**
* Constructor to read QueryInsightsHealthStats from a StreamInput.
Expand All @@ -41,6 +52,9 @@ public QueryInsightsHealthStats(final StreamInput in) throws IOException {
this.threadPoolInfo = new ThreadPool.Info(in);
this.queryRecordsQueueSize = in.readInt();
this.topQueriesHealthStats = in.readMap(MetricType::readFromStream, TopQueriesHealthStats::new);
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
this.fieldTypeCacheStats = in.readMap(StreamInput::readString, StreamInput::readLong);
}
}

/**
Expand All @@ -53,14 +67,16 @@ public QueryInsightsHealthStats(final StreamInput in) throws IOException {
public QueryInsightsHealthStats(
final ThreadPool.Info threadPoolInfo,
final int queryRecordsQueueSize,
final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats
final Map<MetricType, TopQueriesHealthStats> topQueriesHealthStats,
final Map<String, Long> fieldTypeCacheStats
) {
if (threadPoolInfo == null || topQueriesHealthStats == null) {
throw new IllegalArgumentException("Parameters cannot be null");
}
this.threadPoolInfo = threadPoolInfo;
this.queryRecordsQueueSize = queryRecordsQueueSize;
this.topQueriesHealthStats = topQueriesHealthStats;
this.fieldTypeCacheStats = Objects.requireNonNull(fieldTypeCacheStats, "fieldTypeCacheStats cannot be null");
}

/**
Expand All @@ -87,6 +103,12 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.endObject();
}
builder.endObject();
// Write field type cache stats
builder.startObject(FIELD_TYPE_CACHE_STATS);
for (String key : List.of(SIZE_IN_BYTES, ENTRY_COUNT, EVICTIONS, HIT_COUNT, MISS_COUNT)) {
builder.field(key, fieldTypeCacheStats.getOrDefault(key, 0L));
}
builder.endObject();
return builder;
}

Expand All @@ -105,6 +127,9 @@ public void writeTo(final StreamOutput out) throws IOException {
MetricType::writeTo,
(streamOutput, topQueriesHealthStats) -> topQueriesHealthStats.writeTo(out)
);
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
out.writeMap(fieldTypeCacheStats, StreamOutput::writeString, StreamOutput::writeLong);
}
}

/**
Expand Down Expand Up @@ -133,4 +158,13 @@ public int getQueryRecordsQueueSize() {
public Map<MetricType, TopQueriesHealthStats> getTopQueriesHealthStats() {
return topQueriesHealthStats;
}

/**
* Get the field type cache stats.
*
* @return the field type cache stats
*/
public Map<String, Long> getFieldTypeCacheStats() {
return fieldTypeCacheStats;
}
}
Loading
Loading