diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java index 2c8a6fb31..b408fd8f9 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java @@ -179,6 +179,25 @@ void test_2249() throws Exception { } } + @Test + void ensurePartitioningWorksWhenNativeEncodingEnabled() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + EmptyConfiguration.class)).web(WebApplicationType.NONE).run( + "--spring.cloud.stream.source=outputA;outputB", + "--spring.cloud.stream.bindings.outputA-out-0.producer.partition-count=3", + "--spring.cloud.stream.bindings.outputA-out-0.producer.use-native-encoding=true", + "--spring.cloud.stream.bindings.outputA-out-0.producer.partition-key-expression=headers['partitionKey']")) { + StreamBridge streamBridge = context.getBean(StreamBridge.class); + streamBridge.send("outputA-out-0", MessageBuilder.withPayload("A").setHeader("partitionKey", "A").build()); + streamBridge.send("outputA-out-0", MessageBuilder.withPayload("C").setHeader("partitionKey", "C").build()); + + OutputDestination output = context.getBean(OutputDestination.class); + assertThat(output.receive(1000, "outputA-out-0").getHeaders().containsKey("scst_partition")).isTrue(); + assertThat(output.receive(1000, "outputA-out-0").getHeaders().containsKey("scst_partition")).isTrue(); + } + } + @SuppressWarnings("rawtypes") @Test void test_2785() throws Exception { diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index af76d8f15..c27f68849 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -38,7 +38,9 @@ import org.springframework.cloud.stream.binder.BinderFactory; import org.springframework.cloud.stream.binder.ProducerProperties; import org.springframework.cloud.stream.binding.BindingService; +import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor; import org.springframework.cloud.stream.binding.NewDestinationBindingCallback; +import org.springframework.cloud.stream.config.BindingProperties; import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel; import org.springframework.context.ConfigurableApplicationContext; @@ -260,14 +262,13 @@ synchronized MessageChannel resolveDestination(String destinationName, ProducerP BinderFactory binderFactory = this.applicationContext.getBean(BinderFactory.class); binder = binderFactory.getBinder(binderName, messageChannel.getClass()); } - // Commenting out the following block due to this issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2759 - // Once we confirm that there is no unknown regression, we will remove this block from here completely, - // since we already perform the partition finding algorithm once via StreamBridge#send. -// if (producerProperties != null && producerProperties.isPartitioned()) { -// BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(destinationName); -// ((AbstractMessageChannel) messageChannel) -// .addInterceptor(new DefaultPartitioningInterceptor(bindingProperties, this.applicationContext.getBeanFactory())); -// } + // since we already perform the partition finding algorithm once via StreamBridge#send we don't need to + // do the following, unless the conversion is handled natively on the middleware. + if (producerProperties != null && producerProperties.isPartitioned() && producerProperties.isUseNativeEncoding()) { + BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(destinationName); + ((AbstractMessageChannel) messageChannel) + .addInterceptor(new DefaultPartitioningInterceptor(bindingProperties, this.applicationContext.getBeanFactory())); + } this.addInterceptors((AbstractMessageChannel) messageChannel, destinationName); this.bindingService.bindProducer(messageChannel, destinationName, true, binder);