diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/jaxrs/SseBroadcasterImpl.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/jaxrs/SseBroadcasterImpl.java index 1a5b77c30badc..1700f89b43a93 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/jaxrs/SseBroadcasterImpl.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/jaxrs/SseBroadcasterImpl.java @@ -1,5 +1,6 @@ package org.jboss.resteasy.reactive.server.jaxrs; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -37,8 +38,9 @@ public synchronized void onClose(Consumer onClose) { public synchronized void register(SseEventSink sseEventSink) { Objects.requireNonNull(sseEventSink); checkClosed(); - if (sseEventSink instanceof SseEventSinkImpl == false) + if (sseEventSink instanceof SseEventSinkImpl == false) { throw new IllegalArgumentException("Can only work with Quarkus-REST instances: " + sseEventSink); + } ((SseEventSinkImpl) sseEventSink).register(this); sinks.add(sseEventSink); } @@ -50,20 +52,62 @@ public synchronized CompletionStage broadcast(OutboundSseEvent event) { CompletableFuture[] cfs = new CompletableFuture[sinks.size()]; for (int i = 0; i < sinks.size(); i++) { SseEventSink sseEventSink = sinks.get(i); - cfs[i] = sseEventSink.send(event).toCompletableFuture(); + CompletionStage cs; + try { + cs = sseEventSink.send(event).exceptionally((t) -> { + // do not propagate the exception to the returned CF + // apparently, the goal is to close this sink and not report the error + // of the broadcast operation + notifyOnErrorListeners(sseEventSink, t); + return null; + }); + } catch (Exception e) { + // do not propagate the exception to the returned CF + // apparently, the goal is to close this sink and not report the error + // of the broadcast operation + notifyOnErrorListeners(sseEventSink, e); + cs = CompletableFuture.completedFuture(null); + } + cfs[i] = cs.toCompletableFuture(); } return CompletableFuture.allOf(cfs); } + private void notifyOnErrorListeners(SseEventSink eventSink, Throwable throwable) { + // We have to notify close listeners if the SSE event output has been + // closed (either by client closing the connection (IOException) or by + // calling SseEventSink.close() (IllegalStateException) on the server + // side). + if (throwable instanceof IOException || throwable instanceof IllegalStateException) { + notifyOnCloseListeners(eventSink); + } + onErrorListeners.forEach(consumer -> { + consumer.accept(eventSink, throwable); + }); + } + + private void notifyOnCloseListeners(SseEventSink eventSink) { + // First remove the eventSink from the outputQueue to ensure that + // concurrent calls to this method will notify listeners only once for a + // given eventSink instance. + if (sinks.remove(eventSink)) { + onCloseListeners.forEach(consumer -> { + consumer.accept(eventSink); + }); + } + } + private void checkClosed() { - if (isClosed) + if (isClosed) { throw new IllegalStateException("Broadcaster has been closed"); + } } @Override public synchronized void close() { - if (isClosed) + if (isClosed) { return; + } isClosed = true; for (SseEventSink sink : sinks) { // this will in turn fire close events to our listeners