Skip to content

Commit

Permalink
Migration to Mutiny 2 and JDK Flow APIs on top of Jakarta migration
Browse files Browse the repository at this point in the history
Co-authored-by: Julien Ponge <jponge@redhat.com>
Co-authored-by: Ozan Gunalp <ogunalp@redhat.com>
(cherry picked from commit 328f53d)
  • Loading branch information
jponge authored and gsmet committed Jan 31, 2023
1 parent f763645 commit a7d6c59
Show file tree
Hide file tree
Showing 57 changed files with 263 additions and 133 deletions.
18 changes: 12 additions & 6 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@
<smallrye-health.version>4.0.1</smallrye-health.version>
<smallrye-metrics.version>4.0.0</smallrye-metrics.version>
<smallrye-open-api.version>3.1.1</smallrye-open-api.version>
<smallrye-graphql.version>2.0.1</smallrye-graphql.version>
<smallrye-graphql.version>2.1.0.Beta1</smallrye-graphql.version>
<smallrye-opentracing.version>3.0.0</smallrye-opentracing.version>
<smallrye-fault-tolerance.version>6.1.0</smallrye-fault-tolerance.version>
<smallrye-jwt.version>4.0.0</smallrye-jwt.version>
<smallrye-context-propagation.version>2.0.0</smallrye-context-propagation.version>
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>2.7.0</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>2.30.1</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.1.1</smallrye-reactive-messaging.version>
<smallrye-reactive-types-converter.version>3.0.0</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.2.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.1.1.RC1</smallrye-reactive-messaging.version>
<smallrye-stork.version>1.4.1</smallrye-stork.version>
<jakarta.activation.version>2.1.0</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down Expand Up @@ -144,9 +144,9 @@
<caffeine.version>3.1.1</caffeine.version>
<netty.version>4.1.86.Final</netty.version>
<brotli4j.version>1.8.0</brotli4j.version>
<reactive-streams.version>1.0.3</reactive-streams.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<jboss-logging.version>3.5.0.Final</jboss-logging.version>
<mutiny.version>1.9.0</mutiny.version>
<mutiny.version>2.1.0</mutiny.version>
<kafka3.version>3.3.2</kafka3.version>
<lz4.version>1.8.0</lz4.version> <!-- dependency of the kafka-clients that could be overridden by other imported BOMs in the platform -->
<snappy.version>1.1.8.4</snappy.version>
Expand Down Expand Up @@ -214,6 +214,7 @@
<java-buildpack-client.version>0.0.6</java-buildpack-client.version>
<org-crac.version>0.1.3</org-crac.version>
<sshd-common.version>2.9.2</sshd-common.version>
<mutiny-zero.version>1.0.0</mutiny-zero.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -5439,6 +5440,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-zero-flow-adapters</artifactId>
<version>${mutiny-zero.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
<artifactId>microprofile-reactive-streams-operators-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Flow;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.google.protobuf.ByteString;

Expand Down Expand Up @@ -233,15 +232,15 @@ public void testRetrievingAllExtensionNumbersOfType() {
assertThat(list).containsExactlyInAnyOrderElementsOf(expected);
}

private static class ResettableSubscriber<T> implements Subscriber<T> {
private static class ResettableSubscriber<T> implements Flow.Subscriber<T> {

private Subscription subscription;
private Flow.Subscription subscription;
private volatile T last;
private boolean completed;
private Throwable failure;

@Override
public void onSubscribe(Subscription subscription) {
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import java.io.File;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.reactivestreams.Publisher;
import java.util.concurrent.Flow.Publisher;

import io.smallrye.mutiny.Multi;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import java.io.File;
import java.util.*;

import org.reactivestreams.Publisher;
import java.util.concurrent.Flow.Publisher;

/**
* Represents an e-mail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import java.util.stream.Collectors;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.jboss.logging.Logger;
import org.reactivestreams.Publisher;

import io.quarkus.mailer.Attachment;
import io.quarkus.mailer.Mail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.Flow;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;

import io.quarkus.mailer.runtime.MutinyMailerImpl;
import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -70,7 +70,7 @@ void testInlineAttachmentCreationFromFile() {

@Test
void testAttachmentCreationFromStream() {
Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
Flow.Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
.onItem().transformToMulti(af -> af.toMulti()
.onItem().transformToIterable(this::getBytes));

Expand Down Expand Up @@ -109,7 +109,7 @@ public Byte next() {

@Test
void testInlineAttachmentCreationFromStream() {
Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
Flow.Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
.onItem().transformToMulti(af -> af.toMulti()
.onItem().transformToIterable(this::getBytes));

Expand All @@ -128,7 +128,7 @@ void testInlineAttachmentCreationFromStream() {

@Test
void testAttachmentCreationWithDescription() {
Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
Flow.Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
.onItem().transformToMulti(af -> af.toMulti()
.onItem().transformToIterable(this::getBytes));

Expand All @@ -154,7 +154,7 @@ private String getContent(Attachment attachment) {

@Test
void testInlineAttachmentCreationWithDescription() {
Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
Flow.Publisher<Byte> publisher = vertx.fileSystem().open(LOREM.getAbsolutePath(), new OpenOptions().setRead(true))
.onItem().transformToMulti(af -> af.toMulti()
.onItem().transformToIterable(this::getBytes));

Expand Down
4 changes: 4 additions & 0 deletions extensions/mongodb-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-zero-flow-adapters</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.mongodb.impl;

import java.util.List;
import java.util.concurrent.Flow;

import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistry;
Expand Down Expand Up @@ -47,6 +48,7 @@
import io.quarkus.mongodb.reactive.ReactiveMongoCollection;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import mutiny.zero.flow.adapters.AdaptersToFlow;

public class ReactiveMongoCollectionImpl<T> implements ReactiveMongoCollection<T> {

Expand Down Expand Up @@ -262,11 +264,11 @@ public <D> Multi<D> aggregate(ClientSession clientSession, List<? extends Bson>
return Wrappers.toMulti(collection.aggregate(clientSession, pipeline, clazz));
}

private <D> AggregatePublisher<D> apply(AggregateOptions options, AggregatePublisher<D> publisher) {
private <D> Flow.Publisher<D> apply(AggregateOptions options, AggregatePublisher<D> publisher) {
if (options == null) {
return publisher;
return AdaptersToFlow.publisher(publisher);
}
return options.apply(publisher);
return AdaptersToFlow.publisher(options.apply(publisher));
}

@Override
Expand Down Expand Up @@ -409,11 +411,11 @@ public Multi<T> mapReduce(String mapFunction, String reduceFunction, MapReduceOp
return Multi.createFrom().publisher(apply(options, collection.mapReduce(mapFunction, reduceFunction)));
}

private <D> MapReducePublisher<D> apply(MapReduceOptions options, MapReducePublisher<D> mapReduce) {
private <D> Flow.Publisher<D> apply(MapReduceOptions options, MapReducePublisher<D> mapReduce) {
if (options == null) {
return mapReduce;
return AdaptersToFlow.publisher(mapReduce);
}
return options.apply(mapReduce);
return AdaptersToFlow.publisher(options.apply(mapReduce));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.mongodb.impl;

import java.util.List;
import java.util.concurrent.Flow;

import org.bson.Document;
import org.bson.conversions.Bson;
Expand All @@ -22,6 +23,7 @@
import io.quarkus.mongodb.reactive.ReactiveMongoDatabase;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import mutiny.zero.flow.adapters.AdaptersToFlow;

public class ReactiveMongoDatabaseImpl implements ReactiveMongoDatabase {

Expand Down Expand Up @@ -127,12 +129,11 @@ public <T> Multi<T> listCollections(Class<T> clazz, CollectionListOptions option
return Multi.createFrom().publisher(apply(options, database.listCollections(clazz)));
}

private <T> ListCollectionsPublisher<T> apply(CollectionListOptions options,
ListCollectionsPublisher<T> collections) {
private <T> Flow.Publisher<T> apply(CollectionListOptions options, ListCollectionsPublisher<T> collections) {
if (options == null) {
return collections;
return AdaptersToFlow.publisher(collections);
} else {
return options.apply(collections);
return AdaptersToFlow.publisher(options.apply(collections));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import mutiny.zero.flow.adapters.AdaptersToFlow;

class Wrappers {

Expand All @@ -17,7 +18,7 @@ private Wrappers() {

static <T> Uni<T> toUni(Publisher<T> publisher) {
Context context = Vertx.currentContext();
Uni<T> uni = Uni.createFrom().publisher(publisher);
Uni<T> uni = Uni.createFrom().publisher(AdaptersToFlow.publisher(publisher));
if (context != null) {
return uni.emitOn(command -> context.runOnContext(x -> command.run()));
}
Expand All @@ -27,15 +28,16 @@ static <T> Uni<T> toUni(Publisher<T> publisher) {
static <T> Multi<T> toMulti(Publisher<T> publisher) {
Context context = Vertx.currentContext();
if (context != null) {
return Multi.createFrom().publisher(publisher).emitOn(command -> context.runOnContext(x -> command.run()));
return Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher))
.emitOn(command -> context.runOnContext(x -> command.run()));
} else {
return Multi.createFrom().publisher(publisher);
return Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher));
}
}

static <T> Uni<List<T>> toUniOfList(Publisher<T> publisher) {
Context context = Vertx.currentContext();
Uni<List<T>> uni = Multi.createFrom().publisher(publisher)
Uni<List<T>> uni = Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher))
.collect().asList();

if (context != null) {
Expand Down
4 changes: 4 additions & 0 deletions extensions/narayana-jta/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-converter-mutiny</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-zero-flow-adapters</artifactId>
</dependency>
<dependency>
<groupId>org.graalvm.nativeimage</groupId>
<artifactId>svm</artifactId>
Expand Down
Loading

0 comments on commit a7d6c59

Please sign in to comment.