From 78515802c4e723f8f8bdd0f6a462dadd987899a3 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 22 Nov 2022 11:05:10 +0000 Subject: [PATCH] Keep test IT for HibernateOrmStateStore. Uses DTO for sending Kafka records Separate PUs for fruit and people entities --- docs/src/main/asciidoc/kafka.adoc | 5 +++-- .../kafka/HibernateOrmStateStore.java | 20 +++++++++++++++---- .../io/quarkus/it/kafka/KafkaEndpoint.java | 5 +++++ .../io/quarkus/it/kafka/KafkaReceivers.java | 20 +++++++++++-------- .../quarkus/it/kafka/{ => fruit}/Fruit.java | 2 +- .../io/quarkus/it/kafka/fruit/FruitDto.java | 20 +++++++++++++++++++ .../it/kafka/{ => fruit}/FruitProducer.java | 2 +- .../io/quarkus/it/kafka/package-info.java | 4 ---- .../it/kafka/{ => people}/PeopleProducer.java | 2 +- .../it/kafka/{ => people}/PeopleState.java | 2 +- .../quarkus/it/kafka/{ => people}/Person.java | 2 +- .../src/main/resources/application.properties | 5 ++++- .../quarkus/it/kafka/KafkaConnectorTest.java | 7 ++++--- 13 files changed, 69 insertions(+), 27 deletions(-) rename integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/{ => fruit}/Fruit.java (89%) create mode 100644 integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitDto.java rename integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/{ => fruit}/FruitProducer.java (94%) delete mode 100644 integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/package-info.java rename integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/{ => people}/PeopleProducer.java (93%) rename integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/{ => people}/PeopleState.java (89%) rename integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/{ => people}/Person.java (92%) diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 946e61fcacbda..0937bad429616 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -2618,13 +2618,14 @@ public class ResourceSendingToKafka { @Transactional // <1> public CompletionStage storeAndSendToKafka(Fruit fruit) { // <2> fruit.persist(); - return emitter.send(fruit); // <3> + return emitter.send(new FruitDto(fruit)); // <3> } } ---- <1> As we are writing to the database, make sure we run inside a transaction <2> The method receives the fruit instance to persist. It returns a `CompletionStage` which is used for the transaction demarcation. The transaction is committed when the return `CompletionStage` completes. In our case, it's when the message is written to Kafka. -<3> Send the managed instance to Kafka. Make sure we wait for the message to complete before closing the transaction. +<3> Wrap the managed entity inside a Data transfer object and send it to Kafka. +This makes sure that managed entity is not impacted by the Kafka serialization. [#writing-entities-managed-by-hibernate-reactive-to-kafka] === Writing entities managed by Hibernate Reactive to Kafka diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java index dacadc1645387..9a8f75a79a7f3 100644 --- a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java @@ -72,19 +72,31 @@ public Uni>> fetchProcessingState(Collect Object[] ids = partitions.stream() .map(tp -> new CheckpointEntityId(consumerGroupId, tp)) .toArray(Object[]::new); - return Vertx.currentContext().executeBlocking(Uni.createFrom().item(() -> { + return Vertx.currentContext().executeBlocking(Uni.createFrom().emitter(emitter -> { List fetched = new ArrayList<>(); + Transaction tx = null; try (Session session = sf.openSession()) { + tx = session.beginTransaction(); for (Object id : ids) { CheckpointEntity entity = session.find(stateType, id); if (entity != null) { fetched.add(entity); } } + Map> stateMap = fetched.stream() + .filter(e -> e != null && CheckpointEntity.topicPartition(e) != null) + .collect(Collectors.toMap(CheckpointEntity::topicPartition, + e -> new ProcessingState<>(e, e.offset))); + session.flush(); + tx.commit(); + emitter.complete(stateMap); + } catch (Throwable t) { + if (tx != null) { + tx.rollback(); + } + emitter.fail(t); } - return fetched.stream().filter(e -> e != null && CheckpointEntity.topicPartition(e) != null) - .collect(Collectors.toMap(CheckpointEntity::topicPartition, - e -> new ProcessingState<>(e, e.offset))); + })); }); } diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java index 2e5c60d7410df..362c837a90d7b 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java @@ -4,6 +4,7 @@ import javax.inject.Inject; import javax.persistence.EntityManager; +import javax.transaction.Transactional; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -12,6 +13,9 @@ import org.apache.kafka.common.TopicPartition; import io.quarkus.hibernate.orm.PersistenceUnit; +import io.quarkus.it.kafka.fruit.Fruit; +import io.quarkus.it.kafka.people.PeopleState; +import io.quarkus.it.kafka.people.Person; import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId; @Path("/kafka") @@ -40,6 +44,7 @@ public List getPeople() { @GET @Path("/people-state") @Produces(MediaType.APPLICATION_JSON) + @Transactional public PeopleState getPeopleState() { return entityManager.find(PeopleState.class, new CheckpointEntityId("people-checkpoint", new TopicPartition("people", 0))); diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java index 5bb200fe10450..25ea7cffa9c99 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java @@ -8,10 +8,14 @@ import javax.transaction.Transactional; import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; -import io.smallrye.reactive.messaging.MutinyEmitter; +import io.quarkus.it.kafka.fruit.Fruit; +import io.quarkus.it.kafka.fruit.FruitDto; +import io.quarkus.it.kafka.people.PeopleState; +import io.quarkus.it.kafka.people.Person; import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata; @ApplicationScoped @@ -20,13 +24,13 @@ public class KafkaReceivers { private final List people = new CopyOnWriteArrayList<>(); @Channel("fruits-persisted") - MutinyEmitter emitter; + Emitter emitter; @Incoming("fruits-in") @Transactional - public CompletionStage persist(Message fruit) { - fruit.getPayload().persist(); - return emitter.sendMessage(fruit).subscribeAsCompletionStage(); + public CompletionStage persist(Fruit fruit) { + fruit.persist(); + return emitter.send(new FruitDto(fruit)); } @Incoming("people-in") @@ -34,10 +38,10 @@ public CompletionStage consume(Message msg) { CheckpointMetadata store = CheckpointMetadata.fromMessage(msg); Person person = msg.getPayload(); store.transform(new PeopleState(), c -> { - if (c.names == null) { - c.names = person.getName(); + if (c.getNames() == null) { + c.setNames(person.getName()); } else { - c.names = c.names + ";" + person.getName(); + c.setNames(c.getNames() + ";" + person.getName()); } return c; }); diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Fruit.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/Fruit.java similarity index 89% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Fruit.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/Fruit.java index da5f7a87c4829..acfd1f577d564 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Fruit.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/Fruit.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.fruit; import javax.persistence.Entity; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitDto.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitDto.java new file mode 100644 index 0000000000000..9622a305f8ae0 --- /dev/null +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitDto.java @@ -0,0 +1,20 @@ +package io.quarkus.it.kafka.fruit; + +public class FruitDto { + String name; + + public FruitDto(Fruit fruit) { + this.name = fruit.name; + } + + public FruitDto() { + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/FruitProducer.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitProducer.java similarity index 94% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/FruitProducer.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitProducer.java index 9921a1e2fd469..0be96ac8cfe21 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/FruitProducer.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitProducer.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.fruit; import javax.ws.rs.Consumes; import javax.ws.rs.POST; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/package-info.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/package-info.java deleted file mode 100644 index a9574d0fd0b3d..0000000000000 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -@PersistenceUnit("people") -package io.quarkus.it.kafka; - -import io.quarkus.hibernate.orm.PersistenceUnit; \ No newline at end of file diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleProducer.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleProducer.java similarity index 93% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleProducer.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleProducer.java index def7dd4e0baff..893dbbd947b0b 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleProducer.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleProducer.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.people; import javax.enterprise.context.ApplicationScoped; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleState.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleState.java similarity index 89% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleState.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleState.java index 326a001f6aa69..7005b79c15fdb 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleState.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleState.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.people; import javax.persistence.Entity; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Person.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/Person.java similarity index 92% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Person.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/Person.java index 079fe366ed16c..05a673886c3f5 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Person.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/Person.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.people; import io.quarkus.runtime.annotations.RegisterForReflection; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties b/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties index 119ec25620209..ccdbd05a7e582 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties @@ -19,10 +19,13 @@ mp.messaging.outgoing.people-out.acks=all mp.messaging.incoming.people-in.topic=people mp.messaging.incoming.people-in.commit-strategy=checkpoint mp.messaging.incoming.people-in.checkpoint.state-store=quarkus-hibernate-orm -mp.messaging.incoming.people-in.checkpoint.state-type=io.quarkus.it.kafka.PeopleState +mp.messaging.incoming.people-in.checkpoint.state-type=io.quarkus.it.kafka.people.PeopleState mp.messaging.incoming.people-in.checkpoint.quarkus-hibernate-orm.persistence-unit=people mp.messaging.incoming.people-in.auto.commit.interval.ms=500 mp.messaging.incoming.people-in.group.id=people-checkpoint +quarkus.datasource.devservices.enabled=true +quarkus.hibernate-orm.packages=io.quarkus.it.kafka.fruit quarkus.datasource."people".devservices.enabled=true quarkus.hibernate-orm."people".datasource=people +quarkus.hibernate-orm."people".packages=io.quarkus.it.kafka.people diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java b/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java index 860688a11c9ae..ae6f2071138f5 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java @@ -12,9 +12,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import io.quarkus.it.kafka.fruit.Fruit; +import io.quarkus.it.kafka.people.PeopleState; +import io.quarkus.it.kafka.people.Person; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.kafka.InjectKafkaCompanion; @@ -25,7 +27,6 @@ @QuarkusTest @QuarkusTestResource(KafkaCompanionResource.class) -@Disabled public class KafkaConnectorTest { protected static final TypeRef> TYPE_REF = new TypeRef>() { @@ -69,7 +70,7 @@ public void testPeople() { PeopleState result = get("/kafka/people-state").as(PeopleState.class); Assertions.assertNotNull(result); Assertions.assertTrue(result.offset >= 6); - Assertions.assertEquals("bob;alice;tom;jerry;anna;ken", result.names); + Assertions.assertEquals("bob;alice;tom;jerry;anna;ken", result.getNames()); }); }