Skip to content

Commit

Permalink
GH-2815: StreamBridge partitioning fixes
Browse files Browse the repository at this point in the history
 - When native encoding is used, StreamBridge is not setting
   partitioning interceptor which calls the PartitionHandler.
   This is a regression issue and adding the interceptor explicitly
   when native encoding is used.

Resolves #2815
Resolves #2831
  • Loading branch information
sobychacko authored and olegz committed Oct 18, 2023
1 parent 7f89964 commit 5a0183f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@ void test_2249() throws Exception {
}
}

@Test
void ensurePartitioningWorksWhenNativeEncodingEnabled() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
EmptyConfiguration.class)).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.source=outputA;outputB",
"--spring.cloud.stream.bindings.outputA-out-0.producer.partition-count=3",
"--spring.cloud.stream.bindings.outputA-out-0.producer.use-native-encoding=true",
"--spring.cloud.stream.bindings.outputA-out-0.producer.partition-key-expression=headers['partitionKey']")) {
StreamBridge streamBridge = context.getBean(StreamBridge.class);
streamBridge.send("outputA-out-0", MessageBuilder.withPayload("A").setHeader("partitionKey", "A").build());
streamBridge.send("outputA-out-0", MessageBuilder.withPayload("C").setHeader("partitionKey", "C").build());

OutputDestination output = context.getBean(OutputDestination.class);
assertThat(output.receive(1000, "outputA-out-0").getHeaders().containsKey("scst_partition")).isTrue();
assertThat(output.receive(1000, "outputA-out-0").getHeaders().containsKey("scst_partition")).isTrue();
}
}

@SuppressWarnings("rawtypes")
@Test
void test_2785() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
import org.springframework.cloud.stream.binding.NewDestinationBindingCallback;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.ConfigurableApplicationContext;
Expand Down Expand Up @@ -260,14 +262,13 @@ synchronized MessageChannel resolveDestination(String destinationName, ProducerP
BinderFactory binderFactory = this.applicationContext.getBean(BinderFactory.class);
binder = binderFactory.getBinder(binderName, messageChannel.getClass());
}
// Commenting out the following block due to this issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2759
// Once we confirm that there is no unknown regression, we will remove this block from here completely,
// since we already perform the partition finding algorithm once via StreamBridge#send.
// if (producerProperties != null && producerProperties.isPartitioned()) {
// BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(destinationName);
// ((AbstractMessageChannel) messageChannel)
// .addInterceptor(new DefaultPartitioningInterceptor(bindingProperties, this.applicationContext.getBeanFactory()));
// }
// since we already perform the partition finding algorithm once via StreamBridge#send we don't need to
// do the following, unless the conversion is handled natively on the middleware.
if (producerProperties != null && producerProperties.isPartitioned() && producerProperties.isUseNativeEncoding()) {
BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(destinationName);
((AbstractMessageChannel) messageChannel)
.addInterceptor(new DefaultPartitioningInterceptor(bindingProperties, this.applicationContext.getBeanFactory()));
}
this.addInterceptors((AbstractMessageChannel) messageChannel, destinationName);

this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
Expand Down

0 comments on commit 5a0183f

Please sign in to comment.