From b8571a98798c38e695dbb89228fe7f6786d6e020 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Thu, 19 Sep 2024 10:38:12 +0200 Subject: [PATCH] make ThreadTimer not singleton again This is for multi-deployment environments, like WildFly, where multiple applications may exist and each will have its own timer. --- doc/modules/ROOT/pages/reference/metrics.adoc | 3 +- .../core/timer/ThreadTimer.java | 71 ++++++++----------- .../faulttolerance/core/timer/Timer.java | 7 ++ .../core/timer/TimerLogger.java | 8 +-- .../RealWorldCompletionStageTimeoutTest.java | 2 +- .../core/timeout/TestTimer.java | 5 ++ .../faulttolerance/core/timer/TestTimer.java | 5 ++ .../core/timer/ThreadTimerStressTest.java | 2 +- .../core/timer/ThreadTimerTest.java | 2 +- .../faulttolerance/ExecutorHolder.java | 2 +- .../metrics/MicroProfileMetricsProvider.java | 4 +- .../metrics/MicrometerProvider.java | 6 +- .../standalone/LazyDependencies.java | 2 +- .../standalone/MicrometerAdapter.java | 5 +- .../programmatic/CdiMetricsTimerTest.java | 13 +++- 15 files changed, 80 insertions(+), 57 deletions(-) diff --git a/doc/modules/ROOT/pages/reference/metrics.adoc b/doc/modules/ROOT/pages/reference/metrics.adoc index af490452..3888cfdc 100644 --- a/doc/modules/ROOT/pages/reference/metrics.adoc +++ b/doc/modules/ROOT/pages/reference/metrics.adoc @@ -45,7 +45,8 @@ The behavior of the timer thread can be observed through the following metrics: | Type | `Gauge` | Unit | None | Description | The number of tasks that are currently scheduled (for future execution) on the timer. -| Tags | None +| Tags +a| * `id` - the ID of the timer, to distinguish multiple timers in a multi-application environment |=== == Micrometer Support diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/ThreadTimer.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/ThreadTimer.java index dda2028f..7d2a1524 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/ThreadTimer.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/ThreadTimer.java @@ -10,6 +10,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import io.smallrye.faulttolerance.core.util.RunnableWrapper; @@ -18,10 +19,10 @@ * Starts one thread that processes submitted tasks in a loop and when it's time for a task to run, * it gets submitted to the executor. The default executor is provided by a caller, so the caller * must shut down this timer before shutting down the executor. - *

- * At most one timer may exist. */ public final class ThreadTimer implements Timer { + private static final AtomicInteger COUNTER = new AtomicInteger(0); + private static final Comparator TASK_COMPARATOR = (o1, o2) -> { // two different instances are never equal if (o1 == o2) { @@ -40,7 +41,7 @@ public final class ThreadTimer implements Timer { return System.identityHashCode(o1) < System.identityHashCode(o2) ? -1 : 1; }; - private static volatile ThreadTimer INSTANCE; + private final int id; private final SortedSet tasks = new ConcurrentSkipListSet<>(TASK_COMPARATOR); @@ -57,17 +58,9 @@ public final class ThreadTimer implements Timer { * @param defaultExecutor default {@link Executor} used for running scheduled tasks, unless an executor * is provided when {@linkplain #schedule(long, Runnable, Executor) scheduling} a task */ - public static synchronized ThreadTimer create(Executor defaultExecutor) { - ThreadTimer instance = INSTANCE; - if (instance == null) { - instance = new ThreadTimer(defaultExecutor); - INSTANCE = instance; - return instance; - } - throw new IllegalStateException("Timer already exists"); - } + public ThreadTimer(Executor defaultExecutor) { + this.id = COUNTER.incrementAndGet(); - private ThreadTimer(Executor defaultExecutor) { this.defaultExecutor = checkNotNull(defaultExecutor, "Executor must be set"); this.thread = new Thread(() -> { @@ -112,10 +105,15 @@ private ThreadTimer(Executor defaultExecutor) { LOG.unexpectedExceptionInTimerLoop(e); } } - }, "SmallRye Fault Tolerance Timer"); + }, "SmallRye Fault Tolerance Timer " + id); thread.start(); - LOG.createdTimer(); + LOG.createdTimer(id); + } + + @Override + public int getId() { + return id; } @Override @@ -144,17 +142,13 @@ public int countScheduledTasks() { @Override public void shutdown() throws InterruptedException { if (running.compareAndSet(true, false)) { - try { - LOG.shutdownTimer(); - thread.interrupt(); - thread.join(); - } finally { - INSTANCE = null; - } + LOG.shutdownTimer(id); + thread.interrupt(); + thread.join(); } } - private static class Task implements TimerTask, Runnable { + private class Task implements TimerTask, Runnable { // scheduled: present in the `tasks` queue // running: not present in the `tasks` queue && `runnable != null` // finished or cancelled: not present in the `tasks` queue && `runnable == null` @@ -169,29 +163,22 @@ private static class Task implements TimerTask, Runnable { @Override public boolean isDone() { - ThreadTimer timer = INSTANCE; - if (timer != null) { - boolean queued = timer.tasks.contains(this); - if (queued) { - return false; - } else { - return runnable == null; - } + boolean queued = tasks.contains(this); + if (queued) { + return false; + } else { + return runnable == null; } - return true; // ? } @Override public boolean cancel() { - ThreadTimer timer = INSTANCE; - if (timer != null) { - // can't cancel if it's already running - boolean removed = timer.tasks.remove(this); - if (removed) { - runnable = null; - LOG.cancelledTimerTask(this); - return true; - } + // can't cancel if it's already running + boolean removed = tasks.remove(this); + if (removed) { + runnable = null; + LOG.cancelledTimerTask(this); + return true; } return false; } @@ -211,7 +198,7 @@ public void run() { } } - private static final class TaskWithExecutor extends Task { + private final class TaskWithExecutor extends Task { private final Executor executor; TaskWithExecutor(long startTime, Runnable runnable, Executor executor) { diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/Timer.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/Timer.java index e1d3fa6c..977ad205 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/Timer.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/Timer.java @@ -9,6 +9,13 @@ * {@link #schedule(long, Runnable, Executor)} are executed on the given executor. */ public interface Timer { + /** + * Returns the ID of this timer. Timer IDs are guaranteed to be unique. + * + * @return the ID of this timer + */ + int getId(); + /** * Schedules the {@code task} to be executed in {@code delayInMillis} on this timer's * default {@link Executor}. diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/TimerLogger.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/TimerLogger.java index b75d756e..3cd4b62d 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/TimerLogger.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timer/TimerLogger.java @@ -14,13 +14,13 @@ interface TimerLogger extends BasicLogger { TimerLogger LOG = Logger.getMessageLogger(TimerLogger.class, TimerLogger.class.getPackage().getName()); - @Message(id = NONE, value = "Timer created") + @Message(id = NONE, value = "Timer %s created") @LogMessage(level = Logger.Level.TRACE) - void createdTimer(); + void createdTimer(int id); - @Message(id = NONE, value = "Timer shut down") + @Message(id = NONE, value = "Timer %s shut down") @LogMessage(level = Logger.Level.TRACE) - void shutdownTimer(); + void shutdownTimer(int id); @Message(id = NONE, value = "Scheduled timer task %s to run in %s millis") @LogMessage(level = Logger.Level.TRACE) diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java index 9296fcfa..32fc8c2c 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/RealWorldCompletionStageTimeoutTest.java @@ -54,7 +54,7 @@ public void setUp() { executor = Executors.newSingleThreadExecutor(); timerExecutor = Executors.newSingleThreadExecutor(); - timer = ThreadTimer.create(timerExecutor); + timer = new ThreadTimer(timerExecutor); } @AfterEach diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java index e55e5664..1a7a14c4 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java @@ -29,6 +29,11 @@ boolean timerTaskCancelled() { return timerTaskCancelled.get(); } + @Override + public int getId() { + return 0; + } + @Override public TimerTask schedule(long delayInMillis, Runnable task) { if (alreadyUsed.compareAndSet(false, true)) { diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/TestTimer.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/TestTimer.java index 9dbe5d52..e5984cdb 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/TestTimer.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/TestTimer.java @@ -7,6 +7,11 @@ public class TestTimer implements Timer { private final Queue tasks = new ConcurrentLinkedQueue<>(); + @Override + public int getId() { + return 0; + } + @Override public TimerTask schedule(long delayInMillis, Runnable runnable) { Task task = new Task(runnable); diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerStressTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerStressTest.java index d1e15c71..f57095c0 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerStressTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerStressTest.java @@ -35,7 +35,7 @@ public class ThreadTimerStressTest { @BeforeEach public void setUp() throws InterruptedException { executor = Executors.newFixedThreadPool(POOL_SIZE); - timer = ThreadTimer.create(executor); + timer = new ThreadTimer(executor); // precreate all threads in the pool // if we didn't do this, the first few iterations would be dominated diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerTest.java index 3c017878..9d25eebf 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timer/ThreadTimerTest.java @@ -22,7 +22,7 @@ public class ThreadTimerTest { @BeforeEach public void setUp() { executor = Executors.newSingleThreadExecutor(); - timer = ThreadTimer.create(executor); + timer = new ThreadTimer(executor); } @AfterEach diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/ExecutorHolder.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/ExecutorHolder.java index c9ce6b0c..97df5784 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/ExecutorHolder.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/ExecutorHolder.java @@ -25,7 +25,7 @@ public class ExecutorHolder { public ExecutorHolder(AsyncExecutorProvider asyncExecutorProvider) { this.asyncExecutor = asyncExecutorProvider.get(); this.eventLoop = EventLoop.get(); - this.timer = ThreadTimer.create(asyncExecutor); + this.timer = new ThreadTimer(asyncExecutor); this.shouldShutdownAsyncExecutor = asyncExecutorProvider instanceof DefaultAsyncExecutorProvider; } diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java index f55c8df5..450e20e5 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java @@ -11,6 +11,7 @@ import org.eclipse.microprofile.metrics.Metadata; import org.eclipse.microprofile.metrics.MetricRegistry; import org.eclipse.microprofile.metrics.MetricUnits; +import org.eclipse.microprofile.metrics.Tag; import org.eclipse.microprofile.metrics.annotation.RegistryType; import io.smallrye.faulttolerance.ExecutorHolder; @@ -42,7 +43,8 @@ void init() { .withName(MetricsConstants.TIMER_SCHEDULED) .withUnit(MetricUnits.NONE) .build(); - registry.gauge(metadata, executorHolder.getTimer(), Timer::countScheduledTasks); + Timer timer = executorHolder.getTimer(); + registry.gauge(metadata, timer, Timer::countScheduledTasks, new Tag("id", "" + timer.getId())); } @Override diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java index f81e35d7..4371f10f 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java @@ -1,5 +1,6 @@ package io.smallrye.faulttolerance.metrics; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -10,6 +11,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; import io.smallrye.faulttolerance.ExecutorHolder; import io.smallrye.faulttolerance.core.metrics.MeteredOperation; import io.smallrye.faulttolerance.core.metrics.MetricsConstants; @@ -34,7 +36,9 @@ public class MicrometerProvider implements MetricsProvider { @PostConstruct void init() { - registry.gauge(MetricsConstants.TIMER_SCHEDULED, executorHolder.getTimer(), Timer::countScheduledTasks); + Timer timer = executorHolder.getTimer(); + registry.gauge(MetricsConstants.TIMER_SCHEDULED, Collections.singletonList(Tag.of("id", "" + timer.getId())), + timer, Timer::countScheduledTasks); } @Override diff --git a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java index fc84ec48..21c85489 100644 --- a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java +++ b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java @@ -22,7 +22,7 @@ final class LazyDependencies implements BuilderLazyDependencies { this.executor = config.executor(); this.metricsAdapter = config.metricsAdapter(); this.eventLoop = EventLoop.get(); - this.timer = ThreadTimer.create(executor); + this.timer = new ThreadTimer(executor); } @Override diff --git a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/MicrometerAdapter.java b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/MicrometerAdapter.java index 401e6725..887d7afe 100644 --- a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/MicrometerAdapter.java +++ b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/MicrometerAdapter.java @@ -1,9 +1,11 @@ package io.smallrye.faulttolerance.standalone; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; import io.smallrye.faulttolerance.core.metrics.MeteredOperation; import io.smallrye.faulttolerance.core.metrics.MetricsConstants; import io.smallrye.faulttolerance.core.metrics.MetricsProvider; @@ -19,7 +21,8 @@ public MicrometerAdapter(MeterRegistry registry) { } MetricsProvider createMetricsProvider(Timer timer) { - registry.gauge(MetricsConstants.TIMER_SCHEDULED, timer, Timer::countScheduledTasks); + registry.gauge(MetricsConstants.TIMER_SCHEDULED, Collections.singletonList(Tag.of("id", "" + timer.getId())), + timer, Timer::countScheduledTasks); return new MetricsProvider() { private final Map cache = new ConcurrentHashMap<>(); diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiMetricsTimerTest.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiMetricsTimerTest.java index 771a908e..76b955ca 100644 --- a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiMetricsTimerTest.java +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiMetricsTimerTest.java @@ -4,11 +4,13 @@ import static org.awaitility.Awaitility.await; import java.time.temporal.ChronoUnit; +import java.util.SortedMap; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import org.eclipse.microprofile.metrics.Gauge; import org.eclipse.microprofile.metrics.MetricID; import org.eclipse.microprofile.metrics.MetricRegistry; import org.eclipse.microprofile.metrics.annotation.RegistryType; @@ -43,7 +45,7 @@ public void test(@RegistryType(type = MetricRegistry.Type.BASE) MetricRegistry m assertThat(future).isNotCompleted(); await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { - assertThat(metrics.getGauge(new MetricID(MetricsConstants.TIMER_SCHEDULED)).getValue()).isEqualTo(1); + assertThat(findTimerGauge(metrics).getValue()).isEqualTo(1); }); barrier.open(); @@ -51,7 +53,7 @@ public void test(@RegistryType(type = MetricRegistry.Type.BASE) MetricRegistry m assertThat(future).succeedsWithin(2, TimeUnit.SECONDS) .isEqualTo("hello"); - assertThat(metrics.getGauge(new MetricID(MetricsConstants.TIMER_SCHEDULED)).getValue()).isEqualTo(0); + assertThat(findTimerGauge(metrics).getValue()).isEqualTo(0); } public CompletionStage action() throws InterruptedException { @@ -62,4 +64,11 @@ public CompletionStage action() throws InterruptedException { public CompletionStage fallback() { return CompletableFuture.completedFuture("fallback"); } + + private static Gauge findTimerGauge(MetricRegistry metrics) { + SortedMap timers = metrics.getGauges( + (id, metric) -> id.getName().equals(MetricsConstants.TIMER_SCHEDULED)); + assertThat(timers).hasSize(1); + return timers.values().iterator().next(); + } }