Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Part3.2][Live-scale] add interfaces for scale ServiceEndpoint #832

Merged
merged 5 commits into from
Mar 2, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Microsoft.Azure.SignalR.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal static class Constants
public const string ApplicationNameDefaultKey = "Azure:SignalR:ApplicationName";

public const int DefaultShutdownTimeoutInSeconds = 30;
public const int DefaultScaleTimeoutInSeconds = 300;

public const string AsrsMigrateFrom = "Asrs-Migrate-From";
public const string AsrsMigrateTo = "Asrs-Migrate-To";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,40 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Threading.Tasks;

namespace Microsoft.Azure.SignalR
{
internal class HubServiceEndpoint : ServiceEndpoint
{
public HubServiceEndpoint(string hub, IServiceEndpointProvider provider, ServiceEndpoint endpoint) : base(endpoint)
private readonly TaskCompletionSource<bool> _scaleTcs;

public HubServiceEndpoint(
string hub,
IServiceEndpointProvider provider,
ServiceEndpoint endpoint,
bool needScaleTcs = false
) : base(endpoint)
{
Hub = hub;
Provider = provider;
_scaleTcs = needScaleTcs ? new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously) : null;
}

internal HubServiceEndpoint() : base() { }

public string Hub { get; }

public IServiceEndpointProvider Provider { get; }

/// <summary>
/// Task waiting for HubServiceEndpoint turn ready when live add/remove endpoint
/// </summary>
public Task ScaleTask => _scaleTcs?.Task ?? Task.CompletedTask;

public void SetScaleTaskComplete()
{
_scaleTcs?.TrySetResult(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.Azure.SignalR.Common;
using Microsoft.Extensions.Logging;

Expand All @@ -20,6 +23,10 @@ internal abstract class ServiceEndpointManagerBase : IServiceEndpointManager
// Filtered valuable endpoints from ServiceOptions
public ServiceEndpoint[] Endpoints { get; protected set; }

public event EndpointEventHandler OnAdd;
public event EndpointEventHandler OnRemove;
public event EndpointEventHandler OnRename;

protected ServiceEndpointManagerBase(IServiceEndpointOptions options, ILogger logger)
: this(GetEndpoints(options), logger)
{
Expand All @@ -43,11 +50,7 @@ internal ServiceEndpointManagerBase(IEnumerable<ServiceEndpoint> endpoints, ILog

public IReadOnlyList<HubServiceEndpoint> GetEndpoints(string hub)
{
return _endpointsPerHub.GetOrAdd(hub, s => Endpoints.Select(e =>
{
var provider = GetEndpointProvider(e);
return new HubServiceEndpoint(hub, provider, e);
}).ToArray());
return _endpointsPerHub.GetOrAdd(hub, s => Endpoints.Select(e => CreateHubServiceEndpoint(hub, e)).ToArray());
}

protected static IEnumerable<ServiceEndpoint> GetEndpoints(IServiceEndpointOptions options)
Expand Down Expand Up @@ -96,15 +99,186 @@ protected ServiceEndpoint[] GetValuableEndpoints(IEnumerable<ServiceEndpoint> en
return groupedEndpoints.ToArray();
}

protected async Task AddServiceEndpointsAsync(IReadOnlyList<ServiceEndpoint> endpoints, CancellationToken cancellationToken)
{
if (endpoints.Count > 0)
{
try
{
var hubEndpoints = CreateHubServiceEndpoints(endpoints, true);

await Task.WhenAll(hubEndpoints.Select(e => AddHubServiceEndpointAsync(e, cancellationToken)));

// TODO: update local store for negotiation
}
catch (Exception ex)
{
Log.FailedAddingEndpoints(_logger, ex);
}
}
}

protected async Task RemoveServiceEndpointsAsync(IReadOnlyList<ServiceEndpoint> endpoints, CancellationToken cancellationToken)
{
if (endpoints.Count > 0)
{
try
{
var hubEndpoints = CreateHubServiceEndpoints(endpoints, true);

// TODO: update local store for negotiation

await Task.WhenAll(hubEndpoints.Select(e => RemoveHubServiceEndpointAsync(e, cancellationToken)));
}
catch (Exception ex)
{
Log.FailedRemovingEndpoints(_logger, ex);
}
}
}

protected Task RenameSerivceEndpoints(IReadOnlyList<ServiceEndpoint> endpoints)
{
if (endpoints.Count > 0)
{
try
{
var hubEndpoints = CreateHubServiceEndpoints(endpoints, false);

// TODO: update local store for negotiation

return Task.WhenAll(hubEndpoints.Select(e => RenameHubServiceEndpoint(e)));
}
catch (Exception ex)
{
Log.FailedRenamingEndpoint(_logger, ex);
}
}
return Task.CompletedTask;
}

private HubServiceEndpoint CreateHubServiceEndpoint(string hub, ServiceEndpoint endpoint, bool needScaleTcs = false)
{
var provider = GetEndpointProvider(endpoint);

return new HubServiceEndpoint(hub, provider, endpoint, needScaleTcs);
}

private IReadOnlyList<HubServiceEndpoint> CreateHubServiceEndpoints(string hub, IEnumerable<ServiceEndpoint> endpoints, bool needScaleTcs)
{
return endpoints.Select(e => CreateHubServiceEndpoint(hub, e, needScaleTcs)).ToList();
}

private IReadOnlyList<HubServiceEndpoint> CreateHubServiceEndpoints(IEnumerable<ServiceEndpoint> endpoints, bool needScaleTcs)
{
var hubEndpoints = new List<HubServiceEndpoint>();
var hubs = _endpointsPerHub.Keys;
foreach (var hub in hubs)
{
hubEndpoints.AddRange(CreateHubServiceEndpoints(hub, endpoints, needScaleTcs));
}
return hubEndpoints;
}

private async Task AddHubServiceEndpointAsync(HubServiceEndpoint endpoint, CancellationToken cancellationToken)
{
Log.StartAddingEndpoint(_logger, endpoint.Endpoint, endpoint.Name);

OnAdd?.Invoke(endpoint);

// Wait for new endpoint turn Ready or timeout getting cancelled
await Task.WhenAny(endpoint.ScaleTask, cancellationToken.AsTask());

// Set complete
endpoint.SetScaleTaskComplete();
}

private async Task RemoveHubServiceEndpointAsync(HubServiceEndpoint endpoint, CancellationToken cancellationToken)
{
Log.StartRemovingEndpoint(_logger, endpoint.Endpoint, endpoint.Name);

OnRemove?.Invoke(endpoint);

// Wait for endpoint turn offline or timeout getting cancelled
await Task.WhenAny(endpoint.ScaleTask, cancellationToken.AsTask());

// Set complete
endpoint.SetScaleTaskComplete();
}

private Task RenameHubServiceEndpoint(HubServiceEndpoint endpoint)
{
Log.StartRenamingEndpoint(_logger, endpoint.Endpoint, endpoint.Name);

OnRename?.Invoke(endpoint);
return Task.CompletedTask;
}

private static class Log
{
private static readonly Action<ILogger, int, string, string, Exception> _duplicateEndpointFound =
LoggerMessage.Define<int, string, string>(LogLevel.Warning, new EventId(1, "DuplicateEndpointFound"), "{count} endpoint configurations to '{endpoint}' found, use '{name}'.");

private static readonly Action<ILogger, string, string, Exception> _startAddingEndpoint =
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(2, "StartAddingEndpoint"), "Start adding endpoint: '{endpoint}', name: '{name}'.");

private static readonly Action<ILogger, string, string, Exception> _startRemovingEndpoint =
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(3, "StartRemovingEndpoint"), "Start removing endpoint: '{endpoint}', name: '{name}'");

private static readonly Action<ILogger, string, string, Exception> _startRenamingEndpoint =
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(4, "StartRenamingEndpoint"), "Start renaming endpoint: '{endpoint}', name: '{name}'");

private static readonly Action<ILogger, Exception> _failedAddingEndpoints =
LoggerMessage.Define(LogLevel.Error, new EventId(5, "FailedAddingEndpoints"), "Failed adding endpoints.");

private static readonly Action<ILogger, Exception> _failedRemovingEndpoints =
LoggerMessage.Define(LogLevel.Error, new EventId(6, "FailedRemovingEndpoints"), "Failed removing endpoints.");

private static readonly Action<ILogger, Exception> _failedRenamingEndpoints =
LoggerMessage.Define(LogLevel.Error, new EventId(7, "StartRenamingEndpoints"), "Failed renaming endpoints.");

private static readonly Action<ILogger, int, Exception> _timeoutWaitingForScale =
LoggerMessage.Define<int>(LogLevel.Error, new EventId(8, "TimeoutWaitingForScale"), "Timeout waiting '{timeout}' seconds for connection operations when scale endpoint.");

public static void DuplicateEndpointFound(ILogger logger, int count, string endpoint, string name)
{
_duplicateEndpointFound(logger, count, endpoint, name, null);
}

public static void StartAddingEndpoint(ILogger logger, string endpoint, string name)
{
_startAddingEndpoint(logger, endpoint, name, null);
}

public static void StartRemovingEndpoint(ILogger logger, string endpoint, string name)
{
_startRemovingEndpoint(logger, endpoint, name, null);
}

public static void StartRenamingEndpoint(ILogger logger, string endpoint, string name)
{
_startRenamingEndpoint(logger, endpoint, name, null);
}

public static void FailedAddingEndpoints(ILogger logger, Exception ex)
{
_failedAddingEndpoints(logger, ex);
}

public static void FailedRemovingEndpoints(ILogger logger, Exception ex)
{
_failedRemovingEndpoints(logger, ex);
}

public static void FailedRenamingEndpoint(ILogger logger, Exception ex)
{
_failedRenamingEndpoints(logger, ex);
}

public static void TimeoutWaitingForScale(ILogger logger, int timeout)
{
_timeoutWaitingForScale(logger, timeout, null);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@

namespace Microsoft.Azure.SignalR
{
internal delegate void EndpointEventHandler(HubServiceEndpoint endpoint);

internal interface IServiceEndpointManager
{
IServiceEndpointProvider GetEndpointProvider(ServiceEndpoint endpoint);

ServiceEndpoint[] Endpoints { get; }

IReadOnlyList<HubServiceEndpoint> GetEndpoints(string hub);

event EndpointEventHandler OnAdd;

event EndpointEventHandler OnRemove;

event EndpointEventHandler OnRename;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ internal class MultiEndpointServiceConnectionContainer : IMultiEndpointServiceCo
private readonly ConcurrentDictionary<ServiceEndpoint, IServiceConnectionContainer> _connectionContainers =
new ConcurrentDictionary<ServiceEndpoint, IServiceConnectionContainer>();

private readonly string _hubName;
private readonly IMessageRouter _router;
private readonly ILogger _logger;
private readonly IServiceEndpointManager _serviceEndpointManager;

// <needRouter, endpoints>
private (bool needRouter, IReadOnlyList<HubServiceEndpoint> endpoints) _routerEndpoints;
Expand All @@ -40,9 +42,11 @@ internal MultiEndpointServiceConnectionContainer(
throw new ArgumentNullException(nameof(generator));
}

_hubName = hub;
_router = router ?? throw new ArgumentNullException(nameof(router));
_logger = loggerFactory?.CreateLogger<MultiEndpointServiceConnectionContainer>() ?? throw new ArgumentNullException(nameof(loggerFactory));

_serviceEndpointManager = endpointManager;

// provides a copy to the endpoint per container
var endpoints = endpointManager.GetEndpoints(hub);
// router will be used when there's customized MessageRouter or multiple endpoints
Expand All @@ -53,7 +57,11 @@ internal MultiEndpointServiceConnectionContainer(
foreach (var endpoint in endpoints)
{
_connectionContainers[endpoint] = generator(endpoint);
}
}

_serviceEndpointManager.OnAdd += OnAdd;
_serviceEndpointManager.OnRemove += OnRemove;
_serviceEndpointManager.OnRename += OnRename;
}

public MultiEndpointServiceConnectionContainer(
Expand Down Expand Up @@ -242,6 +250,51 @@ private Task WriteMultiEndpointMessageAsync(ServiceMessage serviceMessage, Func<
return Task.WhenAll(routed);
}

private void OnAdd(HubServiceEndpoint endpoint)
{
if (!endpoint.Hub.Equals(_hubName, StringComparison.OrdinalIgnoreCase))
{
return;
}
_ = AddHubServiceEndpointAsync(endpoint);
}

private Task AddHubServiceEndpointAsync(HubServiceEndpoint endpoint)
{
// TODO: create container and trigger server ping.

// do tasks when !endpoint.ScaleTask.IsCanceled or local timeout check not finish
endpoint.SetScaleTaskComplete();
return Task.CompletedTask;
}

private void OnRemove(HubServiceEndpoint endpoint)
{
if (!endpoint.Hub.Equals(_hubName, StringComparison.OrdinalIgnoreCase))
{
return;
}
_ = RemoveHubServiceEndpointAsync(endpoint);
}

private Task RemoveHubServiceEndpointAsync(HubServiceEndpoint endpoint)
{
// TODO: trigger offline ping and wait to remove container.

// finally set task complete when timeout
endpoint.SetScaleTaskComplete();
return Task.CompletedTask;
}

private void OnRename(HubServiceEndpoint endpoint)
{
if (!endpoint.Hub.Equals(_hubName, StringComparison.OrdinalIgnoreCase))
{
return;
}
// TODO: update local store names
}

private static class Log
{
private static readonly Action<ILogger, string, Exception> _startingConnection =
Expand Down
Loading