Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Part3.4][Live-scale] Implement add endpoint. #841

Merged
merged 11 commits into from
Mar 17, 2020
12 changes: 12 additions & 0 deletions src/Microsoft.Azure.SignalR.Common/Endpoints/ScaleOperation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.SignalR
{
internal enum ScaleOperation
{
Add,
Remove,
Rename
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected async Task AddServiceEndpointsAsync(IReadOnlyList<ServiceEndpoint> end

await Task.WhenAll(hubEndpoints.Select(e => AddHubServiceEndpointAsync(e, cancellationToken)));

// TODO: update local store for negotiation
UpdateNegotiationEndpointsStore(hubEndpoints, ScaleOperation.Add);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -214,6 +214,26 @@ private Task RenameHubServiceEndpoint(HubServiceEndpoint endpoint)
return Task.CompletedTask;
}

private void UpdateNegotiationEndpointsStore(IReadOnlyList<HubServiceEndpoint> endpoints, ScaleOperation scaleOperation)
{
foreach (var hubEndpoint in _endpointsPerHub)
{
var updatedEndpoints = endpoints.Where(e => e.Hub == hubEndpoint.Key).ToList();
var oldEndpoints = hubEndpoint.Value;
var newEndpoints = new List<HubServiceEndpoint>();
switch (scaleOperation)
{
case ScaleOperation.Add:
newEndpoints = oldEndpoints.ToList();
newEndpoints.AddRange(endpoints);
break;
default:
break;
}
_endpointsPerHub.TryUpdate(hubEndpoint.Key, newEndpoints, oldEndpoints);
}
}

private static class Log
{
private static readonly Action<ILogger, int, string, string, Exception> _duplicateEndpointFound =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ internal class MultiEndpointServiceConnectionContainer : IMultiEndpointServiceCo
private readonly IMessageRouter _router;
private readonly ILogger _logger;
private readonly IServiceEndpointManager _serviceEndpointManager;
private readonly TimeSpan _scaleTimeout;
private readonly Func<HubServiceEndpoint, IServiceConnectionContainer> _generator;
private readonly int _scaleWaitIntervalInSeconds = 5;
private readonly object _lock = new object();

// <needRouter, endpoints>
private (bool needRouter, IReadOnlyList<HubServiceEndpoint> endpoints) _routerEndpoints;

internal MultiEndpointServiceConnectionContainer(
string hub,
Func<HubServiceEndpoint, IServiceConnectionContainer> generator,
IServiceEndpointManager endpointManager,
IMessageRouter router,
ILoggerFactory loggerFactory)
ILoggerFactory loggerFactory,
TimeSpan? scaleTimeout = null)
{
if (generator == null)
{
Expand All @@ -38,8 +42,12 @@ internal MultiEndpointServiceConnectionContainer(
_router = router ?? throw new ArgumentNullException(nameof(router));
_logger = loggerFactory?.CreateLogger<MultiEndpointServiceConnectionContainer>() ?? throw new ArgumentNullException(nameof(loggerFactory));
_serviceEndpointManager = endpointManager;

// provides a copy to the endpoint per container
_scaleTimeout = scaleTimeout ?? TimeSpan.FromSeconds(Constants.DefaultScaleTimeoutInSeconds);

// Reserve generator for potential scale use.
_generator = generator;

// provides a copy to the endpoint per container
var endpoints = endpointManager.GetEndpoints(hub);
// router will be used when there's customized MessageRouter or multiple endpoints
var needRouter = endpoints.Count > 1 || !(_router is DefaultMessageRouter);
Expand All @@ -62,18 +70,19 @@ public MultiEndpointServiceConnectionContainer(
int count,
IServiceEndpointManager endpointManager,
IMessageRouter router,
ILoggerFactory loggerFactory
ILoggerFactory loggerFactory,
TimeSpan? scaleTimeout = null
) : this(
hub,
endpoint => CreateContainer(serviceConnectionFactory, endpoint, count, loggerFactory),
endpointManager,
router,
loggerFactory
)
loggerFactory,
scaleTimeout)
{
}

public IEnumerable<ServiceEndpoint> GetOnlineEndpoints()
public IEnumerable<HubServiceEndpoint> GetOnlineEndpoints()
{
return _routerEndpoints.endpoints.Where(s => s.Online);
}
Expand Down Expand Up @@ -250,13 +259,31 @@ private void OnAdd(HubServiceEndpoint endpoint)
_ = AddHubServiceEndpointAsync(endpoint);
}

private Task AddHubServiceEndpointAsync(HubServiceEndpoint endpoint)
private async Task AddHubServiceEndpointAsync(HubServiceEndpoint endpoint)
{
// TODO: create container and trigger server ping.
var container = _generator(endpoint);
endpoint.ConnectionContainer = container;

// do tasks when !endpoint.ScaleTask.IsCanceled or local timeout check not finish
endpoint.CompleteScale();
return Task.CompletedTask;
try
{
_ = container.StartAsync();

// Update local store directly after start connection
// to get a uniformed action on trigger servers ping
UpdateEndpointsStore(endpoint, ScaleOperation.Add);

await StartGetServersPing();
await WaitForServerStable(container, endpoint);
}
catch (Exception ex)
{
Log.FailedStartingConnectionForNewEndpoint(_logger, endpoint.ToString(), ex);
}
finally
{
_ = StopGetServersPing();
endpoint.CompleteScale();
}
}

private void OnRemove(HubServiceEndpoint endpoint)
Expand Down Expand Up @@ -286,6 +313,64 @@ private void OnRename(HubServiceEndpoint endpoint)
// TODO: update local store names
}

private void UpdateEndpointsStore(HubServiceEndpoint newEndpoint, ScaleOperation operation)
{
// Use lock to ensure store update safety as parallel changes triggered in container side.
lock (_lock)
{
switch (operation)
{
case ScaleOperation.Add:
var newEndpoints = _routerEndpoints.endpoints.ToList();
newEndpoints.Add(newEndpoint);
var needRouter = newEndpoints.Count > 1;
_routerEndpoints = (needRouter, newEndpoints);
break;
default:
break;
}
}
}

private async Task WaitForServerStable(IServiceConnectionContainer container, HubServiceEndpoint endpoint)
{
var startTime = DateTime.UtcNow;
while (DateTime.UtcNow - startTime < _scaleTimeout)
{
if (IsServerReady(container))
{
return;
}
// status ping interval is 5 seconds, delay to do next check
await Task.Delay(_scaleWaitIntervalInSeconds * 1000);
}
Log.TimeoutWaitingForAddingEndpoint(_logger, endpoint.ToString(), _scaleTimeout.Seconds);
}

private bool IsServerReady(IServiceConnectionContainer container)
{
var serversOnNew = container.GlobalServerIds;
var allMatch = serversOnNew?.Count > 0;
if (!allMatch)
{
// return directly if local server list is not set yet.
return false;
}

// ensure strong consistency of server Ids for new endpoint towards exists
foreach (var endpoint in _routerEndpoints.endpoints)
{
allMatch = endpoint.ConnectionContainer.GlobalServerIds != null
&& serversOnNew.SetEquals(endpoint.ConnectionContainer.GlobalServerIds)
&& allMatch;
if (!allMatch)
{
return false;
}
}
return allMatch;
}

private static class Log
{
private static readonly Action<ILogger, string, Exception> _startingConnection =
Expand All @@ -303,6 +388,12 @@ private static class Log
private static readonly Action<ILogger, string, string, Exception> _failedWritingMessageToEndpoint =
LoggerMessage.Define<string, string>(LogLevel.Warning, new EventId(5, "FailedWritingMessageToEndpoint"), "Message {messageType} is not sent to endpoint {endpoint} because all connections to this endpoint are offline.");

private static readonly Action<ILogger, string, Exception> _failedStartingConnectionForNewEndpoint =
LoggerMessage.Define<string>(LogLevel.Error, new EventId(7, "FailedStartingConnectionForNewEndpoint"), "Fail to create and start server connection for new endpoint {endpoint}.");

private static readonly Action<ILogger, string, int, Exception> _timeoutWaitingForAddingEndpoint =
LoggerMessage.Define<string, int>(LogLevel.Error, new EventId(8, "TimeoutWaitingForAddingEndpoint"), "Timeout waiting for add a new endpoint {endpoint} in {timeoutSecond} seconds. Check if app configurations are consistant and restart app server.");

public static void StartingConnection(ILogger logger, string endpoint)
{
_startingConnection(logger, endpoint, null);
Expand All @@ -327,6 +418,16 @@ public static void FailedWritingMessageToEndpoint(ILogger logger, string message
{
_failedWritingMessageToEndpoint(logger, messageType, endpoint, null);
}

public static void FailedStartingConnectionForNewEndpoint(ILogger logger, string endpoint, Exception ex)
{
_failedStartingConnectionForNewEndpoint(logger, endpoint, ex);
}

public static void TimeoutWaitingForAddingEndpoint(ILogger logger, string endpoint, int timeoutSecond)
{
_timeoutWaitingForAddingEndpoint(logger, endpoint, timeoutSecond, null);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal abstract class ServiceConnectionContainerBase : IServiceConnectionConta
private volatile ServiceConnectionStatus _status;

// <serverIds, lastServerIdsTimestamp>
private volatile Tuple<HashSet<string>, long> _serverIdContext;
private volatile Tuple<HashSet<string>, long> _serverIdContext = DefaultServerIdContext;
private volatile bool _hasClients;
private volatile bool _terminated = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,27 @@ internal class ServiceConnectionContainerFactory : IServiceConnectionContainerFa
private readonly IServiceEndpointManager _serviceEndpointManager;
private readonly IMessageRouter _router;
private readonly IServiceConnectionFactory _serviceConnectionFactory;
private readonly TimeSpan? _serviceScaleTimeout;

public ServiceConnectionContainerFactory(
IServiceConnectionFactory serviceConnectionFactory,
IServiceEndpointManager serviceEndpointManager,
IMessageRouter router,
IServiceEndpointOptions options,
ILoggerFactory loggerFactory)
ILoggerFactory loggerFactory,
TimeSpan? serviceScaleTimeout = null)
{
_serviceConnectionFactory = serviceConnectionFactory;
_serviceEndpointManager = serviceEndpointManager ?? throw new ArgumentNullException(nameof(serviceEndpointManager));
_router = router ?? throw new ArgumentNullException(nameof(router));
_options = options;
_loggerFactory = loggerFactory;
_serviceScaleTimeout = serviceScaleTimeout;
}

public IMultiEndpointServiceConnectionContainer Create(string hub)
{
return new MultiEndpointServiceConnectionContainer(_serviceConnectionFactory, hub, _options.ConnectionCount, _serviceEndpointManager, _router, _loggerFactory);
return new MultiEndpointServiceConnectionContainer(_serviceConnectionFactory, hub, _options.ConnectionCount, _serviceEndpointManager, _router, _loggerFactory, _serviceScaleTimeout);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,39 @@ public override IServiceEndpointProvider GetEndpointProvider(ServiceEndpoint end
return new ServiceEndpointProvider(endpoint, _options);
}

private async void OnChange(ServiceOptions options)
private void OnChange(ServiceOptions options)
{
Log.DetectConfigurationChanges(_logger);

// synchronize scale for quick and clean status sync
ReloadServiceEndpointsAsync(options.Endpoints).Wait();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be blocking here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd complete reload endpoints before accept next change. Most of the operations are not able to easy rollback. Add a task to avoid Wait()?

}

// TODO: make public for non hot-reload plans
private async Task ReloadServiceEndpointsAsync(ServiceEndpoint[] serviceEndpoints)
{
// Reset local cache and validate result
var endpoints = GetValuableEndpoints(GetEndpoints(options));
var endpoints = GetValuableEndpoints(serviceEndpoints);
if (endpoints.Length == 0)
{
Log.EndpointNotFound(_logger);
return;
return;
}
Endpoints = endpoints;

var updatedEndpoints = GetChangedEndpoints(Endpoints);

await RenameSerivceEndpoints(updatedEndpoints.RenamedEndpoints);

using (var addCts = new CancellationTokenSource(options.ServiceScaleTimeout))
using (var addCts = new CancellationTokenSource(_scaleTimeout))
{
if (!await WaitTaskOrTimeout(AddServiceEndpointsAsync(updatedEndpoints.AddedEndpoints, addCts.Token), addCts))
{
Log.TimeoutAddEndpoints(_logger);
}
}

using (var removeCts = new CancellationTokenSource(options.ServiceScaleTimeout))
using (var removeCts = new CancellationTokenSource(_scaleTimeout))
{
if (!await WaitTaskOrTimeout(RemoveServiceEndpointsAsync(updatedEndpoints.RemovedEndpoints, removeCts.Token), removeCts))
{
Expand All @@ -82,6 +89,8 @@ private async void OnChange(ServiceOptions options)
}

_endpointsStore = Endpoints;

Log.CompleteUpdateEndpoints(_logger);
}

private (IReadOnlyList<ServiceEndpoint> AddedEndpoints,
Expand Down Expand Up @@ -141,6 +150,10 @@ private static class Log
private static readonly Action<ILogger, Exception> _timeoutRemoveEndpoints =
LoggerMessage.Define(LogLevel.Error, new EventId(5, "TimeoutRemoveEndpoints"), "Timeout waiting for removing endpoints.");

private static readonly Action<ILogger, Exception> _completeUpdateEndpoints =
LoggerMessage.Define(LogLevel.Debug, new EventId(6, "CompleteUpdateEndpoints"), "Complete updating endpoints.");


public static void DetectConfigurationChanges(ILogger logger)
{
_detectEndpointChanges(logger, null);
Expand All @@ -165,6 +178,11 @@ public static void TimeoutRemoveEndpoints(ILogger logger)
{
_timeoutRemoveEndpoints(logger, null);
}

public static void CompleteUpdateEndpoints(ILogger logger)
{
_completeUpdateEndpoints(logger, null);
}
}
}
}
3 changes: 2 additions & 1 deletion src/Microsoft.Azure.SignalR/HubHost/ServiceHubDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ private IMultiEndpointServiceConnectionContainer GetMultiEndpointServiceConnecti
_serviceEndpointManager,
_router,
_options,
_loggerFactory
_loggerFactory,
_options.ServiceScaleTimeout
);
return factory.Create(hub);
}
Expand Down
Loading