Skip to content

Commit

Permalink
Merge pull request #35483 from geoand/#34998
Browse files Browse the repository at this point in the history
Properly report Vertx worker pool size
  • Loading branch information
geoand authored Aug 24, 2023
2 parents 21b1a73 + 8b06077 commit 416a731
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;

import org.jboss.logging.Logger;
import org.jboss.threads.ContextHandler;
Expand Down Expand Up @@ -151,10 +152,9 @@ private static EnhancedQueueExecutor createExecutor(ThreadPoolConfig threadPoolC
.setRegisterMBean(false)
.setHandoffExecutor(JBossExecutors.rejectingExecutor())
.setThreadFactory(JBossExecutors.resettingThreadFactory(threadFactory));
final int cpus = ProcessorInfo.availableProcessors();
// run time config variables
builder.setCorePoolSize(threadPoolConfig.coreThreads);
builder.setMaximumPoolSize(threadPoolConfig.maxThreads.orElse(Math.max(8 * cpus, 200)));
builder.setMaximumPoolSize(getMaxSize(threadPoolConfig));
if (threadPoolConfig.queueSize.isPresent()) {
if (threadPoolConfig.queueSize.getAsInt() < 0) {
builder.setMaximumQueueSize(Integer.MAX_VALUE);
Expand All @@ -172,6 +172,35 @@ private static EnhancedQueueExecutor createExecutor(ThreadPoolConfig threadPoolC
return builder.build();
}

public static int getMaxSize(ThreadPoolConfig threadPoolConfig) {
return threadPoolConfig.maxThreads.orElseGet(MaxThreadsCalculator.INSTANCE);
}

public static int calculateMaxThreads() {
return MaxThreadsCalculator.INSTANCE.getAsInt();
}

/**
* NOTE: This is not folded at native image build time, so it works as expected
*/
private static final class MaxThreadsCalculator implements IntSupplier {

private static final MaxThreadsCalculator INSTANCE = new MaxThreadsCalculator();

private MaxThreadsCalculator() {
}

@Override
public int getAsInt() {
return Holder.CALCULATION;
}

private static class Holder {
private static final int DEFAULT_MAX_THREADS = 200;
private static final int CALCULATION = Math.max(8 * ProcessorInfo.availableProcessors(), DEFAULT_MAX_THREADS);
}
}

public static Executor getCurrent() {
return current;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,12 @@ public class ThreadPoolConfig {
@ConfigItem(defaultValue = "30")
public Duration keepAliveTime;

public static ThreadPoolConfig empty() {
var config = new ThreadPoolConfig();
config.maxThreads = OptionalInt.empty();
config.queueSize = OptionalInt.empty();
config.shutdownCheckInterval = Optional.empty();
return config;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.quarkus.runtime.QuarkusBindException;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.ThreadPoolConfig;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.configuration.ConfigInstantiator;
import io.quarkus.runtime.configuration.ConfigUtils;
Expand Down Expand Up @@ -250,7 +251,11 @@ public static void startServerAfterFailedStart() {
.addDiscoveredSources()
.withMapping(VertxConfiguration.class)
.build().getConfigMapping(VertxConfiguration.class);
vertx = VertxCoreRecorder.recoverFailedStart(vertxConfiguration).get();

ThreadPoolConfig threadPoolConfig = new ThreadPoolConfig();
ConfigInstantiator.handleObject(threadPoolConfig);

vertx = VertxCoreRecorder.recoverFailedStart(vertxConfiguration, threadPoolConfig).get();
} else {
vertx = supplier.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
import io.quarkus.gizmo.Gizmo;
import io.quarkus.netty.deployment.EventLoopSupplierBuildItem;
import io.quarkus.runtime.ThreadPoolConfig;
import io.quarkus.vertx.VertxOptionsCustomizer;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.core.runtime.VertxLocalsHelper;
Expand Down Expand Up @@ -215,6 +216,7 @@ IOThreadDetectorBuildItem ioThreadDetector(VertxCoreRecorder recorder) {
CoreVertxBuildItem build(VertxCoreRecorder recorder,
LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown, VertxConfiguration config,
List<VertxOptionsConsumerBuildItem> vertxOptionsConsumers,
ThreadPoolConfig threadPoolConfig,
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
BuildProducer<EventLoopSupplierBuildItem> eventLoops,
ExecutorBuildItem executorBuildItem) {
Expand All @@ -225,7 +227,7 @@ CoreVertxBuildItem build(VertxCoreRecorder recorder,
consumers.add(x.getConsumer());
}

Supplier<Vertx> vertx = recorder.configureVertx(config,
Supplier<Vertx> vertx = recorder.configureVertx(config, threadPoolConfig,
launchMode.getLaunchMode(), shutdown, consumers, executorBuildItem.getExecutorProxy());
syntheticBeans.produce(SyntheticBeanBuildItem.configure(Vertx.class)
.types(Vertx.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import org.jboss.logging.Logger;
import org.jboss.threads.EnhancedQueueExecutor;
import org.jboss.threads.JBossExecutors;
import org.wildfly.common.cpu.ProcessorInfo;

import io.quarkus.runtime.ExecutorRecorder;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.vertx.core.runtime.config.VertxConfiguration;
import io.vertx.core.spi.ExecutorServiceFactory;
Expand Down Expand Up @@ -47,10 +47,9 @@ private ExecutorService internalCreateExecutor(ThreadFactory threadFactory, Inte
.setRegisterMBean(false)
.setHandoffExecutor(JBossExecutors.rejectingExecutor())
.setThreadFactory(JBossExecutors.resettingThreadFactory(threadFactory));
final int cpus = ProcessorInfo.availableProcessors();
// run time config variables
builder.setCorePoolSize(concurrency);
builder.setMaximumPoolSize(maxConcurrency != null ? maxConcurrency : Math.max(8 * cpus, 200));
builder.setMaximumPoolSize(maxConcurrency != null ? maxConcurrency : ExecutorRecorder.calculateMaxThreads());

if (conf != null) {
if (conf.queueSize().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import io.netty.util.concurrent.FastThreadLocal;
import io.quarkus.arc.Arc;
import io.quarkus.arc.InstanceHandle;
import io.quarkus.runtime.ExecutorRecorder;
import io.quarkus.runtime.IOThreadDetector;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.ThreadPoolConfig;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.vertx.core.runtime.config.AddressResolverConfiguration;
import io.quarkus.vertx.core.runtime.config.ClusterConfiguration;
Expand Down Expand Up @@ -95,12 +97,12 @@ public class VertxCoreRecorder {
*/
private static volatile ClassLoader currentDevModeNewThreadCreationClassLoader;

public Supplier<Vertx> configureVertx(VertxConfiguration config,
public Supplier<Vertx> configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
if (launchMode != LaunchMode.DEVELOPMENT) {
vertx = new VertxSupplier(launchMode, config, customizers, shutdown);
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
// we need this to be part of the last shutdown tasks because closing it early (basically before Arc)
// could cause problem to beans that rely on Vert.x and contain shutdown tasks
shutdown.addLastShutdownTask(new Runnable() {
Expand All @@ -113,7 +115,7 @@ public void run() {
});
} else {
if (vertx == null) {
vertx = new VertxSupplier(launchMode, config, customizers, shutdown);
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
} else if (vertx.v != null) {
tryCleanTccl();
}
Expand Down Expand Up @@ -195,13 +197,14 @@ public static Supplier<Vertx> getVertx() {
return vertx;
}

public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer customizer, ShutdownContext shutdown,
public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer customizer,
ThreadPoolConfig threadPoolConfig, ShutdownContext shutdown,
LaunchMode launchMode) {

VertxOptions options = new VertxOptions();

if (conf != null) {
convertToVertxOptions(conf, options, true, shutdown);
convertToVertxOptions(conf, options, threadPoolConfig, true, shutdown);
}

// Allow extension customizers to do their thing
Expand Down Expand Up @@ -293,7 +296,8 @@ private static Vertx logVertxInitialization(Vertx vertx) {
return vertx;
}

private static VertxOptions convertToVertxOptions(VertxConfiguration conf, VertxOptions options, boolean allowClustering,
private static VertxOptions convertToVertxOptions(VertxConfiguration conf, VertxOptions options,
ThreadPoolConfig threadPoolConfig, boolean allowClustering,
ShutdownContext shutdown) {

if (!conf.useAsyncDNS()) {
Expand Down Expand Up @@ -349,7 +353,7 @@ public void run() {
}

options.setFileSystemOptions(fileSystemOptions);
options.setWorkerPoolSize(conf.workerPoolSize());
options.setWorkerPoolSize(ExecutorRecorder.getMaxSize(threadPoolConfig));
options.setInternalBlockingPoolSize(conf.internalBlockingPoolSize());
blockingThreadPoolSize = conf.internalBlockingPoolSize();

Expand Down Expand Up @@ -586,30 +590,33 @@ public void runWith(Runnable task, Object context) {
};
}

public static Supplier<Vertx> recoverFailedStart(VertxConfiguration config) {
return vertx = new VertxSupplier(LaunchMode.DEVELOPMENT, config, Collections.emptyList(), null);
public static Supplier<Vertx> recoverFailedStart(VertxConfiguration config, ThreadPoolConfig threadPoolConfig) {
return vertx = new VertxSupplier(LaunchMode.DEVELOPMENT, config, Collections.emptyList(), threadPoolConfig, null);

}

static class VertxSupplier implements Supplier<Vertx> {
final LaunchMode launchMode;
final VertxConfiguration config;
final VertxOptionsCustomizer customizer;
final ThreadPoolConfig threadPoolConfig;
final ShutdownContext shutdown;
Vertx v;

VertxSupplier(LaunchMode launchMode, VertxConfiguration config, List<Consumer<VertxOptions>> customizers,
ThreadPoolConfig threadPoolConfig,
ShutdownContext shutdown) {
this.launchMode = launchMode;
this.config = config;
this.customizer = new VertxOptionsCustomizer(customizers);
this.threadPoolConfig = threadPoolConfig;
this.shutdown = shutdown;
}

@Override
public synchronized Vertx get() {
if (v == null) {
v = initialize(config, customizer, shutdown, launchMode);
v = initialize(config, customizer, threadPoolConfig, shutdown, launchMode);
}
return v;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ public interface VertxConfiguration {
Duration warningExceptionTime();

/**
* The size of the worker thread pool.
* @deprecated use {@code quarkus.thread-pool.max-threads} instead
*/
@WithDefault("20")
@WithDefault("${quarkus.thread-pool.max-threads:20}")
@Deprecated
int workerPoolSize();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.jupiter.api.Test;

import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ThreadPoolConfig;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder.VertxOptionsCustomizer;
import io.quarkus.vertx.core.runtime.config.AddressResolverConfiguration;
import io.quarkus.vertx.core.runtime.config.ClusterConfiguration;
Expand Down Expand Up @@ -98,7 +99,8 @@ public Duration warningExceptionTime() {
};

try {
VertxCoreRecorder.initialize(configuration, null, null, LaunchMode.TEST);

VertxCoreRecorder.initialize(configuration, null, ThreadPoolConfig.empty(), null, LaunchMode.TEST);
Assertions.fail("It should not have a cluster manager on the classpath, and so fail the creation");
} catch (IllegalStateException e) {
Assertions.assertTrue(e.getMessage().contains("No ClusterManagerFactory"),
Expand Down Expand Up @@ -155,7 +157,7 @@ public void accept(VertxOptions vertxOptions) {
}
}));

VertxCoreRecorder.initialize(configuration, customizers, null, LaunchMode.TEST);
VertxCoreRecorder.initialize(configuration, customizers, ThreadPoolConfig.empty(), null, LaunchMode.TEST);
}

@Test
Expand All @@ -168,7 +170,9 @@ public void accept(VertxOptions vertxOptions) {
called.set(true);
}
}));
Vertx v = VertxCoreRecorder.initialize(new DefaultVertxConfiguration(), customizers, null, LaunchMode.TEST);
Vertx v = VertxCoreRecorder.initialize(new DefaultVertxConfiguration(), customizers, ThreadPoolConfig.empty(),
null,
LaunchMode.TEST);
Assertions.assertTrue(called.get(), "Customizer should get called during initialization");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void tearDown() {

@Test
public void shouldNotFailWithoutConfig() {
verifyProducer(VertxCoreRecorder.initialize(null, null, null, LaunchMode.TEST));
verifyProducer(VertxCoreRecorder.initialize(null, null, null, null, LaunchMode.TEST));
}

private void verifyProducer(Vertx v) {
Expand Down

0 comments on commit 416a731

Please sign in to comment.