Skip to content

Commit

Permalink
Update ChatWebSocketHandler to move blocking operations to boundedEla…
Browse files Browse the repository at this point in the history
…stic threads
  • Loading branch information
Arooba-git committed Nov 29, 2023
1 parent 93894bf commit 371c6a9
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

@Slf4j
public class ChatWebSocketHandler implements WebSocketHandler {
Expand Down Expand Up @@ -40,15 +41,19 @@ public Mono<Void> handle(WebSocketSession webSocketSession) {
Mono<Void> inputMessage = webSocketSession.receive()
.flatMap(webSocketMessage -> redisChatMessagePublisher.publishChatMessage(webSocketMessage.getPayloadAsText()))
.doOnSubscribe(subscription -> {
long activeUserCount = activeUserCounter.incrementAndGet();
log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount));
Mono.fromRunnable(() -> {
long activeUserCount = activeUserCounter.incrementAndGet();
log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount));
}).subscribeOn(Schedulers.boundedElastic()).subscribe();
})
.doOnError(throwable -> log.error("Error Occurred while sending message to Redis.", throwable))
.doFinally(signalType -> {
long activeUserCount = activeUserCounter.decrementAndGet();
log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount));
Mono.fromRunnable(() -> {
long activeUserCount = activeUserCounter.decrementAndGet();
log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount));
}).subscribeOn(Schedulers.boundedElastic()).subscribe();
})
.then();

Expand Down

0 comments on commit 371c6a9

Please sign in to comment.