Skip to content

Commit

Permalink
Merge pull request #90 from smallrye/feature/vertx-publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Sep 24, 2022
2 parents b0919ad + 5a4697b commit bf7c797
Show file tree
Hide file tree
Showing 14 changed files with 603 additions and 0 deletions.
27 changes: 27 additions & 0 deletions docs/vertx-based-reactive-streams-publishers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Vert.x-based Reactive Streams Publishers

The `mutiny-zero-vertx-publishers` library (Maven coordinates `io.smallrye.reactive:mutiny-zero-vertx-publishers`) allows creating _Reactive Streams_ publishers from [Vert.x](https://vertx.io/) streams.

This library acts as a thin adapter between Vert.x `ReadStream` and `java.util.concurrent.Flow.Publisher` and uses _Mutiny Zero_ to expose _Reactive Streams_ compliant publishers.

## API overview

The entry point is the `mutiny.zero.vertxpublishers.VertxPublisher` interface that exposes 2 static factory methods.

- `Publisher<T> fromSupplier(Supplier<ReadStream<T>> streamSupplier)` is to be used when some Vert.x API returns a `ReadStream<T>`.
- `Publisher<T> fromFuture(Supplier<Future<? extends ReadStream<T>>> futureStreamSupplier)` is to be used when some Vert.x API asynchronously returns a `ReadStream<T>` through a `Future`.

The factory methods use suppliers so that the `ReadStream` instances to be adapted are on a per-subscriber basis.

## Sample usage

The following example makes HTTP requests to the [Newcastle University Urban Observatory API](https://api.usb.urbanobservatory.ac.uk/) using the Vert.x HTTP client:

```java linenums="1"
--8<-- "mutiny-zero-vertx-publishers/src/test/java/docsamples/UrbanObservatoryHttpClient.java"
```

A new HTTP connection is issued everytime the publisher is being subscribed.
In this example the subscriber controls demand by requesting a new Vert.x `Buffer` every 500ms.

If you run this program then you will see the JSON response being progressively printed to the standard console in chunks, every 500 milliseconds.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ nav:
- 'quick-start.md'
- Javadoc: './apidocs/index.html'
- 'flow-adapters.md'
- 'vertx-based-reactive-streams-publishers.md'

theme:
name: material
Expand Down
70 changes: 70 additions & 0 deletions mutiny-zero-vertx-publishers/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-zero-parent</artifactId>
<version>0.5.0-SNAPSHOT</version>
</parent>

<artifactId>mutiny-zero-vertx-publishers</artifactId>
<name>SmallRye Mutiny Zero Vert.x-based publishers</name>
<packaging>jar</packaging>
<description>Support making publishers from Vert.x streams</description>

<dependencies>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-zero</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<scope>test</scope>
</dependency>

<!-- TestNG running in JUnit5 -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.support</groupId>
<artifactId>testng-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
46 changes: 46 additions & 0 deletions mutiny-zero-vertx-publishers/revapi.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[
{
"extension": "revapi.java",
"id": "java",
"configuration": {
"missing-classes": {
"behavior": "report",
"ignoreMissingAnnotations": false
}
}
},
{
"extension": "revapi.filter",
"configuration": {
"elements": {
"include": [
{
"matcher": "java-package",
"match": "mutiny.zero.vertxpublishers"
}
]
}
}
},
{
"extension": "revapi.differences",
"id": "breaking-changes",
"configuration": {
"criticality": "highlight",
"minSeverity": "POTENTIALLY_BREAKING",
"minCriticality": "documented",
"differences": []
}
},
{
"extension": "revapi.reporter.json",
"configuration": {
"minSeverity": "POTENTIALLY_BREAKING",
"minCriticality": "documented",
"output": "target/compatibility.json",
"indent": true,
"append": false,
"keepEmptyFile": true
}
}
]
5 changes: 5 additions & 0 deletions mutiny-zero-vertx-publishers/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module mutiny.zero.vertxpublishers {
exports mutiny.zero.vertxpublishers;
requires mutiny.zero;
requires io.vertx.core;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package mutiny.zero.vertxpublishers;

import java.util.concurrent.Flow;

class NoopSubscription implements Flow.Subscription {
@Override
public void request(long n) {
// Nothing
}

@Override
public void cancel() {
// Nothing
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package mutiny.zero.vertxpublishers;

import java.util.concurrent.Flow;

import io.vertx.core.streams.ReadStream;
import mutiny.zero.BackpressureStrategy;
import mutiny.zero.TubeConfiguration;
import mutiny.zero.ZeroPublisher;

abstract class PublisherBase<T> implements Flow.Publisher<T> {

protected void adapt(Flow.Subscriber<? super T> subscriber, ReadStream<T> stream) {
TubeConfiguration conf = new TubeConfiguration().withBackpressureStrategy(BackpressureStrategy.ERROR);
Flow.Publisher<T> publisher = ZeroPublisher.create(conf, tube -> {
stream.pause();
stream.handler(tube::send);
stream.exceptionHandler(tube::fail);
stream.endHandler(v -> tube.complete());
tube.whenCancelled(() -> {
stream.pause();
stream.handler(null);
stream.exceptionHandler(null);
stream.endHandler(null);
});
tube.whenRequested(stream::fetch);
});
publisher.subscribe(subscriber);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package mutiny.zero.vertxpublishers;

import java.util.concurrent.Flow;
import java.util.function.Supplier;

import io.vertx.core.Future;
import io.vertx.core.streams.ReadStream;

class SuppliedFutureStreamPublisher<T> extends PublisherBase<T> {

private final Supplier<Future<? extends ReadStream<T>>> futureStreamSupplier;

SuppliedFutureStreamPublisher(Supplier<Future<? extends ReadStream<T>>> futureStreamSupplier) {
this.futureStreamSupplier = futureStreamSupplier;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Future<? extends ReadStream<T>> future;
try {
future = futureStreamSupplier.get();
} catch (Throwable err) {
subscriber.onSubscribe(new NoopSubscription());
subscriber.onError(err);
return;
}
if (future == null) {
subscriber.onSubscribe(new NoopSubscription());
subscriber.onError(new NullPointerException("The future cannot be null"));
} else {
future.onSuccess(stream -> adapt(subscriber, stream));
future.onFailure(err -> {
subscriber.onSubscribe(new NoopSubscription());
subscriber.onError(err);
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package mutiny.zero.vertxpublishers;

import java.util.concurrent.Flow;
import java.util.function.Supplier;

import io.vertx.core.streams.ReadStream;

class SuppliedStreamPublisher<T> extends PublisherBase<T> {

private final Supplier<ReadStream<T>> streamSupplier;

SuppliedStreamPublisher(Supplier<ReadStream<T>> streamSupplier) {
this.streamSupplier = streamSupplier;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
ReadStream<T> stream;
try {
stream = streamSupplier.get();
} catch (Throwable err) {
subscriber.onSubscribe(new NoopSubscription());
subscriber.onError(err);
return;
}
if (stream == null) {
subscriber.onSubscribe(new NoopSubscription());
subscriber.onError(new NullPointerException("The stream cannot be null"));
} else {
adapt(subscriber, stream);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package mutiny.zero.vertxpublishers;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.Flow.Publisher;
import java.util.function.Supplier;

import io.vertx.core.Future;
import io.vertx.core.streams.ReadStream;

/**
* Expose Vert.x streams as Reactive Streams compliant publishers.
*/
public interface VertxPublisher {

/**
* Create a publisher from a Vert.x stream supplier.
* The supplier is called for each new publisher subscription.
*
* @param streamSupplier the {@link ReadStream} supplier, cannot be {@code null}, cannot return {@code null}, must not throw
* an exception
* @param <T> the elements type
* @return the new {@link Publisher}
*/
static <T> Publisher<T> fromSupplier(Supplier<ReadStream<T>> streamSupplier) {
requireNonNull(streamSupplier, "The stream supplier cannot be null");
return new SuppliedStreamPublisher<>(streamSupplier);
}

/**
* Create a publisher from a Vert.x future stream supplier.
* The supplier is called for each new publisher subscription.
*
* @param futureStreamSupplier the {@link Future} {@link ReadStream} supplier, cannot be {@code null}, cannot return
* {@code null}, must not throw an exception
* @param <T> the elements type
* @return the new {@link Publisher}
*/
static <T> Publisher<T> fromFuture(Supplier<Future<? extends ReadStream<T>>> futureStreamSupplier) {
requireNonNull(futureStreamSupplier, "The future supplier cannot be null");
return new SuppliedFutureStreamPublisher<>(futureStreamSupplier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package docsamples;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import mutiny.zero.vertxpublishers.VertxPublisher;

public class UrbanObservatoryHttpClient {

public static void main(String[] args) {

Vertx vertx = Vertx.vertx();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

RequestOptions opts = new RequestOptions()
.setSsl(true)
.setHost("api.usb.urbanobservatory.ac.uk")
.setPort(443)
.setMethod(HttpMethod.GET)
.addHeader("Accept", "application/json")
.setURI("/api/v2.0a/sensors/entity");

HttpClient httpClient = vertx.createHttpClient();

Flow.Publisher<Buffer> publisher = VertxPublisher.fromFuture(() -> httpClient
.request(opts)
.compose(HttpClientRequest::send));

publisher.subscribe(new Flow.Subscriber<>() {

private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription s) {
System.out.println("======================================");
this.subscription = s;
s.request(1L);
}

@Override
public void onNext(Buffer buffer) {
System.out.print(buffer.toString(StandardCharsets.UTF_8));
executor.schedule(() -> subscription.request(1L), 500, TimeUnit.MILLISECONDS);
}

@Override
public void onError(Throwable t) {
System.out.println("======================================");
t.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("======================================");
}
});
}
}
Loading

0 comments on commit bf7c797

Please sign in to comment.