Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2815: StreamBridge partitioning fixes #2831

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@ void test_2249() throws Exception {
}
}

@Test
void ensurePartitioningWorksWhenNativeEncodingEnabled() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
EmptyConfiguration.class)).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.source=outputA;outputB",
"--spring.cloud.stream.bindings.outputA-out-0.producer.partition-count=3",
"--spring.cloud.stream.bindings.outputA-out-0.producer.use-native-encoding=true",
"--spring.cloud.stream.bindings.outputA-out-0.producer.partition-key-expression=headers['partitionKey']")) {
StreamBridge streamBridge = context.getBean(StreamBridge.class);
streamBridge.send("outputA-out-0", MessageBuilder.withPayload("A").setHeader("partitionKey", "A").build());
streamBridge.send("outputA-out-0", MessageBuilder.withPayload("C").setHeader("partitionKey", "C").build());

OutputDestination output = context.getBean(OutputDestination.class);
assertThat(output.receive(1000, "outputA-out-0").getHeaders().containsKey("scst_partition")).isTrue();
assertThat(output.receive(1000, "outputA-out-0").getHeaders().containsKey("scst_partition")).isTrue();
}
}

@SuppressWarnings("rawtypes")
@Test
void test_2785() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
import org.springframework.cloud.stream.binding.NewDestinationBindingCallback;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.ConfigurableApplicationContext;
Expand Down Expand Up @@ -260,14 +262,13 @@ synchronized MessageChannel resolveDestination(String destinationName, ProducerP
BinderFactory binderFactory = this.applicationContext.getBean(BinderFactory.class);
binder = binderFactory.getBinder(binderName, messageChannel.getClass());
}
// Commenting out the following block due to this issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2759
// Once we confirm that there is no unknown regression, we will remove this block from here completely,
// since we already perform the partition finding algorithm once via StreamBridge#send.
// if (producerProperties != null && producerProperties.isPartitioned()) {
// BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(destinationName);
// ((AbstractMessageChannel) messageChannel)
// .addInterceptor(new DefaultPartitioningInterceptor(bindingProperties, this.applicationContext.getBeanFactory()));
// }
// since we already perform the partition finding algorithm once via StreamBridge#send we don't need to
// do the following, unless the conversion is handled natively on the middleware.
if (producerProperties != null && producerProperties.isPartitioned() && producerProperties.isUseNativeEncoding()) {
BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(destinationName);
((AbstractMessageChannel) messageChannel)
.addInterceptor(new DefaultPartitioningInterceptor(bindingProperties, this.applicationContext.getBeanFactory()));
}
this.addInterceptors((AbstractMessageChannel) messageChannel, destinationName);

this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
Expand Down
Loading