diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java index e810544bd0..c3ad14f829 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java @@ -406,9 +406,11 @@ private void monitor(MeterRegistry registry, ForkJoinPool fj) { + "underestimates the actual total number of steals when the pool " + "is not quiescent") .register(registry), - Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount) + Gauge + .builder(metricPrefix + "executor.queued", fj, + pool -> pool.getQueuedTaskCount() + pool.getQueuedSubmissionCount()) .tags(tags) - .description("An estimate of the total number of tasks currently held in queues by worker threads") + .description("The approximate number of tasks that are queued for execution") .register(registry), Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount) diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetricsTest.java index c8af1a4d8b..03ecb3b831 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetricsTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetricsTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.params.provider.CsvSource; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import static org.assertj.core.api.AssertionsForClassTypes.*; import static org.awaitility.Awaitility.await; @@ -302,6 +303,32 @@ void monitorScheduledExecutorServiceWithRepetitiveTasks(String metricPrefix, Str assertThat(registry.get(expectedMetricPrefix + "executor.idle").tags(userTags).timer().count()).isEqualTo(0L); } + @Test + @Issue("#5650") + void queuedSubmissionsAreIncludedInExecutorQueuedMetric() { + ForkJoinPool pool = new ForkJoinPool(1, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false, 1, 1, 1, + a -> true, 555, TimeUnit.MILLISECONDS); + ExecutorServiceMetrics.monitor(registry, pool, "myForkJoinPool"); + AtomicBoolean busy = new AtomicBoolean(true); + + // will be an active task + pool.execute(() -> { + while (busy.get()) { + } + }); + + // will be queued for submission + pool.execute(() -> { + }); + pool.execute(() -> { + }); + + double queued = registry.get("executor.queued").tag("name", "myForkJoinPool").gauge().value(); + busy.set(false); + + assertThat(queued).isEqualTo(2.0); + } + @SuppressWarnings("unchecked") private T monitorExecutorService(String executorName, String metricPrefix, T exec) { if (metricPrefix == null) {