Skip to content

Commit

Permalink
Improve Pulsar listener container concurrency configuration
Browse files Browse the repository at this point in the history
This is a follow-up to gh-42062 that utilizes newly introduced
`concurrency` property in `PulsarContainerProperties` to simplify
auto-configuration support for Pulsar listener container concurrency.

See: spring-projects/spring-pulsar#820

See gh-42120
  • Loading branch information
vpavic authored and wilkinsona committed Sep 5, 2024
1 parent 0bc2761 commit 4eba42f
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
* @author Alexander Preuß
* @author Phillip Webb
* @author Jonas Geiregat
* @author Vedran Pavic
* @since 3.2.0
*/
@AutoConfiguration
Expand Down Expand Up @@ -188,10 +187,7 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
}
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
this.propertiesMapper.customizeContainerProperties(containerProperties);
ConcurrentPulsarListenerContainerFactory<Object> listenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(
pulsarConsumerFactory, containerProperties);
this.propertiesMapper.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
return listenerContainerFactory;
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.json.JsonWriter;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
Expand Down Expand Up @@ -197,17 +196,10 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie
PulsarProperties.Listener properties = this.properties.getListener();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
map.from(properties::getConcurrency).to(containerProperties::setConcurrency);
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
}

@SuppressWarnings("removal")
<T> void customizeConcurrentPulsarListenerContainerFactory(
ConcurrentPulsarListenerContainerFactory<T> listenerContainerFactory) {
PulsarProperties.Listener properties = this.properties.getListener();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getConcurrency).to(listenerContainerFactory::setConcurrency);
}

<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
PulsarProperties.Reader properties = this.properties.getReader();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
Expand Down Expand Up @@ -264,29 +263,19 @@ void customizeContainerProperties() {
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
properties.getConsumer().getSubscription().setName("my-subscription");
properties.getListener().setSchemaType(SchemaType.AVRO);
properties.getListener().setConcurrency(10);
properties.getListener().setObservationEnabled(true);
properties.getTransaction().setEnabled(true);
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
assertThat(containerProperties.isObservationEnabled()).isTrue();
assertThat(containerProperties.transactions().isEnabled()).isTrue();
}

@Test
@SuppressWarnings("removal")
void customizeConcurrentPulsarListenerContainerFactory() {
PulsarProperties properties = new PulsarProperties();
properties.getListener().setConcurrency(10);
ConcurrentPulsarListenerContainerFactory<?> listenerContainerFactory = mock(
ConcurrentPulsarListenerContainerFactory.class);
new PulsarPropertiesMapper(properties)
.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
then(listenerContainerFactory).should().setConcurrency(10);
}

@Test
@SuppressWarnings("unchecked")
void customizeReaderBuilder() {
Expand Down

0 comments on commit 4eba42f

Please sign in to comment.