From 95320630ba9697a2f5514c8627376c50c2018db4 Mon Sep 17 00:00:00 2001 From: Noah Wood Date: Sat, 2 Mar 2024 10:44:49 -0500 Subject: [PATCH 1/3] Gracefully handle remote WebSocket disconnections. Adds a boolean property to the `TimelineStreaming` class to specify reconnection of a stream if disconnected. This is used with a new try/catch block to gracefully handle a `WebSocketException` and either stop the stream or attempt a reconnect. --- Mastonet/TimelineStreaming.cs | 2 ++ Mastonet/TimelineWebSocketStreaming.cs | 33 +++++++++++++++++--------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/Mastonet/TimelineStreaming.cs b/Mastonet/TimelineStreaming.cs index b08e3ce..6c99d89 100644 --- a/Mastonet/TimelineStreaming.cs +++ b/Mastonet/TimelineStreaming.cs @@ -27,6 +27,8 @@ protected TimelineStreaming(StreamingType type, string? param, string? accessTok public abstract Task Start(); public abstract void Stop(); + public bool ReconnectStreamOnDisconnect { get; set; } + protected void SendEvent(string eventName, string data) { switch (eventName) diff --git a/Mastonet/TimelineWebSocketStreaming.cs b/Mastonet/TimelineWebSocketStreaming.cs index b2853f5..bd0e7c6 100644 --- a/Mastonet/TimelineWebSocketStreaming.cs +++ b/Mastonet/TimelineWebSocketStreaming.cs @@ -75,26 +75,37 @@ public override async Task Start() MemoryStream ms = new MemoryStream(); while (socket != null) { - var result = await socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); + try + { + var result = await socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - ms.Write(buffer, 0, result.Count); + ms.Write(buffer, 0, result.Count); - if (result.EndOfMessage) - { - var messageStr = Encoding.UTF8.GetString(ms.ToArray()); + if (result.EndOfMessage) + { + var messageStr = Encoding.UTF8.GetString(ms.ToArray()); #if NET6_0_OR_GREATER - var message = JsonSerializer.Deserialize(messageStr, TimelineMessageContext.Default.TimelineMessage); + var message = JsonSerializer.Deserialize(messageStr, TimelineMessageContext.Default.TimelineMessage); #else var message = JsonSerializer.Deserialize(messageStr); #endif - if (message != null) + if (message != null) + { + SendEvent(message.Event, message.Payload); + } + + ms.Dispose(); + ms = new MemoryStream(); + } + } + catch (WebSocketException ex) + { + this.Stop(); + if (ReconnectStreamOnDisconnect) { - SendEvent(message.Event, message.Payload); + await this.Start(); } - - ms.Dispose(); - ms = new MemoryStream(); } } ms.Dispose(); From 0be938ade5f65c4fb0825bb2e75c523277915d86 Mon Sep 17 00:00:00 2001 From: Noah Wood Date: Sun, 10 Mar 2024 16:18:13 -0400 Subject: [PATCH 2/3] Only close & dispose stream if open. --- Mastonet/TimelineWebSocketStreaming.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Mastonet/TimelineWebSocketStreaming.cs b/Mastonet/TimelineWebSocketStreaming.cs index bd0e7c6..d62a8b0 100644 --- a/Mastonet/TimelineWebSocketStreaming.cs +++ b/Mastonet/TimelineWebSocketStreaming.cs @@ -124,7 +124,7 @@ internal class TimelineMessage public override void Stop() { - if (socket != null) + if (socket?.State == WebSocketState.Open) { socket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); socket.Dispose(); From dc0618a468b7b9bfd28cb14767e38b3de70e1278 Mon Sep 17 00:00:00 2001 From: Noah Wood Date: Sun, 10 Mar 2024 16:45:46 -0400 Subject: [PATCH 3/3] Adds event handler for when streaming is stopped. --- Mastonet/TimelineHttpStreaming.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Mastonet/TimelineHttpStreaming.cs b/Mastonet/TimelineHttpStreaming.cs index 8c41c6e..ec456fa 100644 --- a/Mastonet/TimelineHttpStreaming.cs +++ b/Mastonet/TimelineHttpStreaming.cs @@ -15,6 +15,8 @@ public class TimelineHttpStreaming : TimelineStreaming private HttpClient client; private CancellationTokenSource? cts; + public event EventHandler StreamingStopped; + public TimelineHttpStreaming(StreamingType type, string? param, string instance, string? accessToken) : this(type, param, instance, accessToken, DefaultHttpClient.Instance) { } public TimelineHttpStreaming(StreamingType type, string? param, string instance, string? accessToken, HttpClient client) @@ -100,6 +102,7 @@ public override void Stop() { cts.Cancel(); cts = null; + StreamingStopped?.Invoke(this, EventArgs.Empty); } } }