Skip to content

Commit

Permalink
add index execution time tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
jdconrad committed Aug 14, 2024
1 parent d3fdb36 commit ec9387c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
.append(", ");
}

public boolean trackOngoingTasks() {
return trackOngoingTasks;
}

/**
* Returns the set of currently running tasks and their start timestamp.
* <p>
Expand Down
48 changes: 38 additions & 10 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -141,13 +142,15 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -314,6 +317,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private final Tracer tracer;

private final boolean trackIndexExecutionTime;
private final ConcurrentHashMap<String, LongAdder> indexExecutionTimes = new ConcurrentHashMap<>();

public SearchService(
ClusterService clusterService,
IndicesService indicesService,
Expand Down Expand Up @@ -382,6 +388,10 @@ public SearchService(
enableQueryPhaseParallelCollection = QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection);

Executor searchExecutor = threadPool.executor(Names.SEARCH);
trackIndexExecutionTime = searchExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
&& ((TaskExecutionTimeTrackingEsThreadPoolExecutor) searchExecutor).trackOngoingTasks();
}

private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
Expand Down Expand Up @@ -570,7 +580,7 @@ private <T extends RefCounted> void ensureAfterSeqNoRefreshed(
final Executor executor = getExecutor(shard);
try {
if (waitForCheckpoint <= UNASSIGNED_SEQ_NO) {
runAsync(executor, executable, listener);
runAsync(executor, executable, listener, shard);
return;
}
if (shard.indexSettings().getRefreshInterval().getMillis() <= 0) {
Expand Down Expand Up @@ -642,7 +652,7 @@ private void searchReady() {
if (timeoutTask != null) {
timeoutTask.cancel();
}
runAsync(executor, executable, listener);
runAsync(executor, executable, listener, shard);
}
}
});
Expand All @@ -664,12 +674,30 @@ private IndexShard getShard(ShardSearchRequest request) {
return indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
}

private static <T extends RefCounted> void runAsync(
public Map<String, LongAdder> getIndexExecutionTimes() {
return indexExecutionTimes;
}

private <T extends RefCounted> void runAsync(
Executor executor,
CheckedSupplier<T, Exception> executable,
ActionListener<T> listener
ActionListener<T> listener,
IndexShard indexShard
) {
executor.execute(ActionRunnable.supplyAndDecRef(listener, executable));
if (trackIndexExecutionTime) {
String indexName = indexShard.shardId().getIndexName();
indexExecutionTimes.putIfAbsent(indexName, new LongAdder());
LongAdder indexExecutionTime = indexExecutionTimes.get(indexName);
executor.execute(ActionRunnable.supplyAndDecRef(listener, () -> {
long start = System.nanoTime();
var result = executable.get();
long total = System.nanoTime() - start;
indexExecutionTime.add(total);
return result;
}));
} else {
executor.execute(ActionRunnable.supplyAndDecRef(listener, executable));
}
}

/**
Expand Down Expand Up @@ -745,7 +773,7 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
// we handle the failure in the failure listener below
throw e;
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard());
}

private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
Expand Down Expand Up @@ -795,7 +823,7 @@ public void executeQueryPhase(
// we handle the failure in the failure listener below
throw e;
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard());
}

/**
Expand Down Expand Up @@ -838,7 +866,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
// we handle the failure in the failure listener below
throw e;
}
}, wrapFailureListener(l, readerContext, markAsUsed));
}, wrapFailureListener(l, readerContext, markAsUsed), readerContext.indexShard());
}));
}

Expand Down Expand Up @@ -889,7 +917,7 @@ public void executeFetchPhase(
// we handle the failure in the failure listener below
throw e;
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard());
}

public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
Expand Down Expand Up @@ -921,7 +949,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
// we handle the failure in the failure listener below
throw e;
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}, wrapFailureListener(listener, readerContext, markAsUsed), readerContext.indexShard());
}

protected void checkCancelled(SearchShardTask task) {
Expand Down

0 comments on commit ec9387c

Please sign in to comment.