Skip to content

Commit

Permalink
Handle post-processing for payload consuming stream returning process…
Browse files Browse the repository at this point in the history
…or methods

Closes smallrye#2732 and smallrye#2733
  • Loading branch information
ozangunalp committed Aug 29, 2024
1 parent 233e114 commit d69033a
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 50 deletions.
16 changes: 8 additions & 8 deletions documentation/src/main/docs/concepts/signatures.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ and available acknowledgement strategies (when applicable).
| `@Outgoing @Incoming Flow.Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming ProcessorBuilder<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming ProcessorBuilder<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Publisher<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming Multi<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Multi<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming Flow.Publisher<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Publisher<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming PublisherBuilder<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Multi<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Multi<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Flow.Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming PublisherBuilder<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |

## Method signatures to manipulate streams

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,19 @@ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloa
this.mapper = upstream -> {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
PublisherBuilder<?> pb = invoke(getArguments(message));
if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
.onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
if (isPostAck()) {
try {
PublisherBuilder<?> pb = invoke(getArguments(message));
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
.onItem()
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
} catch (Throwable t) {
return handlePostInvocation(message, t);
}
} else {
PublisherBuilder<?> pb = invoke(getArguments(message));
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
.onItem().transform(payload -> payloadToMessage(payload, message.getMetadata()));
}
Expand All @@ -276,13 +282,19 @@ private void processMethodReturningAReactiveStreamsPublisherOfPayloadsAndConsumi
this.mapper = upstream -> {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
Publisher<?> pub = invoke(getArguments(message));
if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pub))
.onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
if (isPostAck()) {
try {
Publisher<?> pub = invoke(getArguments(message));
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pub))
.onItem()
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
} catch (Throwable t) {
return handlePostInvocation(message, t);
}
} else {
Publisher<?> pub = invoke(getArguments(message));
return MultiUtils.publisher(AdaptersToFlow.publisher(pub))
.onItem().transform(payload -> payloadToMessage(payload, message.getMetadata()));
}
Expand All @@ -294,13 +306,20 @@ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() {
this.mapper = upstream -> {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
Flow.Publisher<?> pub = invoke(getArguments(message));
if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(pub)
.onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
if (isPostAck()) {
try {
Flow.Publisher<?> pub = invoke(getArguments(message));
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(pub)
.onItem()
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));

} catch (Throwable t) {
return handlePostInvocation(message, t);
}
} else {
Flow.Publisher<?> pub = invoke(getArguments(message));
return MultiUtils.publisher(pub)
.onItem().transform(payload -> payloadToMessage(payload, message.getMetadata()));
}
Expand Down Expand Up @@ -384,6 +403,10 @@ private Flow.Publisher<? extends Message<Object>> handleSkip(Message<Object> m)
}
}

private Multi<? extends Message<?>> handlePostInvocation(Message<?> message, Throwable fail) {
return Uni.createFrom().completionStage(() -> message.nack(fail).thenApply(x -> (Message<?>) null)).toMulti();
}

private Uni<? extends Message<Object>> handlePostInvocation(Message<?> message, Object res, Throwable fail) {
if (fail != null) {
if (isPostAck()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfMessagesAndConsumingIndividualMessage;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayload;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError;

public class ProcessorShapeReturningPublisherTest extends WeldTestBase {

Expand All @@ -27,6 +32,17 @@ public void testBeanProducingAPublisherOfPayloadsAndConsumingIndividualPayload()
assertThat(collector.payloads()).isEqualTo(EXPECTED);
}

@Test
public void BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError() {
addBeanClass(BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.class);
initialize();
MyCollector collector = container.select(MyCollector.class).get();
assertThat(collector.payloads()).isEqualTo(IntStream.rangeClosed(1, 5)
.map(i -> i * 2)
.flatMap(i -> IntStream.of(i, i)).boxed()
.map(Object::toString).collect(Collectors.toList()));
}

@Test
public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload() {
addBeanClass(BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload.class);
Expand All @@ -35,6 +51,15 @@ public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPa
assertThat(collector.payloads()).isEqualTo(EXPECTED);
}

@Test
public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError() {
addBeanClass(BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.class);
initialize();
MyCollector collector = container.select(MyCollector.class).get();
assertThat(collector.payloads()).isEqualTo(IntStream.rangeClosed(1, 6).flatMap(i -> IntStream.of(i, i)).boxed()
.map(Object::toString).collect(Collectors.toList()));
}

@Test
public void testBeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage() {
addBeanClass(BeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.smallrye.reactive.messaging.beans;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

import io.reactivex.Flowable;

@ApplicationScoped
public class BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError {

@Incoming("count")
@Outgoing("sink")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public PublisherBuilder<String> process(Integer payload) {
if (payload > 5) {
throw new IllegalArgumentException("boom");
}
return ReactiveStreams.of(payload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.smallrye.reactive.messaging.beans;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;

import io.reactivex.Flowable;

@ApplicationScoped
public class BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError {

@Incoming("count")
@Outgoing("sink")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public Publisher<String> process(Integer payload) {
if (payload % 2 == 0) {
throw new IllegalArgumentException("boom");
}
return ReactiveStreams.of(payload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.buildRs();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import io.smallrye.common.annotation.CheckReturnValue;
import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.config.ConfigProvider;
Expand Down Expand Up @@ -476,15 +475,15 @@ static class DeliveryTagInterceptor implements OutgoingInterceptor {

List<Long> deliveryTagsNack = new CopyOnWriteArrayList<>();


@Override
public void onMessageAck(Message<?> message) {
message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTags.add((long) m.getResult()));
}

@Override
public void onMessageNack(Message<?> message, Throwable failure) {
message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> deliveryTagsNack.add(((Integer) message.getPayload()).longValue()));
message.getMetadata(OutgoingMessageMetadata.class)
.ifPresent(m -> deliveryTagsNack.add(((Integer) message.getPayload()).longValue()));
}

public List<Long> getDeliveryTags() {
Expand All @@ -495,7 +494,7 @@ public List<Long> getDeliveryTagsNack() {
return deliveryTagsNack;
}

public int numberOfProcessedMessage(){
public int numberOfProcessedMessage() {
return deliveryTags.size() + deliveryTagsNack.size();
}
}
Expand All @@ -511,10 +510,12 @@ void testSendingMessagesToRabbitMQPublishConfirmsWithNack() throws InterruptedEx

List<Long> receivedTags = new CopyOnWriteArrayList<>();
CountDownLatch latch = new CountDownLatch(10);
usage.prepareNackQueue(exchangeName, routingKey);/*, v -> {
receivedTags.add(v.envelope().getDeliveryTag());
latch.countDown();
});*/
usage.prepareNackQueue(exchangeName, routingKey);/*
* , v -> {
* receivedTags.add(v.envelope().getDeliveryTag());
* latch.countDown();
* });
*/

weld.addBeanClasses(ProducingBean.class, DeliveryTagInterceptor.class);

Expand All @@ -536,21 +537,15 @@ void testSendingMessagesToRabbitMQPublishConfirmsWithNack() throws InterruptedEx
await().until(() -> isRabbitMQConnectorAvailable(container));

DeliveryTagInterceptor interceptor = get(container, DeliveryTagInterceptor.class);
await().until(() ->
interceptor.numberOfProcessedMessage() == 10);


await().until(() -> interceptor.numberOfProcessedMessage() == 10);

assertThat(interceptor.getDeliveryTags())
.hasSizeBetween(1,2);
.hasSizeBetween(1, 2);

assertThat(interceptor.getDeliveryTagsNack())
.hasSizeBetween(8,9);
.hasSizeBetween(8, 9);
}




/**
* Verifies that messages can be sent to RabbitMQ.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Future;
import org.jboss.logging.Logger;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BasicProperties;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
Expand Down Expand Up @@ -180,15 +179,17 @@ public void prepareNackQueue(String exchange, String routingKey) {

client.queueBindAndAwait(queue, exchange, routingKey);


}

public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclareAndAwait(String queue, boolean durable, boolean exclusive, boolean autoDelete, JsonObject config) {
return (com.rabbitmq.client.AMQP.Queue.DeclareOk) queueDeclare(queue, durable, exclusive, autoDelete, config).await().indefinitely();
public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclareAndAwait(String queue, boolean durable, boolean exclusive,
boolean autoDelete, JsonObject config) {
return (com.rabbitmq.client.AMQP.Queue.DeclareOk) queueDeclare(queue, durable, exclusive, autoDelete, config).await()
.indefinitely();
}

@CheckReturnValue
public io.smallrye.mutiny.Uni<com.rabbitmq.client.AMQP.Queue.DeclareOk> queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,JsonObject config) {
public io.smallrye.mutiny.Uni<com.rabbitmq.client.AMQP.Queue.DeclareOk> queueDeclare(String queue, boolean durable,
boolean exclusive, boolean autoDelete, JsonObject config) {
return io.smallrye.mutiny.vertx.AsyncResultUni.toUni(resultHandler -> {
client.getDelegate().queueDeclare(queue, durable, exclusive, autoDelete, config, resultHandler);
});
Expand Down

0 comments on commit d69033a

Please sign in to comment.