From 8e2a3f78dba7894bd96681f99a9aa93eefbd783d Mon Sep 17 00:00:00 2001 From: Timon Borter Date: Mon, 16 Dec 2024 19:07:38 +0100 Subject: [PATCH] feat(#1281): introduce separate consumers per subscription The `org.apache.kafka.clients.consumer.KafkaConsumer` is **not thread-safe**. When using selective message consumption, it is recommended to configure `useThreadSafeConsumer` respectively `thread-safe-consumer` for the Kafka endpoint. Otherwise, you will experience errors when executing tests in parallel. --- .../messaging/AbstractMessageConsumer.java | 2 +- .../AbstractSelectiveMessageConsumer.java | 2 +- .../kafka/config/xml/KafkaEndpointParser.java | 2 + .../kafka/endpoint/KafkaConsumer.java | 39 ++- .../kafka/endpoint/KafkaEndpoint.java | 103 +++++--- .../endpoint/KafkaEndpointConfiguration.java | 10 + .../KafkaMessageFilteringConsumer.java | 6 +- .../endpoint/KafkaMessageSingleConsumer.java | 7 +- .../schema/citrus-kafka-config.xsd | 2 +- .../config/xml/KafkaEndpointParserTest.java | 4 + .../kafka/endpoint/KafkaConsumerTest.java | 147 +++++++---- .../kafka/endpoint/KafkaEndpointTest.java | 240 ++++++++++++------ .../integration/KafkaEndpointFactoryIT.java | 1 - .../integration/KafkaEndpointJavaIT.java | 39 +-- .../xml/KafkaEndpointParserTest-context.xml | 3 +- .../KafkaEndpointIT_selectiveMessage.xml | 4 - .../KafkaEndpointIT_singleMessage.xml | 4 - src/manual/endpoint-kafka.adoc | 18 +- 18 files changed, 417 insertions(+), 216 deletions(-) diff --git a/core/citrus-api/src/main/java/org/citrusframework/messaging/AbstractMessageConsumer.java b/core/citrus-api/src/main/java/org/citrusframework/messaging/AbstractMessageConsumer.java index 7cc460c2cb..96193047ca 100644 --- a/core/citrus-api/src/main/java/org/citrusframework/messaging/AbstractMessageConsumer.java +++ b/core/citrus-api/src/main/java/org/citrusframework/messaging/AbstractMessageConsumer.java @@ -34,7 +34,7 @@ public abstract class AbstractMessageConsumer implements Consumer { /** * Default constructor using receive timeout setting. */ - public AbstractMessageConsumer(String name, EndpointConfiguration endpointConfiguration) { + protected AbstractMessageConsumer(String name, EndpointConfiguration endpointConfiguration) { this.name = name; this.endpointConfiguration = endpointConfiguration; } diff --git a/core/citrus-api/src/main/java/org/citrusframework/messaging/AbstractSelectiveMessageConsumer.java b/core/citrus-api/src/main/java/org/citrusframework/messaging/AbstractSelectiveMessageConsumer.java index 2c556672c6..f848ff0f5d 100644 --- a/core/citrus-api/src/main/java/org/citrusframework/messaging/AbstractSelectiveMessageConsumer.java +++ b/core/citrus-api/src/main/java/org/citrusframework/messaging/AbstractSelectiveMessageConsumer.java @@ -34,7 +34,7 @@ public abstract class AbstractSelectiveMessageConsumer extends AbstractMessageCo * @param name * @param endpointConfiguration */ - public AbstractSelectiveMessageConsumer(String name, EndpointConfiguration endpointConfiguration) { + protected AbstractSelectiveMessageConsumer(String name, EndpointConfiguration endpointConfiguration) { super(name, endpointConfiguration); this.endpointConfiguration = endpointConfiguration; } diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/config/xml/KafkaEndpointParser.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/config/xml/KafkaEndpointParser.java index d3128a88eb..ddcc1f1b54 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/config/xml/KafkaEndpointParser.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/config/xml/KafkaEndpointParser.java @@ -66,6 +66,8 @@ protected void parseEndpointConfiguration(BeanDefinitionBuilder endpointConfigur setPropertyValue(endpointConfiguration, element.getAttribute("key-deserializer"), "keyDeserializer"); setPropertyValue(endpointConfiguration, element.getAttribute("value-serializer"), "valueSerializer"); setPropertyValue(endpointConfiguration, element.getAttribute("value-deserializer"), "valueDeserializer"); + + setPropertyValue(endpointConfiguration, element.getAttribute("thread-safe-consumer"), "useThreadSafeConsumer"); } @Override diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java index b84ddd456e..677165c752 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java @@ -16,17 +16,6 @@ package org.citrusframework.kafka.endpoint; -import org.citrusframework.context.TestContext; -import org.citrusframework.message.Message; -import org.citrusframework.messaging.AbstractSelectiveMessageConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - import static java.util.UUID.randomUUID; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; @@ -39,6 +28,16 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.citrusframework.kafka.message.KafkaMessageHeaders.KAFKA_PREFIX; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.citrusframework.context.TestContext; +import org.citrusframework.message.Message; +import org.citrusframework.messaging.AbstractSelectiveMessageConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class KafkaConsumer extends AbstractSelectiveMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @@ -65,20 +64,20 @@ public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer threadLocalKafkaConsumer; + public static SimpleKafkaEndpointBuilder builder() { return new SimpleKafkaEndpointBuilder(); } @@ -54,7 +56,7 @@ public static SimpleKafkaEndpointBuilder builder() { * Default constructor initializing endpoint configuration. */ public KafkaEndpoint() { - super(new KafkaEndpointConfiguration()); + this(new KafkaEndpointConfiguration()); } /** @@ -62,15 +64,16 @@ public KafkaEndpoint() { */ public KafkaEndpoint(KafkaEndpointConfiguration endpointConfiguration) { super(endpointConfiguration); + + threadLocalKafkaConsumer = ThreadLocal.withInitial(() -> new KafkaConsumer(getConsumerName(), getEndpointConfiguration())); } static KafkaEndpoint newKafkaEndpoint( - @Nullable org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer, - @Nullable org.apache.kafka.clients.producer.KafkaProducer kafkaProducer, - @Nullable Boolean randomConsumerGroup, - @Nullable String server, - @Nullable Long timeout, - @Nullable String topic + @Nullable Boolean randomConsumerGroup, + @Nullable String server, + @Nullable Long timeout, + @Nullable String topic, + boolean useThreadSafeConsumer ) { var kafkaEndpoint = new KafkaEndpoint(); @@ -88,6 +91,29 @@ static KafkaEndpoint newKafkaEndpoint( kafkaEndpoint.getEndpointConfiguration().setTopic(topic); } + kafkaEndpoint.getEndpointConfiguration().setUseThreadSafeConsumer(useThreadSafeConsumer); + + return kafkaEndpoint; + } + + /** + * @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is not thread-safe + * and manual consumer management is error-prone. Use + * {@link #newKafkaEndpoint(Boolean, String, Long, String, boolean)} instead to obtain properly + * managed consumer instances. This method will be removed in a future release. + */ + @Deprecated(forRemoval = true) + static KafkaEndpoint newKafkaEndpoint( + @Nullable org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer, + @Nullable org.apache.kafka.clients.producer.KafkaProducer kafkaProducer, + @Nullable Boolean randomConsumerGroup, + @Nullable String server, + @Nullable Long timeout, + @Nullable String topic, + boolean useThreadSafeConsumer + ) { + var kafkaEndpoint = newKafkaEndpoint(randomConsumerGroup, server, timeout, topic, useThreadSafeConsumer); + // Make sure these come at the end, so endpoint configuration is already initialized if (nonNull(kafkaConsumer)) { kafkaEndpoint.createConsumer().setConsumer(kafkaConsumer); @@ -99,19 +125,11 @@ static KafkaEndpoint newKafkaEndpoint( return kafkaEndpoint; } - @Nullable - KafkaProducer getKafkaProducer() { - return kafkaProducer; - } - - @Nullable - KafkaConsumer getKafkaConsumer() { - return kafkaConsumer; - } - @Override public KafkaConsumer createConsumer() { - if (kafkaConsumer == null) { + if (getEndpointConfiguration().useThreadSafeConsumer()) { + return threadLocalKafkaConsumer.get(); + } else if (isNull(kafkaConsumer)) { kafkaConsumer = new KafkaConsumer(getConsumerName(), getEndpointConfiguration()); } @@ -134,20 +152,24 @@ public KafkaEndpointConfiguration getEndpointConfiguration() { @Override public void destroy() { - if (kafkaConsumer != null) { + if (getEndpointConfiguration().useThreadSafeConsumer()) { + threadLocalKafkaConsumer.get() + .stop(); + threadLocalKafkaConsumer.remove(); + } else if (nonNull(kafkaConsumer)) { kafkaConsumer.stop(); } } public ReceiveMessageAction.ReceiveMessageActionBuilderSupport findKafkaEventHeaderEquals(Duration lookbackWindow, String key, String value) { return receive(this) - .selector( - KafkaMessageFilter.kafkaMessageFilter() - .eventLookbackWindow(lookbackWindow) - .kafkaMessageSelector(kafkaHeaderEquals(key, value)) - .build() - ) - .getMessageBuilderSupport(); + .selector( + KafkaMessageFilter.kafkaMessageFilter() + .eventLookbackWindow(lookbackWindow) + .kafkaMessageSelector(kafkaHeaderEquals(key, value)) + .build() + ) + .getMessageBuilderSupport(); } public static class SimpleKafkaEndpointBuilder { @@ -158,7 +180,13 @@ public static class SimpleKafkaEndpointBuilder { private String server; private Long timeout; private String topic; + private boolean useThreadSafeConsumer = false; + /** + * @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is not thread-safe and manual consumer management is error-prone. + * This method will be removed in a future release. + */ + @Deprecated(forRemoval = true) public SimpleKafkaEndpointBuilder kafkaConsumer(org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer) { this.kafkaConsumer = kafkaConsumer; return this; @@ -189,8 +217,13 @@ public SimpleKafkaEndpointBuilder topic(String topic) { return this; } + public SimpleKafkaEndpointBuilder useThreadSafeConsumer() { + this.useThreadSafeConsumer = true; + return this; + } + public KafkaEndpoint build() { - return KafkaEndpoint.newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic); + return newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic, useThreadSafeConsumer); } } } diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpointConfiguration.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpointConfiguration.java index 7645198741..ded118c0f1 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpointConfiguration.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpointConfiguration.java @@ -94,6 +94,8 @@ public class KafkaEndpointConfiguration extends AbstractPollableEndpointConfigur */ private int partition = 0; + private boolean useThreadSafeConsumer; + public String getClientId() { return clientId; } @@ -221,4 +223,12 @@ public int getPartition() { public void setPartition(int partition) { this.partition = partition; } + + public boolean useThreadSafeConsumer() { + return useThreadSafeConsumer; + } + + public void setUseThreadSafeConsumer(boolean useThreadSafeConsumer) { + this.useThreadSafeConsumer = useThreadSafeConsumer; + } } diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer.java index 0200dfa1fe..22a041ba56 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer.java @@ -132,7 +132,11 @@ public Message receive(String selector, TestContext testContext, long timeout) { getEndpointConfiguration(), testContext); - logger.info("Received Kafka message on topic: '{}", topic); + if (logger.isDebugEnabled()) { + logger.info("Received Kafka message on topic '{}': {}", topic, received); + } else { + logger.info("Received Kafka message on topic '{}'", topic); + } return received; } diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageSingleConsumer.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageSingleConsumer.java index 0f430e879e..2cca04953a 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageSingleConsumer.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageSingleConsumer.java @@ -72,7 +72,12 @@ public Message receive(TestContext testContext, long timeout) { consumer.commitSync(Duration.ofMillis(getEndpointConfiguration().getTimeout())); - logger.info("Received Kafka message on topic: '{}", topic); + if (logger.isDebugEnabled()) { + logger.info("Received Kafka message on topic '{}': {}", topic, received); + } else { + logger.info("Received Kafka message on topic '{}'", topic); + } + return received; } diff --git a/endpoints/citrus-kafka/src/main/resources/org/citrusframework/schema/citrus-kafka-config.xsd b/endpoints/citrus-kafka/src/main/resources/org/citrusframework/schema/citrus-kafka-config.xsd index 4be57b9629..41c13e9b0d 100644 --- a/endpoints/citrus-kafka/src/main/resources/org/citrusframework/schema/citrus-kafka-config.xsd +++ b/endpoints/citrus-kafka/src/main/resources/org/citrusframework/schema/citrus-kafka-config.xsd @@ -62,7 +62,7 @@ + - diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/config/xml/KafkaEndpointParserTest.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/config/xml/KafkaEndpointParserTest.java index 29cea46628..bf1368ef57 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/config/xml/KafkaEndpointParserTest.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/config/xml/KafkaEndpointParserTest.java @@ -70,6 +70,7 @@ public void testKafkaEndpointParser() { assertEquals(kafkaEndpoint.getEndpointConfiguration().getValueSerializer(), StringSerializer.class); assertEquals(kafkaEndpoint.getEndpointConfiguration().getKeyDeserializer(), StringDeserializer.class); assertEquals(kafkaEndpoint.getEndpointConfiguration().getValueDeserializer(), StringDeserializer.class); + assertEquals(kafkaEndpoint.getEndpointConfiguration().useThreadSafeConsumer(), false); // 2nd message receiver kafkaEndpoint = endpoints.get("kafkaEndpoint2"); @@ -90,6 +91,7 @@ public void testKafkaEndpointParser() { assertEquals(kafkaEndpoint.getEndpointConfiguration().getValueSerializer(), ByteArraySerializer.class); assertEquals(kafkaEndpoint.getEndpointConfiguration().getKeyDeserializer(), IntegerDeserializer.class); assertEquals(kafkaEndpoint.getEndpointConfiguration().getValueDeserializer(), ByteArrayDeserializer.class); + assertEquals(kafkaEndpoint.getEndpointConfiguration().useThreadSafeConsumer(), false); // 3rd message receiver kafkaEndpoint = endpoints.get("kafkaEndpoint3"); @@ -102,6 +104,7 @@ public void testKafkaEndpointParser() { assertEquals(kafkaEndpoint.getEndpointConfiguration().getConsumerProperties().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), true); assertEquals(kafkaEndpoint.getEndpointConfiguration().getProducerProperties().size(), 1); assertEquals(kafkaEndpoint.getEndpointConfiguration().getProducerProperties().get(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), 1024); + assertEquals(kafkaEndpoint.getEndpointConfiguration().useThreadSafeConsumer(), false); // 4th message receiver kafkaEndpoint = endpoints.get("kafkaEndpoint4"); @@ -109,5 +112,6 @@ public void testKafkaEndpointParser() { .startsWith(KAFKA_PREFIX) .hasSize(23) .containsPattern(".*[a-z]{10}$"); + assertEquals(kafkaEndpoint.getEndpointConfiguration().useThreadSafeConsumer(), true); } } diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaConsumerTest.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaConsumerTest.java index c1efe6414a..5dc2edd6cf 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaConsumerTest.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaConsumerTest.java @@ -16,50 +16,55 @@ package org.citrusframework.kafka.endpoint; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.internals.RecordHeader; -import org.citrusframework.exceptions.ActionTimeoutException; -import org.citrusframework.message.DefaultMessage; -import org.citrusframework.message.Message; -import org.citrusframework.testng.AbstractTestNGUnitTest; -import org.testng.annotations.Test; - -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.citrusframework.exceptions.ActionTimeoutException; +import org.citrusframework.message.DefaultMessage; +import org.citrusframework.message.Message; +import org.citrusframework.testng.AbstractTestNGUnitTest; +import org.testng.annotations.Test; + public class KafkaConsumerTest extends AbstractTestNGUnitTest { private final org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumerMock = mock(KafkaConsumer.class); @Test - public void testReceiveMessage() { + public void receiveMessage() { String topic = "default"; KafkaEndpoint endpoint = KafkaEndpoint.builder() - .kafkaConsumer(kafkaConsumerMock) - .topic(topic) - .build(); + .kafkaConsumer(kafkaConsumerMock) + .topic(topic) + .build(); - TopicPartition partition = new TopicPartition(topic, 0); + var partition = new TopicPartition(topic, 0); reset(kafkaConsumerMock); @@ -83,15 +88,15 @@ public void testReceiveMessage() { } @Test - public void testReceiveMessage_inRandomConsumerGroup() { + public void receiveMessage_inRandomConsumerGroup() { String topic = "default"; KafkaEndpoint endpoint = KafkaEndpoint.builder() - .kafkaConsumer(kafkaConsumerMock) - .topic(topic) - .build(); + .kafkaConsumer(kafkaConsumerMock) + .topic(topic) + .build(); - TopicPartition partition = new TopicPartition(topic, 0); + var partition = new TopicPartition(topic, 0); reset(kafkaConsumerMock); @@ -115,14 +120,14 @@ public void testReceiveMessage_inRandomConsumerGroup() { } @Test - public void testReceiveMessageTimeout() { + public void receiveMessage_runIntoTimeout() { String topic = "test"; KafkaEndpoint endpoint = KafkaEndpoint.builder() - .kafkaConsumer(kafkaConsumerMock) - .server("localhost:9092") - .topic(topic) - .build(); + .kafkaConsumer(kafkaConsumerMock) + .server("localhost:9092") + .topic(topic) + .build(); reset(kafkaConsumerMock); when(kafkaConsumerMock.subscription()).thenReturn(singleton(topic)); @@ -140,16 +145,16 @@ public void testReceiveMessageTimeout() { } @Test - public void testWithCustomTimeout() { + public void receiveMessage_customTimeout_runIntoTimeout() { String topic = "timeout"; KafkaEndpoint endpoint = KafkaEndpoint.builder() - .kafkaConsumer(kafkaConsumerMock) - .timeout(10_000L) - .topic(topic) - .build(); + .kafkaConsumer(kafkaConsumerMock) + .timeout(10_000L) + .topic(topic) + .build(); - TopicPartition partition = new TopicPartition(topic, 0); + var partition = new TopicPartition(topic, 0); reset(kafkaConsumerMock); when(kafkaConsumerMock.subscription()).thenReturn(singleton(topic)); @@ -165,16 +170,16 @@ public void testWithCustomTimeout() { } @Test - public void testWithMessageHeaders() { + public void receiveMessage_withMessageHeaders() { String topic = "headers"; KafkaEndpoint endpoint = KafkaEndpoint.builder() - .kafkaConsumer(kafkaConsumerMock) - .server("localhost:9092") - .topic(topic) - .build(); + .kafkaConsumer(kafkaConsumerMock) + .server("localhost:9092") + .topic(topic) + .build(); - TopicPartition partition = new TopicPartition(topic, 0); + var partition = new TopicPartition(topic, 0); reset(kafkaConsumerMock); when(kafkaConsumerMock.subscription()).thenReturn(singleton(topic)); @@ -193,4 +198,60 @@ public void testWithMessageHeaders() { assertNotNull(receivedMessage.getHeader("Operation")); assertEquals(receivedMessage.getHeader("Operation"), "sayHello"); } + + @Test + public void getConsumer_returnsSetConsumer() { + var kafkaConsumerMock = mock(KafkaConsumer.class); + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .kafkaConsumer(kafkaConsumerMock) + .build(); + + var result = endpoint.createConsumer().getConsumer(); + assertThat(result) + .isEqualTo(kafkaConsumerMock); + } + + @Test + public void getConsumer_createsConsumerIfNonSet() { + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .kafkaConsumer(null) // null for explicity + .build(); + + var result = endpoint.createConsumer().getConsumer(); + assertThat(result) + .isNotNull(); + } + + @Test + @SuppressWarnings({"unchecked"}) + public void stop_unsubscribesAndClosesConsumer() { + var kafkaConsumerMock = mock(KafkaConsumer.class); + doReturn(Set.of("subscription")).when(kafkaConsumerMock).subscription(); + + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .kafkaConsumer(kafkaConsumerMock) + .build(); + + endpoint.createConsumer().stop(); + verify(kafkaConsumerMock).unsubscribe(); + verify(kafkaConsumerMock).close(Duration.ofSeconds(10)); + } + + @Test + @SuppressWarnings({"unchecked"}) + public void stop_closesConsumerEvenAfterUnsubscriptionError() { + var kafkaConsumerMock = mock(KafkaConsumer.class); + var unsubscribeException = new RuntimeException(); + doReturn(Set.of("subscription")).when(kafkaConsumerMock).subscription(); + doThrow(unsubscribeException).when(kafkaConsumerMock).unsubscribe(); + + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .kafkaConsumer(kafkaConsumerMock) + .build(); + + assertThatThrownBy(() -> endpoint.createConsumer().stop()) + .isEqualTo(unsubscribeException); + + verify(kafkaConsumerMock).close(Duration.ofSeconds(10)); + } } diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java index dfbbd304a0..40955f4843 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java @@ -16,25 +16,26 @@ package org.citrusframework.kafka.endpoint; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.InstanceOfAssertFactories.OPTIONAL; import static org.assertj.core.api.InstanceOfAssertFactories.STRING; import static org.citrusframework.kafka.message.KafkaMessageHeaders.KAFKA_PREFIX; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.springframework.test.util.ReflectionTestUtils.setField; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + public class KafkaEndpointTest { @Test public void classHasBuilder() { assertThat(KafkaEndpoint.builder().build()) - .isInstanceOf(KafkaEndpoint.class); + .isInstanceOf(KafkaEndpoint.class); } @Test @@ -42,8 +43,8 @@ public void defaultConstructor_initializesKafkaEndpointConfiguration() { var fixture = new KafkaEndpoint(); assertThat(fixture) - .extracting(KafkaEndpoint::getEndpointConfiguration) - .isNotNull(); + .extracting(KafkaEndpoint::getEndpointConfiguration) + .isNotNull(); } @Test @@ -51,16 +52,16 @@ public void newKafkaEndpoint_acceptsKafkaConsumer() { var kafkaConsumerMock = mock(org.apache.kafka.clients.consumer.KafkaConsumer.class); var fixture = KafkaEndpoint.newKafkaEndpoint( - kafkaConsumerMock, - null, null, null, null, null + kafkaConsumerMock, + null, null, null, null, null, false ); assertThat(fixture) - .isNotNull() - .extracting(KafkaEndpoint::getKafkaConsumer) - .isNotNull() - .extracting(org.citrusframework.kafka.endpoint.KafkaConsumer::getConsumer) - .isEqualTo(kafkaConsumerMock); + .isNotNull() + .extracting(KafkaEndpoint::createConsumer) + .isNotNull() + .extracting(org.citrusframework.kafka.endpoint.KafkaConsumer::getConsumer) + .isEqualTo(kafkaConsumerMock); } @Test @@ -68,17 +69,17 @@ public void newKafkaEndpoint_acceptsKafkaProducer() { var kafkaProducerMock = mock(org.apache.kafka.clients.producer.KafkaProducer.class); var fixture = KafkaEndpoint.newKafkaEndpoint( - null, - kafkaProducerMock, - null, null, null, null + null, + kafkaProducerMock, + null, null, null, null, false ); assertThat(fixture) - .isNotNull() - .extracting(KafkaEndpoint::getKafkaProducer) - .isNotNull() - .extracting(org.citrusframework.kafka.endpoint.KafkaProducer::getProducer) - .isEqualTo(kafkaProducerMock); + .isNotNull() + .extracting(KafkaEndpoint::createProducer) + .isNotNull() + .extracting(org.citrusframework.kafka.endpoint.KafkaProducer::getProducer) + .isEqualTo(kafkaProducerMock); } @DataProvider @@ -89,47 +90,45 @@ public static Object[][] defaultConsumerGroups() { @Test(dataProvider = "defaultConsumerGroups") public void newKafkaEndpoint_usesDefaultConsumerGroup(Boolean useRandomConsumerGroup) { var fixture = KafkaEndpoint.newKafkaEndpoint( - null, null, - useRandomConsumerGroup, - null, null, null + useRandomConsumerGroup, + null, null, null, false ); assertThat(fixture) - .isNotNull() - .extracting(KafkaEndpoint::getEndpointConfiguration) - .isNotNull() - .extracting(KafkaEndpointConfiguration::getConsumerGroup) - .asInstanceOf(STRING) - .isNotEmpty() - .isEqualTo("citrus_kafka_group"); + .isNotNull() + .extracting(KafkaEndpoint::getEndpointConfiguration) + .isNotNull() + .extracting(KafkaEndpointConfiguration::getConsumerGroup) + .asInstanceOf(STRING) + .isNotEmpty() + .isEqualTo("citrus_kafka_group"); } @Test public void newKafkaEndpoint_isAbleToCreateRandomConsumerGroup() { var fixture = KafkaEndpoint.newKafkaEndpoint( - null, null, - TRUE, - null, null, null + TRUE, + null, null, null, false ); assertThat(fixture) - .isNotNull() - .extracting(KafkaEndpoint::getEndpointConfiguration) - .isNotNull() - .extracting(KafkaEndpointConfiguration::getConsumerGroup) - .asInstanceOf(STRING) - .isNotEmpty() - .startsWith(KAFKA_PREFIX) - .hasSize(23) - .containsPattern(".*[a-z]{10}$") - .satisfies( - // Additionally make sure that gets passed downstream - groupId -> assertThat(fixture.createConsumer().getConsumer()) - .extracting("delegate") - .extracting("groupId") - .asInstanceOf(OPTIONAL) - .hasValue(groupId) - ); + .isNotNull() + .extracting(KafkaEndpoint::getEndpointConfiguration) + .isNotNull() + .extracting(KafkaEndpointConfiguration::getConsumerGroup) + .asInstanceOf(STRING) + .isNotEmpty() + .startsWith(KAFKA_PREFIX) + .hasSize(23) + .containsPattern(".*[a-z]{10}$") + // Make sure the random group id is propagated to new consumers + .satisfies( + groupId -> assertThat(fixture.createConsumer().getConsumer()) + .extracting("delegate") + .extracting("groupId") + .asInstanceOf(OPTIONAL) + .hasValue(groupId) + ); } @Test @@ -137,17 +136,17 @@ public void newKafkaEndpoint_acceptsServer() { var server = "localhost"; var fixture = KafkaEndpoint.newKafkaEndpoint( - null, null, null, - server, - null, null + null, + server, + null, null, false ); assertThat(fixture) - .isNotNull() - .extracting(KafkaEndpoint::getEndpointConfiguration) - .isNotNull() - .extracting(KafkaEndpointConfiguration::getServer) - .isEqualTo(server); + .isNotNull() + .extracting(KafkaEndpoint::getEndpointConfiguration) + .isNotNull() + .extracting(KafkaEndpointConfiguration::getServer) + .isEqualTo(server); } @Test @@ -155,17 +154,17 @@ public void newKafkaEndpoint_acceptsTimeout() { var timeout = 1234L; var fixture = KafkaEndpoint.newKafkaEndpoint( - null, null, null, null, - timeout, - null + null, null, + timeout, + null, false ); assertThat(fixture) - .isNotNull() - .extracting(KafkaEndpoint::getEndpointConfiguration) - .isNotNull() - .extracting(KafkaEndpointConfiguration::getTimeout) - .isEqualTo(timeout); + .isNotNull() + .extracting(KafkaEndpoint::getEndpointConfiguration) + .isNotNull() + .extracting(KafkaEndpointConfiguration::getTimeout) + .isEqualTo(timeout); } @Test @@ -173,16 +172,35 @@ public void newKafkaEndpoint_acceptsTopic() { var topic = "citrus"; var fixture = KafkaEndpoint.newKafkaEndpoint( - null, null, null, null, null, - topic + null, null, null, + topic, + false + ); + + assertThat(fixture) + .isNotNull() + .extracting(KafkaEndpoint::getEndpointConfiguration) + .isNotNull() + .extracting(KafkaEndpointConfiguration::getTopic) + .isEqualTo(topic); + } + + @Test + public void newKafkaEndpoint_acceptsThreadSafetyConfiguration() { + var topic = "citrus"; + + var fixture = KafkaEndpoint.newKafkaEndpoint( + null, null, null, + topic, + true ); assertThat(fixture) - .isNotNull() - .extracting(KafkaEndpoint::getEndpointConfiguration) - .isNotNull() - .extracting(KafkaEndpointConfiguration::getTopic) - .isEqualTo(topic); + .isNotNull() + .extracting(KafkaEndpoint::getEndpointConfiguration) + .isNotNull() + .extracting(KafkaEndpointConfiguration::useThreadSafeConsumer) + .isEqualTo(true); } @Test @@ -192,21 +210,58 @@ public void createConsumer_isMultipleInvocationAware() { var firstConsumer = fixture.createConsumer(); var secondConsumer = fixture.createConsumer(); - assertThat(firstConsumer).isNotNull(); - assertThat(secondConsumer).isNotNull(); - assertThat(firstConsumer).isSameAs(secondConsumer); + assertThat(firstConsumer) + .isNotNull(); + assertThat(secondConsumer) + .isNotNull() + .isSameAs(firstConsumer); + } + + @Test + public void createConsumer_returnsNewConsumers_inThreadSafeMode() { + ThreadLocal threadLocalKafkaConsumerMock = mock(); + + var kafkaConsumerMock = mock(KafkaConsumer.class); + doReturn(kafkaConsumerMock).when(threadLocalKafkaConsumerMock).get(); + + var fixture = new KafkaEndpoint(); + fixture.getEndpointConfiguration().setUseThreadSafeConsumer(true); + setField(fixture, "threadLocalKafkaConsumer", threadLocalKafkaConsumerMock, ThreadLocal.class); + + var consumer = fixture.createConsumer(); + + assertThat(consumer) + .isEqualTo(kafkaConsumerMock); + verify(threadLocalKafkaConsumerMock).get(); + } + + @Test + public void createConsumer_createsNonExistingConsumer_inThreadSafeMode() { + var fixture = new KafkaEndpoint(); + fixture.getEndpointConfiguration().setUseThreadSafeConsumer(true); + + var firstConsumer = fixture.createConsumer(); + var secondConsumer = fixture.createConsumer(); + + assertThat(firstConsumer) + .isNotNull(); + assertThat(secondConsumer) + .isNotNull() + .isSameAs(firstConsumer); } @Test public void createProducer_isMultipleInvocationAware() { var fixture = new KafkaEndpoint(); - var firstConsumer = fixture.createProducer(); - var secondConsumer = fixture.createProducer(); + var firstProducer = fixture.createProducer(); + var secondProducer = fixture.createProducer(); - assertThat(firstConsumer).isNotNull(); - assertThat(secondConsumer).isNotNull(); - assertThat(firstConsumer).isSameAs(secondConsumer); + assertThat(firstProducer) + .isNotNull(); + assertThat(secondProducer) + .isNotNull() + .isSameAs(firstProducer); } @Test @@ -216,8 +271,8 @@ public void getEndpointConfiguration_returnsInitialEndpointConfiguration() { var fixture = new KafkaEndpoint(kafkaEndpointConfigurationMock); assertThat(fixture) - .extracting(KafkaEndpoint::getEndpointConfiguration) - .isEqualTo(kafkaEndpointConfigurationMock); + .extracting(KafkaEndpoint::getEndpointConfiguration) + .isEqualTo(kafkaEndpointConfigurationMock); } @Test @@ -231,4 +286,21 @@ public void destroy_stopsKafkaConsumer() { verify(kafkaConsumerMock).stop(); } + + @Test + public void destroy_stopsKafkaConsumer_inThreadSafeMode() { + ThreadLocal threadLocalKafkaConsumerMock = mock(); + + var kafkaConsumerMock = mock(KafkaConsumer.class); + doReturn(kafkaConsumerMock).when(threadLocalKafkaConsumerMock).get(); + + var fixture = new KafkaEndpoint(); + fixture.getEndpointConfiguration().setUseThreadSafeConsumer(true); + setField(fixture, "threadLocalKafkaConsumer", threadLocalKafkaConsumerMock, ThreadLocal.class); + + fixture.destroy(); + + verify(kafkaConsumerMock).stop(); + verify(threadLocalKafkaConsumerMock).remove(); + } } diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointFactoryIT.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointFactoryIT.java index 9cbf45ddc3..0a55dada77 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointFactoryIT.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointFactoryIT.java @@ -27,4 +27,3 @@ public class KafkaEndpointFactoryIT extends TestNGCitrusSpringSupport { @CitrusTestSource(type = TestLoader.SPRING, name = "KafkaEndpointFactoryIT") public void testKafkaEndpointFactory() {} } - diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointJavaIT.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointJavaIT.java index 3b87f04284..27677abf70 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointJavaIT.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointJavaIT.java @@ -16,6 +16,18 @@ package org.citrusframework.kafka.integration; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.citrusframework.actions.ReceiveMessageAction.Builder.receive; +import static org.citrusframework.actions.SendMessageAction.Builder.send; +import static org.citrusframework.actions.SleepAction.Builder.sleep; +import static org.citrusframework.container.Parallel.Builder.parallel; +import static org.citrusframework.kafka.endpoint.KafkaMessageFilter.kafkaMessageFilter; +import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.ENDS_WITH; +import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.STARTS_WITH; +import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.kafkaHeaderContains; +import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.kafkaHeaderEquals; + +import java.time.Duration; import org.assertj.core.api.ThrowableAssert; import org.citrusframework.annotations.CitrusTest; import org.citrusframework.exceptions.CitrusRuntimeException; @@ -28,28 +40,17 @@ import org.citrusframework.testng.spring.TestNGCitrusSpringSupport; import org.testng.annotations.Test; -import java.time.Duration; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.citrusframework.actions.ReceiveMessageAction.Builder.receive; -import static org.citrusframework.actions.SendMessageAction.Builder.send; -import static org.citrusframework.actions.SleepAction.Builder.sleep; -import static org.citrusframework.container.Parallel.Builder.parallel; -import static org.citrusframework.kafka.endpoint.KafkaMessageFilter.kafkaMessageFilter; -import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.ENDS_WITH; -import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.STARTS_WITH; -import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.kafkaHeaderContains; -import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.kafkaHeaderEquals; - @Test(singleThreaded = true) public class KafkaEndpointJavaIT extends TestNGCitrusSpringSupport { @BindToRegistry private final KafkaEndpoint kafkaWithRandomConsumerGroupEndpoint = KafkaEndpoint.builder() .randomConsumerGroup(true) - .topic("hello") + .topic(getClass().getSimpleName()) + .useThreadSafeConsumer() .build(); + @Test @CitrusTest public void findKafkaEvent_headerEquals_citrus_DSL() { var body = "findKafkaEvent_headerEquals_citrus_DSL"; @@ -75,6 +76,7 @@ public void findKafkaEvent_headerEquals_citrus_DSL() { ); } + @Test @CitrusTest public void findKafkaEvent_headerContains_citrus_DSL() { var body = "findKafkaEvent_headerContains_citrus_DSL"; @@ -100,6 +102,7 @@ public void findKafkaEvent_headerContains_citrus_DSL() { ); } + @Test @CitrusTest public void findKafkaEvent_headerStartsWith_citrus_DSL() { var body = "findKafkaEvent_headerStartsWith_citrus_DSL"; @@ -131,6 +134,7 @@ public void findKafkaEvent_headerStartsWith_citrus_DSL() { ); } + @Test @CitrusTest public void findKafkaEvent_headerEndsWith_citrus_DSL() { var body = "findKafkaEvent_headerEndsWith_citrus_DSL"; @@ -162,6 +166,7 @@ public void findKafkaEvent_headerEndsWith_citrus_DSL() { ); } + @Test @CitrusTest public void findKafkaEvent_nothingFound_noMatch_citrus_DSL() { var body = "findKafkaEvent_nothingFound_noMatch_citrus_DSL"; @@ -192,6 +197,7 @@ public void findKafkaEvent_nothingFound_noMatch_citrus_DSL() { .hasMessageContaining("Failed to resolve Kafka message using selector"); } + @Test @CitrusTest public void findKafkaEvent_nothingFound_outsideLookbackWindow_citrus_DSL() { var body = "findKafkaEvent_nothingFound_outsideLookbackWindow_citrus_DSL"; @@ -224,6 +230,7 @@ public void findKafkaEvent_nothingFound_outsideLookbackWindow_citrus_DSL() { .hasMessageContaining("Failed to resolve Kafka message using selector"); } + @Test @CitrusTest public void findKafkaEvent_duplicateEntriesFound_citrus_DSL() { var body = "findKafkaEvent_duplicateEntriesFound_citrus_DSL"; @@ -258,6 +265,7 @@ public void findKafkaEvent_duplicateEntriesFound_citrus_DSL() { .hasMessageContaining("More than one matching record found in topic"); } + @Test @CitrusTest public void findKafkaEvent_headerEquals_java_DSL() { var body = "findKafkaEvent_headerEquals_java_DSL"; @@ -276,9 +284,10 @@ public void findKafkaEvent_headerEquals_java_DSL() { ); } + @Test @CitrusTest @GitHubIssue(1281) - public void parallel_access_thread_safety() { + public void threadSafetyOfKafkaConsumer_onParallelAccess() { var body = "parallel_access_thread_safety"; var key = "Name"; diff --git a/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/config/xml/KafkaEndpointParserTest-context.xml b/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/config/xml/KafkaEndpointParserTest-context.xml index 9ace182680..f7a929263b 100644 --- a/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/config/xml/KafkaEndpointParserTest-context.xml +++ b/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/config/xml/KafkaEndpointParserTest-context.xml @@ -37,7 +37,8 @@ actor="testActor"/> + random-consumer-group="true" + thread-safe-consumer="true"/> diff --git a/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/integration/KafkaEndpointIT_selectiveMessage.xml b/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/integration/KafkaEndpointIT_selectiveMessage.xml index 222ab71015..7c4218ad05 100644 --- a/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/integration/KafkaEndpointIT_selectiveMessage.xml +++ b/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/integration/KafkaEndpointIT_selectiveMessage.xml @@ -31,10 +31,6 @@ Test 1: Send Kafka message and receive that message based on header equality - - - - Send Kafka request: Citrus -> Kafka broker diff --git a/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/integration/KafkaEndpointIT_singleMessage.xml b/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/integration/KafkaEndpointIT_singleMessage.xml index 5e09b2ec70..cfec964c2d 100644 --- a/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/integration/KafkaEndpointIT_singleMessage.xml +++ b/endpoints/citrus-kafka/src/test/resources/org/citrusframework/kafka/integration/KafkaEndpointIT_singleMessage.xml @@ -29,10 +29,6 @@ Test 1: Send Kafka message and receive that message on the same topic (inline CDATA payload) - - - - Send Kafka request: Citrus -> Kafka broker diff --git a/src/manual/endpoint-kafka.adoc b/src/manual/endpoint-kafka.adoc index f174eed471..e38820a6a4 100644 --- a/src/manual/endpoint-kafka.adoc +++ b/src/manual/endpoint-kafka.adoc @@ -103,12 +103,16 @@ public KafkaEndpoint helloKafkaEndpoint() { consumer-group="citrus_group"/> ---- -Note that for effective message consumption it is advisable to use random consumer groups. -Both the Java and XML DSL support random consumer groups, when enabled: `randomConsumerGroup(true)` or `random-consumer-group="true"`. - The endpoint is now ready to be used inside a test case. The test simply references the endpoint by its name when sending or receiving. +IMPORTANT: The `org.apache.kafka.clients.consumer.KafkaConsumer` is **not thread-safe**. +When using <>, it is recommended to configure `useThreadSafeConsumer` respectively `thread-safe-consumer` for the Kafka endpoint. +Otherwise, you cannot execute tests in parallel. + +NOTE: For effective message consumption, it is additionally advisable to use random consumer groups. +Both the Java and XML DSL support random consumer groups, when enabled: `randomConsumerGroup(true)` or `random-consumer-group="true"`. + In case of a send operation the endpoint creates a Kafka producer and will simply publish the records to the defined Kafka topic. As the communication is asynchronous by default, the producer does not wait for a response. @@ -198,7 +202,7 @@ So you might run into message timeouts when using multiple Kafka endpoints with | `random-consumer-group` | No | `false` -| Whether to use random consumer gorup names. +| Whether to use random consumer group names. Note that these will all be prefixed by `citrus_kafka_` and end with a random 10 characters alphabetic suffix. | `key-serializer` @@ -243,6 +247,12 @@ So you might run into message timeouts when using multiple Kafka endpoints with | Map of producer property settings to apply to the Kafka producer configuration. This enables you to overwrite any producer setting with respective property key value pairs. +| `thread-safe-consumer` +| No +| `false` +| Whether to use separate ``org.apache.kafka.clients.consumer.KafkaConsumer``'s per thread. + Required for parallel test execution, because the consumer is **not thread-safe**. + |=== [[kafka-endpoint-properties]]