diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 1ae60850..67d33e3f 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -33,10 +33,19 @@ jobs:
server-username: MAVEN_USERNAME
server-password: MAVEN_PASSWORD
- - name: Regular build
+ - name: Run regular build
run: |
./mvnw -B -U clean verify
+ - name: Run integration tests
+ run: | # no clean
+ ./mvnw -B -U -Pintegration-test -DskipExamples
+
+ - name: Build coverage report
+ if: matrix.sonar-enabled
+ run: | # no clean
+ ./mvnw -B -U -Pcoverage-aggregate -DskipExamples
+
- name: Sonar analysis
if: matrix.sonar-enabled
run: | # no clean
@@ -48,10 +57,6 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- - name: Run integration tests
- run: | # no clean
- ./mvnw -B -Pitest
-
- name: Deploy to Sonatype
if: success()
run: | # no clean, no tests, no examples
diff --git a/.github/workflows/pullrequest.yml b/.github/workflows/pullrequest.yml
index 5618598e..957dafcc 100644
--- a/.github/workflows/pullrequest.yml
+++ b/.github/workflows/pullrequest.yml
@@ -11,7 +11,9 @@ jobs:
matrix:
include:
- java-version: 8
+ sonar-enabled: false
- java-version: 11
+ sonar-enabled: true
fail-fast: false # run both to the end
steps:
@@ -34,4 +36,20 @@ jobs:
- name: Run integration tests
run: | # no clean
- ./mvnw -B -Pitest
+ ./mvnw -B -U -Pintegration-test -DskipExamples
+
+ - name: Build coverage report
+ if: matrix.sonar-enabled
+ run: | # no clean
+ ./mvnw -B -U -Pcoverage-aggregate -DskipExamples
+
+ - name: Sonar Analysis
+ if: ${{ success() && matrix.sonar-enabled && github.event.pull_request.head.repo.full_name == github.repository }}
+ run: |
+ ./mvnw -B sonar:sonar \
+ -Dsonar.projectKey=AxonFramework_extension-kafka \
+ -Dsonar.organization=axonframework \
+ -Dsonar.host.url=https://sonarcloud.io \
+ -Dsonar.login=${{ secrets.SONAR_TOKEN }}
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 1c77bbbe..e06af3df 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -39,22 +39,21 @@ immediately.
### Project Build
-The project is built with Apache Maven, supplied by the Maven Wrapper `mvnw`. For separate aspects of the build Maven
+The project is built with Apache Maven, supplied by the Maven Wrapper `mvnw`. For separate aspects of the build Maven
profiles are used.
-For a **regular** build, execute from your command line: `./mvnw`. This operation will run the build and execute JUnit tests
-of all modules and package the resulting artifacts.
+For a **regular** build, execute from your command line: `./mvnw`. This operation will run the build and execute JUnit
+tests of all modules and package the resulting artifacts.
-This repository contains an example project.
-You can skip its build by adding `-DskipExamples` to your build command.
+This repository contains an example project. You can skip its build by adding `-DskipExamples` to your build command.
-There are long-running integration tests present (starting Spring Boot Application and/or running Kafka in a TestContainer), which **ARE NOT** executed by default.
-A unique `itest` build is needed to run those long-running tests.
-If you want to run them, please call `./mvnw -Pitest` from your command line.
-When introducing additional integration tests, make sure the class name ends with `IntegrationTest`.
+There are long-running integration tests present (starting Spring Boot Application and/or running Kafka in a
+TestContainer), which **ARE NOT** executed by default. A unique `integration-test` build is needed to run those
+long-running tests. If you want to run them, please call `./mvnw -Pintegration-test` from your command line. When
+introducing additional integration tests, make sure the class name ends with `IntegrationTest`.
The project uses JaCoCo to measure test coverage of the code and automatically generate coverage reports on regular
-and `itest` builds. If you are interested in the overall test coverage, please run `./mvnw -Pcoverage-aggregate`
-(without calling `clean`) after you run the **regular** and `itest` builds and check the resulting aggregated report
-in `./coverage-report-generator/target/site/jacoco-aggregate/index.html`
+and `integration-test` builds. If you are interested in the overall test coverage, please run `./mvnw -Pcoverage` after
+running both without clean. and check the resulting aggregated report
+in `./coverage-report/target/site/jacoco-aggregate/index.html`
diff --git a/coverage-report-generator/pom.xml b/coverage-report/pom.xml
similarity index 95%
rename from coverage-report-generator/pom.xml
rename to coverage-report/pom.xml
index ac0480da..194c1c20 100644
--- a/coverage-report-generator/pom.xml
+++ b/coverage-report/pom.xml
@@ -24,7 +24,7 @@
4.6.0-SNAPSHOT
- axon-kafka-coverage-report-generator
+ axon-kafka-coverage-report4.6.0-SNAPSHOTAxon Framework Kafka Extension - Coverage Report Generator
@@ -40,11 +40,13 @@
org.axonframework.extensions.kafkaaxon-kafka${project.version}
+ runtimeorg.axonframework.extensions.kafkaaxon-kafka-spring-boot-autoconfigure${project.version}
+ runtime
diff --git a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java
index c9ae6b18..b68bd633 100644
--- a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java
+++ b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010-2021. Axon Framework
+ * Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -57,6 +57,7 @@
import java.lang.invoke.MethodHandles;
import java.util.Collections;
+import java.util.Optional;
import static org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
@@ -141,7 +142,7 @@ public KafkaPublisher kafkaPublisher(
.producerFactory(axonKafkaProducerFactory)
.messageConverter(kafkaMessageConverter)
.messageMonitor(configuration.messageMonitor(KafkaPublisher.class, "kafkaPublisher"))
- .topic(properties.getDefaultTopic())
+ .topicResolver(m -> Optional.of(properties.getDefaultTopic()))
.build();
}
diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java
index 229492ce..727f09a6 100644
--- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java
+++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010-2021. Axon Framework
+ * Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
import org.slf4j.LoggerFactory;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -70,7 +71,7 @@ public class KafkaPublisher {
private final ProducerFactory producerFactory;
private final KafkaMessageConverter messageConverter;
private final MessageMonitor super EventMessage>> messageMonitor;
- private final String topic;
+ private final TopicResolver topicResolver;
private final long publisherAckTimeout;
/**
@@ -86,7 +87,7 @@ protected KafkaPublisher(Builder builder) {
this.producerFactory = builder.producerFactory;
this.messageConverter = builder.messageConverter;
this.messageMonitor = builder.messageMonitor;
- this.topic = builder.topic;
+ this.topicResolver = builder.topicResolver;
this.publisherAckTimeout = builder.publisherAckTimeout;
}
@@ -123,37 +124,42 @@ public static Builder builder() {
* @param event the events to publish on the Kafka broker.
* @param the implementation of {@link EventMessage} send through this method
*/
+ @SuppressWarnings("squid:S2095") //producer needs to be closed async, not within this method
public > void send(T event) {
logger.debug("Starting event producing process for [{}].", event.getPayloadType());
+ Optional topic = topicResolver.resolve(event);
+ if (!topic.isPresent()) {
+ logger.debug("Skip publishing event for [{}] since topicFunction returned empty.", event.getPayloadType());
+ return;
+ }
UnitOfWork> uow = CurrentUnitOfWork.get();
MonitorCallback monitorCallback = messageMonitor.onMessageIngested(event);
- try (Producer producer = producerFactory.createProducer()) {
- ConfirmationMode confirmationMode = producerFactory.confirmationMode();
+ Producer producer = producerFactory.createProducer();
+ ConfirmationMode confirmationMode = producerFactory.confirmationMode();
+
+ if (confirmationMode.isTransactional()) {
+ tryBeginTxn(producer);
+ }
+
+ // Sends event messages to Kafka and receive a future indicating the status.
+ Future publishStatus = producer.send(messageConverter.createKafkaMessage(event, topic.get()));
+ uow.onPrepareCommit(u -> {
if (confirmationMode.isTransactional()) {
- tryBeginTxn(producer);
+ tryCommit(producer, monitorCallback);
+ } else if (confirmationMode.isWaitForAck()) {
+ waitForPublishAck(publishStatus, monitorCallback);
}
+ tryClose(producer);
+ });
- // Sends event messages to Kafka and receive a future indicating the status.
- Future publishStatus = producer.send(messageConverter.createKafkaMessage(event, topic));
-
- uow.onPrepareCommit(u -> {
- if (confirmationMode.isTransactional()) {
- tryCommit(producer, monitorCallback);
- } else if (confirmationMode.isWaitForAck()) {
- waitForPublishAck(publishStatus, monitorCallback);
- }
- tryClose(producer);
- });
-
- uow.onRollback(u -> {
- if (confirmationMode.isTransactional()) {
- tryRollback(producer);
- }
- tryClose(producer);
- });
- }
+ uow.onRollback(u -> {
+ if (confirmationMode.isTransactional()) {
+ tryRollback(producer);
+ }
+ tryClose(producer);
+ });
}
private void tryBeginTxn(Producer, ?> producer) {
@@ -248,7 +254,7 @@ public static class Builder {
.serializer(XStreamSerializer.defaultSerializer())
.build();
private MessageMonitor super EventMessage>> messageMonitor = NoOpMessageMonitor.instance();
- private String topic = DEFAULT_TOPIC;
+ private TopicResolver topicResolver = m -> Optional.of(DEFAULT_TOPIC);
private long publisherAckTimeout = 1_000;
/**
@@ -300,10 +306,29 @@ public Builder messageMonitor(MessageMonitor super EventMessage>> mess
*
* @param topic the Kafka {@code topic} to publish {@link EventMessage}s on
* @return the current Builder instance, for fluent interfacing
+ * @deprecated in through use of topic resolver
*/
+ @Deprecated
+ @SuppressWarnings("squid:S1133") //needs a major release to remove, since part of public API
public Builder topic(String topic) {
assertThat(topic, name -> Objects.nonNull(name) && !"".equals(name), "The topic may not be null or empty");
- this.topic = topic;
+ this.topicResolver = m -> Optional.of(topic);
+ return this;
+ }
+
+ /**
+ * Set the resolver to determine the Kafka {@code topic} to publish a certain {@link EventMessage} to.The {@code
+ * EventMessage} is not published if the resolver returns an {@code Optional.empty()}. Defaults to always return
+ * the set topic, or always return {@code Axon.Events}.
+ *
+ * @param topicResolver the Kafka {@code topic} to publish {@link EventMessage}s on
+ * @return the current Builder instance, for fluent interfacing
+ * @author Gerard Klijs
+ * @since 4.6.0
+ */
+ public Builder topicResolver(TopicResolver topicResolver) {
+ assertNonNull(topicResolver, "The TopicResolver may not be null");
+ this.topicResolver = topicResolver;
return this;
}
diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/TopicResolver.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/TopicResolver.java
new file mode 100644
index 00000000..ed0900f5
--- /dev/null
+++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/TopicResolver.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2010-2022. Axon Framework
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.axonframework.extensions.kafka.eventhandling.producer;
+
+import org.axonframework.eventhandling.EventMessage;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Interface to determine if an {@code EventMessage} should be published to Kafka, and if so to which topic. If the
+ * result from the call is {@code Optional.empty()} is will not be published, else the result will be used for the
+ * topic.
+ *
+ * @author Gerard Klijs
+ * @since 4.6.0
+ */
+@FunctionalInterface
+public interface TopicResolver extends Function, Optional> {
+
+ /**
+ * resolve an {@code EventMessage} to an optional topic to publish the event to
+ *
+ * @param event an {@code EventMessage}
+ * @return the optional topic, when empty the event message will not be published
+ */
+ default Optional resolve(EventMessage> event) {
+ return this.apply(event);
+ }
+}
diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/KafkaIntegrationTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/KafkaIntegrationTest.java
index 1f04db48..42101267 100644
--- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/KafkaIntegrationTest.java
+++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/KafkaIntegrationTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010-2021. Axon Framework
+ * Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@
import org.junit.jupiter.api.*;
import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.axonframework.eventhandling.GenericEventMessage.asEventMessage;
@@ -78,7 +79,13 @@ void setUp() {
producerFactory = ProducerConfigUtil.ackProducerFactory(getBootstrapServers(), ByteArraySerializer.class);
publisher = KafkaPublisher.builder()
.producerFactory(producerFactory)
- .topic(TEST_TOPIC)
+ .topicResolver(m -> {
+ if (m.getPayloadType().isAssignableFrom(String.class)) {
+ return Optional.of(TEST_TOPIC);
+ } else {
+ return Optional.empty();
+ }
+ })
.build();
KafkaEventPublisher sender =
KafkaEventPublisher.builder().kafkaPublisher(publisher).build();
@@ -124,6 +131,34 @@ void testPublishAndReadMessages() throws Exception {
assertTrue(stream2.hasNextAvailable(25, TimeUnit.SECONDS));
TrackedEventMessage> actual = stream2.nextAvailable();
assertNotNull(actual);
+ assertEquals("test", actual.getPayload());
+
+ stream2.close();
+ }
+
+ @Test
+ void testSkipPublishForLongPayload() throws Exception {
+ StreamableKafkaMessageSource streamableMessageSource =
+ StreamableKafkaMessageSource.builder()
+ .topics(Collections.singletonList(TEST_TOPIC))
+ .consumerFactory(consumerFactory)
+ .fetcher(fetcher)
+ .build();
+
+ BlockingStream> stream1 = streamableMessageSource.openStream(null);
+ stream1.close();
+ BlockingStream> stream2 = streamableMessageSource.openStream(null);
+
+ //This one will not be received
+ eventBus.publish(asEventMessage(42L));
+ //Added so we don't have to wait longer than necessary, to know the other one did not publish
+ eventBus.publish(asEventMessage("test"));
+
+ // The consumer may need some time to start
+ assertTrue(stream2.hasNextAvailable(25, TimeUnit.SECONDS));
+ TrackedEventMessage> actual = stream2.nextAvailable();
+ assertNotNull(actual);
+ assertInstanceOf(String.class, actual.getPayload(), "Long is not skipped");
stream2.close();
}
diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/producer/DefaultProducerFactoryIntegrationTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/producer/DefaultProducerFactoryIntegrationTest.java
index 581682d5..925e2f1d 100644
--- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/producer/DefaultProducerFactoryIntegrationTest.java
+++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/producer/DefaultProducerFactoryIntegrationTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010-2021. Axon Framework
+ * Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,13 +16,11 @@
package org.axonframework.extensions.kafka.eventhandling.producer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.extensions.kafka.eventhandling.util.KafkaAdminUtils;
import org.axonframework.extensions.kafka.eventhandling.util.KafkaContainerTest;
@@ -33,7 +31,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
@@ -104,36 +101,33 @@ void testDefaultConfirmationModeForTransactionalProducer() {
@Test
void testConfiguringInvalidCacheSize() {
- DefaultProducerFactory.Builder