Skip to content

Latest commit

 

History

History
167 lines (143 loc) · 7.07 KB

server_sent_events.asciidoc

File metadata and controls

167 lines (143 loc) · 7.07 KB

MicroProfile Rest Client Server Sent Event Support

The HTTP 5 specification introduced Server Sent Events (SSE), allowing HTTP servers to push events to HTTP clients. MicroProfile Rest Client interfaces may consume events from servers that push SSEs by using the @Produces(MediaType.SERVER_SENT_EVENTS) annotation on the method or interface and by the interface method returning a org.reactivestreams.Publisher<?> type. The Publisher type is available from the Reactive Streams APIs used by the MicroProfile Reactive Streams Operators APIs.

A client interface’s Publisher return type can include a type argument for javax.ws.rs.sse.InboundSseEvent allowing the client to obtain specific fields from the SSE including the name of the event, it’s ID, comments, and the actual data. The data can be returned as a plain String or deserialized into a Java object using an applicable MessageBodyReader registered with the client.

Here is an example:

public interface SseClient {

    @GET
    @Path("ssePath")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    Publisher<InboundSseEvent> getEvents();
}
void testSseClient() {
    SseClient client = RestClientBuilder.newBuilder().baseUri(someUri).build(SseClient.class);
    Publisher<InboundSseEvent> publisher = client.getEvents();
    publisher.subscribe(new Subscriber<InboundSseEvent>(){
        int MAX_EVENTS = 3;
        int counter = 0;
        Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            subscription = s;
            s.request(MAX_EVENTS);
        }

        @Override
        public void onNext(InboundSseEvent event) {

            System.out.println("Received Event");
            System.out.println("  Name: " + event.getName());
            System.out.println("  ID: " + event.getId());
            System.out.println("  Comment: " + event.getComment());
            System.out.println("  Data: " + event.readData());
            if (++counter >= MAX_EVENTS) {
                subscription.cancel();
            }
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("Error occurred while reading SSEs" + t);
        }

        @Override
        public void onComplete() {
            System.out.println("All done");
        }
    });
}

In this example, once the client instance is created and the getEvents method is called, the user has access to a Publisher instance. With that Publisher, the user can subscribe to events published by the Publisher - in this case, these events are instances of InboundSseEvent, which include all of the data from the SSEs. In this example, the subscriber has requested three events, and will close the connection to the server once it has received the third event. This is done with the subscription.cancel() call.

It is also possible to receive type-safe objects from SSEs. If the server always returns the same type of object in the SSE’s data field, then the client can consume those events directly. For example, suppose the server sends weather data in JSON format such as: {"date":"2020-01-17", "description":"Blizzard"} That data could be consumed into a WeatherEvent class directly like so:

public class WeatherEvent {
    private Date date;
    private String description;
    // ... getters and setters
}
public class WeatherEventProvider implements MessageBodyReader<WeatherEvent> {

    @Override
    public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
        return WeatherEvent.class.isAssignableFrom(type);
    }

    @Override
    public WeatherEvent readFrom(Class<WeatherEvent> type, Type genericType, Annotation[] annotations,
            MediaType mediaType, MultivaluedMap<String, String> httpHeaders, InputStream entityStream)
            throws IOException, WebApplicationException {
        JsonReaderFactory factory = Json.createReaderFactory(null);
        JsonReader reader = factory.createReader(entityStream);
        JsonObject jsonObject = reader.readObject();
        String dateString = jsonObject.getString("date");
        String description = jsonObject.getString("description");
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
        try {
            WeatherEvent event = new WeatherEvent(df.parse(dateString), description);
            return event;
        }
        catch (ParseException ex) {
            throw new IOException(ex);
        }
    }
}
@RegisterProvider(WeatherEventProvider.class)
public interface WeatherEventClient {

    @GET
    @Path("ssePath")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    Publisher<WeatherEvent> getEvents();
}

This allows subscribers to consume the type-safe business objects (WeatherEvent in this example) directly without needing to manually deserialize them from the InboundSseEvent. Depending on the execution environment, the provider class may not be necessary.

Similar to JAX-RS, MicroProfile Rest Client implementations must use registered MessageBodyReader implementations to deserialize the data from the SSE into the business object. The SSE specification does not specify that a content type be sent with each SSE, so it is not always possible for Rest Client implementations to choose the correct MessageBodyReader for the specified business object. MessageBodyReader selection is documented in the JAX-RS specification. Users are advised to either use Publisher<InboundSseEvent> or create and register their own MessageBodyReader when type selection is difficult to determine. Users are always advised to use Publisher<InboundSseEvent> when a server pushes different types of objects from the endpoint.

Note that Java 9 and above provides the java.util.concurrent.Flow API, with enclosed interfaces that exactly match the org.reactivestreams. interfaces. MicroProfile Rest Client 2.0 only requires Java 8, but implementations may include support for Java 9 Flow APIs in addition to the org.reactivestreams. APIs.

SSE processing is intended to be asynchronous. The Publisher instance returned by the client interface should fire events to any associated Subscription instance using the ExecutorService specified when the client instance was built.