Skip to content

Commit

Permalink
Add a counter to node stat (and _cat/shards) api to track shard going…
Browse files Browse the repository at this point in the history
… from idle to non-idle (opensearch-project#12768)

Signed-off-by: Ruirui Zhang <mariazrr@amazon.com>
  • Loading branch information
ruai0511 committed Mar 28, 2024
1 parent 48881de commit f639259
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818))
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- Add a counter to node stat (and _cat/shards) api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,108 @@
"Help":
- skip:
version: " - 2.11.99"
version: " - 2.99.99"
reason: search idle reactivate count total is only added in 3.0.0
features: node_selector
- do:
cat.shards:
help: true
node_selector:
version: "3.0.0 - "

- match:
$body: |
/^ index .+ \n
shard .+ \n
prirep .+ \n
state .+ \n
docs .+ \n
store .+ \n
ip .+ \n
id .+ \n
node .+ \n
sync_id .+ \n
unassigned.reason .+ \n
unassigned.at .+ \n
unassigned.for .+ \n
unassigned.details .+ \n
recoverysource.type .+ \n
completion.size .+ \n
fielddata.memory_size .+ \n
fielddata.evictions .+ \n
query_cache.memory_size .+ \n
query_cache.evictions .+ \n
flush.total .+ \n
flush.total_time .+ \n
get.current .+ \n
get.time .+ \n
get.total .+ \n
get.exists_time .+ \n
get.exists_total .+ \n
get.missing_time .+ \n
get.missing_total .+ \n
indexing.delete_current .+ \n
indexing.delete_time .+ \n
indexing.delete_total .+ \n
indexing.index_current .+ \n
indexing.index_time .+ \n
indexing.index_total .+ \n
indexing.index_failed .+ \n
merges.current .+ \n
merges.current_docs .+ \n
merges.current_size .+ \n
merges.total .+ \n
merges.total_docs .+ \n
merges.total_size .+ \n
merges.total_time .+ \n
refresh.total .+ \n
refresh.time .+ \n
refresh.external_total .+ \n
refresh.external_time .+ \n
refresh.listeners .+ \n
search.fetch_current .+ \n
search.fetch_time .+ \n
search.fetch_total .+ \n
search.open_contexts .+ \n
search.query_current .+ \n
search.query_time .+ \n
search.query_total .+ \n
search.concurrent_query_current .+ \n
search.concurrent_query_time .+ \n
search.concurrent_query_total .+ \n
search.concurrent_avg_slice_count .+ \n
search.scroll_current .+ \n
search.scroll_time .+ \n
search.scroll_total .+ \n
search.point_in_time_current .+ \n
search.point_in_time_time .+ \n
search.point_in_time_total .+ \n
search.search_idle_reactivate_count_total .+ \n
segments.count .+ \n
segments.memory .+ \n
segments.index_writer_memory .+ \n
segments.version_map_memory .+ \n
segments.fixed_bitset_memory .+ \n
seq_no.max .+ \n
seq_no.local_checkpoint .+ \n
seq_no.global_checkpoint .+ \n
warmer.current .+ \n
warmer.total .+ \n
warmer.total_time .+ \n
path.data .+ \n
path.state .+ \n
docs.deleted .+ \n
$/
---
"Help from 2.12.0 to 2.99.99":
- skip:
version: " - 2.11.99 , 3.0.0 - "
reason: deleted docs and concurrent search are added in 2.12.0
features: node_selector
- do:
cat.shards:
help: true
node_selector:
version: "2.12.0 - "
version: "2.12.0 - 2.99.99"

- match:
$body: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public static class Stats implements Writeable, ToXContentFragment {
private long pitTimeInMillis;
private long pitCurrent;

private long searchIdleReactivateCount;

@Nullable
private RequestStatsLongHolder requestStatsLongHolder;

Expand Down Expand Up @@ -193,7 +195,8 @@ public Stats(
long pitCurrent,
long suggestCount,
long suggestTimeInMillis,
long suggestCurrent
long suggestCurrent,
long searchIdleReactivateCount
) {
this.requestStatsLongHolder = new RequestStatsLongHolder();
this.queryCount = queryCount;
Expand All @@ -220,6 +223,8 @@ public Stats(
this.pitCount = pitCount;
this.pitTimeInMillis = pitTimeInMillis;
this.pitCurrent = pitCurrent;

this.searchIdleReactivateCount = searchIdleReactivateCount;
}

private Stats(StreamInput in) throws IOException {
Expand Down Expand Up @@ -255,6 +260,10 @@ private Stats(StreamInput in) throws IOException {
concurrentQueryCurrent = in.readVLong();
queryConcurrency = in.readVLong();
}

if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
searchIdleReactivateCount = in.readVLong();
}
}

public void add(Stats stats) {
Expand Down Expand Up @@ -282,6 +291,8 @@ public void add(Stats stats) {
pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;

searchIdleReactivateCount += stats.searchIdleReactivateCount;
}

public void addForClosingShard(Stats stats) {
Expand All @@ -306,6 +317,8 @@ public void addForClosingShard(Stats stats) {
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
queryConcurrency += stats.queryConcurrency;

searchIdleReactivateCount += stats.searchIdleReactivateCount;
}

public long getQueryCount() {
Expand Down Expand Up @@ -412,6 +425,10 @@ public long getSuggestCurrent() {
return suggestCurrent;
}

public long getSearchIdleReactivateCount() {
return searchIdleReactivateCount;
}

public static Stats readStats(StreamInput in) throws IOException {
return new Stats(in);
}
Expand Down Expand Up @@ -457,6 +474,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(concurrentQueryCurrent);
out.writeVLong(queryConcurrency);
}

if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(searchIdleReactivateCount);
}
}

@Override
Expand Down Expand Up @@ -486,6 +507,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime());
builder.field(Fields.SUGGEST_CURRENT, suggestCurrent);

builder.field(Fields.SEARCH_IDLE_REACTIVATE_COUNT_TOTAL, searchIdleReactivateCount);

if (requestStatsLongHolder != null) {
builder.startObject(Fields.REQUEST);

Expand Down Expand Up @@ -654,6 +677,7 @@ static final class Fields {
static final String TIME = "time";
static final String CURRENT = "current";
static final String TOTAL = "total";
static final String SEARCH_IDLE_REACTIVATE_COUNT_TOTAL = "search_idle_reactivate_count_total";

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public void onFreePitContext(ReaderContext readerContext) {
totalStats.pitMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

@Override
public void onSearchIdleReactivation() {
totalStats.searchIdleMetric.inc();
}

/**
* Holder of statistics values
*
Expand All @@ -239,6 +244,7 @@ static final class StatsHolder {
final CounterMetric scrollCurrent = new CounterMetric();
final CounterMetric pitCurrent = new CounterMetric();
final CounterMetric suggestCurrent = new CounterMetric();
final CounterMetric searchIdleMetric = new CounterMetric();

SearchStats.Stats stats() {
return new SearchStats.Stats(
Expand All @@ -260,7 +266,8 @@ SearchStats.Stats stats() {
pitCurrent.count(),
suggestMetric.count(),
TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()),
suggestCurrent.count()
suggestCurrent.count(),
searchIdleMetric.count()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4698,9 +4698,14 @@ public void afterRefresh(boolean didRefresh) {
* <code>true</code> if the listener was registered to wait for a refresh.
*/
public final void awaitShardSearchActive(Consumer<Boolean> listener) {
boolean isSearchIdle = isSearchIdle();
markSearcherAccessed(); // move the shard into non-search idle
final Translog.Location location = pendingRefreshLocation.get();
if (location != null) {
if (isSearchIdle) {
SearchOperationListener searchOperationListener = getSearchOperationListener();
searchOperationListener.onSearchIdleReactivation();
}
addRefreshListener(location, (b) -> {
pendingRefreshLocation.compareAndSet(location, null);
listener.accept(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ default void onNewPitContext(ReaderContext readerContext) {}
*/
default void onFreePitContext(ReaderContext readerContext) {}

/**
* Executed when a shard goes from idle to non-idle state
*/
default void onSearchIdleReactivation() {}

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
Expand Down Expand Up @@ -310,5 +315,16 @@ public void onFreePitContext(ReaderContext readerContext) {
}
}
}

@Override
public void onSearchIdleReactivation() {
for (SearchOperationListener listener : listeners) {
try {
listener.onSearchIdleReactivation();
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewSearchIdleReactivation listener [{}] failed", listener), e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ protected Table getTableWithHeader(final RestRequest request) {
"search.point_in_time_total",
"alias:spto,searchPointInTimeTotal;default:false;text-align:right;desc:completed point in time contexts"
);
table.addCell(
"search.search_idle_reactivate_count_total",
"alias:ssirct,searchSearchIdleReactivateCountTotal;default:false;text-align:right;desc:number of times a shard reactivated"
);

table.addCell("segments.count", "alias:sc,segmentsCount;default:false;text-align:right;desc:number of segments");
table.addCell("segments.memory", "alias:sm,segmentsMemory;default:false;text-align:right;desc:memory used by segments");
Expand Down Expand Up @@ -427,6 +431,7 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getPitCurrent()));
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getPitTime()));
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getPitCount()));
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getSearchIdleReactivateCount()));

table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getCount));
table.addCell(getOrNull(commonStats, CommonStats::getSegments, SegmentsStats::getZeroMemory));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public void testShardLevelSearchGroupStats() throws Exception {
// let's create two dummy search stats with groups
Map<String, Stats> groupStats1 = new HashMap<>();
Map<String, Stats> groupStats2 = new HashMap<>();
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);

// adding these two search stats and checking group stats are correct
searchStats1.add(searchStats2);
Expand Down Expand Up @@ -128,6 +128,7 @@ private static void assertStats(Stats stats, long equalTo) {
assertEquals(equalTo, stats.getSuggestCount());
assertEquals(equalTo, stats.getSuggestTimeInMillis());
assertEquals(equalTo, stats.getSuggestCurrent());
assertEquals(equalTo, stats.getSearchIdleReactivateCount());
// avg_concurrency is not summed up across stats
assertEquals(1, stats.getConcurrentAvgSliceCount(), 0);
}
Expand Down
Loading

0 comments on commit f639259

Please sign in to comment.