diff --git a/src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java b/src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java index ebdc694..c2ab28e 100644 --- a/src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java +++ b/src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java @@ -11,6 +11,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; @Slf4j public class ChatWebSocketHandler implements WebSocketHandler { @@ -40,15 +41,19 @@ public Mono handle(WebSocketSession webSocketSession) { Mono inputMessage = webSocketSession.receive() .flatMap(webSocketMessage -> redisChatMessagePublisher.publishChatMessage(webSocketMessage.getPayloadAsText())) .doOnSubscribe(subscription -> { - long activeUserCount = activeUserCounter.incrementAndGet(); - log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); - chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount)); + Mono.fromRunnable(() -> { + long activeUserCount = activeUserCounter.incrementAndGet(); + log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); + chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount)); + }).subscribeOn(Schedulers.boundedElastic()).subscribe(); }) .doOnError(throwable -> log.error("Error Occurred while sending message to Redis.", throwable)) .doFinally(signalType -> { - long activeUserCount = activeUserCounter.decrementAndGet(); - log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); - chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount)); + Mono.fromRunnable(() -> { + long activeUserCount = activeUserCounter.decrementAndGet(); + log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); + chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount)); + }).subscribeOn(Schedulers.boundedElastic()).subscribe(); }) .then();