Skip to content

Commit

Permalink
clients resilient, connect and disconnect events, graceful cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
gencebay committed Aug 8, 2018
1 parent 9df15b5 commit c6ffb88
Show file tree
Hide file tree
Showing 25 changed files with 210 additions and 126 deletions.
46 changes: 30 additions & 16 deletions src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public abstract class ClientWebSocketConnector : IWebSocketConnector
private readonly IServiceProvider _serviceProvider;
private readonly IStreamCompressor _compressor;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<ClientWebSocketConnector> _logger;

public string ConnectionId
{
Expand Down Expand Up @@ -43,18 +44,18 @@ public ClientWebSocketConnector(IServiceProvider serviceProvider,
_serviceProvider = serviceProvider;
_compressor = compressor;
_loggerFactory = loggerFactory;
_logger = _loggerFactory.CreateLogger<ClientWebSocketConnector>();
}

public abstract ClientInvocatorContext InvocatorContext { get; }

private async Task<ClientWebSocketReceiver> TryConnectAsync(CancellationTokenSource cancellationTokenSource = null)
private async Task<ClientWebSocketReceiver> TryConnectAsync(CancellationToken cancellationToken)
{
_webSocket = new ClientWebSocket();
_webSocket.Options.SetRequestHeader(NCSConstants.ConnectorName, InvocatorContext.ConnectorName);
try
{
CancellationToken token = cancellationTokenSource != null ? cancellationTokenSource.Token : CancellationToken.None;
await _webSocket.ConnectAsync(InvocatorContext.Uri, token);
await _webSocket.ConnectAsync(InvocatorContext.Uri, cancellationToken);
}
catch (Exception ex)
{
Expand All @@ -77,27 +78,42 @@ private async Task<ClientWebSocketReceiver> TryConnectAsync(CancellationTokenSou
return receiver;
}

public async Task ConnectAsync(CancellationTokenSource cancellationTokenSource = null)
public async Task ConnectAsync(CancellationToken cancellationToken)
{
if (cancellationTokenSource == null)
cancellationTokenSource = new CancellationTokenSource();

ClientWebSocketReceiver receiver = null;
while (!cancellationTokenSource.IsCancellationRequested)
while (!cancellationToken.IsCancellationRequested)
{
receiver = await TryConnectAsync(cancellationTokenSource);
_logger.LogInformation("===TryConnectAsync to: {0}", InvocatorContext.Uri.ToString());
receiver = await TryConnectAsync(cancellationToken);
if (receiver != null && WebSocketState == WebSocketState.Open)
{
break;
}

_logger.LogInformation("===Retry...");
await Task.Delay(1000);
}

await Task.WhenAll(receiver.ReceiveAsync());
_logger.LogInformation("===WebSocketConnected to: {0}", InvocatorContext.Uri.ToString());

if (InvocatorContext.OnConnectedAsync != null)
{
await InvocatorContext.OnConnectedAsync(_webSocket);
}

await Task.WhenAll(receiver.ReceiveAsync(cancellationToken));

// Handshake down try re-connect
if (_webSocket.CloseStatus.HasValue)
// Disconnected
if (_webSocket.CloseStatus.HasValue || _webSocket.State == WebSocketState.Aborted)
{
await ConnectAsync(cancellationTokenSource);
if (InvocatorContext.OnDisconnectedAsync != null)
{
await InvocatorContext.OnDisconnectedAsync(_webSocket);
}
else
{
await ConnectAsync(cancellationToken);
}
}
}

Expand Down Expand Up @@ -139,9 +155,7 @@ public async Task SendBinaryAsync(byte[] bytes)

internal void Close(ClientWebSocketReceiverContext context)
{
context.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
nameof(ClientWebSocketReceiverContext),
CancellationToken.None);
context.WebSocket.Abort();
}

internal void Close(string statusDescription)
Expand Down
27 changes: 19 additions & 8 deletions src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public ClientWebSocketReceiver(IServiceProvider serviceProvider,
_logger = context.LoggerFactory.CreateLogger<ClientWebSocketReceiver>();
}

public async Task ReceiveAsync()
public async Task ReceiveAsync(CancellationToken cancellationToken)
{
var buffer = new byte[NCSConstants.ChunkSize];
var result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
var result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
while (!result.CloseStatus.HasValue)
{
if (result.MessageType == WebSocketMessageType.Text)
Expand All @@ -42,7 +42,7 @@ public async Task ReceiveAsync()
while (!result.EndOfMessage)
{
await ms.WriteAsync(buffer, 0, result.Count);
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
}

await ms.WriteAsync(buffer, 0, result.Count);
Expand All @@ -66,7 +66,17 @@ public async Task ReceiveAsync()
{
_logger.LogWarning(ex, "{0} An error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Text);
}
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

try
{
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
}
catch (WebSocketException ex)
{
_logger.LogInformation("ClientWebSocketReceiver[Proxy] {0} has close status for connection: {1}", ex?.WebSocketErrorCode, _context.ConnectionId);
_closeCallback?.Invoke(_context);
return;
}
}

if (result.MessageType == WebSocketMessageType.Binary)
Expand All @@ -77,7 +87,7 @@ public async Task ReceiveAsync()
while (!result.EndOfMessage)
{
await ms.WriteAsync(buffer, 0, result.Count);
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
}

await ms.WriteAsync(buffer, 0, result.Count);
Expand All @@ -94,13 +104,14 @@ public async Task ReceiveAsync()
}
catch (Exception ex)
{
_logger.LogWarning(ex, "{0} Invocator error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Binary);
_logger.LogWarning(ex, "ClientWebSocketReceiver {0} Invocator error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Binary);
}
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
}
}

await _context.WebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
await _context.WebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, cancellationToken);
_logger.LogInformation("ClientWebSocketReceiver[Proxy] {0} has close status for connection: {1}", result.CloseStatus, _context.ConnectionId);
_closeCallback?.Invoke(_context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@ public DefaultClientInvocatorContextFactory(IOptions<ProxyOptions<TInvocator>> o

public ClientInvocatorContext CreateInvocatorContext()
{
return new ClientInvocatorContext(_proxyOptions.Invocator, _proxyOptions.ConnectorName, _proxyOptions.WebSocketHostAddress);
var context = new ClientInvocatorContext(_proxyOptions.Invocator, _proxyOptions.ConnectorName, _proxyOptions.WebSocketHostAddress);

if (_proxyOptions.OnConnectedAsync != null)
{
context.OnConnectedAsync = _proxyOptions.OnConnectedAsync;
}

if (_proxyOptions.OnDisconnectedAsync != null)
{
context.OnDisconnectedAsync = _proxyOptions.OnDisconnectedAsync;
}

return context;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ private static void ThrowIfServiceNotRegistered(IServiceProvider applicationServ
throw new InvalidOperationException(string.Format("Required services are not registered - are you missing a call to AddProxyWebSockets?"));
}

public static IApplicationBuilder UseProxyWebSockets(this IApplicationBuilder app, CancellationTokenSource cancellationTokenSource = null)
public static IApplicationBuilder UseProxyWebSockets(this IApplicationBuilder app, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfServiceNotRegistered(app.ApplicationServices);
var appLifeTime = app.ApplicationServices.GetService<IApplicationLifetime>();
Expand All @@ -26,7 +26,7 @@ public static IApplicationBuilder UseProxyWebSockets(this IApplicationBuilder ap
{
InvocatorsHelper.EnsureHostPair(connector.InvocatorContext);
appLifeTime.ApplicationStopping.Register(OnShutdown, connector);
Task.Factory.StartNew(async () => await connector.ConnectAsync(cancellationTokenSource), TaskCreationOptions.LongRunning);
Task.Factory.StartNew(async () => await connector.ConnectAsync(cancellationToken), TaskCreationOptions.LongRunning);
}

return app;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ namespace NetCoreStack.WebSockets.ProxyClient
{
public static class ConsoleApplicationBuilderExtensions
{
public static IServiceProvider UseProxyWebSocket(this IServiceProvider serviceProvider, CancellationTokenSource cancellationTokenSource = null)
public static IServiceProvider UseProxyWebSocket(this IServiceProvider serviceProvider, CancellationToken cancellationToken = default(CancellationToken))
{
IList<IWebSocketConnector> connectors = InvocatorFactory.GetConnectors(serviceProvider);
foreach (var connector in connectors)
{
Task.Run(async () => await connector.ConnectAsync(cancellationTokenSource));
Task.Run(async () => await connector.ConnectAsync(cancellationToken));
}

return serviceProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public interface IWebSocketConnector
{
string ConnectionId { get; }
WebSocketState WebSocketState { get; }
Task ConnectAsync(CancellationTokenSource cancellationTokenSource);
Task ConnectAsync(CancellationToken cancellationToken);
Task SendAsync(WebSocketMessageContext context);
Task SendBinaryAsync(byte[] bytes);
ClientInvocatorContext InvocatorContext { get; }
Expand Down
10 changes: 8 additions & 2 deletions src/NetCoreStack.WebSockets.ProxyClient/ProxyOptions.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
namespace NetCoreStack.WebSockets.ProxyClient
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;

namespace NetCoreStack.WebSockets.ProxyClient
{
public class ProxyOptions<TInvocator> : SocketsOptions<TInvocator> where TInvocator : IClientWebSocketCommandInvocator
{
public string ConnectorName { get; set; }
public string WebSocketHostAddress { get; set; }
public Func<WebSocket, Task> OnConnectedAsync { get; set; }
public Func<WebSocket, Task> OnDisconnectedAsync { get; set; }

public ProxyOptions()
{
ConnectorName = "";
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;

namespace NetCoreStack.WebSockets.ProxyClient
{
Expand All @@ -12,10 +14,16 @@ public class ClientInvocatorContext : InvocatorContext
public string ConnectorKey { get; }
public Uri Uri { get; }

public Func<WebSocket, Task> OnConnectedAsync { get; set; }

public Func<WebSocket, Task> OnDisconnectedAsync { get; set; }

public ClientInvocatorContext(Type invocator, string connectorName, string hostAddress,
WebSocketSupportedSchemes scheme = WebSocketSupportedSchemes.WS,
string uriPath = "",
string query = "")
string query = "",
Func<WebSocket, Task> onConnectedAsync = null,
Func<WebSocket, Task> onDisconnectedAsync = null)
:base(invocator)
{
ConnectorName = connectorName ?? throw new ArgumentNullException(nameof(connectorName));
Expand All @@ -24,6 +32,9 @@ public ClientInvocatorContext(Type invocator, string connectorName, string hostA
UriPath = uriPath;
Query = query;

OnConnectedAsync = onConnectedAsync;
OnDisconnectedAsync = onDisconnectedAsync;

var schemeStr = Scheme == WebSocketSupportedSchemes.WS ? "ws" : "wss";
var uriBuilder = new UriBuilder(new Uri($"{schemeStr}://{HostAddress}"));
if (!string.IsNullOrEmpty(UriPath))
Expand Down
28 changes: 24 additions & 4 deletions src/NetCoreStack.WebSockets/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,19 @@ private async Task SendDataAsync(Stream stream,
}
}

public async Task ConnectAsync(WebSocket webSocket, string connectionId, string connectorName = "")
public async Task ConnectAsync(WebSocket webSocket,
string connectionId,
string connectorName = "",
CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken != CancellationToken.None)
{
cancellationToken.Register(() =>
{
CancellationGraceful();
});
}

var receiverContext = new WebSocketReceiverContext
{
Compressor = _compressor,
Expand All @@ -153,8 +164,8 @@ public async Task ConnectAsync(WebSocket webSocket, string connectionId, string
await SendAsync(connectionId, context);
}

var receiver = new WebSocketReceiver(_serviceProvider, receiverContext, CloseConnection);
await receiver.ReceiveAsync();
var receiver = new WebSocketReceiver(_serviceProvider, receiverContext, CloseConnection, _loggerFactory);
await receiver.ReceiveAsync(cancellationToken);
}

public async Task BroadcastAsync(WebSocketMessageContext context)
Expand Down Expand Up @@ -257,11 +268,20 @@ public async Task SendBinaryAsync(string connectionId, byte[] input, IDictionary
}
}

public void CancellationGraceful()
{
foreach (KeyValuePair<string, WebSocketTransport> entry in Connections)
{
var transport = entry.Value;
_logger.LogInformation("Graceful cancellation. Close the websocket transport for: {0}", transport.ConnectorName);
}
}

public void CloseConnection(string connectionId)
{
if (Connections.TryRemove(connectionId, out WebSocketTransport transport))
{
transport.Dispose();
transport.WebSocket.Abort();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
using Microsoft.AspNetCore.Builder;
using NetCoreStack.WebSockets.Internal;
using System;
using System.Threading;

namespace NetCoreStack.WebSockets
{
public static class SocketApplicationBuilderExtensions
{
public static IApplicationBuilder UseNativeWebSockets(this IApplicationBuilder app)
public static IApplicationBuilder UseNativeWebSockets(this IApplicationBuilder app, CancellationToken cancellationToken = default(CancellationToken))
{
if (app == null)
{
throw new ArgumentNullException(nameof(app));
}

app.UseWebSockets();
app.UseMiddleware<WebSocketMiddleware>();

app.UseMiddleware<WebSocketMiddleware>(cancellationToken);

return app;
}
Expand Down
3 changes: 2 additions & 1 deletion src/NetCoreStack.WebSockets/Interfaces/IConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace NetCoreStack.WebSockets
Expand All @@ -10,7 +11,7 @@ public interface IConnectionManager
{
ConcurrentDictionary<string, WebSocketTransport> Connections { get; }

Task ConnectAsync(WebSocket webSocket, string connectionId, string connectorName = "");
Task ConnectAsync(WebSocket webSocket, string connectionId, string connectorName = "", CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Text message broadcaster
Expand Down
Loading

0 comments on commit c6ffb88

Please sign in to comment.