From bc08aced21d655809d4fc2fa95f914f930d27159 Mon Sep 17 00:00:00 2001 From: Jorge Bescos Gascon Date: Mon, 1 Jun 2020 17:53:56 +0200 Subject: [PATCH 1/2] Moving 2 tests from KafkaMP to KafkaSE Signed-off-by: Jorge Bescos Gascon --- .../connectors/kafka/AbstractKafkaTest.java | 94 ++++++++++ .../connectors/kafka/AbstractSampleBean.java | 14 +- .../connectors/kafka/KafkaMpTest.java | 161 +----------------- .../connectors/kafka/KafkaSeTest.java | 128 +++++++++++++- 4 files changed, 229 insertions(+), 168 deletions(-) diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractKafkaTest.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractKafkaTest.java index d575d173fe2..7a305714f7a 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractKafkaTest.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractKafkaTest.java @@ -17,12 +17,43 @@ package io.helidon.messaging.connectors.kafka; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.RegisterExtension; public abstract class AbstractKafkaTest { + private static final Logger LOGGER = Logger.getLogger(AbstractKafkaTest.class.getName()); + static String KAFKA_SERVER; @RegisterExtension @@ -36,4 +67,67 @@ public abstract class AbstractKafkaTest { static void prepareTopics() { KAFKA_SERVER = kafkaResource.getKafkaConnectString(); } + + static void produceSync(String topic, Map config, List testData) { + try (Producer producer = new KafkaProducer<>(config)) { + LOGGER.fine(() -> "Producing " + testData.size() + " events"); + //Send all test messages(async send means order is not guaranteed) and in parallel + List> sent = testData.parallelStream() + .map(s -> producer.send(new ProducerRecord<>(topic, s))).collect(Collectors.toList()); + sent.stream().forEach(future -> { + try { + future.get(30, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + fail("Some of next messages were not sent in time: " + testData, e); + } + }); + } + } + + static void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List testData, String topic, + List expected) { + produceAndCheck(kafkaConsumingBean, testData, topic, expected, expected.size()); + } + + static void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List testData, String topic, + List expected, long requested) { + kafkaConsumingBean.expectedRequests(requested); + Map config = new HashMap<>(); + config.put("bootstrap.servers", KAFKA_SERVER); + config.put("key.serializer", LongSerializer.class.getName()); + config.put("value.serializer", StringSerializer.class.getName()); + + produceSync(topic, config, testData); + if (requested > 0) { + // Wait till records are delivered + boolean done = kafkaConsumingBean.await(); + assertTrue(done, String.format("Timeout waiting for results.\nExpected: %s \nBut was: %s", + expected.toString(), kafkaConsumingBean.consumed().toString())); + } + Collections.sort(kafkaConsumingBean.consumed()); + Collections.sort(expected); + if (!expected.isEmpty()) { + assertEquals(expected, kafkaConsumingBean.consumed()); + } + } + + static List readTopic(String topic, int expected, String group){ + final long timeout = 30000; + List events = new LinkedList<>(); + Map config = new HashMap<>(); + config.put("enable.auto.commit", Boolean.toString(true)); + config.put("auto.offset.reset", "earliest"); + config.put("bootstrap.servers", KAFKA_SERVER); + config.put("group.id", group); + config.put("key.deserializer", LongDeserializer.class.getName()); + config.put("value.deserializer", StringDeserializer.class.getName()); + try (Consumer consumer = new KafkaConsumer<>(config)) { + consumer.subscribe(Arrays.asList(topic)); + long current = System.currentTimeMillis(); + while (events.size() < expected && System.currentTimeMillis() - current < timeout) { + consumer.poll(Duration.ofSeconds(5)).forEach(c -> events.add(c.value())); + } + } + return events; + } } diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractSampleBean.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractSampleBean.java index 1109f44cb1b..dfd358e958a 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractSampleBean.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractSampleBean.java @@ -205,13 +205,10 @@ public void onComplete() { } } - @ApplicationScoped public static class Channel6 extends AbstractSampleBean { static final String NO_ACK = "noAck"; - @Incoming("test-channel-6") - @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public CompletionStage channel6(KafkaMessage msg) { + public CompletionStage onMsg(KafkaMessage msg) { LOGGER.fine(() -> String.format("Received %s", msg.getPayload())); consumed().add(msg.getPayload()); // Certain messages are not ACK. We can check later that they will be sent again. @@ -221,12 +218,11 @@ public CompletionStage channel6(KafkaMessage msg) { } else { LOGGER.fine(() -> "ACK is not sent"); } - countDown("channel6()"); + countDown("channel6()"); return CompletableFuture.completedFuture(null); } } - - @ApplicationScoped + public static class Channel8 extends AbstractSampleBean { static final String NO_ACK = "noAck"; static final int LIMIT = 10; @@ -236,9 +232,7 @@ public static class Channel8 extends AbstractSampleBean { // Limit is for one scenario that Kafka rebalances and sends again same data in different partition private final AtomicInteger limit = new AtomicInteger(); - @Incoming("test-channel-8") - @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public CompletionStage channel6(KafkaMessage msg) { + public CompletionStage onMsg(KafkaMessage msg) { ConsumerRecord record = msg.unwrap(ConsumerRecord.class); consumed().add(record.value()); // Certain messages are not ACK. We can check later that they will be sent again. diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java index a9966983f76..82f14a7d468 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java @@ -17,28 +17,31 @@ package io.helidon.messaging.connectors.kafka; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; + import java.lang.annotation.Annotation; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.LongStream; import javax.enterprise.inject.Instance; import javax.enterprise.inject.se.SeContainer; @@ -50,13 +53,6 @@ import io.helidon.config.mp.MpConfigSources; import io.helidon.microprofile.messaging.MessagingCdiExtension; -import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -65,16 +61,9 @@ import org.eclipse.microprofile.reactive.messaging.spi.Connector; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - class KafkaMpTest extends AbstractKafkaTest{ private static final Logger LOGGER = Logger.getLogger(KafkaMpTest.class.getName()); @@ -99,14 +88,10 @@ public String value() { private static final String TEST_TOPIC_3 = "graph-done-3"; private static final String TEST_TOPIC_4 = "graph-done-4"; private static final String TEST_TOPIC_5 = "graph-done-5"; - private static final String TEST_TOPIC_6 = "graph-done-6"; private static final String TEST_TOPIC_7 = "graph-done-7"; - private static final String TEST_TOPIC_8 = "graph-done-8"; private static final String TEST_TOPIC_10 = "graph-done-10"; private static final String TEST_TOPIC_13 = "graph-done-13"; private static final String UNEXISTING_TOPIC = "unexistingTopic2"; - private static final String GROUP_1 = "group1"; - private static final String GROUP_2 = "group2"; private static SeContainer cdiContainer; @@ -166,15 +151,6 @@ private static Map cdiConfig() { "mp.messaging.incoming.test-channel-5.group.id", UUID.randomUUID().toString(), "mp.messaging.incoming.test-channel-5.key.deserializer", LongDeserializer.class.getName(), "mp.messaging.incoming.test-channel-5.value.deserializer", StringDeserializer.class.getName())); - p.putAll(Map.of( - "mp.messaging.incoming.test-channel-6.enable.auto.commit", Boolean.toString(false), - "mp.messaging.incoming.test-channel-6.auto.offset.reset", "earliest", - "mp.messaging.incoming.test-channel-6.connector", KafkaConnector.CONNECTOR_NAME, - "mp.messaging.incoming.test-channel-6.bootstrap.servers", KAFKA_SERVER, - "mp.messaging.incoming.test-channel-6.topic", TEST_TOPIC_6, - "mp.messaging.incoming.test-channel-6.group.id", GROUP_1, - "mp.messaging.incoming.test-channel-6.key.deserializer", LongDeserializer.class.getName(), - "mp.messaging.incoming.test-channel-6.value.deserializer", StringDeserializer.class.getName())); p.putAll(Map.of( "mp.messaging.incoming.test-channel-7.enable.auto.commit", Boolean.toString(false), "mp.messaging.incoming.test-channel-7.connector", KafkaConnector.CONNECTOR_NAME, @@ -183,15 +159,6 @@ private static Map cdiConfig() { "mp.messaging.incoming.test-channel-7.group.id", UUID.randomUUID().toString(), "mp.messaging.incoming.test-channel-7.key.deserializer", LongDeserializer.class.getName(), "mp.messaging.incoming.test-channel-7.value.deserializer", StringDeserializer.class.getName())); - p.putAll(Map.of( - "mp.messaging.incoming.test-channel-8.enable.auto.commit", Boolean.toString(false), - "mp.messaging.incoming.test-channel-8.auto.offset.reset", "earliest", - "mp.messaging.incoming.test-channel-8.connector", KafkaConnector.CONNECTOR_NAME, - "mp.messaging.incoming.test-channel-8.bootstrap.servers", KAFKA_SERVER, - "mp.messaging.incoming.test-channel-8.topic", TEST_TOPIC_8, - "mp.messaging.incoming.test-channel-8.group.id", GROUP_2, - "mp.messaging.incoming.test-channel-8.key.deserializer", LongDeserializer.class.getName(), - "mp.messaging.incoming.test-channel-8.value.deserializer", StringDeserializer.class.getName())); p.putAll(Map.of( "mp.messaging.outgoing.test-channel-9.connector", KafkaConnector.CONNECTOR_NAME, "mp.messaging.outgoing.test-channel-9.bootstrap.servers", "unexsitingserver:7777", @@ -249,9 +216,7 @@ static void prepareTopics() { kafkaResource.getKafkaTestUtils().createTopic(TEST_TOPIC_3, 4, (short) 1); kafkaResource.getKafkaTestUtils().createTopic(TEST_TOPIC_4, 4, (short) 1); kafkaResource.getKafkaTestUtils().createTopic(TEST_TOPIC_5, 4, (short) 1); - kafkaResource.getKafkaTestUtils().createTopic(TEST_TOPIC_6, 1, (short) 1); kafkaResource.getKafkaTestUtils().createTopic(TEST_TOPIC_7, 4, (short) 1); - kafkaResource.getKafkaTestUtils().createTopic(TEST_TOPIC_8, 2, (short) 1); kafkaResource.getKafkaTestUtils().createTopic(TEST_TOPIC_10, 1, (short) 1); kafkaResource.getKafkaTestUtils().createTopic(TEST_TOPIC_13, 1, (short) 1); KAFKA_SERVER = kafkaResource.getKafkaConnectString(); @@ -274,8 +239,6 @@ private static void cdiContainerUp() { classes.add(AbstractSampleBean.Channel1.class); classes.add(AbstractSampleBean.Channel4.class); classes.add(AbstractSampleBean.Channel5.class); - classes.add(AbstractSampleBean.Channel6.class); - classes.add(AbstractSampleBean.Channel8.class); classes.add(AbstractSampleBean.ChannelError.class); classes.add(AbstractSampleBean.ChannelProcessor.class); classes.add(AbstractSampleBean.Channel9.class); @@ -368,55 +331,6 @@ void withBackPressureAndError() { kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_5); } - @Test - @Disabled("It fails sometimes. Needs to be solved in otherway.") - void someEventsNoAckWithOnePartition() { - LOGGER.fine(() -> "==========> test someEventsNoAckWithOnePartition()"); - List uncommit = new ArrayList<>(); - // Push some messages that will ACK - List testData = IntStream.range(0, 20).mapToObj(i -> Integer.toString(i)).collect(Collectors.toList()); - AbstractSampleBean.Channel6 kafkaConsumingBean = cdiContainer.select(AbstractSampleBean.Channel6.class).get(); - produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_6, testData); - // Next message will not ACK - kafkaConsumingBean.restart(); - testData = Arrays.asList(AbstractSampleBean.Channel6.NO_ACK); - uncommit.addAll(testData); - produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_6, testData); - // As this topic only have one partition, next messages will not ACK because previous message wasn't - kafkaConsumingBean.restart(); - testData = IntStream.range(100, 120).mapToObj(i -> Integer.toString(i)).collect(Collectors.toList()); - uncommit.addAll(testData); - produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_6, testData); - // We receive uncommitted messages again - List events = readTopic(TEST_TOPIC_6, uncommit.size(), GROUP_1); - Collections.sort(events); - Collections.sort(uncommit); - assertEquals(uncommit, events); - } - - @Test - @Disabled("It fails sometimes. Needs to be solved in otherway.") - void someEventsNoAckWithDifferentPartitions() { - LOGGER.fine(() -> "==========> test someEventsNoAckWithDifferentPartitions()"); - final long FROM = 2000; - final long TO = FROM + AbstractSampleBean.Channel8.LIMIT; - // Send the message that will not ACK. This will make in one partition to not commit any new message - List testData = Arrays.asList(AbstractSampleBean.Channel8.NO_ACK); - AbstractSampleBean.Channel8 kafkaConsumingBean = cdiContainer.select(AbstractSampleBean.Channel8.class).get(); - produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_8, testData); - kafkaConsumingBean.restart(); - // Now sends new messages. Some of them will be lucky and will not go to the partition with no ACK - testData = LongStream.range(FROM, TO) - .mapToObj(i -> Long.toString(i)).collect(Collectors.toList()); - produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_8, testData); - int uncommited = kafkaConsumingBean.uncommitted(); - // At least one message was not committed - assertTrue(uncommited > 0); - LOGGER.fine(() -> "Uncommitted messages : " + uncommited); - List messages = readTopic(TEST_TOPIC_8, uncommited, GROUP_2); - assertEquals(uncommited, messages.size(), "Received messages are " + messages); - } - @Test void wakeupCanBeInvokedWhenKafkaConsumerClosed() { LOGGER.fine(() -> "==========> test wakeupCanBeInvokedWhenKafkaConsumerClosed()"); @@ -457,65 +371,6 @@ void kafkaSubscriberSendError() throws InterruptedException { kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_13); } - private void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List testData, String topic, - List expected) { - produceAndCheck(kafkaConsumingBean, testData, topic, expected, expected.size()); - } - - private void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List testData, String topic, - List expected, long requested) { - kafkaConsumingBean.expectedRequests(requested); - Map config = new HashMap<>(); - config.put("bootstrap.servers", KAFKA_SERVER); - config.put("key.serializer", LongSerializer.class.getName()); - config.put("value.serializer", StringSerializer.class.getName()); - - try (Producer producer = new KafkaProducer<>(config)) { - LOGGER.fine(() -> "Producing " + testData.size() + " events"); - //Send all test messages(async send means order is not guaranteed) and in parallel - List> sent = testData.parallelStream() - .map(s -> producer.send(new ProducerRecord<>(topic, s))).collect(Collectors.toList()); - sent.stream().forEach(future -> { - try { - future.get(30, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - fail("Some of next messages were not sent in time: " + testData, e); - } - }); - } - if (requested > 0) { - // Wait till records are delivered - boolean done = kafkaConsumingBean.await(); - assertTrue(done, String.format("Timeout waiting for results.\nExpected: %s \nBut was: %s", - expected.toString(), kafkaConsumingBean.consumed().toString())); - } - Collections.sort(kafkaConsumingBean.consumed()); - Collections.sort(expected); - if (!expected.isEmpty()) { - assertEquals(expected, kafkaConsumingBean.consumed()); - } - } - - private List readTopic(String topic, int expected, String group){ - final long timeout = 30000; - List events = new LinkedList<>(); - Map config = new HashMap<>(); - config.put("enable.auto.commit", Boolean.toString(true)); - config.put("auto.offset.reset", "earliest"); - config.put("bootstrap.servers", KAFKA_SERVER); - config.put("group.id", group); - config.put("key.deserializer", LongDeserializer.class.getName()); - config.put("value.deserializer", StringDeserializer.class.getName()); - try (Consumer consumer = new KafkaConsumer<>(config)) { - consumer.subscribe(Arrays.asList(topic)); - long current = System.currentTimeMillis(); - while (events.size() < expected && System.currentTimeMillis() - current < timeout) { - consumer.poll(Duration.ofSeconds(5)).forEach(c -> events.add(c.value())); - } - } - return events; - } - private static Instance getInstance(Class beanType, Annotation annotation){ return cdiContainer.select(beanType, annotation); } diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java index 0a2cc6d0a76..3d1276ca0bf 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java @@ -17,8 +17,20 @@ package io.helidon.messaging.connectors.kafka; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -27,22 +39,21 @@ import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.LongStream; import io.helidon.common.reactive.Multi; import io.helidon.config.Config; import io.helidon.messaging.Channel; import io.helidon.messaging.Messaging; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; +import io.helidon.messaging.connectors.kafka.AbstractSampleBean.Channel6; +import io.helidon.messaging.connectors.kafka.AbstractSampleBean.Channel8; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.junit.jupiter.api.BeforeAll; @@ -55,6 +66,8 @@ public class KafkaSeTest extends AbstractKafkaTest { private static final String TEST_SE_TOPIC_3 = "special-se-topic-3"; private static final String TEST_SE_TOPIC_4 = "special-se-topic-4"; private static final String TEST_SE_TOPIC_5 = "special-se-topic-4"; + private static final String TEST_SE_TOPIC_6 = "special-se-topic-6"; + private static final String TEST_SE_TOPIC_7 = "special-se-topic-7"; @BeforeAll @@ -64,6 +77,8 @@ static void prepareTopics() { kafkaResource.getKafkaTestUtils().createTopic(TEST_SE_TOPIC_3, 4, (short) 2); kafkaResource.getKafkaTestUtils().createTopic(TEST_SE_TOPIC_4, 4, (short) 2); kafkaResource.getKafkaTestUtils().createTopic(TEST_SE_TOPIC_5, 4, (short) 2); + kafkaResource.getKafkaTestUtils().createTopic(TEST_SE_TOPIC_6, 1, (short) 2); + kafkaResource.getKafkaTestUtils().createTopic(TEST_SE_TOPIC_7, 2, (short) 2); KAFKA_SERVER = kafkaResource.getKafkaConnectString(); } @@ -334,4 +349,107 @@ void kafkaHeaderTest() throws InterruptedException { messaging.stop(); } } + + @Test + void someEventsNoAckWithOnePartition() { + LOGGER.fine(() -> "==========> test someEventsNoAckWithOnePartition()"); + final String GROUP = "group_1"; + final String TOPIC = TEST_SE_TOPIC_6; + Channel fromKafka = Channel.builder() + .name("from-kafka") + .publisherConfig(KafkaConnector.configBuilder() + .bootstrapServers(KAFKA_SERVER) + .groupId(GROUP) + .topic(TOPIC) + .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) + .enableAutoCommit(false) + .keyDeserializer(LongDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .build() + ) + .build(); + List uncommit = new ArrayList<>(); + Channel6 kafkaConsumingBean = new Channel6(); + Map config = new HashMap<>(); + config.put("bootstrap.servers", KAFKA_SERVER); + config.put("key.serializer", LongSerializer.class.getName()); + config.put("value.serializer", StringSerializer.class.getName()); + Messaging messaging = Messaging.builder().connector(KafkaConnector.create()) + .subscriber(fromKafka, ReactiveStreams.>builder() + .forEach(msg -> kafkaConsumingBean.onMsg(msg))) + .build(); + try { + messaging.start(); + // Push some messages that will ACK + List testData = IntStream.range(0, 20).mapToObj(i -> Integer.toString(i)).collect(Collectors.toList()); + produceAndCheck(kafkaConsumingBean, testData, TOPIC, testData); + kafkaConsumingBean.restart(); + // Next message will not ACK + testData = Arrays.asList(Channel6.NO_ACK); + uncommit.addAll(testData); + produceAndCheck(kafkaConsumingBean, testData, TOPIC, testData); + kafkaConsumingBean.restart(); + // As this topic only have one partition, next messages will not ACK because previous message wasn't + testData = IntStream.range(100, 120).mapToObj(i -> Integer.toString(i)).collect(Collectors.toList()); + uncommit.addAll(testData); + produceAndCheck(kafkaConsumingBean, testData, TOPIC, testData); + } finally { + messaging.stop(); + } + // We receive uncommitted messages again + List events = readTopic(TOPIC, uncommit.size(), GROUP); + Collections.sort(events); + Collections.sort(uncommit); + assertEquals(uncommit, events); + } + + @Test + void someEventsNoAckWithDifferentPartitions() { + LOGGER.fine(() -> "==========> test someEventsNoAckWithDifferentPartitions()"); + final long FROM = 2000; + final long TO = FROM + Channel8.LIMIT; + final String GROUP = "group_2"; + final String TOPIC = TEST_SE_TOPIC_7; + Channel fromKafka = Channel.builder() + .name("from-kafka") + .publisherConfig(KafkaConnector.configBuilder() + .bootstrapServers(KAFKA_SERVER) + .groupId(GROUP) + .topic(TOPIC) + .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) + .enableAutoCommit(false) + .keyDeserializer(LongDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .build() + ) + .build(); + // Send the message that will not ACK. This will make in one partition to not commit any new message + Channel8 kafkaConsumingBean = new Channel8(); + Map config = new HashMap<>(); + config.put("bootstrap.servers", KAFKA_SERVER); + config.put("key.serializer", LongSerializer.class.getName()); + config.put("value.serializer", StringSerializer.class.getName()); + Messaging messaging = Messaging.builder().connector(KafkaConnector.create()) + .subscriber(fromKafka, ReactiveStreams.>builder() + .forEach(msg -> kafkaConsumingBean.onMsg(msg))) + .build(); + try { + messaging.start(); + List testData = Arrays.asList(Channel8.NO_ACK); + produceAndCheck(kafkaConsumingBean, testData, TOPIC, testData); + kafkaConsumingBean.restart(); + // Now sends new messages. Some of them will be lucky and will not go to the partition with no ACK + testData = LongStream.range(FROM, TO) + .mapToObj(i -> Long.toString(i)).collect(Collectors.toList()); + produceAndCheck(kafkaConsumingBean, testData, TOPIC, testData); + } finally { + messaging.stop(); + } + int uncommited = kafkaConsumingBean.uncommitted(); + // At least one message was not committed + assertTrue(uncommited > 0); + LOGGER.fine(() -> "Uncommitted messages : " + uncommited); + List messages = readTopic(TOPIC, uncommited, GROUP); + assertEquals(uncommited, messages.size(), "Received messages are " + messages); + } } From 13a481e3ee88cbe091868eae00113b989adad54f Mon Sep 17 00:00:00 2001 From: Jorge Bescos Gascon Date: Fri, 12 Jun 2020 14:18:01 +0200 Subject: [PATCH 2/2] Removed unused fields Signed-off-by: Jorge Bescos Gascon --- .../messaging/connectors/kafka/KafkaSeTest.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java index 3d1276ca0bf..7b56d6a72b3 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java @@ -28,7 +28,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -53,7 +52,6 @@ import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.junit.jupiter.api.BeforeAll; @@ -370,10 +368,6 @@ void someEventsNoAckWithOnePartition() { .build(); List uncommit = new ArrayList<>(); Channel6 kafkaConsumingBean = new Channel6(); - Map config = new HashMap<>(); - config.put("bootstrap.servers", KAFKA_SERVER); - config.put("key.serializer", LongSerializer.class.getName()); - config.put("value.serializer", StringSerializer.class.getName()); Messaging messaging = Messaging.builder().connector(KafkaConnector.create()) .subscriber(fromKafka, ReactiveStreams.>builder() .forEach(msg -> kafkaConsumingBean.onMsg(msg))) @@ -425,10 +419,6 @@ void someEventsNoAckWithDifferentPartitions() { .build(); // Send the message that will not ACK. This will make in one partition to not commit any new message Channel8 kafkaConsumingBean = new Channel8(); - Map config = new HashMap<>(); - config.put("bootstrap.servers", KAFKA_SERVER); - config.put("key.serializer", LongSerializer.class.getName()); - config.put("value.serializer", StringSerializer.class.getName()); Messaging messaging = Messaging.builder().connector(KafkaConnector.create()) .subscriber(fromKafka, ReactiveStreams.>builder() .forEach(msg -> kafkaConsumingBean.onMsg(msg)))