Skip to content

Commit

Permalink
[Part3.2][Live-scale] add interfaces for scale ServiceEndpoint (#832)
Browse files Browse the repository at this point in the history
* add interfaces for Scale ServiceEndpoint

* add timeout check in ServiceEndpointManager when scale

* improve timeout

* better naming.
  • Loading branch information
JialinXin authored Mar 2, 2020
1 parent 7929771 commit 5f1f13f
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/Microsoft.Azure.SignalR.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal static class Constants
public const string ApplicationNameDefaultKey = "Azure:SignalR:ApplicationName";

public const int DefaultShutdownTimeoutInSeconds = 30;
public const int DefaultScaleTimeoutInSeconds = 300;

public const string AsrsMigrateFrom = "Asrs-Migrate-From";
public const string AsrsMigrateTo = "Asrs-Migrate-To";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,40 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Threading.Tasks;

namespace Microsoft.Azure.SignalR
{
internal class HubServiceEndpoint : ServiceEndpoint
{
public HubServiceEndpoint(string hub, IServiceEndpointProvider provider, ServiceEndpoint endpoint) : base(endpoint)
private readonly TaskCompletionSource<bool> _scaleTcs;

public HubServiceEndpoint(
string hub,
IServiceEndpointProvider provider,
ServiceEndpoint endpoint,
bool needScaleTcs = false
) : base(endpoint)
{
Hub = hub;
Provider = provider;
_scaleTcs = needScaleTcs ? new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously) : null;
}

internal HubServiceEndpoint() : base() { }

public string Hub { get; }

public IServiceEndpointProvider Provider { get; }

/// <summary>
/// Task waiting for HubServiceEndpoint turn ready when live add/remove endpoint
/// </summary>
public Task ScaleTask => _scaleTcs?.Task ?? Task.CompletedTask;

public void CompleteScale()
{
_scaleTcs?.TrySetResult(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.Azure.SignalR.Common;
using Microsoft.Extensions.Logging;

Expand All @@ -20,6 +23,10 @@ internal abstract class ServiceEndpointManagerBase : IServiceEndpointManager
// Filtered valuable endpoints from ServiceOptions
public ServiceEndpoint[] Endpoints { get; protected set; }

public event EndpointEventHandler OnAdd;
public event EndpointEventHandler OnRemove;
public event EndpointEventHandler OnRename;

protected ServiceEndpointManagerBase(IServiceEndpointOptions options, ILogger logger)
: this(GetEndpoints(options), logger)
{
Expand All @@ -43,11 +50,7 @@ internal ServiceEndpointManagerBase(IEnumerable<ServiceEndpoint> endpoints, ILog

public IReadOnlyList<HubServiceEndpoint> GetEndpoints(string hub)
{
return _endpointsPerHub.GetOrAdd(hub, s => Endpoints.Select(e =>
{
var provider = GetEndpointProvider(e);
return new HubServiceEndpoint(hub, provider, e);
}).ToArray());
return _endpointsPerHub.GetOrAdd(hub, s => Endpoints.Select(e => CreateHubServiceEndpoint(hub, e)).ToArray());
}

protected static IEnumerable<ServiceEndpoint> GetEndpoints(IServiceEndpointOptions options)
Expand Down Expand Up @@ -96,15 +99,186 @@ protected ServiceEndpoint[] GetValuableEndpoints(IEnumerable<ServiceEndpoint> en
return groupedEndpoints.ToArray();
}

protected async Task AddServiceEndpointsAsync(IReadOnlyList<ServiceEndpoint> endpoints, CancellationToken cancellationToken)
{
if (endpoints.Count > 0)
{
try
{
var hubEndpoints = CreateHubServiceEndpoints(endpoints, true);

await Task.WhenAll(hubEndpoints.Select(e => AddHubServiceEndpointAsync(e, cancellationToken)));

// TODO: update local store for negotiation
}
catch (Exception ex)
{
Log.FailedAddingEndpoints(_logger, ex);
}
}
}

protected async Task RemoveServiceEndpointsAsync(IReadOnlyList<ServiceEndpoint> endpoints, CancellationToken cancellationToken)
{
if (endpoints.Count > 0)
{
try
{
var hubEndpoints = CreateHubServiceEndpoints(endpoints, true);

// TODO: update local store for negotiation

await Task.WhenAll(hubEndpoints.Select(e => RemoveHubServiceEndpointAsync(e, cancellationToken)));
}
catch (Exception ex)
{
Log.FailedRemovingEndpoints(_logger, ex);
}
}
}

protected Task RenameSerivceEndpoints(IReadOnlyList<ServiceEndpoint> endpoints)
{
if (endpoints.Count > 0)
{
try
{
var hubEndpoints = CreateHubServiceEndpoints(endpoints, false);

// TODO: update local store for negotiation

return Task.WhenAll(hubEndpoints.Select(e => RenameHubServiceEndpoint(e)));
}
catch (Exception ex)
{
Log.FailedRenamingEndpoint(_logger, ex);
}
}
return Task.CompletedTask;
}

private HubServiceEndpoint CreateHubServiceEndpoint(string hub, ServiceEndpoint endpoint, bool needScaleTcs = false)
{
var provider = GetEndpointProvider(endpoint);

return new HubServiceEndpoint(hub, provider, endpoint, needScaleTcs);
}

private IReadOnlyList<HubServiceEndpoint> CreateHubServiceEndpoints(string hub, IEnumerable<ServiceEndpoint> endpoints, bool needScaleTcs)
{
return endpoints.Select(e => CreateHubServiceEndpoint(hub, e, needScaleTcs)).ToList();
}

private IReadOnlyList<HubServiceEndpoint> CreateHubServiceEndpoints(IEnumerable<ServiceEndpoint> endpoints, bool needScaleTcs)
{
var hubEndpoints = new List<HubServiceEndpoint>();
var hubs = _endpointsPerHub.Keys;
foreach (var hub in hubs)
{
hubEndpoints.AddRange(CreateHubServiceEndpoints(hub, endpoints, needScaleTcs));
}
return hubEndpoints;
}

private async Task AddHubServiceEndpointAsync(HubServiceEndpoint endpoint, CancellationToken cancellationToken)
{
Log.StartAddingEndpoint(_logger, endpoint.Endpoint, endpoint.Name);

OnAdd?.Invoke(endpoint);

// Wait for new endpoint turn Ready or timeout getting cancelled
await Task.WhenAny(endpoint.ScaleTask, cancellationToken.AsTask());

// Set complete
endpoint.CompleteScale();
}

private async Task RemoveHubServiceEndpointAsync(HubServiceEndpoint endpoint, CancellationToken cancellationToken)
{
Log.StartRemovingEndpoint(_logger, endpoint.Endpoint, endpoint.Name);

OnRemove?.Invoke(endpoint);

// Wait for endpoint turn offline or timeout getting cancelled
await Task.WhenAny(endpoint.ScaleTask, cancellationToken.AsTask());

// Set complete
endpoint.CompleteScale();
}

private Task RenameHubServiceEndpoint(HubServiceEndpoint endpoint)
{
Log.StartRenamingEndpoint(_logger, endpoint.Endpoint, endpoint.Name);

OnRename?.Invoke(endpoint);
return Task.CompletedTask;
}

private static class Log
{
private static readonly Action<ILogger, int, string, string, Exception> _duplicateEndpointFound =
LoggerMessage.Define<int, string, string>(LogLevel.Warning, new EventId(1, "DuplicateEndpointFound"), "{count} endpoint configurations to '{endpoint}' found, use '{name}'.");

private static readonly Action<ILogger, string, string, Exception> _startAddingEndpoint =
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(2, "StartAddingEndpoint"), "Start adding endpoint: '{endpoint}', name: '{name}'.");

private static readonly Action<ILogger, string, string, Exception> _startRemovingEndpoint =
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(3, "StartRemovingEndpoint"), "Start removing endpoint: '{endpoint}', name: '{name}'");

private static readonly Action<ILogger, string, string, Exception> _startRenamingEndpoint =
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(4, "StartRenamingEndpoint"), "Start renaming endpoint: '{endpoint}', name: '{name}'");

private static readonly Action<ILogger, Exception> _failedAddingEndpoints =
LoggerMessage.Define(LogLevel.Error, new EventId(5, "FailedAddingEndpoints"), "Failed adding endpoints.");

private static readonly Action<ILogger, Exception> _failedRemovingEndpoints =
LoggerMessage.Define(LogLevel.Error, new EventId(6, "FailedRemovingEndpoints"), "Failed removing endpoints.");

private static readonly Action<ILogger, Exception> _failedRenamingEndpoints =
LoggerMessage.Define(LogLevel.Error, new EventId(7, "StartRenamingEndpoints"), "Failed renaming endpoints.");

private static readonly Action<ILogger, int, Exception> _timeoutWaitingForScale =
LoggerMessage.Define<int>(LogLevel.Error, new EventId(8, "TimeoutWaitingForScale"), "Timeout waiting '{timeout}' seconds for connection operations when scale endpoint.");

public static void DuplicateEndpointFound(ILogger logger, int count, string endpoint, string name)
{
_duplicateEndpointFound(logger, count, endpoint, name, null);
}

public static void StartAddingEndpoint(ILogger logger, string endpoint, string name)
{
_startAddingEndpoint(logger, endpoint, name, null);
}

public static void StartRemovingEndpoint(ILogger logger, string endpoint, string name)
{
_startRemovingEndpoint(logger, endpoint, name, null);
}

public static void StartRenamingEndpoint(ILogger logger, string endpoint, string name)
{
_startRenamingEndpoint(logger, endpoint, name, null);
}

public static void FailedAddingEndpoints(ILogger logger, Exception ex)
{
_failedAddingEndpoints(logger, ex);
}

public static void FailedRemovingEndpoints(ILogger logger, Exception ex)
{
_failedRemovingEndpoints(logger, ex);
}

public static void FailedRenamingEndpoint(ILogger logger, Exception ex)
{
_failedRenamingEndpoints(logger, ex);
}

public static void TimeoutWaitingForScale(ILogger logger, int timeout)
{
_timeoutWaitingForScale(logger, timeout, null);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@

namespace Microsoft.Azure.SignalR
{
internal delegate void EndpointEventHandler(HubServiceEndpoint endpoint);

internal interface IServiceEndpointManager
{
IServiceEndpointProvider GetEndpointProvider(ServiceEndpoint endpoint);

ServiceEndpoint[] Endpoints { get; }

IReadOnlyList<HubServiceEndpoint> GetEndpoints(string hub);

event EndpointEventHandler OnAdd;

event EndpointEventHandler OnRemove;

event EndpointEventHandler OnRename;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ internal class MultiEndpointServiceConnectionContainer : IMultiEndpointServiceCo
private readonly ConcurrentDictionary<ServiceEndpoint, IServiceConnectionContainer> _connectionContainers =
new ConcurrentDictionary<ServiceEndpoint, IServiceConnectionContainer>();

private readonly string _hubName;
private readonly IMessageRouter _router;
private readonly ILogger _logger;
private readonly IServiceEndpointManager _serviceEndpointManager;

// <needRouter, endpoints>
private (bool needRouter, IReadOnlyList<HubServiceEndpoint> endpoints) _routerEndpoints;
Expand All @@ -40,9 +42,11 @@ internal MultiEndpointServiceConnectionContainer(
throw new ArgumentNullException(nameof(generator));
}

_hubName = hub;
_router = router ?? throw new ArgumentNullException(nameof(router));
_logger = loggerFactory?.CreateLogger<MultiEndpointServiceConnectionContainer>() ?? throw new ArgumentNullException(nameof(loggerFactory));

_serviceEndpointManager = endpointManager;

// provides a copy to the endpoint per container
var endpoints = endpointManager.GetEndpoints(hub);
// router will be used when there's customized MessageRouter or multiple endpoints
Expand All @@ -53,7 +57,11 @@ internal MultiEndpointServiceConnectionContainer(
foreach (var endpoint in endpoints)
{
_connectionContainers[endpoint] = generator(endpoint);
}
}

_serviceEndpointManager.OnAdd += OnAdd;
_serviceEndpointManager.OnRemove += OnRemove;
_serviceEndpointManager.OnRename += OnRename;
}

public MultiEndpointServiceConnectionContainer(
Expand Down Expand Up @@ -242,6 +250,51 @@ private Task WriteMultiEndpointMessageAsync(ServiceMessage serviceMessage, Func<
return Task.WhenAll(routed);
}

private void OnAdd(HubServiceEndpoint endpoint)
{
if (!endpoint.Hub.Equals(_hubName, StringComparison.OrdinalIgnoreCase))
{
return;
}
_ = AddHubServiceEndpointAsync(endpoint);
}

private Task AddHubServiceEndpointAsync(HubServiceEndpoint endpoint)
{
// TODO: create container and trigger server ping.

// do tasks when !endpoint.ScaleTask.IsCanceled or local timeout check not finish
endpoint.CompleteScale();
return Task.CompletedTask;
}

private void OnRemove(HubServiceEndpoint endpoint)
{
if (!endpoint.Hub.Equals(_hubName, StringComparison.OrdinalIgnoreCase))
{
return;
}
_ = RemoveHubServiceEndpointAsync(endpoint);
}

private Task RemoveHubServiceEndpointAsync(HubServiceEndpoint endpoint)
{
// TODO: trigger offline ping and wait to remove container.

// finally set task complete when timeout
endpoint.CompleteScale();
return Task.CompletedTask;
}

private void OnRename(HubServiceEndpoint endpoint)
{
if (!endpoint.Hub.Equals(_hubName, StringComparison.OrdinalIgnoreCase))
{
return;
}
// TODO: update local store names
}

private static class Log
{
private static readonly Action<ILogger, string, Exception> _startingConnection =
Expand Down
Loading

0 comments on commit 5f1f13f

Please sign in to comment.