diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java index dacadc16453878..9a8f75a79a7f3c 100644 --- a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java @@ -72,19 +72,31 @@ public Uni>> fetchProcessingState(Collect Object[] ids = partitions.stream() .map(tp -> new CheckpointEntityId(consumerGroupId, tp)) .toArray(Object[]::new); - return Vertx.currentContext().executeBlocking(Uni.createFrom().item(() -> { + return Vertx.currentContext().executeBlocking(Uni.createFrom().emitter(emitter -> { List fetched = new ArrayList<>(); + Transaction tx = null; try (Session session = sf.openSession()) { + tx = session.beginTransaction(); for (Object id : ids) { CheckpointEntity entity = session.find(stateType, id); if (entity != null) { fetched.add(entity); } } + Map> stateMap = fetched.stream() + .filter(e -> e != null && CheckpointEntity.topicPartition(e) != null) + .collect(Collectors.toMap(CheckpointEntity::topicPartition, + e -> new ProcessingState<>(e, e.offset))); + session.flush(); + tx.commit(); + emitter.complete(stateMap); + } catch (Throwable t) { + if (tx != null) { + tx.rollback(); + } + emitter.fail(t); } - return fetched.stream().filter(e -> e != null && CheckpointEntity.topicPartition(e) != null) - .collect(Collectors.toMap(CheckpointEntity::topicPartition, - e -> new ProcessingState<>(e, e.offset))); + })); }); } diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java index 2e5c60d7410dff..362c837a90d7b3 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaEndpoint.java @@ -4,6 +4,7 @@ import javax.inject.Inject; import javax.persistence.EntityManager; +import javax.transaction.Transactional; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -12,6 +13,9 @@ import org.apache.kafka.common.TopicPartition; import io.quarkus.hibernate.orm.PersistenceUnit; +import io.quarkus.it.kafka.fruit.Fruit; +import io.quarkus.it.kafka.people.PeopleState; +import io.quarkus.it.kafka.people.Person; import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId; @Path("/kafka") @@ -40,6 +44,7 @@ public List getPeople() { @GET @Path("/people-state") @Produces(MediaType.APPLICATION_JSON) + @Transactional public PeopleState getPeopleState() { return entityManager.find(PeopleState.class, new CheckpointEntityId("people-checkpoint", new TopicPartition("people", 0))); diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java index 5bb200fe10450a..25ea7cffa9c99f 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/KafkaReceivers.java @@ -8,10 +8,14 @@ import javax.transaction.Transactional; import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; -import io.smallrye.reactive.messaging.MutinyEmitter; +import io.quarkus.it.kafka.fruit.Fruit; +import io.quarkus.it.kafka.fruit.FruitDto; +import io.quarkus.it.kafka.people.PeopleState; +import io.quarkus.it.kafka.people.Person; import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata; @ApplicationScoped @@ -20,13 +24,13 @@ public class KafkaReceivers { private final List people = new CopyOnWriteArrayList<>(); @Channel("fruits-persisted") - MutinyEmitter emitter; + Emitter emitter; @Incoming("fruits-in") @Transactional - public CompletionStage persist(Message fruit) { - fruit.getPayload().persist(); - return emitter.sendMessage(fruit).subscribeAsCompletionStage(); + public CompletionStage persist(Fruit fruit) { + fruit.persist(); + return emitter.send(new FruitDto(fruit)); } @Incoming("people-in") @@ -34,10 +38,10 @@ public CompletionStage consume(Message msg) { CheckpointMetadata store = CheckpointMetadata.fromMessage(msg); Person person = msg.getPayload(); store.transform(new PeopleState(), c -> { - if (c.names == null) { - c.names = person.getName(); + if (c.getNames() == null) { + c.setNames(person.getName()); } else { - c.names = c.names + ";" + person.getName(); + c.setNames(c.getNames() + ";" + person.getName()); } return c; }); diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Fruit.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/Fruit.java similarity index 89% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Fruit.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/Fruit.java index da5f7a87c48294..acfd1f577d564b 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Fruit.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/Fruit.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.fruit; import javax.persistence.Entity; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitDto.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitDto.java new file mode 100644 index 00000000000000..9622a305f8ae06 --- /dev/null +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitDto.java @@ -0,0 +1,20 @@ +package io.quarkus.it.kafka.fruit; + +public class FruitDto { + String name; + + public FruitDto(Fruit fruit) { + this.name = fruit.name; + } + + public FruitDto() { + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/FruitProducer.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitProducer.java similarity index 94% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/FruitProducer.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitProducer.java index 9921a1e2fd4694..0be96ac8cfe212 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/FruitProducer.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/fruit/FruitProducer.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.fruit; import javax.ws.rs.Consumes; import javax.ws.rs.POST; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/package-info.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/package-info.java deleted file mode 100644 index a9574d0fd0b3de..00000000000000 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -@PersistenceUnit("people") -package io.quarkus.it.kafka; - -import io.quarkus.hibernate.orm.PersistenceUnit; \ No newline at end of file diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleProducer.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleProducer.java similarity index 93% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleProducer.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleProducer.java index def7dd4e0baff1..893dbbd947b0ba 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleProducer.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleProducer.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.people; import javax.enterprise.context.ApplicationScoped; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleState.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleState.java similarity index 89% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleState.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleState.java index 326a001f6aa695..7005b79c15fdb6 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/PeopleState.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/PeopleState.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.people; import javax.persistence.Entity; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Person.java b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/Person.java similarity index 92% rename from integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Person.java rename to integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/Person.java index 079fe366ed16c9..05a673886c3f59 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/Person.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/java/io/quarkus/it/kafka/people/Person.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.people; import io.quarkus.runtime.annotations.RegisterForReflection; diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties b/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties index 119ec256202090..ccdbd05a7e5828 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties +++ b/integration-tests/reactive-messaging-hibernate-orm/src/main/resources/application.properties @@ -19,10 +19,13 @@ mp.messaging.outgoing.people-out.acks=all mp.messaging.incoming.people-in.topic=people mp.messaging.incoming.people-in.commit-strategy=checkpoint mp.messaging.incoming.people-in.checkpoint.state-store=quarkus-hibernate-orm -mp.messaging.incoming.people-in.checkpoint.state-type=io.quarkus.it.kafka.PeopleState +mp.messaging.incoming.people-in.checkpoint.state-type=io.quarkus.it.kafka.people.PeopleState mp.messaging.incoming.people-in.checkpoint.quarkus-hibernate-orm.persistence-unit=people mp.messaging.incoming.people-in.auto.commit.interval.ms=500 mp.messaging.incoming.people-in.group.id=people-checkpoint +quarkus.datasource.devservices.enabled=true +quarkus.hibernate-orm.packages=io.quarkus.it.kafka.fruit quarkus.datasource."people".devservices.enabled=true quarkus.hibernate-orm."people".datasource=people +quarkus.hibernate-orm."people".packages=io.quarkus.it.kafka.people diff --git a/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java b/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java index 860688a11c9aee..ae6f2071138f50 100644 --- a/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java +++ b/integration-tests/reactive-messaging-hibernate-orm/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java @@ -12,9 +12,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import io.quarkus.it.kafka.fruit.Fruit; +import io.quarkus.it.kafka.people.PeopleState; +import io.quarkus.it.kafka.people.Person; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.kafka.InjectKafkaCompanion; @@ -25,7 +27,6 @@ @QuarkusTest @QuarkusTestResource(KafkaCompanionResource.class) -@Disabled public class KafkaConnectorTest { protected static final TypeRef> TYPE_REF = new TypeRef>() { @@ -69,7 +70,7 @@ public void testPeople() { PeopleState result = get("/kafka/people-state").as(PeopleState.class); Assertions.assertNotNull(result); Assertions.assertTrue(result.offset >= 6); - Assertions.assertEquals("bob;alice;tom;jerry;anna;ken", result.names); + Assertions.assertEquals("bob;alice;tom;jerry;anna;ken", result.getNames()); }); }