Skip to content

Commit

Permalink
server connection migration implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
terencefan committed Nov 18, 2019
1 parent 36228ea commit fbeef9d
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 12 deletions.
2 changes: 1 addition & 1 deletion samples/ChatSample/ChatSample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void ConfigureServices(IServiceCollection services)
}

public void Configure(IApplicationBuilder app)
{
{
app.UseFileServer();
app.UseAzureSignalR(routes =>
{
Expand Down
8 changes: 0 additions & 8 deletions samples/ChatSample/ChatSample/package.json

This file was deleted.

1 change: 1 addition & 0 deletions src/Microsoft.Azure.SignalR.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ internal static class Constants

public const int DefaultShutdownTimeoutInSeconds = 30;

public const string AsrsMigratedFrom = "Asrs-Migrated-From";
public const string AsrsUserAgent = "Asrs-User-Agent";
public const string AsrsInstanceId = "Asrs-Instance-Id";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ private static class Log
private static readonly Action<ILogger, string, Exception> _connectedStarting =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(11, "ConnectedStarting"), "Connection {TransportConnectionId} started.");

private static readonly Action<ILogger, string, Exception> _migrationStarting =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(22, "MigrationStarting"), "Connection {TransportConnectionId} migrated from another server.");

private static readonly Action<ILogger, string, Exception> _connectedEnding =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(12, "ConnectedEnding"), "Connection {TransportConnectionId} ended.");

Expand Down Expand Up @@ -82,6 +85,11 @@ public static void ConnectedStarting(ILogger logger, string connectionId)
_connectedStarting(logger, connectionId, null);
}

public static void MigrationStarting(ILogger logger, string connectionId)
{
_migrationStarting(logger, connectionId, null);
}

public static void ConnectedEnding(ILogger logger, string connectionId)
{
_connectedEnding(logger, connectionId, null);
Expand Down
28 changes: 25 additions & 3 deletions src/Microsoft.Azure.SignalR/ServerConnections/ServiceConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using SignalRProtocol = Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.SignalR.Protocol;
Expand Down Expand Up @@ -113,6 +114,17 @@ private async Task ProcessOutgoingMessagesAsync(ServiceConnectionContext connect
var buffer = result.Buffer;
if (!buffer.IsEmpty)
{
// We assume the first response message would be a HandshakeResponse.
if (connection.IsMigrated)
{
if (SignalRProtocol.HandshakeProtocol.TryParseResponseMessage(ref buffer, out var message))
{
connection.IsMigrated = false;
connection.Application.Input.AdvanceTo(buffer.End);
}
continue;
}

try
{
// Forward the message to the service
Expand Down Expand Up @@ -149,17 +161,27 @@ private async Task ProcessOutgoingMessagesAsync(ServiceConnectionContext connect
}
}

private void AddClientConnection(ServiceConnectionContext connection, string instanceId)
private void AddClientConnection(ServiceConnectionContext connection, OpenConnectionMessage message)
{
var instanceId = GetInstanceId(message.Headers);

_clientConnectionManager.AddClientConnection(connection);
_connectionIds.TryAdd(connection.ConnectionId, instanceId);
}

protected override Task OnConnectedAsync(OpenConnectionMessage message)
{
var connection = _clientConnectionFactory.CreateConnection(message, ConfigureContext);
AddClientConnection(connection, GetInstanceId(message.Headers));
Log.ConnectedStarting(Logger, connection.ConnectionId);
AddClientConnection(connection, message);

if (connection.IsMigrated)
{
Log.MigrationStarting(Logger, connection.ConnectionId);
}
else
{
Log.ConnectedStarting(Logger, connection.ConnectionId);
}

// Execute the application code
connection.ApplicationTask = _connectionDelegate(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Linq;
using System.Net;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
Expand Down Expand Up @@ -38,6 +39,8 @@ internal class ServiceConnectionContext : ConnectionContext,
private readonly TaskCompletionSource<object> _connectionEndTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);

public Task CompleteTask => _connectionEndTcs.Task;

public bool IsMigrated { get; set; }

private readonly object _heartbeatLock = new object();
private List<(Action<object> handler, object state)> _heartbeatHandlers;
Expand All @@ -47,6 +50,11 @@ public ServiceConnectionContext(OpenConnectionMessage serviceMessage, Action<Htt
ConnectionId = serviceMessage.ConnectionId;
User = serviceMessage.GetUserPrincipal();

if (serviceMessage.Headers.TryGetValue(Constants.AsrsMigratedFrom, out _))
{
IsMigrated = true;
}

// Create the Duplix Pipeline for the virtual connection
transportPipeOptions = transportPipeOptions ?? DefaultPipeOptions;
appPipeOptions = appPipeOptions ?? DefaultPipeOptions;
Expand Down

0 comments on commit fbeef9d

Please sign in to comment.