Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka message default ack npe fix #2252

Merged

Conversation

danielkec
Copy link
Contributor

As @tkote reported on Slack, when KafkaMessage is created with KafkaMessage.of(key, value), only 5 messages are sent and no more is ever requested.
Problem is caused by wrong initialization of the ack future supplier.

    static <K, V> KafkaMessage<K, V> of(K key, V payload) {
        Objects.requireNonNull(payload);
        return new KafkaProducerMessage<>(key, payload, null);
    }

So Kafka ack callback silently dies and nothing is ever requested again:

        CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
                .whenComplete((success, exception) -> {
                    if (exception == null) {
                        message.ack().whenComplete((a, b) -> {
                            // Atomically increment
                            // or reset backpressureCounter if incrementing would reach threshold
                            if (backpressureCounter.getAndUpdate(n -> ++n == backpressure ? 0 : n)
                                    >= backpressure - 1) {
                                // configured backpressure threshold reached
                                subscription.request(backpressure);
                            }
                        });
                    }
                });

Signed-off-by: Daniel Kec daniel.kec@oracle.com

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
@danielkec danielkec self-assigned this Aug 12, 2020
@danielkec danielkec requested a review from jbescos August 12, 2020 10:14
@danielkec danielkec added the messaging Reactive Messaging label Aug 12, 2020
@danielkec danielkec linked an issue Aug 12, 2020 that may be closed by this pull request
@danielkec danielkec merged commit 4990636 into helidon-io:master Aug 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
messaging Reactive Messaging
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Backpressure doesn't work in Kafka Connector
2 participants