From 8cd642205fe95155d9872742decc461f299f98c5 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Fri, 11 Aug 2023 16:37:06 +0300 Subject: [PATCH] Transform JMS thread pool to fixed one without rejection. move from CompletableFuture API to Mutiny --- .../messaging/jms/IncomingJmsMessage.java | 19 +-- .../reactive/messaging/jms/JmsConnector.java | 6 +- .../reactive/messaging/jms/JmsSink.java | 128 +++++++++--------- 3 files changed, 76 insertions(+), 77 deletions(-) diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java index 1d4d23e8d1..d116bb9f2b 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java @@ -2,7 +2,6 @@ import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.function.Supplier; @@ -12,6 +11,7 @@ import org.eclipse.microprofile.reactive.messaging.Metadata; +import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.json.JsonMapping; public class IncomingJmsMessage implements org.eclipse.microprofile.reactive.messaging.Message { @@ -110,13 +110,16 @@ public Supplier> getAck() { @Override public CompletionStage ack() { - return CompletableFuture.runAsync(() -> { - try { - delegate.acknowledge(); - } catch (JMSException e) { - throw new IllegalArgumentException("Unable to acknowledge message", e); - } - }, executor); + return Uni.createFrom().voidItem() + .onItem().invoke(m -> { + try { + delegate.acknowledge(); + } catch (JMSException e) { + throw new IllegalArgumentException("Unable to acknowledge message", e); + } + }) + .runSubscriptionOn(executor) + .subscribeAsCompletionStage(); } @Override diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java index d59b4b57a4..46755d6795 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java @@ -8,10 +8,8 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Flow; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -102,7 +100,7 @@ public class JmsConnector implements InboundConnector, OutboundConnector { @PostConstruct public void init() { - this.executor = new ThreadPoolExecutor(0, maxPoolSize, ttl, TimeUnit.SECONDS, new SynchronousQueue<>()); + this.executor = Executors.newFixedThreadPool(maxPoolSize); if (jsonMapper.isUnsatisfied()) { log.warn( "Please add one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages."); diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java index 895c616fc2..98645a1e2d 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java @@ -3,8 +3,6 @@ import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex; import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.Flow; @@ -67,17 +65,12 @@ class JmsSink { producer.setJMSReplyTo(replyToDestination); }); - sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> Uni.createFrom().completionStage(() -> { - try { - return send(message); - } catch (JMSException e) { - return CompletableFuture.failedStage(new IllegalStateException(e)); - } - })).onFailure().invoke(log::unableToSend)); + sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(this::send) + .onFailure().invoke(log::unableToSend)); } - private CompletionStage> send(Message message) throws JMSException { + private Uni> send(Message message) { Object payload = message.getPayload(); // If the payload is a JMS Message, send it as it is, ignoring metadata. @@ -85,62 +78,66 @@ private CompletionStage> send(Message message) throws JMSException return dispatch(message, () -> producer.send(destination, (jakarta.jms.Message) payload)); } - jakarta.jms.Message outgoing; - if (payload instanceof String || payload.getClass().isPrimitive() || isPrimitiveBoxed(payload.getClass())) { - outgoing = context.createTextMessage(payload.toString()); - outgoing.setStringProperty("_classname", payload.getClass().getName()); - outgoing.setJMSType(payload.getClass().getName()); - } else if (payload.getClass().isArray() && payload.getClass().getComponentType().equals(Byte.TYPE)) { - BytesMessage o = context.createBytesMessage(); - o.writeBytes((byte[]) payload); - outgoing = o; - } else { - outgoing = context.createTextMessage(jsonMapping.toJson(payload)); - outgoing.setJMSType(payload.getClass().getName()); - outgoing.setStringProperty("_classname", payload.getClass().getName()); - } - - OutgoingJmsMessageMetadata metadata = message.getMetadata(OutgoingJmsMessageMetadata.class).orElse(null); - Destination actualDestination; - if (metadata != null) { - String correlationId = metadata.getCorrelationId(); - Destination replyTo = metadata.getReplyTo(); - Destination dest = metadata.getDestination(); - int deliveryMode = metadata.getDeliveryMode(); - String type = metadata.getType(); - JmsProperties properties = metadata.getProperties(); - if (correlationId != null) { - outgoing.setJMSCorrelationID(correlationId); - } - if (replyTo != null) { - outgoing.setJMSReplyTo(replyTo); - } - if (dest != null) { - outgoing.setJMSDestination(dest); - } - if (deliveryMode != -1) { - outgoing.setJMSDeliveryMode(deliveryMode); - } - if (type != null) { - outgoing.setJMSType(type); - } - if (type != null) { - outgoing.setJMSType(type); + try { + jakarta.jms.Message outgoing; + if (payload instanceof String || payload.getClass().isPrimitive() || isPrimitiveBoxed(payload.getClass())) { + outgoing = context.createTextMessage(payload.toString()); + outgoing.setStringProperty("_classname", payload.getClass().getName()); + outgoing.setJMSType(payload.getClass().getName()); + } else if (payload.getClass().isArray() && payload.getClass().getComponentType().equals(Byte.TYPE)) { + BytesMessage o = context.createBytesMessage(); + o.writeBytes((byte[]) payload); + outgoing = o; + } else { + outgoing = context.createTextMessage(jsonMapping.toJson(payload)); + outgoing.setJMSType(payload.getClass().getName()); + outgoing.setStringProperty("_classname", payload.getClass().getName()); } - if (properties != null) { - if (!(properties instanceof JmsPropertiesBuilder.OutgoingJmsProperties)) { - throw ex.illegalStateUnableToMapProperties(properties.getClass().getName()); + OutgoingJmsMessageMetadata metadata = message.getMetadata(OutgoingJmsMessageMetadata.class).orElse(null); + Destination actualDestination; + if (metadata != null) { + String correlationId = metadata.getCorrelationId(); + Destination replyTo = metadata.getReplyTo(); + Destination dest = metadata.getDestination(); + int deliveryMode = metadata.getDeliveryMode(); + String type = metadata.getType(); + JmsProperties properties = metadata.getProperties(); + if (correlationId != null) { + outgoing.setJMSCorrelationID(correlationId); + } + if (replyTo != null) { + outgoing.setJMSReplyTo(replyTo); } - JmsPropertiesBuilder.OutgoingJmsProperties op = ((JmsPropertiesBuilder.OutgoingJmsProperties) properties); - op.getProperties().forEach(p -> p.apply(outgoing)); + if (dest != null) { + outgoing.setJMSDestination(dest); + } + if (deliveryMode != -1) { + outgoing.setJMSDeliveryMode(deliveryMode); + } + if (type != null) { + outgoing.setJMSType(type); + } + if (type != null) { + outgoing.setJMSType(type); + } + + if (properties != null) { + if (!(properties instanceof JmsPropertiesBuilder.OutgoingJmsProperties)) { + throw ex.illegalStateUnableToMapProperties(properties.getClass().getName()); + } + JmsPropertiesBuilder.OutgoingJmsProperties op = ((JmsPropertiesBuilder.OutgoingJmsProperties) properties); + op.getProperties().forEach(p -> p.apply(outgoing)); + } + actualDestination = dest != null ? dest : this.destination; + } else { + actualDestination = this.destination; } - actualDestination = dest != null ? dest : this.destination; - } else { - actualDestination = this.destination; - } - return dispatch(message, () -> producer.send(actualDestination, outgoing)); + return dispatch(message, () -> producer.send(actualDestination, outgoing)); + } catch (JMSException e) { + return Uni.createFrom().failure(new IllegalStateException(e)); + } } private boolean isPrimitiveBoxed(Class c) { @@ -154,10 +151,11 @@ private boolean isPrimitiveBoxed(Class c) { || c.equals(Long.class); } - private CompletionStage> dispatch(Message incoming, Runnable action) { - return CompletableFuture.runAsync(action, executor) - .thenCompose(x -> incoming.ack()) - .thenApply(x -> incoming); + private Uni> dispatch(Message incoming, Runnable action) { + return Uni.createFrom().item(incoming) + .invoke(action) + .call(message -> Uni.createFrom().completionStage(incoming::ack)) + .runSubscriptionOn(executor); } private Destination getDestination(JMSContext context, String name, String type) {