Skip to content

Commit

Permalink
Graceful shutdown support
Browse files Browse the repository at this point in the history
  • Loading branch information
terencefan committed Oct 21, 2019
1 parent 69ca480 commit f4f4b66
Show file tree
Hide file tree
Showing 24 changed files with 266 additions and 46 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ TestResult.xml
[Rr]eleasePS/
dlldata.c

# VIM
*.swp

# .NET Core
project.lock.json
project.fragment.lock.json
Expand Down Expand Up @@ -290,4 +293,4 @@ __pycache__/
*.odx.cs
*.xsd.cs

.publish/
.publish/
1 change: 1 addition & 0 deletions samples/ChatSample/ChatSample/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
appsettings.json
3 changes: 2 additions & 1 deletion samples/ChatSample/ChatSample/ChatSample.csproj
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>netcoreapp3.0</TargetFramework>
<UserSecretsId>chatsample</UserSecretsId>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.AspNetCore.SignalR.Protocols.MessagePack" Version="3.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
7 changes: 7 additions & 0 deletions samples/ChatSample/ChatSample/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"HubServer": {
"commandName": "Project",
"applicationUrl": "http://localhost:5050",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
13 changes: 7 additions & 6 deletions samples/ChatSample/ChatSample/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Security.Claims;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

using Microsoft.Extensions.Hosting;

namespace ChatSample
{
public class Startup
Expand All @@ -23,17 +22,19 @@ public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
services.AddSignalR()
.AddMessagePackProtocol()
.AddAzureSignalR();
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime)
{
app.UseMvc();
app.UseFileServer();
app.UseAzureSignalR(routes =>
{
routes.MapHub<Chat>("/chat");
routes.MapHub<NotificationHub>("/notifications");
routes.MapHub<NotificationHub>("/notifications");

lifetime.ApplicationStopping.Register(routes.ShutdownAll);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public Task StopAsync()
return Task.WhenAll(GetConnections().Select(s => s.StopAsync()));
}

public Task ShutdownAsync()
{
return Task.WhenAll(GetConnections().Select(s => s.ShutdownAsync()));
}

public IServiceConnectionContainer WithHub(string hubName)
{
if (_hubConnections == null ||!_hubConnections.TryGetValue(hubName, out var connection))
Expand Down
3 changes: 3 additions & 0 deletions src/Microsoft.Azure.SignalR.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ internal static class Constants
public const string ConnectionStringDefaultKey = "Azure:SignalR:ConnectionString";
public const string ApplicationNameDefaultKey = "Azure:SignalR:ApplicationName";

public const string ConnectionFin = "fin";
public const string ConnectionFinAck = "finack";

public const string AsrsUserAgent = "Asrs-User-Agent";
public const string AsrsInstanceId = "Asrs-Instance-Id";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ internal interface IServiceConnection

Task StopAsync();

Task CloseAsync();

ServiceConnectionStatus Status { get; }

Task ConnectionInitializedTask { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ internal interface IServiceConnectionContainer

Task StopAsync();

Task ShutdownAsync();

Task WriteAsync(ServiceMessage serviceMessage);

Task<bool> WriteAckableMessageAsync(ServiceMessage serviceMessage, CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,23 @@ public Task StopAsync()
}));
}

public Task ShutdownAsync()
{
if (_inner != null)
{
return _inner.ShutdownAsync();
}
else
{
return Task.WhenAll(Connections.Select(s =>
{
// Closing connection.
Log.StoppingConnection(_logger, s.Key.Endpoint);
return s.Value.ShutdownAsync();
}));
}
}

public Task WriteAsync(ServiceMessage serviceMessage)
{
if (_inner != null)
Expand Down Expand Up @@ -242,6 +259,9 @@ private static class Log
private static readonly Action<ILogger, string, string, Exception> _failedWritingMessageToEndpoint =
LoggerMessage.Define<string, string>(LogLevel.Warning, new EventId(5, "FailedWritingMessageToEndpoint"), "Message {messageType} is not sent to endpoint {endpoint} because all connections to this endpoint are offline.");

private static readonly Action<ILogger, string, Exception> _closingConnection =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(6, "ClosingConnection"), "Closing connections for endpoint {endpoint}.");

public static void StartingConnection(ILogger logger, string endpoint)
{
_startingConnection(logger, endpoint, null);
Expand All @@ -252,6 +272,11 @@ public static void StoppingConnection(ILogger logger, string endpoint)
_stoppingConnection(logger, endpoint, null);
}

public static void ClosingConnection(ILogger logger, string endpoint)
{
_closingConnection(logger, endpoint, null);
}

public static void EndpointNotExists(ILogger logger, string endpoint)
{
_endpointNotExists(logger, endpoint, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
Expand All @@ -30,7 +31,11 @@ internal abstract class ServiceConnectionBase : IServiceConnection
private readonly ReadOnlyMemory<byte> _cachedPingBytes;
private readonly HandshakeRequestMessage _handshakeRequest;

private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _closeConnectionLock = new SemaphoreSlim(1, 1);
protected readonly SemaphoreSlim _closeIncomingLock = new SemaphoreSlim(1, 1);
protected readonly SemaphoreSlim _closeOutgoingLock = new SemaphoreSlim(1, 1);

private readonly SemaphoreSlim _serverConnectionLock = new SemaphoreSlim(1, 1);

private readonly TaskCompletionSource<bool> _serviceConnectionStartTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

Expand All @@ -51,6 +56,9 @@ internal abstract class ServiceConnectionBase : IServiceConnection

private int _started;

// Shutdown flag
protected volatile bool _outgoingStopped = false;

protected HubServiceEndpoint HubEndpoint { get; }

protected string ConnectionId { get; }
Expand Down Expand Up @@ -147,15 +155,15 @@ public async Task StartAsync(string target = null)
// Don't allow write anymore when the connection is disconnected
Status = ServiceConnectionStatus.Disconnected;

await _writeLock.WaitAsync();
await _serverConnectionLock.WaitAsync();
try
{
// close the underlying connection
await DisposeConnection(connection);
}
finally
{
_writeLock.Release();
_serverConnectionLock.Release();
}
}
}
Expand All @@ -180,6 +188,43 @@ public Task StopAsync()
return Task.CompletedTask;
}

public async Task CloseAsync()
{
if (_closeIncomingLock.Wait(0))
{
// incoming task hasn't started yet so we don't need to wait.
return;
}

if (!_closeConnectionLock.Wait(0))
{
// only one close async process can exist at a time.
return;
}

Console.WriteLine($"{HubEndpoint}: sending fin request.");

// tell the runtime that the server connection is ready to close.
await WriteAsync(new CloseConnectionMessage(Constants.ConnectionFin));
// and wait the fin ack response from the runtime.
await _closeIncomingLock.WaitAsync();

Console.WriteLine($"{HubEndpoint}: fin ack received.");

// wait until outcoming pipe has been drained.
await _serverConnectionLock.WaitAsync();
_outgoingStopped = true;
_serverConnectionLock.Release();
await _closeOutgoingLock.WaitAsync();

Console.WriteLine($"{HubEndpoint} closed.");
}

protected virtual Task InnerCloseAsync()
{
return Task.CompletedTask;
}

public virtual async Task WriteAsync(ServiceMessage serviceMessage)
{
var errorMessage = _errorMessage;
Expand All @@ -195,12 +240,12 @@ public virtual async Task WriteAsync(ServiceMessage serviceMessage)
throw new ServiceConnectionNotActiveException(_errorMessage);
}

await _writeLock.WaitAsync();
await _serverConnectionLock.WaitAsync();

if (Status != ServiceConnectionStatus.Connected)
{
// Make sure not write messages to the connection when it is no longer connected
_writeLock.Release();
_serverConnectionLock.Release();
throw new ServiceConnectionNotActiveException(_errorMessage);
}

Expand All @@ -218,7 +263,7 @@ public virtual async Task WriteAsync(ServiceMessage serviceMessage)
}
finally
{
_writeLock.Release();
_serverConnectionLock.Release();
}
}

Expand Down Expand Up @@ -403,10 +448,14 @@ private async Task<bool> ReceiveHandshakeResponseAsync(PipeReader input, Cancell

private async Task ProcessIncomingAsync(ConnectionContext connection)
{
await _closeIncomingLock.WaitAsync();

var keepAliveTimer = StartKeepAliveTimer();
var inLoop = true;

try
{
while (true)
while (inLoop)
{
var result = await connection.Transport.Input.ReadAsync();
var buffer = result.Buffer;
Expand All @@ -430,7 +479,14 @@ private async Task ProcessIncomingAsync(ConnectionContext connection)

while (ServiceProtocol.TryParseMessage(ref buffer, out var message))
{
_ = DispatchMessageAsync(message);
if (message is CloseConnectionMessage closeConnectionMessage && closeConnectionMessage.ConnectionId == Constants.ConnectionFinAck)
{
inLoop = false;
break;
} else
{
_ = DispatchMessageAsync(message);
}
}
}

Expand All @@ -454,27 +510,35 @@ private async Task ProcessIncomingAsync(ConnectionContext connection)
}
}
finally
{
keepAliveTimer.Stop();
}
{
_closeIncomingLock.Release();
keepAliveTimer.Stop();
Status = ServiceConnectionStatus.Disconnected;
await DisposeConnection(connection);
}

// TODO: Never cleanup connections unless Service asks us to do that
// Current implementation is based on assumption that Service will drop clients
// if server connection fails.
await CleanupConnections();
}

private Task DispatchMessageAsync(ServiceMessage message)
{
switch (message)
{
case OpenConnectionMessage openConnectionMessage:
return OnConnectedAsync(openConnectionMessage);
case CloseConnectionMessage closeConnectionMessage:
return OnDisconnectedAsync(closeConnectionMessage);
case ConnectionDataMessage connectionDataMessage:
return OnMessageAsync(connectionDataMessage);
case ServiceErrorMessage serviceErrorMessage:
return OnServiceErrorAsync(serviceErrorMessage);
case PingMessage pingMessage:
return OnPingMessageAsync(pingMessage);
case AckMessage ackMessage:
return OnAckMessageAsync(ackMessage);
{
switch (message)
{
case OpenConnectionMessage openConnectionMessage:
return OnConnectedAsync(openConnectionMessage);
case CloseConnectionMessage closeConnectionMessage:
return OnDisconnectedAsync(closeConnectionMessage);
case ConnectionDataMessage connectionDataMessage:
return OnMessageAsync(connectionDataMessage);
case ServiceErrorMessage serviceErrorMessage:
return OnServiceErrorAsync(serviceErrorMessage);
case PingMessage pingMessage:
return OnPingMessageAsync(pingMessage);
case AckMessage ackMessage:
return OnAckMessageAsync(ackMessage);
}
return Task.CompletedTask;
}
Expand Down Expand Up @@ -517,7 +581,7 @@ private async Task KeepAliveAsync(TimerAwaitable timer)

private async ValueTask TrySendPingAsync()
{
if (!_writeLock.Wait(0))
if (!_serverConnectionLock.Wait(0))
{
// Skip sending PingMessage when failed getting lock
return;
Expand All @@ -539,7 +603,7 @@ private async ValueTask TrySendPingAsync()
}
finally
{
_writeLock.Release();
_serverConnectionLock.Release();
}
}

Expand Down
Loading

0 comments on commit f4f4b66

Please sign in to comment.