Skip to content

Commit

Permalink
Merge pull request #42067 from onobc
Browse files Browse the repository at this point in the history
* gh-42067:
  Add subscription name to Pulsar mapped config props

Closes gh-42067
  • Loading branch information
wilkinsona committed Sep 5, 2024
2 parents f024c19 + 62ef81b commit 0bc2761
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private void customizePulsarContainerConsumerSubscriptionProperties(PulsarContai
PulsarProperties.Consumer.Subscription properties = this.properties.getConsumer().getSubscription();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getType).to(containerProperties::setSubscriptionType);
map.from(properties::getName).to(containerProperties::setSubscriptionName);
}

private void customizePulsarContainerListenerProperties(PulsarContainerProperties containerProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private void customizePulsarContainerConsumerSubscriptionProperties(
PulsarProperties.Consumer.Subscription properties = this.properties.getConsumer().getSubscription();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getType).to(containerProperties::setSubscriptionType);
map.from(properties::getName).to(containerProperties::setSubscriptionName);
}

private void customizePulsarContainerListenerProperties(ReactivePulsarContainerProperties<?> containerProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,14 @@ void customizeConsumerBuilder() {
void customizeContainerProperties() {
PulsarProperties properties = new PulsarProperties();
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
properties.getConsumer().getSubscription().setName("my-subscription");
properties.getListener().setSchemaType(SchemaType.AVRO);
properties.getListener().setObservationEnabled(true);
properties.getTransaction().setEnabled(true);
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(containerProperties.isObservationEnabled()).isTrue();
assertThat(containerProperties.transactions().isEnabled()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,13 @@ void customizeMessageConsumerBuilder() {
void customizeContainerProperties() {
PulsarProperties properties = new PulsarProperties();
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
properties.getConsumer().getSubscription().setName("my-subscription");
properties.getListener().setSchemaType(SchemaType.AVRO);
properties.getListener().setConcurrency(10);
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
new PulsarReactivePropertiesMapper(properties).customizeContainerProperties(containerProperties);
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
}
Expand Down

0 comments on commit 0bc2761

Please sign in to comment.