diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index 961173243002..e7cbd0340e09 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -44,6 +44,7 @@ * @author Chris Bono * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic * @since 3.2.0 */ @ConfigurationProperties("spring.pulsar") @@ -136,6 +137,11 @@ public static class Client { */ private final Authentication authentication = new Authentication(); + /** + * Thread related configuration. + */ + private final Threads threads = new Threads(); + /** * Failover settings. */ @@ -177,6 +183,10 @@ public Authentication getAuthentication() { return this.authentication; } + public Threads getThreads() { + return this.threads; + } + public Failover getFailover() { return this.failover; } @@ -959,6 +969,36 @@ public void setParam(Map param) { } + public static class Threads { + + /** + * Number of threads to be used for handling connections to brokers. + */ + private Integer io; + + /** + * Number of threads to be used for message listeners. + */ + private Integer listener; + + public Integer getIo() { + return this.io; + } + + public void setIo(Integer io) { + this.io = io; + } + + public Integer getListener() { + return this.listener; + } + + public void setListener(Integer listener) { + this.listener = listener; + } + + } + public static class Failover { /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java index aa9f505b4cb2..9665c4cdb942 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java @@ -50,6 +50,7 @@ * @author Chris Bono * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic */ final class PulsarPropertiesMapper { @@ -68,6 +69,8 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout)); map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout)); map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout)); + map.from(properties.getThreads()::getIo).to(clientBuilder::ioThreads); + map.from(properties.getThreads()::getListener).to(clientBuilder::listenerThreads); map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction); customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication); customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties, diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java index f23584aab0ab..dbacef33f9c3 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java @@ -59,6 +59,7 @@ * @author Chris Bono * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic */ class PulsarPropertiesMapperTests { @@ -69,6 +70,8 @@ void customizeClientBuilderWhenHasNoAuthentication() { properties.getClient().setConnectionTimeout(Duration.ofSeconds(1)); properties.getClient().setOperationTimeout(Duration.ofSeconds(2)); properties.getClient().setLookupTimeout(Duration.ofSeconds(3)); + properties.getClient().getThreads().setIo(3); + properties.getClient().getThreads().setListener(10); ClientBuilder builder = mock(ClientBuilder.class); new PulsarPropertiesMapper(properties).customizeClientBuilder(builder, new PropertiesPulsarConnectionDetails(properties)); @@ -76,6 +79,8 @@ void customizeClientBuilderWhenHasNoAuthentication() { then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS); then(builder).should().operationTimeout(2000, TimeUnit.MILLISECONDS); then(builder).should().lookupTimeout(3000, TimeUnit.MILLISECONDS); + then(builder).should().ioThreads(3); + then(builder).should().listenerThreads(10); } @Test diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java index 0c173b3567cb..6ef42ef83452 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java @@ -54,6 +54,7 @@ * @author Soby Chacko * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic */ class PulsarPropertiesTests { @@ -88,6 +89,16 @@ void bindAuthentication() { assertThat(properties.getAuthentication().getParam()).containsEntry("token", "1234"); } + @Test + void bindThread() { + Map map = new HashMap<>(); + map.put("spring.pulsar.client.threads.io", "3"); + map.put("spring.pulsar.client.threads.listener", "10"); + PulsarProperties.Client properties = bindProperties(map).getClient(); + assertThat(properties.getThreads().getIo()).isEqualTo(3); + assertThat(properties.getThreads().getListener()).isEqualTo(10); + } + @Test void bindFailover() { Map map = new HashMap<>();