Skip to content

Commit

Permalink
Add ServiceConnectionMigrationLevel option
Browse files Browse the repository at this point in the history
  • Loading branch information
terencefan committed Nov 28, 2019
1 parent 506b832 commit 0a9a648
Show file tree
Hide file tree
Showing 20 changed files with 169 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,15 @@ public ServiceConnection(
IClientConnectionManager clientConnectionManager,
ILoggerFactory loggerFactory,
IServiceMessageHandler serviceMessageHandler,
ServerConnectionType connectionType = ServerConnectionType.Default)
: base(serviceProtocol, connectionId, endpoint, serviceMessageHandler, connectionType,
loggerFactory?.CreateLogger<ServiceConnection>())
ServiceConnectionOptions options = null
)
: base(
serviceProtocol,
connectionId,
endpoint,
serviceMessageHandler,
options,
loggerFactory?.CreateLogger<ServiceConnection>())
{
_connectionFactory = connectionFactory;
_clientConnectionManager = clientConnectionManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class ServiceConnectionFactory : IServiceConnectionFactory
private readonly IClientConnectionManager _clientConnectionManager;
private readonly IConnectionFactory _connectionFactory;
private readonly ILoggerFactory _logger;
private readonly ServiceConnectionOptions _options = ServiceConnectionOptions.Default;

public ServiceConnectionFactory(IServiceProtocol serviceProtocol,
IClientConnectionManager clientConnectionManager,
Expand All @@ -22,9 +23,15 @@ public ServiceConnectionFactory(IServiceProtocol serviceProtocol,
_logger = logger;
}

public IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServerConnectionType type)
public IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServiceConnectionType type)
{
return new ServiceConnection(Guid.NewGuid().ToString(), endpoint, _serviceProtocol, _connectionFactory, _clientConnectionManager, _logger, serviceMessageHandler, type);
ServiceConnectionOptions options = _options;
if (type != _options.ConnectionType)
{
options = _options.Clone();
options.ConnectionType = type;
}
return new ServiceConnection(Guid.NewGuid().ToString(), endpoint, _serviceProtocol, _connectionFactory, _clientConnectionManager, _logger, serviceMessageHandler, options);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
{
interface IServiceConnectionFactory
{
IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServerConnectionType type);
IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServiceConnectionType type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ internal abstract class ServiceConnectionBase : IServiceConnection
private readonly TaskCompletionSource<bool> _serviceConnectionStartTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<object> _serviceConnectionOfflineTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);

private readonly ServerConnectionType _connectionType;
protected readonly ServiceConnectionOptions Options;

private ServiceConnectionType _connectionType { get => Options.ConnectionType; }

private readonly IServiceMessageHandler _serviceMessageHandler;
private readonly object _statusLock = new object();
Expand Down Expand Up @@ -87,19 +89,26 @@ protected set

public Task ConnectionOfflineTask => _serviceConnectionOfflineTcs.Task;

protected ServiceConnectionBase(IServiceProtocol serviceProtocol, string connectionId,
HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServerConnectionType connectionType, ILogger logger)
protected ServiceConnectionBase(
IServiceProtocol serviceProtocol,
string connectionId,
HubServiceEndpoint endpoint,
IServiceMessageHandler serviceMessageHandler,
ServiceConnectionOptions options,
ILogger logger
)
{
ServiceProtocol = serviceProtocol;
ConnectionId = connectionId;

_connectionType = connectionType;
Options = options ?? ServiceConnectionOptions.Default;

HubEndpoint = endpoint;

if (serviceProtocol != null)
{
_cachedPingBytes = serviceProtocol.GetMessageBytes(PingMessage.Instance);
_handshakeRequest = new HandshakeRequestMessage(serviceProtocol.Version, (int)connectionType);
_handshakeRequest = new HandshakeRequestMessage(serviceProtocol.Version, (int)options.ConnectionType);
}

Logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand Down Expand Up @@ -385,7 +394,7 @@ private async Task<bool> ReceiveHandshakeResponseAsync(PipeReader input, Cancell
}

// Handshake error. Will stop reconnect.
if (_connectionType == ServerConnectionType.OnDemand)
if (_connectionType == ServiceConnectionType.OnDemand)
{
// Handshake errors on on-demand connections are acceptable.
Log.OnDemandConnectionHandshakeResponse(Logger, handshakeResponse.ErrorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected List<IServiceConnection> FixedServiceConnections

protected int FixedConnectionCount { get; }

protected virtual ServerConnectionType InitialConnectionType { get; } = ServerConnectionType.Default;
protected virtual ServiceConnectionType InitialConnectionType { get; } = ServiceConnectionType.Default;

public HubServiceEndpoint Endpoint { get; }

Expand All @@ -78,9 +78,13 @@ private set
}
}

protected ServiceConnectionContainerBase(IServiceConnectionFactory serviceConnectionFactory,
int minConnectionCount, HubServiceEndpoint endpoint,
IReadOnlyList<IServiceConnection> initialConnections = null, ILogger logger = null, AckHandler ackHandler = null)
protected ServiceConnectionContainerBase(
IServiceConnectionFactory serviceConnectionFactory,
int minConnectionCount,
HubServiceEndpoint endpoint,
IReadOnlyList<IServiceConnection> initialConnections = null,
ILogger logger = null,
AckHandler ackHandler = null)
{
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
ServiceConnectionFactory = serviceConnectionFactory;
Expand Down Expand Up @@ -155,7 +159,7 @@ public void HandleAck(AckMessage ackMessage)
/// <summary>
/// Create a connection for a specific service connection type
/// </summary>
protected IServiceConnection CreateServiceConnectionCore(ServerConnectionType type)
protected IServiceConnection CreateServiceConnectionCore(ServiceConnectionType type)
{
var connection = ServiceConnectionFactory.Create(Endpoint, this, type);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.SignalR
{
internal enum ServiceConnectionMigrationLevel
{
/// <summary>
/// 0, Default, client-connection will not be migrated at any time.
/// </summary>
Off = 0,
/// <summary>
/// 1, ShutdownOnly, client-connection will be migrated to another available server if a graceful shutdown had performed.
/// </summary>
ShutdownOnly = 1,
/// <summary>
/// 2, All, migration will happen even if one or all service-connection had been dropped accidentally. (pending messages will be lost)
/// </summary>
All = 2,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace Microsoft.Azure.SignalR
{
class ServiceConnectionOptions
{
public ServiceConnectionType ConnectionType = ServiceConnectionType.Default;

public ServiceConnectionMigrationLevel MigrationLevel = ServiceConnectionMigrationLevel.Off;

public static ServiceConnectionOptions Default { get => new ServiceConnectionOptions(); }

internal ServiceConnectionOptions()
{

}

internal ServiceConnectionOptions Clone()
{
return new ServiceConnectionOptions
{
ConnectionType = ConnectionType,
MigrationLevel = MigrationLevel
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Microsoft.Azure.SignalR
{
internal enum ServerConnectionType
internal enum ServiceConnectionType
{
/// <summary>
/// 0, Default, it can carry clients, service runtime should always accept this kind of connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private IServiceConnection CreateOnDemandServiceConnection()

lock (_lock)
{
newConnection = CreateServiceConnectionCore(ServerConnectionType.OnDemand);
newConnection = CreateServiceConnectionCore(ServiceConnectionType.OnDemand);
_onDemandServiceConnections.Add(newConnection);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal class WeakServiceConnectionContainer : ServiceConnectionContainerBase

private readonly TimerAwaitable _timer;

protected override ServerConnectionType InitialConnectionType => ServerConnectionType.Weak;
protected override ServiceConnectionType InitialConnectionType => ServiceConnectionType.Weak;

public WeakServiceConnectionContainer(IServiceConnectionFactory serviceConnectionFactory,
int fixedConnectionCount, HubServiceEndpoint endpoint, ILogger logger)
Expand Down
15 changes: 13 additions & 2 deletions src/Microsoft.Azure.SignalR.Management/ServiceManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,23 @@ public async Task<IServiceHubContext> CreateHubContextAsync(string hubName, ILog
var clientConnectionManager = new ClientConnectionManager();
var clientConnectionFactory = new ClientConnectionFactory();
ConnectionDelegate connectionDelegate = connectionContext => Task.CompletedTask;
var serviceConnectionFactory = new ServiceConnectionFactory(serviceProtocol, clientConnectionManager, connectionFactory, loggerFactory, connectionDelegate, clientConnectionFactory);

var serviceConnectionFactory = new ServiceConnectionFactory(
serviceProtocol,
clientConnectionManager,
connectionFactory,
loggerFactory,
connectionDelegate,
clientConnectionFactory,
ServiceConnectionOptions.Default
);

var weakConnectionContainer = new WeakServiceConnectionContainer(
serviceConnectionFactory,
_serviceManagerOptions.ConnectionCount,
new HubServiceEndpoint(hubName, _endpointProvider, _endpoint),
loggerFactory?.CreateLogger(nameof(WeakServiceConnectionContainer)) ?? NullLogger.Instance);
loggerFactory?.CreateLogger(nameof(WeakServiceConnectionContainer)) ?? NullLogger.Instance
);

var serviceCollection = new ServiceCollection();
serviceCollection.AddSignalRCore();
Expand Down
17 changes: 15 additions & 2 deletions src/Microsoft.Azure.SignalR/HubHost/ServiceHubDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,21 @@ private async Task OfflineAndWaitForCompletedAsync()
private IServiceConnectionContainer GetMultiEndpointServiceConnectionContainer(string hub, ConnectionDelegate connectionDelegate, Action<HttpContext> contextConfig = null)
{
var connectionFactory = new ConnectionFactory(_nameProvider, _loggerFactory);
var serviceConnectionFactory = new ServiceConnectionFactory(_serviceProtocol, _clientConnectionManager, connectionFactory, _loggerFactory, connectionDelegate, _clientConnectionFactory);
serviceConnectionFactory.ConfigureContext = contextConfig;
var connectionOptions = ServiceConnectionOptions.Default;

if (_options.ServerConnectionMigration == 1)
{
connectionOptions.MigrationLevel = ServiceConnectionMigrationLevel.ShutdownOnly;
}
else if (_options.ServerConnectionMigration == 2)
{
connectionOptions.MigrationLevel = ServiceConnectionMigrationLevel.All;
}

var serviceConnectionFactory = new ServiceConnectionFactory(_serviceProtocol, _clientConnectionManager, connectionFactory, _loggerFactory, connectionDelegate, _clientConnectionFactory, connectionOptions)
{
ConfigureContext = contextConfig
};

var factory = new ServiceConnectionContainerFactory(
serviceConnectionFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ public ServiceConnection(IServiceProtocol serviceProtocol,
string connectionId,
HubServiceEndpoint endpoint,
IServiceMessageHandler serviceMessageHandler,
ServerConnectionType connectionType = ServerConnectionType.Default) :
base(serviceProtocol, connectionId, endpoint, serviceMessageHandler, connectionType, loggerFactory?.CreateLogger<ServiceConnection>())
ServiceConnectionOptions options = null
) :
base(serviceProtocol, connectionId, endpoint, serviceMessageHandler, options, loggerFactory?.CreateLogger<ServiceConnection>())
{
_clientConnectionManager = clientConnectionManager;
_connectionFactory = connectionFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.SignalR.Protocol;
Expand All @@ -17,30 +15,55 @@ internal class ServiceConnectionFactory : IServiceConnectionFactory
private readonly ConnectionDelegate _connectionDelegate;
private readonly IClientConnectionFactory _clientConnectionFactory;

private readonly ServiceConnectionOptions _options;

public Action<HttpContext> ConfigureContext { get; set; }

public ServiceConnectionFactory(IServiceProtocol serviceProtocol,
IClientConnectionManager clientConnectionManager,
IConnectionFactory connectionFactory,
ILoggerFactory loggerFactory,
ConnectionDelegate connectionDelegate,
IClientConnectionFactory clientConnectionFactory)
IClientConnectionFactory clientConnectionFactory,
ServiceConnectionOptions options
)
{
_serviceProtocol = serviceProtocol;
_clientConnectionManager = clientConnectionManager;
_connectionFactory = connectionFactory;
_loggerFactory = loggerFactory;
_connectionDelegate = connectionDelegate;
_clientConnectionFactory = clientConnectionFactory;
_options = options;
}

public IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServerConnectionType type)
public IServiceConnection Create(
HubServiceEndpoint endpoint,
IServiceMessageHandler serviceMessageHandler,
ServiceConnectionType type
)
{
var serviceConnection = new ServiceConnection(_serviceProtocol, _clientConnectionManager, _connectionFactory,
_loggerFactory, _connectionDelegate, _clientConnectionFactory,
Guid.NewGuid().ToString(), endpoint, serviceMessageHandler, type);
serviceConnection.ConfigureContext = ConfigureContext;
return serviceConnection;
ServiceConnectionOptions options = _options;
if (type != _options.ConnectionType)
{
options = _options.Clone();
options.ConnectionType = type;
}
return new ServiceConnection(
_serviceProtocol,
_clientConnectionManager,
_connectionFactory,
_loggerFactory,
_connectionDelegate,
_clientConnectionFactory,
Guid.NewGuid().ToString(),
endpoint,
serviceMessageHandler,
options
)
{
ConfigureContext = ConfigureContext
};
}
}
}
8 changes: 8 additions & 0 deletions src/Microsoft.Azure.SignalR/ServiceOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ public class ServiceOptions : IServiceEndpointOptions
/// </summary>
internal bool EnableGracefulShutdown { get; set; } = false;

/// <summary>
/// Specifies if the client-connection assigned to this server can be migrated to another server.
/// Default value is 0.
/// 1: Only migrate client-connection if server was shutdown gracefully.
/// 2: Migrate client-connection even if server-connection was accidentally dropped. (Potential data losses)
/// </summary>
internal int ServerConnectionMigration { get; set; } = 0;

/// <summary>
/// Specifies the timeout of a graceful shutdown process (in seconds).
/// Default value is 30 seconds.
Expand Down
5 changes: 0 additions & 5 deletions src/Microsoft.Azure.SignalR/ServiceOptionsSetup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ internal class ServiceOptionsSetup : IConfigureOptions<ServiceOptions>
private readonly string _connectionString;
private readonly ServiceEndpoint[] _endpoints;

private readonly bool _gracefulShutdownEnabled = false;
private readonly TimeSpan _shutdownTimeout = TimeSpan.FromSeconds(Constants.DefaultShutdownTimeoutInSeconds);

public ServiceOptionsSetup(IConfiguration configuration)
{
_appName = configuration[Constants.ApplicationNameDefaultKeyPrefix];
Expand Down Expand Up @@ -47,8 +44,6 @@ public void Configure(ServiceOptions options)
options.Endpoints = _endpoints;
options.ApplicationName = _appName;
options.ServerStickyMode = _serverStickyMode;
options.EnableGracefulShutdown = _gracefulShutdownEnabled;
options.ServerShutdownTimeout = _shutdownTimeout;
}

private static (string, ServiceEndpoint[]) GetEndpoint(IConfiguration configuration, string key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public TestServiceConnection(ServiceConnectionStatus status = ServiceConnectionS
Guid.NewGuid().ToString(),
new HubServiceEndpoint(),
null, // TODO replace it with a NullMessageHandler
ServerConnectionType.Default,
ServiceConnectionOptions.Default,
NullLogger.Instance
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public TestServiceConnectionFactory(Func<ServiceEndpoint, IServiceConnection> ge
_generator = generator;
}

public IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServerConnectionType type)
public IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServiceConnectionType type)
{
return _generator?.Invoke(endpoint) ?? new TestServiceConnection();
}
Expand Down
Loading

0 comments on commit 0a9a648

Please sign in to comment.