Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default partitioning not respected with StreamBridge #2815

Closed
dejank1986 opened this issue Sep 21, 2023 · 1 comment
Closed

Default partitioning not respected with StreamBridge #2815

dejank1986 opened this issue Sep 21, 2023 · 1 comment

Comments

@dejank1986
Copy link

Describe the issue
After upgrading to version 4.0.4 we noticed that default partitioning is not working as expected (key.hashCode() % partitionCount) when using StreamBridge. It can be possibly associated with this commented block:

// Commenting out the following block due to this issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2759
. Before, if condition is met, DefaultPartitionInterceptor was added and partitionHandler.determinePartition(message) was called, which is not the case now in version 4.0.4.

To Reproduce
We are using Spring Cloud Stream functions. The message key is a very simple record schema.

Steps to reproduce the behavior:

  1. Set the configuration properties
spring.cloud.stream:
  default:
    producer:
      useNativeEncoding: true
      partition-key-expression: headers['kafka_messageKey'].id
    consumer.useNativeDecoding: true
  function.autodetect: false
  bindings:
    testEventSource-out-0:
      destination: test-event
  kafka:
    binder:
      auto-create-topics: false
      producer-properties:
        key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        key.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
  1. Run the following integration test
void sendEvent_whenUsingStreamBridge_shouldRespectPartitioningScheme() {
        final int numberOfPartitions = 6;

        final Message<KafkaNull> outgoingMessage = MessageBuilder.withPayload(KafkaNull.INSTANCE)
                .setHeader(KafkaHeaders.KEY, TestEventKey.newBuilder().setId(1001L).build())
                .build();

        streamBridge.send("testEventSource-out-0", outgoingMessage);

        final ConsumerRecords<Object, Object> records = testConsumer.poll(Duration.ofSeconds(5L));
        assertThat(records.iterator().next().partition()).isEqualTo(eventId % numberOfPartitions);
    }

We have used KafkaNull.INSTANCE but it's the same behaviour with the regular message.
4. See the failed assertion

Version of the framework
SpringBoot 3.1.2
SpringCloudStream 4.0.4

Expected behavior
When using defaults partition should be selected based on the following function: (key.hashCode() % partitionCount).

@olegz
Copy link
Contributor

olegz commented Sep 26, 2023

I literally just cut/paste your code and i get

Invalid value io.confluent.kafka.serializers.KafkaAvroSerializer for configuration key.serializer: Class io.confluent.kafka.serializers.KafkaAvroSerializer could not be found.
	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744) ~[kafka-clients-3.5.1.jar:na]
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490) ~[kafka-clients-3.5.1.jar:na]
. . . .

If you want us to look at the issue please provide a way to reproduce it. The best thing you can do is create a small project with reproducible code an push it to github so we can take a look.

@sobychacko sobychacko added this to the 4.1.0-RC1 milestone Sep 29, 2023
sobychacko added a commit to sobychacko/spring-cloud-stream that referenced this issue Oct 17, 2023
 - When native encoding is used, StreamBridge is not setting
   partitioning interceptor which calls the PartitionHandler.
   This is a regression issue and adding the interceptor explicitly
   when native encoding is used.

Resolves spring-cloud#2815
@olegz olegz closed this as completed in 5a0183f Oct 18, 2023
olegz pushed a commit that referenced this issue Oct 18, 2023
 - When native encoding is used, StreamBridge is not setting
   partitioning interceptor which calls the PartitionHandler.
   This is a regression issue and adding the interceptor explicitly
   when native encoding is used.

Resolves #2815
Resolves #2831
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants