Skip to content

Commit

Permalink
Add subscription name to Pulsar mapped config props
Browse files Browse the repository at this point in the history
The subscription name config prop was not being set on the Pulsar
listener container properties. This commit adds the subscription
name to the Pulsar property mappers.

See gh-42067
  • Loading branch information
onobc authored and wilkinsona committed Sep 5, 2024
1 parent f024c19 commit 62ef81b
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private void customizePulsarContainerConsumerSubscriptionProperties(PulsarContai
PulsarProperties.Consumer.Subscription properties = this.properties.getConsumer().getSubscription();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getType).to(containerProperties::setSubscriptionType);
map.from(properties::getName).to(containerProperties::setSubscriptionName);
}

private void customizePulsarContainerListenerProperties(PulsarContainerProperties containerProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private void customizePulsarContainerConsumerSubscriptionProperties(
PulsarProperties.Consumer.Subscription properties = this.properties.getConsumer().getSubscription();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getType).to(containerProperties::setSubscriptionType);
map.from(properties::getName).to(containerProperties::setSubscriptionName);
}

private void customizePulsarContainerListenerProperties(ReactivePulsarContainerProperties<?> containerProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,14 @@ void customizeConsumerBuilder() {
void customizeContainerProperties() {
PulsarProperties properties = new PulsarProperties();
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
properties.getConsumer().getSubscription().setName("my-subscription");
properties.getListener().setSchemaType(SchemaType.AVRO);
properties.getListener().setObservationEnabled(true);
properties.getTransaction().setEnabled(true);
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(containerProperties.isObservationEnabled()).isTrue();
assertThat(containerProperties.transactions().isEnabled()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,13 @@ void customizeMessageConsumerBuilder() {
void customizeContainerProperties() {
PulsarProperties properties = new PulsarProperties();
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
properties.getConsumer().getSubscription().setName("my-subscription");
properties.getListener().setSchemaType(SchemaType.AVRO);
properties.getListener().setConcurrency(10);
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
new PulsarReactivePropertiesMapper(properties).customizeContainerProperties(containerProperties);
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
}
Expand Down

0 comments on commit 62ef81b

Please sign in to comment.