The HTTP 5 specification introduced Server Sent Events (SSE),
allowing HTTP servers to push events to HTTP clients. MicroProfile Rest Client interfaces may consume events from
servers that push SSEs by using the @Produces(MediaType.SERVER_SENT_EVENTS)
annotation on the method or interface
and by the interface method returning a org.reactivestreams.Publisher<?>
type. The Publisher type is available from
the Reactive Streams APIs used by the
MicroProfile Reactive Streams Operators APIs.
A client interface’s Publisher
return type can include a type argument for javax.ws.rs.sse.InboundSseEvent
allowing the client to obtain specific fields from the SSE including the name of the event, it’s ID, comments, and
the actual data. The data can be returned as a plain String
or deserialized into a Java object using an applicable
MessageBodyReader
registered with the client.
Here is an example:
public interface SseClient {
@GET
@Path("ssePath")
@Produces(MediaType.SERVER_SENT_EVENTS)
Publisher<InboundSseEvent> getEvents();
}
void testSseClient() {
SseClient client = RestClientBuilder.newBuilder().baseUri(someUri).build(SseClient.class);
Publisher<InboundSseEvent> publisher = client.getEvents();
publisher.subscribe(new Subscriber<InboundSseEvent>(){
int MAX_EVENTS = 3;
int counter = 0;
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(MAX_EVENTS);
}
@Override
public void onNext(InboundSseEvent event) {
System.out.println("Received Event");
System.out.println(" Name: " + event.getName());
System.out.println(" ID: " + event.getId());
System.out.println(" Comment: " + event.getComment());
System.out.println(" Data: " + event.readData());
if (++counter >= MAX_EVENTS) {
subscription.cancel();
}
}
@Override
public void onError(Throwable t) {
System.out.println("Error occurred while reading SSEs" + t);
}
@Override
public void onComplete() {
System.out.println("All done");
}
});
}
In this example, once the client instance is created and the getEvents
method is called, the user has access to a
Publisher
instance. With that Publisher, the user can subscribe to events published by the Publisher - in this case,
these events are instances of InboundSseEvent
, which include all of the data from the SSEs. In this example, the
subscriber has requested three events, and will close the connection to the server once it has received the third
event. This is done with the subscription.cancel()
call.
It is also possible to receive type-safe objects from SSEs. If the server always returns the same type of object in
the SSE’s data field, then the client can consume those events directly. For example, suppose the server sends
weather data in JSON format such as: {"date":"2020-01-17", "description":"Blizzard"}
That data could be consumed
into a WeatherEvent
class directly like so:
public class WeatherEvent {
private Date date;
private String description;
// ... getters and setters
}
public class WeatherEventProvider implements MessageBodyReader<WeatherEvent> {
@Override
public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
return WeatherEvent.class.isAssignableFrom(type);
}
@Override
public WeatherEvent readFrom(Class<WeatherEvent> type, Type genericType, Annotation[] annotations,
MediaType mediaType, MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
throws IOException, WebApplicationException {
JsonReaderFactory factory = Json.createReaderFactory(null);
JsonReader reader = factory.createReader(entityStream);
JsonObject jsonObject = reader.readObject();
String dateString = jsonObject.getString("date");
String description = jsonObject.getString("description");
DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
try {
WeatherEvent event = new WeatherEvent(df.parse(dateString), description);
return event;
}
catch (ParseException ex) {
throw new IOException(ex);
}
}
}
@RegisterProvider(WeatherEventProvider.class)
public interface WeatherEventClient {
@GET
@Path("ssePath")
@Produces(MediaType.SERVER_SENT_EVENTS)
Publisher<WeatherEvent> getEvents();
}
This allows subscribers to consume the type-safe business objects (WeatherEvent
in this example) directly without
needing to manually deserialize them from the InboundSseEvent
. Depending on the execution environment, the provider
class may not be necessary.
Similar to JAX-RS, MicroProfile Rest Client implementations must use registered MessageBodyReader
implementations to
deserialize the data from the SSE into the business object. The SSE specification does not specify that a content type
be sent with each SSE, so it is not always possible for Rest Client implementations to choose the correct
MessageBodyReader
for the specified business object. MessageBodyReader selection is documented in the JAX-RS
specification. Users are advised to either use Publisher<InboundSseEvent>
or create and register their own
MessageBodyReader
when type selection is difficult to determine. Users are always advised to use
Publisher<InboundSseEvent>
when a server pushes different types of objects from the endpoint.
Note that Java 9 and above provides the java.util.concurrent.Flow
API, with enclosed interfaces that exactly match the
org.reactivestreams.
interfaces. MicroProfile Rest Client 2.0 only requires Java 8, but implementations may include
support for Java 9 Flow APIs in addition to the org.reactivestreams.
APIs.
SSE processing is intended to be asynchronous. The Publisher
instance returned by the client interface should fire
events to any associated Subscription
instance using the ExecutorService
specified when the client instance was
built.