diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java index 4942e348bac..7299d1d9311 100644 --- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java @@ -42,6 +42,7 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.stats.ThreadRegistry; import org.mockito.stubbing.Answer; /** @@ -172,6 +173,10 @@ private static Answer<ScheduledFuture<?>> answerDelay(MockExecutorController exe private static Answer<Future<?>> answerNow() { return invocationOnMock -> { + // this method executes everything in the caller thread + // this messes up assertions that verify + // that a thread is part of only a threadpool + ThreadRegistry.forceClearRegistrationForTests(Thread.currentThread().getId()); Runnable task = invocationOnMock.getArgument(0); task.run(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 1514a74ad80..a69df4a1768 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -660,7 +660,7 @@ public synchronized void start() { bookieThread = new BookieCriticalThread(() -> run(), "Bookie-" + conf.getBookiePort()); bookieThread.setDaemon(true); - ThreadRegistry.register("BookieThread", 0); + ThreadRegistry.register("BookieThread", true); if (LOG.isDebugEnabled()) { LOG.debug("I'm starting a bookie with journal directories {}", journalDirectories.stream().map(File::getName).collect(Collectors.joining(", "))); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index de22bf416a3..2d68b8d2da8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -482,7 +482,7 @@ public ForceWriteThread(Consumer<Void> threadToNotifyOnEx, @Override public void run() { LOG.info("ForceWrite Thread started"); - ThreadRegistry.register(super.getName(), 0); + ThreadRegistry.register(super.getName()); if (conf.isBusyWaitEnabled()) { try { @@ -955,7 +955,7 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(), */ public void run() { LOG.info("Starting journal on {}", journalDirectory); - ThreadRegistry.register(journalThreadName, 0); + ThreadRegistry.register(journalThreadName); if (conf.isBusyWaitEnabled()) { try { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java index c3a4c8fc711..3b77cf45f10 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java @@ -82,12 +82,16 @@ public SyncThread(ServerConfiguration conf, this.checkpointSource = checkpointSource; this.executor = newExecutor(); this.syncExecutorTime = statsLogger.getThreadScopedCounter("sync-thread-time"); - this.executor.submit(() -> ThreadRegistry.register(executorName, 0)); } @VisibleForTesting static ScheduledExecutorService newExecutor() { - return Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(executorName)); + return Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(executorName) { + @Override + protected Thread newThread(Runnable r, String name) { + return super.newThread(ThreadRegistry.registerThread(r, executorName), name); + } + }); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 405ffe6f11a..35d4c8caf2c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -117,7 +117,12 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private static String dbStoragerExecutorName = "db-storage"; private final ExecutorService executor = Executors.newSingleThreadExecutor( - new DefaultThreadFactory(dbStoragerExecutorName)); + new DefaultThreadFactory(dbStoragerExecutorName) { + @Override + protected Thread newThread(Runnable r, String name) { + return super.newThread(ThreadRegistry.registerThread(r, dbStoragerExecutorName), name); + } + }); // Executor used to for db index cleanup private final ScheduledExecutorService cleanupExecutor = Executors @@ -218,7 +223,6 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le flushExecutorTime = ledgerIndexDirStatsLogger.getThreadScopedCounter("db-storage-thread-time"); executor.submit(() -> { - ThreadRegistry.register(dbStoragerExecutorName, 0); // ensure the metric gets registered on start-up as this thread only executes // when the write cache is full which may not happen or not for a long time flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java index a5c4b162935..bfad643d1e2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java @@ -79,6 +79,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.processor.RequestProcessor; +import org.apache.bookkeeper.stats.ThreadRegistry; import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.EventLoopUtil; import org.apache.zookeeper.KeeperException; @@ -122,7 +123,12 @@ class BookieNettyServer { if (!conf.isDisableServerSocketBind()) { this.eventLoopGroup = EventLoopUtil.getServerEventLoopGroup(conf, - new DefaultThreadFactory("bookie-io")); + new DefaultThreadFactory("bookie-io") { + @Override + protected Thread newThread(Runnable r, String name) { + return super.newThread(ThreadRegistry.registerThread(r, "bookie-id"), name); + } + }); this.acceptorGroup = EventLoopUtil.getServerAcceptorGroup(conf, new DefaultThreadFactory("bookie-acceptor")); allChannels = new CleanupChannelGroup(eventLoopGroup); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java index c468d2c2dc6..510b7ab975d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java @@ -56,6 +56,7 @@ import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.ThreadRegistry; import org.apache.bookkeeper.test.ZooKeeperUtil; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.PortManager; @@ -97,6 +98,7 @@ public class LedgerStorageCheckpointTest { @Before public void setUp() throws Exception { + ThreadRegistry.clear(); LOG.info("Setting up test {}", getClass()); try { @@ -128,6 +130,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { + ThreadRegistry.clear(); LOG.info("TearDown"); sortedLedgerStorageMockedStatic.close(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 35c1ea2aeb8..83893922d69 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -76,6 +76,7 @@ import org.apache.bookkeeper.replication.ReplicationWorker; import org.apache.bookkeeper.server.Main; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.ThreadRegistry; import org.apache.bookkeeper.util.DiskChecker; import org.apache.bookkeeper.util.PortManager; import org.apache.zookeeper.KeeperException; @@ -242,6 +243,11 @@ public void tearDown() throws Exception { } } + @After + public void clearMetricsThreadRegistry() throws Exception { + ThreadRegistry.clear(); + } + /** * Start zookeeper cluster. * diff --git a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java index e890660ebb1..5bc7259a09f 100644 --- a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java +++ b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java @@ -18,6 +18,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * For mapping thread ids to thread pools and threads within those pools * or just for lone named threads. Thread scoped metrics add labels to @@ -25,7 +28,42 @@ * For flexibility, this registry is not based on TLS. */ public class ThreadRegistry { + private static Logger logger = LoggerFactory.getLogger(ThreadRegistry.class); private static ConcurrentMap<Long, ThreadPoolThread> threadPoolMap = new ConcurrentHashMap<>(); + private static ConcurrentMap<String, Integer> threadPoolThreadMap = new ConcurrentHashMap<>(); + + /* + Threads can register themselves as their first act before carrying out + any work. By calling this method, the ThreadPoolThread is incremented + for the given thread pool. + */ + public static void register(String threadPool) { + register(threadPool, false); + } + + public static void register(String threadPool, boolean force) { + Integer threadPoolThread = threadPoolThreadMap.compute(threadPool, (k, v) -> v == null ? 0 : v + 1); + if (force) { + threadPoolMap.remove(Thread.currentThread().getId()); + } + register(threadPool, threadPoolThread, Thread.currentThread().getId()); + } + + /** + * In some tests we run in the same thread activities that should + * run in different threads from different thread-pools + * this would trigger assertions to fail. + * This is a convenience method to work around such cases. + * This method shouldn't be used in production code. + */ + public static void forceClearRegistrationForTests(long threadId) { + threadPoolMap.compute(threadId, (id, value) -> { + if (value != null) { + logger.info("Forcibly clearing registry entry {} for thread id {}", value, id); + } + return null; + }); + } /* Threads can register themselves as their first act before carrying out @@ -37,10 +75,22 @@ public static void register(String threadPool, int threadPoolThread) { /* Thread factories can register a thread by its id. + The assumption is that one thread belongs only to one threadpool. + The doesn't hold in tests, in which we use mock Executors that + run the code in the same thread as the caller */ public static void register(String threadPool, int threadPoolThread, long threadId) { ThreadPoolThread tpt = new ThreadPoolThread(threadPool, threadPoolThread, threadId); - threadPoolMap.put(threadId, tpt); + ThreadPoolThread previous = threadPoolMap.put(threadId, tpt); + if (previous != null) { + throw new IllegalStateException("Thread " + threadId + " was already registered in thread pool " + + previous.threadPool + " as thread " + previous.ordinal + " with threadId " + previous.threadId + + " trying to overwrite with " + threadPool + " and ordinal " + threadPoolThread); + } + } + + public static Runnable registerThread(Runnable runnable, String threadPool) { + return new RegisteredRunnable(threadPool, runnable); } /* @@ -48,6 +98,7 @@ public static void register(String threadPool, int threadPoolThread, long thread */ public static void clear() { threadPoolMap.clear(); + threadPoolThreadMap.clear(); } /* @@ -79,4 +130,20 @@ public int getOrdinal() { return ordinal; } } + + private static class RegisteredRunnable implements Runnable { + private final String threadPool; + private final Runnable runnable; + + public RegisteredRunnable(String threadPool, Runnable runnable) { + this.threadPool = threadPool; + this.runnable = runnable; + } + + @Override + public void run() { + register(threadPool); + runnable.run(); + } + } } diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java index 015c3d3b247..057fed65c18 100644 --- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java +++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java @@ -212,4 +212,9 @@ protected void onRemoval(LocalData value) throws Exception { } }; } + + @Override + public String toString() { + return "DataSketchesOpStatsLogger{labels=" + labels + ", id=" + System.identityHashCode(this) + "}"; + } } diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java index 53aca527928..cee895b0c54 100644 --- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java +++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java @@ -22,6 +22,8 @@ import org.apache.bookkeeper.stats.OpStatsData; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.ThreadRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * OpStatsLogger implementation that lazily registers OpStatsLoggers per thread @@ -29,6 +31,8 @@ */ public class ThreadScopedDataSketchesStatsLogger implements OpStatsLogger { + private static Logger logger = LoggerFactory.getLogger(ThreadScopedDataSketchesStatsLogger.class); + private ThreadLocal<DataSketchesOpStatsLogger> statsLoggers; private DataSketchesOpStatsLogger defaultStatsLogger; private Map<String, String> originalLabels; @@ -95,8 +99,18 @@ private DataSketchesOpStatsLogger getStatsLogger() { if (!statsLogger.isThreadInitialized()) { ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get(); if (tpt == null) { + logger.warn("Thread {} was not registered in the thread registry. Using default stats logger {}.", + Thread.currentThread(), defaultStatsLogger); statsLoggers.set(defaultStatsLogger); - provider.opStats.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger); + DataSketchesOpStatsLogger previous = provider.opStats + .put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger); + // If we overwrite a logger, metrics will not be collected correctly + if (previous != null && previous != defaultStatsLogger) { + logger.error("Invalid state for thead " + Thread.currentThread() + ". Overwrote a stats logger." + + "New is {}, previous was {}", + defaultStatsLogger, previous); + throw new IllegalStateException("Invalid state. Overwrote a stats logger."); + } return defaultStatsLogger; } else { Map<String, String> threadScopedlabels = new HashMap<>(originalLabels); @@ -104,7 +118,15 @@ private DataSketchesOpStatsLogger getStatsLogger() { threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal())); statsLogger.initializeThread(threadScopedlabels); - provider.opStats.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger); + DataSketchesOpStatsLogger previous = provider.opStats + .put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger); + // If we overwrite a logger, metrics will not be collected correctly + if (previous != null && previous != statsLogger) { + logger.error("Invalid state for thead " + Thread.currentThread() + ". Overwrote a stats logger." + + "New is {}, previous was {}", + defaultStatsLogger, previous); + throw new IllegalStateException("Invalid state. Overwrote a stats logger."); + } } }