From 7601951ff63241ef0c7ceade60d479997dd1ae8e Mon Sep 17 00:00:00 2001 From: Vedran Pavic Date: Wed, 28 Aug 2024 23:31:58 +0200 Subject: [PATCH] Add support for configuring Pulsar listener container concurrency This commit adds configuration property that allows users to configure Pulsar message listener container concurrency. --- .../pulsar/PulsarAutoConfiguration.java | 6 +++++- .../boot/autoconfigure/pulsar/PulsarProperties.java | 13 +++++++++++++ .../pulsar/PulsarPropertiesMapper.java | 8 ++++++++ .../pulsar/PulsarReactivePropertiesMapper.java | 4 +++- .../pulsar/PulsarPropertiesMapperTests.java | 12 ++++++++++++ .../autoconfigure/pulsar/PulsarPropertiesTests.java | 2 ++ .../pulsar/PulsarReactivePropertiesMapperTests.java | 5 ++++- 7 files changed, 47 insertions(+), 3 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 5b5f9dc41235..9d2f4d88d319 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -69,6 +69,7 @@ * @author Alexander Preuß * @author Phillip Webb * @author Jonas Geiregat + * @author Vedran Pavic * @since 3.2.0 */ @AutoConfiguration @@ -187,7 +188,10 @@ ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( } pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager); this.propertiesMapper.customizeContainerProperties(containerProperties); - return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); + ConcurrentPulsarListenerContainerFactory listenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>( + pulsarConsumerFactory, containerProperties); + this.propertiesMapper.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory); + return listenerContainerFactory; } @Bean diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index e7cbd0340e09..3972eec5029f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -811,6 +811,11 @@ public static class Listener { */ private SchemaType schemaType; + /** + * Number of threads used by listener container. + */ + private Integer concurrency; + /** * Whether to record observations for when the Observations API is available and * the client supports it. @@ -825,6 +830,14 @@ public void setSchemaType(SchemaType schemaType) { this.schemaType = schemaType; } + public Integer getConcurrency() { + return this.concurrency; + } + + public void setConcurrency(Integer concurrency) { + this.concurrency = concurrency; + } + public boolean isObservationEnabled() { return this.observationEnabled; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java index 9665c4cdb942..d02d04f457d9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java @@ -39,6 +39,7 @@ import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.json.JsonWriter; +import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; import org.springframework.pulsar.reader.PulsarReaderContainerProperties; @@ -198,6 +199,13 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled); } + void customizeConcurrentPulsarListenerContainerFactory( + ConcurrentPulsarListenerContainerFactory listenerContainerFactory) { + PulsarProperties.Listener properties = this.properties.getListener(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(properties::getConcurrency).to(listenerContainerFactory::setConcurrency); + } + void customizeReaderBuilder(ReaderBuilder readerBuilder) { PulsarProperties.Reader properties = this.properties.getReader(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java index 2f79bbae615f..9abf379a5198 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 the original author or authors. + * Copyright 2012-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ * * @author Chris Bono * @author Phillip Webb + * @author Vedran Pavic */ final class PulsarReactivePropertiesMapper { @@ -93,6 +94,7 @@ private void customizePulsarContainerListenerProperties(ReactivePulsarContainerP PulsarProperties.Listener properties = this.properties.getListener(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(properties::getSchemaType).to(containerProperties::setSchemaType); + map.from(properties::getConcurrency).to(containerProperties::setConcurrency); } void customizeMessageReaderBuilder(ReactiveMessageReaderBuilder builder) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java index dbacef33f9c3..79e9818685a0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java @@ -41,6 +41,7 @@ import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer; import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster; +import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; @@ -272,6 +273,17 @@ void customizeContainerProperties() { assertThat(containerProperties.transactions().isEnabled()).isTrue(); } + @Test + void customizeConcurrentPulsarListenerContainerFactory() { + PulsarProperties properties = new PulsarProperties(); + properties.getListener().setConcurrency(10); + ConcurrentPulsarListenerContainerFactory listenerContainerFactory = mock( + ConcurrentPulsarListenerContainerFactory.class); + new PulsarPropertiesMapper(properties) + .customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory); + then(listenerContainerFactory).should().setConcurrency(10); + } + @Test @SuppressWarnings("unchecked") void customizeReaderBuilder() { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java index 6ef42ef83452..72fbdd0e73f4 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java @@ -393,9 +393,11 @@ class ListenerProperties { void bind() { Map map = new HashMap<>(); map.put("spring.pulsar.listener.schema-type", "avro"); + map.put("spring.pulsar.listener.concurrency", "10"); map.put("spring.pulsar.listener.observation-enabled", "true"); PulsarProperties.Listener properties = bindProperties(map).getListener(); assertThat(properties.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(properties.getConcurrency()).isEqualTo(10); assertThat(properties.isObservationEnabled()).isTrue(); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java index df078b21a354..b31de0290322 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2023 the original author or authors. + * Copyright 2012-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ * * @author Chris Bono * @author Phillip Webb + * @author Vedran Pavic */ class PulsarReactivePropertiesMapperTests { @@ -120,10 +121,12 @@ void customizeContainerProperties() { PulsarProperties properties = new PulsarProperties(); properties.getConsumer().getSubscription().setType(SubscriptionType.Shared); properties.getListener().setSchemaType(SchemaType.AVRO); + properties.getListener().setConcurrency(10); ReactivePulsarContainerProperties containerProperties = new ReactivePulsarContainerProperties<>(); new PulsarReactivePropertiesMapper(properties).customizeContainerProperties(containerProperties); assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(containerProperties.getConcurrency()).isEqualTo(10); } @Test