Skip to content

Commit

Permalink
Fix ThreadRegistry#register behavior to ensure correct Prom metrics (a…
Browse files Browse the repository at this point in the history
…pache#4300)

* Make tests fail

* Fix ThreadRegistry#register to ensure correct Prom metrics

* Change style to match BK standards

* Fix tests

---------

Co-authored-by: Nicolò Boschi <boschi1997@gmail.com>
  • Loading branch information
michaeljmarshall and nicoloboschi authored May 22, 2024
1 parent 4e00c8f commit 1d82b92
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.mockito.stubbing.Answer;

/**
Expand Down Expand Up @@ -172,6 +173,10 @@ private static Answer<ScheduledFuture<?>> answerDelay(MockExecutorController exe

private static Answer<Future<?>> answerNow() {
return invocationOnMock -> {
// this method executes everything in the caller thread
// this messes up assertions that verify
// that a thread is part of only a threadpool
ThreadRegistry.forceClearRegistrationForTests(Thread.currentThread().getId());

Runnable task = invocationOnMock.getArgument(0);
task.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ public synchronized void start() {
bookieThread = new BookieCriticalThread(() -> run(), "Bookie-" + conf.getBookiePort());
bookieThread.setDaemon(true);

ThreadRegistry.register("BookieThread", 0);
ThreadRegistry.register("BookieThread", true);
if (LOG.isDebugEnabled()) {
LOG.debug("I'm starting a bookie with journal directories {}",
journalDirectories.stream().map(File::getName).collect(Collectors.joining(", ")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public ForceWriteThread(Consumer<Void> threadToNotifyOnEx,
@Override
public void run() {
LOG.info("ForceWrite Thread started");
ThreadRegistry.register(super.getName(), 0);
ThreadRegistry.register(super.getName());

if (conf.isBusyWaitEnabled()) {
try {
Expand Down Expand Up @@ -955,7 +955,7 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
*/
public void run() {
LOG.info("Starting journal on {}", journalDirectory);
ThreadRegistry.register(journalThreadName, 0);
ThreadRegistry.register(journalThreadName);

if (conf.isBusyWaitEnabled()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,16 @@ public SyncThread(ServerConfiguration conf,
this.checkpointSource = checkpointSource;
this.executor = newExecutor();
this.syncExecutorTime = statsLogger.getThreadScopedCounter("sync-thread-time");
this.executor.submit(() -> ThreadRegistry.register(executorName, 0));
}

@VisibleForTesting
static ScheduledExecutorService newExecutor() {
return Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(executorName));
return Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(executorName) {
@Override
protected Thread newThread(Runnable r, String name) {
return super.newThread(ThreadRegistry.registerThread(r, executorName), name);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage

private static String dbStoragerExecutorName = "db-storage";
private final ExecutorService executor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory(dbStoragerExecutorName));
new DefaultThreadFactory(dbStoragerExecutorName) {
@Override
protected Thread newThread(Runnable r, String name) {
return super.newThread(ThreadRegistry.registerThread(r, dbStoragerExecutorName), name);
}
});

// Executor used to for db index cleanup
private final ScheduledExecutorService cleanupExecutor = Executors
Expand Down Expand Up @@ -218,7 +223,6 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
flushExecutorTime = ledgerIndexDirStatsLogger.getThreadScopedCounter("db-storage-thread-time");

executor.submit(() -> {
ThreadRegistry.register(dbStoragerExecutorName, 0);
// ensure the metric gets registered on start-up as this thread only executes
// when the write cache is full which may not happen or not for a long time
flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.EventLoopUtil;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -122,7 +123,12 @@ class BookieNettyServer {

if (!conf.isDisableServerSocketBind()) {
this.eventLoopGroup = EventLoopUtil.getServerEventLoopGroup(conf,
new DefaultThreadFactory("bookie-io"));
new DefaultThreadFactory("bookie-io") {
@Override
protected Thread newThread(Runnable r, String name) {
return super.newThread(ThreadRegistry.registerThread(r, "bookie-id"), name);
}
});
this.acceptorGroup = EventLoopUtil.getServerAcceptorGroup(conf,
new DefaultThreadFactory("bookie-acceptor"));
allChannels = new CleanupChannelGroup(eventLoopGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.PortManager;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class LedgerStorageCheckpointTest {

@Before
public void setUp() throws Exception {
ThreadRegistry.clear();
LOG.info("Setting up test {}", getClass());

try {
Expand Down Expand Up @@ -128,6 +130,7 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
ThreadRegistry.clear();
LOG.info("TearDown");

sortedLedgerStorageMockedStatic.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.server.Main;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.PortManager;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -242,6 +243,11 @@ public void tearDown() throws Exception {
}
}

@After
public void clearMetricsThreadRegistry() throws Exception {
ThreadRegistry.clear();
}

/**
* Start zookeeper cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,52 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* For mapping thread ids to thread pools and threads within those pools
* or just for lone named threads. Thread scoped metrics add labels to
* metrics by retrieving the ThreadPoolThread object from this registry.
* For flexibility, this registry is not based on TLS.
*/
public class ThreadRegistry {
private static Logger logger = LoggerFactory.getLogger(ThreadRegistry.class);
private static ConcurrentMap<Long, ThreadPoolThread> threadPoolMap = new ConcurrentHashMap<>();
private static ConcurrentMap<String, Integer> threadPoolThreadMap = new ConcurrentHashMap<>();

/*
Threads can register themselves as their first act before carrying out
any work. By calling this method, the ThreadPoolThread is incremented
for the given thread pool.
*/
public static void register(String threadPool) {
register(threadPool, false);
}

public static void register(String threadPool, boolean force) {
Integer threadPoolThread = threadPoolThreadMap.compute(threadPool, (k, v) -> v == null ? 0 : v + 1);
if (force) {
threadPoolMap.remove(Thread.currentThread().getId());
}
register(threadPool, threadPoolThread, Thread.currentThread().getId());
}

/**
* In some tests we run in the same thread activities that should
* run in different threads from different thread-pools
* this would trigger assertions to fail.
* This is a convenience method to work around such cases.
* This method shouldn't be used in production code.
*/
public static void forceClearRegistrationForTests(long threadId) {
threadPoolMap.compute(threadId, (id, value) -> {
if (value != null) {
logger.info("Forcibly clearing registry entry {} for thread id {}", value, id);
}
return null;
});
}

/*
Threads can register themselves as their first act before carrying out
Expand All @@ -37,17 +75,30 @@ public static void register(String threadPool, int threadPoolThread) {

/*
Thread factories can register a thread by its id.
The assumption is that one thread belongs only to one threadpool.
The doesn't hold in tests, in which we use mock Executors that
run the code in the same thread as the caller
*/
public static void register(String threadPool, int threadPoolThread, long threadId) {
ThreadPoolThread tpt = new ThreadPoolThread(threadPool, threadPoolThread, threadId);
threadPoolMap.put(threadId, tpt);
ThreadPoolThread previous = threadPoolMap.put(threadId, tpt);
if (previous != null) {
throw new IllegalStateException("Thread " + threadId + " was already registered in thread pool "
+ previous.threadPool + " as thread " + previous.ordinal + " with threadId " + previous.threadId
+ " trying to overwrite with " + threadPool + " and ordinal " + threadPoolThread);
}
}

public static Runnable registerThread(Runnable runnable, String threadPool) {
return new RegisteredRunnable(threadPool, runnable);
}

/*
Clears all stored thread state.
*/
public static void clear() {
threadPoolMap.clear();
threadPoolThreadMap.clear();
}

/*
Expand Down Expand Up @@ -79,4 +130,20 @@ public int getOrdinal() {
return ordinal;
}
}

private static class RegisteredRunnable implements Runnable {
private final String threadPool;
private final Runnable runnable;

public RegisteredRunnable(String threadPool, Runnable runnable) {
this.threadPool = threadPool;
this.runnable = runnable;
}

@Override
public void run() {
register(threadPool);
runnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,9 @@ protected void onRemoval(LocalData value) throws Exception {
}
};
}

@Override
public String toString() {
return "DataSketchesOpStatsLogger{labels=" + labels + ", id=" + System.identityHashCode(this) + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import org.apache.bookkeeper.stats.OpStatsData;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* OpStatsLogger implementation that lazily registers OpStatsLoggers per thread
* with added labels for the threadpool/thresd name and thread no.
*/
public class ThreadScopedDataSketchesStatsLogger implements OpStatsLogger {

private static Logger logger = LoggerFactory.getLogger(ThreadScopedDataSketchesStatsLogger.class);

private ThreadLocal<DataSketchesOpStatsLogger> statsLoggers;
private DataSketchesOpStatsLogger defaultStatsLogger;
private Map<String, String> originalLabels;
Expand Down Expand Up @@ -95,16 +99,34 @@ private DataSketchesOpStatsLogger getStatsLogger() {
if (!statsLogger.isThreadInitialized()) {
ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get();
if (tpt == null) {
logger.warn("Thread {} was not registered in the thread registry. Using default stats logger {}.",
Thread.currentThread(), defaultStatsLogger);
statsLoggers.set(defaultStatsLogger);
provider.opStats.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger);
DataSketchesOpStatsLogger previous = provider.opStats
.put(new ScopeContext(scopeContext.getScope(), originalLabels), defaultStatsLogger);
// If we overwrite a logger, metrics will not be collected correctly
if (previous != null && previous != defaultStatsLogger) {
logger.error("Invalid state for thead " + Thread.currentThread() + ". Overwrote a stats logger."
+ "New is {}, previous was {}",
defaultStatsLogger, previous);
throw new IllegalStateException("Invalid state. Overwrote a stats logger.");
}
return defaultStatsLogger;
} else {
Map<String, String> threadScopedlabels = new HashMap<>(originalLabels);
threadScopedlabels.put("threadPool", tpt.getThreadPool());
threadScopedlabels.put("thread", String.valueOf(tpt.getOrdinal()));

statsLogger.initializeThread(threadScopedlabels);
provider.opStats.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger);
DataSketchesOpStatsLogger previous = provider.opStats
.put(new ScopeContext(scopeContext.getScope(), threadScopedlabels), statsLogger);
// If we overwrite a logger, metrics will not be collected correctly
if (previous != null && previous != statsLogger) {
logger.error("Invalid state for thead " + Thread.currentThread() + ". Overwrote a stats logger."
+ "New is {}, previous was {}",
defaultStatsLogger, previous);
throw new IllegalStateException("Invalid state. Overwrote a stats logger.");
}
}
}

Expand Down

0 comments on commit 1d82b92

Please sign in to comment.