Skip to content

Commit

Permalink
Merge pull request #29433 from ozangunalp/fix_hibernate_orm_state_store
Browse files Browse the repository at this point in the history
Fix flaky test in reactive-messaging-hibernate-orm
  • Loading branch information
gsmet authored Jan 31, 2023
2 parents 63bfcba + 7851580 commit e6dc19d
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 27 deletions.
5 changes: 3 additions & 2 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2618,13 +2618,14 @@ public class ResourceSendingToKafka {
@Transactional // <1>
public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) { // <2>
fruit.persist();
return emitter.send(fruit); // <3>
return emitter.send(new FruitDto(fruit)); // <3>
}
}
----
<1> As we are writing to the database, make sure we run inside a transaction
<2> The method receives the fruit instance to persist. It returns a `CompletionStage` which is used for the transaction demarcation. The transaction is committed when the return `CompletionStage` completes. In our case, it's when the message is written to Kafka.
<3> Send the managed instance to Kafka. Make sure we wait for the message to complete before closing the transaction.
<3> Wrap the managed entity inside a Data transfer object and send it to Kafka.
This makes sure that managed entity is not impacted by the Kafka serialization.

[#writing-entities-managed-by-hibernate-reactive-to-kafka]
=== Writing entities managed by Hibernate Reactive to Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,31 @@ public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(Collect
Object[] ids = partitions.stream()
.map(tp -> new CheckpointEntityId(consumerGroupId, tp))
.toArray(Object[]::new);
return Vertx.currentContext().executeBlocking(Uni.createFrom().item(() -> {
return Vertx.currentContext().executeBlocking(Uni.createFrom().emitter(emitter -> {
List<CheckpointEntity> fetched = new ArrayList<>();
Transaction tx = null;
try (Session session = sf.openSession()) {
tx = session.beginTransaction();
for (Object id : ids) {
CheckpointEntity entity = session.find(stateType, id);
if (entity != null) {
fetched.add(entity);
}
}
Map<TopicPartition, ProcessingState<?>> stateMap = fetched.stream()
.filter(e -> e != null && CheckpointEntity.topicPartition(e) != null)
.collect(Collectors.toMap(CheckpointEntity::topicPartition,
e -> new ProcessingState<>(e, e.offset)));
session.flush();
tx.commit();
emitter.complete(stateMap);
} catch (Throwable t) {
if (tx != null) {
tx.rollback();
}
emitter.fail(t);
}
return fetched.stream().filter(e -> e != null && CheckpointEntity.topicPartition(e) != null)
.collect(Collectors.toMap(CheckpointEntity::topicPartition,
e -> new ProcessingState<>(e, e.offset)));

}));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.transaction.Transactional;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
Expand All @@ -12,6 +13,9 @@
import org.apache.kafka.common.TopicPartition;

import io.quarkus.hibernate.orm.PersistenceUnit;
import io.quarkus.it.kafka.fruit.Fruit;
import io.quarkus.it.kafka.people.PeopleState;
import io.quarkus.it.kafka.people.Person;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;

@Path("/kafka")
Expand Down Expand Up @@ -40,6 +44,7 @@ public List<Person> getPeople() {
@GET
@Path("/people-state")
@Produces(MediaType.APPLICATION_JSON)
@Transactional
public PeopleState getPeopleState() {
return entityManager.find(PeopleState.class,
new CheckpointEntityId("people-checkpoint", new TopicPartition("people", 0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
import javax.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.MutinyEmitter;
import io.quarkus.it.kafka.fruit.Fruit;
import io.quarkus.it.kafka.fruit.FruitDto;
import io.quarkus.it.kafka.people.PeopleState;
import io.quarkus.it.kafka.people.Person;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

@ApplicationScoped
Expand All @@ -20,24 +24,24 @@ public class KafkaReceivers {
private final List<Person> people = new CopyOnWriteArrayList<>();

@Channel("fruits-persisted")
MutinyEmitter<Fruit> emitter;
Emitter<FruitDto> emitter;

@Incoming("fruits-in")
@Transactional
public CompletionStage<Void> persist(Message<Fruit> fruit) {
fruit.getPayload().persist();
return emitter.sendMessage(fruit).subscribeAsCompletionStage();
public CompletionStage<Void> persist(Fruit fruit) {
fruit.persist();
return emitter.send(new FruitDto(fruit));
}

@Incoming("people-in")
public CompletionStage<Void> consume(Message<Person> msg) {
CheckpointMetadata<PeopleState> store = CheckpointMetadata.fromMessage(msg);
Person person = msg.getPayload();
store.transform(new PeopleState(), c -> {
if (c.names == null) {
c.names = person.getName();
if (c.getNames() == null) {
c.setNames(person.getName());
} else {
c.names = c.names + ";" + person.getName();
c.setNames(c.getNames() + ";" + person.getName());
}
return c;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.it.kafka;
package io.quarkus.it.kafka.fruit;

import javax.persistence.Entity;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.it.kafka.fruit;

public class FruitDto {
String name;

public FruitDto(Fruit fruit) {
this.name = fruit.name;
}

public FruitDto() {
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.it.kafka;
package io.quarkus.it.kafka.fruit;

import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.it.kafka;
package io.quarkus.it.kafka.people;

import javax.enterprise.context.ApplicationScoped;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.it.kafka;
package io.quarkus.it.kafka.people;

import javax.persistence.Entity;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.it.kafka;
package io.quarkus.it.kafka.people;

import io.quarkus.runtime.annotations.RegisterForReflection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ mp.messaging.outgoing.people-out.acks=all
mp.messaging.incoming.people-in.topic=people
mp.messaging.incoming.people-in.commit-strategy=checkpoint
mp.messaging.incoming.people-in.checkpoint.state-store=quarkus-hibernate-orm
mp.messaging.incoming.people-in.checkpoint.state-type=io.quarkus.it.kafka.PeopleState
mp.messaging.incoming.people-in.checkpoint.state-type=io.quarkus.it.kafka.people.PeopleState
mp.messaging.incoming.people-in.checkpoint.quarkus-hibernate-orm.persistence-unit=people
mp.messaging.incoming.people-in.auto.commit.interval.ms=500
mp.messaging.incoming.people-in.group.id=people-checkpoint

quarkus.datasource.devservices.enabled=true
quarkus.hibernate-orm.packages=io.quarkus.it.kafka.fruit
quarkus.datasource."people".devservices.enabled=true
quarkus.hibernate-orm."people".datasource=people
quarkus.hibernate-orm."people".packages=io.quarkus.it.kafka.people
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import io.quarkus.it.kafka.fruit.Fruit;
import io.quarkus.it.kafka.people.PeopleState;
import io.quarkus.it.kafka.people.Person;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.InjectKafkaCompanion;
Expand All @@ -25,7 +27,6 @@

@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
@Disabled
public class KafkaConnectorTest {

protected static final TypeRef<List<Fruit>> TYPE_REF = new TypeRef<List<Fruit>>() {
Expand Down Expand Up @@ -69,7 +70,7 @@ public void testPeople() {
PeopleState result = get("/kafka/people-state").as(PeopleState.class);
Assertions.assertNotNull(result);
Assertions.assertTrue(result.offset >= 6);
Assertions.assertEquals("bob;alice;tom;jerry;anna;ken", result.names);
Assertions.assertEquals("bob;alice;tom;jerry;anna;ken", result.getNames());
});
}

Expand Down

0 comments on commit e6dc19d

Please sign in to comment.