Skip to content

Commit

Permalink
Fix HibernateOrmStateStore
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 23, 2022
1 parent d16119e commit 896e85f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,29 @@ public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(Collect
Object[] ids = partitions.stream()
.map(tp -> new CheckpointEntityId(consumerGroupId, tp))
.toArray(Object[]::new);
return Vertx.currentContext().executeBlocking(Uni.createFrom().item(() -> {
return Vertx.currentContext().executeBlocking(Uni.createFrom().emitter(emitter -> {
List<CheckpointEntity> fetched = new ArrayList<>();
Transaction tx = null;
try (Session session = sf.openSession()) {
tx = session.beginTransaction();
for (Object id : ids) {
CheckpointEntity entity = session.find(stateType, id);
if (entity != null) {
fetched.add(entity);
}
}
session.flush();
tx.commit();
emitter.complete(fetched.stream().filter(e -> e != null && CheckpointEntity.topicPartition(e) != null)
.collect(Collectors.toMap(CheckpointEntity::topicPartition,
e -> new ProcessingState<>(e, e.offset))));
} catch (Throwable t) {
if (tx != null) {
tx.rollback();
}
emitter.fail(t);
}
return fetched.stream().filter(e -> e != null && CheckpointEntity.topicPartition(e) != null)
.collect(Collectors.toMap(CheckpointEntity::topicPartition,
e -> new ProcessingState<>(e, e.offset)));

}));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.transaction.Transactional;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
Expand Down Expand Up @@ -40,6 +41,7 @@ public List<Person> getPeople() {
@GET
@Path("/people-state")
@Produces(MediaType.APPLICATION_JSON)
@Transactional
public PeopleState getPeopleState() {
return entityManager.find(PeopleState.class,
new CheckpointEntityId("people-checkpoint", new TopicPartition("people", 0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public class KafkaReceivers {

@Incoming("fruits-in")
@Transactional
public CompletionStage<Void> persist(Message<Fruit> fruit) {
fruit.getPayload().persist();
return emitter.sendMessage(fruit).subscribeAsCompletionStage();
public void persist(Fruit fruit) {
fruit.persist();
emitter.sendAndAwait(fruit);
}

@Incoming("people-in")
Expand Down

0 comments on commit 896e85f

Please sign in to comment.