diff --git a/src/Surging.Core/Surging.Core.CPlatform/AppConfig.cs b/src/Surging.Core/Surging.Core.CPlatform/AppConfig.cs index 2bfeedff2..c0833cf86 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/AppConfig.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/AppConfig.cs @@ -21,6 +21,9 @@ public class AppConfig public static IConfigurationRoot Configuration { get; internal set; } + /// + /// 负载均衡模式 + /// public static AddressSelectorMode LoadBalanceMode { get diff --git a/src/Surging.Core/Surging.Core.CPlatform/ContainerBuilderExtensions.cs b/src/Surging.Core/Surging.Core.CPlatform/ContainerBuilderExtensions.cs index fd28b5898..021374120 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/ContainerBuilderExtensions.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/ContainerBuilderExtensions.cs @@ -552,7 +552,7 @@ public static IServiceBuilder RegisterServiceBus services.RegisterAssemblyTypes(assembly) .Where(t => typeof(IIntegrationEventHandler).GetTypeInfo().IsAssignableFrom(t)).AsImplementedInterfaces().SingleInstance(); services.RegisterAssemblyTypes(assembly) - .Where(t => typeof(IIntegrationEventHandler).IsAssignableFrom(t)).SingleInstance(); + .Where(t => typeof(IIntegrationEventHandler).IsAssignableFrom(t)).SingleInstance(); } return builder; } @@ -562,8 +562,7 @@ public static IServiceBuilder RegisterServiceBus /// /// IOC容器 /// 返回注册模块信息 - public static IServiceBuilder RegisterRepositories - (this IServiceBuilder builder, params string[] virtualPaths) + public static IServiceBuilder RegisterRepositories(this IServiceBuilder builder, params string[] virtualPaths) { var services = builder.Services; var referenceAssemblies = GetAssemblies(virtualPaths); @@ -576,8 +575,7 @@ public static IServiceBuilder RegisterRepositories return builder; } - public static IServiceBuilder RegisterModules( - this IServiceBuilder builder, params string[] virtualPaths) + public static IServiceBuilder RegisterModules(this IServiceBuilder builder, params string[] virtualPaths) { var services = builder.Services; var referenceAssemblies = GetAssemblies(virtualPaths); @@ -600,7 +598,7 @@ public static IServiceBuilder RegisterModules( }); } builder.Services.Register(provider => new ModuleProvider( - _modules,virtualPaths, provider.Resolve>(), provider.Resolve() + _modules, virtualPaths, provider.Resolve>(), provider.Resolve() )).As().SingleInstance(); return builder; } @@ -721,7 +719,7 @@ private static List GetAbstractModules(Assembly assembly) private static string[] GetFilterAssemblies(string[] assemblyNames) { - var notRelatedFile = AppConfig.ServerOptions.NotRelatedAssemblyFiles; + var notRelatedFile = AppConfig.ServerOptions.NotRelatedAssemblyFiles; var relatedFile = AppConfig.ServerOptions.RelatedAssemblyFiles; var pattern = string.Format("^Microsoft.\\w*|^System.\\w*|^DotNetty.\\w*|^runtime.\\w*|^ZooKeeperNetEx\\w*|^StackExchange.Redis\\w*|^Consul\\w*|^Newtonsoft.Json.\\w*|^Autofac.\\w*{0}", string.IsNullOrEmpty(notRelatedFile) ? "" : $"|{notRelatedFile}"); @@ -738,7 +736,7 @@ private static string[] GetFilterAssemblies(string[] assemblyNames) return assemblyNames.Where( name => !notRelatedRegex.IsMatch(name)).ToArray(); -} + } } private static List GetAllAssemblyFiles(string parentDir) diff --git a/src/Surging.Core/Surging.Core.CPlatform/Runtime/Client/Address/Resolvers/Implementation/Selectors/Implementation/AddressSelectorMode.cs b/src/Surging.Core/Surging.Core.CPlatform/Runtime/Client/Address/Resolvers/Implementation/Selectors/Implementation/AddressSelectorMode.cs index 21d0dfa68..95862c9de 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/Runtime/Client/Address/Resolvers/Implementation/Selectors/Implementation/AddressSelectorMode.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/Runtime/Client/Address/Resolvers/Implementation/Selectors/Implementation/AddressSelectorMode.cs @@ -4,11 +4,26 @@ namespace Surging.Core.CPlatform.Runtime.Client.Address.Resolvers.Implementation.Selectors.Implementation { + /// + /// 负载均衡模式 + /// public enum AddressSelectorMode { + /// + /// Hash算法 + /// HashAlgorithm, + /// + /// 轮训 + /// Polling, + /// + /// 随机 + /// Random, + /// + /// 压力最小优先 + /// FairPolling, } } diff --git a/src/Surging.Core/Surging.Core.CPlatform/ServiceDescriptor.cs b/src/Surging.Core/Surging.Core.CPlatform/ServiceDescriptor.cs index 680e20c5e..3d62aba1f 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/ServiceDescriptor.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/ServiceDescriptor.cs @@ -31,7 +31,6 @@ public static string GroupName(this ServiceDescriptor descriptor) public static ServiceDescriptor GroupName(this ServiceDescriptor descriptor, string groupName) { descriptor.Metadatas["GroupName"] = groupName; - return descriptor; } @@ -198,7 +197,6 @@ public ServiceDescriptor() /// public string RoutePath { get; set; } - /// /// 元数据。 /// diff --git a/src/Surging.Core/Surging.Core.CPlatform/ServiceHostBuilderExtensions.cs b/src/Surging.Core/Surging.Core.CPlatform/ServiceHostBuilderExtensions.cs index 49749bc0f..4d99e80c9 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/ServiceHostBuilderExtensions.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/ServiceHostBuilderExtensions.cs @@ -1,5 +1,4 @@ - -using Autofac; +using Autofac; using Surging.Core.CPlatform.Support; using System.Linq; using Surging.Core.CPlatform.Routing; @@ -33,10 +32,10 @@ public static IServiceHostBuilder UseServer(this IServiceHostBuilder hostBuilder BuildServiceEngine(mapper); await mapper.Resolve().SetServiceCommandsAsync(); string serviceToken = mapper.Resolve().GeneratorToken(token); - int _port = AppConfig.ServerOptions.Port= AppConfig.ServerOptions.Port == 0 ? port : AppConfig.ServerOptions.Port; + int _port = AppConfig.ServerOptions.Port = AppConfig.ServerOptions.Port == 0 ? port : AppConfig.ServerOptions.Port; string _ip = AppConfig.ServerOptions.Ip = AppConfig.ServerOptions.Ip ?? ip; _port = AppConfig.ServerOptions.Port = AppConfig.ServerOptions.IpEndpoint?.Port ?? _port; - _ip = AppConfig.ServerOptions.Ip =AppConfig.ServerOptions.IpEndpoint?.Address.ToString() ?? _ip; + _ip = AppConfig.ServerOptions.Ip = AppConfig.ServerOptions.IpEndpoint?.Address.ToString() ?? _ip; _ip = NetUtils.GetHostAddress(_ip); await ConfigureRoute(mapper, serviceToken); @@ -56,7 +55,7 @@ public static IServiceHostBuilder UseServer(this IServiceHostBuilder hostBuilder var serverOptions = new SurgingServerOptions(); options.Invoke(serverOptions); AppConfig.ServerOptions = serverOptions; - return hostBuilder.UseServer(serverOptions.Ip,serverOptions.Port,serverOptions.Token); + return hostBuilder.UseServer(serverOptions.Ip, serverOptions.Port, serverOptions.Token); } public static IServiceHostBuilder UseClient(this IServiceHostBuilder hostBuilder) @@ -70,9 +69,9 @@ public static IServiceHostBuilder UseClient(this IServiceHostBuilder hostBuilder return new ServiceSubscriber { Address = new[] { new IpAddressModel { - Ip = Dns.GetHostEntry(Dns.GetHostName()) - .AddressList.FirstOrDefault - (a => a.AddressFamily.ToString().Equals("InterNetwork")).ToString() } }, + Ip = Dns.GetHostEntry(Dns.GetHostName()) + .AddressList.FirstOrDefault + (a => a.AddressFamily.ToString().Equals("InterNetwork")).ToString() } }, ServiceDescriptor = i.Descriptor }; }).ToList(); @@ -88,14 +87,14 @@ public static void BuildServiceEngine(IContainer container) var builder = new ContainerBuilder(); container.Resolve().Build(builder); - var configBuilder= container.Resolve(); + var configBuilder = container.Resolve(); var appSettingPath = Path.Combine(AppConfig.ServerOptions.RootPath, "appsettings.json"); - configBuilder.AddCPlatformFile("${appsettingspath}|"+ appSettingPath, optional: false, reloadOnChange: true); + configBuilder.AddCPlatformFile("${appsettingspath}|" + appSettingPath, optional: false, reloadOnChange: true); builder.Update(container); } } - public static async Task ConfigureRoute(IContainer mapper,string serviceToken) + public static async Task ConfigureRoute(IContainer mapper, string serviceToken) { if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.Tcp || AppConfig.ServerOptions.Protocol == CommunicationProtocol.None) @@ -106,9 +105,9 @@ public static async Task ConfigureRoute(IContainer mapper,string serviceToken) async () => await routeProvider.RegisterRoutes( Math.Round(Convert.ToDecimal(Process.GetCurrentProcess().TotalProcessorTime.TotalSeconds), 2, MidpointRounding.AwayFromZero))); else - await routeProvider.RegisterRoutes(0); + await routeProvider.RegisterRoutes(0); } } - + } } diff --git a/src/Surging.Core/Surging.Core.CPlatform/Support/Implementation/BreakeRemoteInvokeService.cs b/src/Surging.Core/Surging.Core.CPlatform/Support/Implementation/BreakeRemoteInvokeService.cs index c65ca2693..5d9bcac97 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/Support/Implementation/BreakeRemoteInvokeService.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/Support/Implementation/BreakeRemoteInvokeService.cs @@ -24,7 +24,7 @@ public class BreakeRemoteInvokeService : IBreakeRemoteInvokeService private readonly ILogger _logger; private readonly ConcurrentDictionary _serviceInvokeListenInfo = new ConcurrentDictionary(); private readonly IHashAlgorithm _hashAlgorithm; - private readonly IEnumerable exceptionFilters=new List(); + private readonly IEnumerable exceptionFilters = new List(); public BreakeRemoteInvokeService(IHashAlgorithm hashAlgorithm, IServiceCommandProvider commandProvider, ILogger logger, IRemoteInvokeService remoteInvokeService, @@ -35,15 +35,18 @@ public BreakeRemoteInvokeService(IHashAlgorithm hashAlgorithm, IServiceCommandPr _logger = logger; _hashAlgorithm = hashAlgorithm; if (serviceProvider.Current.IsRegistered()) - exceptionFilters= serviceProvider.GetInstances>(); + exceptionFilters = serviceProvider.GetInstances>(); } public async Task InvokeAsync(IDictionary parameters, string serviceId, string serviceKey, bool decodeJOject) { var serviceInvokeInfos = _serviceInvokeListenInfo.GetOrAdd(serviceId, - new ServiceInvokeListenInfo() { FirstInvokeTime=DateTime.Now, - FinalRemoteInvokeTime =DateTime.Now }); - var vt = _commandProvider.GetCommand(serviceId); + new ServiceInvokeListenInfo() + { + FirstInvokeTime = DateTime.Now, + FinalRemoteInvokeTime = DateTime.Now + }); + var vt = _commandProvider.GetCommand(serviceId); var command = vt.IsCompletedSuccessfully ? vt.Result : await vt; var intervalSeconds = (DateTime.Now - serviceInvokeInfos.FinalRemoteInvokeTime).TotalSeconds; bool reachConcurrentRequest() => serviceInvokeInfos.ConcurrentRequests > command.MaxConcurrentRequests; @@ -51,7 +54,7 @@ bool reachRequestVolumeThreshold() => intervalSeconds <= 10 && serviceInvokeInfos.SinceFaultRemoteServiceRequests > command.BreakerRequestVolumeThreshold; bool reachErrorThresholdPercentage() => (double)serviceInvokeInfos.FaultRemoteServiceRequests / (double)(serviceInvokeInfos.RemoteServiceRequests ?? 1) * 100 > command.BreakeErrorThresholdPercentage; - var item = GetHashItem(command,parameters); + var item = GetHashItem(command, parameters); if (command.BreakerForceClosed) { _serviceInvokeListenInfo.AddOrUpdate(serviceId, new ServiceInvokeListenInfo(), (k, v) => { v.LocalServiceRequests++; return v; }); @@ -78,7 +81,7 @@ bool reachErrorThresholdPercentage() => } } - private async Task MonitorRemoteInvokeAsync(IDictionary parameters, string serviceId, string serviceKey, bool decodeJOject, int requestTimeout,string item) + private async Task MonitorRemoteInvokeAsync(IDictionary parameters, string serviceId, string serviceKey, bool decodeJOject, int requestTimeout, string item) { CancellationTokenSource source = new CancellationTokenSource(); var token = source.Token; @@ -101,7 +104,7 @@ private async Task MonitorRemoteInvokeAsync(IDictiona }); var message = await _remoteInvokeService.InvokeAsync(new RemoteInvokeContext { - Item=item , + Item = item, InvokeMessage = invokeMessage }, requestTimeout); _serviceInvokeListenInfo.AddOrUpdate(serviceId, new ServiceInvokeListenInfo(), (k, v) => @@ -111,7 +114,7 @@ private async Task MonitorRemoteInvokeAsync(IDictiona }); return message; } - catch(Exception ex) + catch (Exception ex) { _serviceInvokeListenInfo.AddOrUpdate(serviceId, new ServiceInvokeListenInfo(), (k, v) => { @@ -141,10 +144,10 @@ await filter.ExecuteExceptionFilterAsync(new RpcActionExecutedContext private string GetHashItem(ServiceCommand command, IDictionary parameters) { string result = ""; - if(command.ShuntStrategy==AddressSelectorMode.HashAlgorithm) + if (command.ShuntStrategy == AddressSelectorMode.HashAlgorithm) { var parameter = parameters.Values.FirstOrDefault(); - result= parameter?.ToString(); + result = parameter?.ToString(); } return result; } diff --git a/src/Surging.Core/Surging.Core.CPlatform/Support/ServiceCommand.cs b/src/Surging.Core/Surging.Core.CPlatform/Support/ServiceCommand.cs index 23a68ed13..8f4a680d6 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/Support/ServiceCommand.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/Support/ServiceCommand.cs @@ -25,7 +25,13 @@ public ServiceCommand() FallBackName = AppConfig.ServerOptions.FallBackName; } } + /// + /// 故障转移次数 + /// public int FailoverCluster { get; set; } = 3; + /// + /// 是否强制开启熔断 + /// public bool CircuitBreakerForceOpen { get; set; } /// /// 容错策略 @@ -45,7 +51,6 @@ public ServiceCommand() /// public string Injection { get; set; } = "return null"; - /// /// IFallbackInvoker 实例名称 /// @@ -55,9 +60,10 @@ public ServiceCommand() /// [JsonConverter(typeof(StringEnumConverter))] public AddressSelectorMode ShuntStrategy { get; set; } = AddressSelectorMode.Polling; - + /// + /// 注入命名空间 + /// public string[] InjectionNamespaces { get; set; } - /// /// 错误率达到多少开启熔断保护 /// @@ -67,12 +73,12 @@ public ServiceCommand() /// public int BreakeSleepWindowInMilliseconds { get; set; } = 60000; /// - /// 是否强制关闭熔断 + /// 是否强制关闭熔断 /// public bool BreakerForceClosed { get; set; } /// - /// 10秒钟内至少多少请求失败,熔断器才发挥起作用 + /// 10秒钟内至少多少请求失败,熔断器才发挥起作用 /// public int BreakerRequestVolumeThreshold { get; set; } = 20; diff --git a/src/Surging.Core/Surging.Core.CPlatform/Support/StrategyType.cs b/src/Surging.Core/Surging.Core.CPlatform/Support/StrategyType.cs index ff9a8f85d..183ecfc93 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/Support/StrategyType.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/Support/StrategyType.cs @@ -1,9 +1,21 @@ namespace Surging.Core.CPlatform.Support { + /// + /// 容错策略 + /// public enum StrategyType { - Failover=0, - Injection=1, - FallBack=2, + /// + /// 故障转移策略、失败切换远程服务机制 + /// + Failover = 0, + /// + /// 脚本注入策略、失败执行注入脚本 + /// + Injection = 1, + /// + /// 回退策略、失败时调用通过FallBackName指定的接口 + /// + FallBack = 2, } } diff --git a/src/Surging.Core/Surging.Core.CPlatform/Utilities/NetUtils.cs b/src/Surging.Core/Surging.Core.CPlatform/Utilities/NetUtils.cs index 26a41e5bb..1195c7025 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/Utilities/NetUtils.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/Utilities/NetUtils.cs @@ -6,7 +6,7 @@ namespace Surging.Core.CPlatform.Utilities { - public class NetUtils + public class NetUtils { public const string LOCALHOST = "127.0.0.1"; public const string ANYHOST = "0.0.0.0"; @@ -14,7 +14,7 @@ public class NetUtils private const int MAX_PORT = 65535; private const string LOCAL_IP_PATTERN = "127(\\.\\d{1,3}){3}$"; private const string IP_PATTERN = "\\d{1,3}(\\.\\d{1,3}){3,5}$"; - private static AddressModel _host=null; + private static AddressModel _host = null; public static bool IsInvalidPort(int port) { @@ -36,7 +36,7 @@ public static bool IsAnyHost(String host) private static bool IsValidAddress(string address) { return (address != null - && !ANYHOST.Equals(address) + && !ANYHOST.Equals(address) && address.IsMatch(IP_PATTERN)); } @@ -74,7 +74,7 @@ public static string GetAnyHostAddress() public static string GetHostAddress(string hostAddress) { var result = hostAddress; - if((!IsValidAddress(hostAddress) && !IsLocalHost(hostAddress)) || IsAnyHost(hostAddress)) + if ((!IsValidAddress(hostAddress) && !IsLocalHost(hostAddress)) || IsAnyHost(hostAddress)) { result = GetAnyHostAddress(); } @@ -88,11 +88,11 @@ public static AddressModel GetHostAddress() var ports = AppConfig.ServerOptions.Ports; string address = GetHostAddress(AppConfig.ServerOptions.Ip); int port = AppConfig.ServerOptions.Port; - var mappingIp = AppConfig.ServerOptions.MappingIP ?? address; + var mappingIp = AppConfig.ServerOptions.MappingIP ?? address; var mappingPort = AppConfig.ServerOptions.MappingPort; if (mappingPort == 0) mappingPort = port; - _host= new IpAddressModel + _host = new IpAddressModel { HttpPort = ports.HttpPort, Ip = mappingIp, diff --git a/src/Surging.Core/Surging.Core.Consul/ConsulServiceRouteManager.cs b/src/Surging.Core/Surging.Core.Consul/ConsulServiceRouteManager.cs index 135151a2d..7e973325b 100644 --- a/src/Surging.Core/Surging.Core.Consul/ConsulServiceRouteManager.cs +++ b/src/Surging.Core/Surging.Core.Consul/ConsulServiceRouteManager.cs @@ -45,7 +45,7 @@ public ConsulServiceRouteManager(ConfigInfo configInfo, ISerializer seri _consulClientProvider = consulClientProvider; _manager = manager; _serviceHeartbeatManager = serviceHeartbeatManager; - EnterRoutes().Wait(); + EnterRoutes().Wait(); } public override async Task ClearAsync() @@ -66,7 +66,7 @@ public override async Task ClearAsync() } public void Dispose() - { + { } /// @@ -81,7 +81,7 @@ public override async Task> GetRoutesAsync() public override async Task SetRoutesAsync(IEnumerable routes) { - var locks= await CreateLock(); + var locks = await CreateLock(); try { await _consulClientProvider.Check(); @@ -123,13 +123,13 @@ public override async Task RemveAddressAsync(IEnumerable Address) route.Address = route.Address.Except(Address); } } - catch(Exception ex) + catch (Exception ex) { throw ex; } await base.SetRoutesAsync(routes); } - + protected override async Task SetRoutesAsync(IEnumerable routes) { var clients = await _consulClientProvider.GetClients(); @@ -173,8 +173,8 @@ private async Task> CreateLock() var clients = await _consulClientProvider.GetClients(); foreach (var client in clients) { - var distributedLock = await client.AcquireLock($"lock_{_configInfo.RoutePath}", _configInfo.LockDelay==0? - default: + var distributedLock = await client.AcquireLock($"lock_{_configInfo.RoutePath}", _configInfo.LockDelay == 0 ? + default : new CancellationTokenSource(TimeSpan.FromSeconds(_configInfo.LockDelay)).Token); result.Add(distributedLock); } @@ -235,13 +235,14 @@ private async Task GetRoutes(IEnumerable childrens) private async Task GetRoute(string path) { ServiceRoute result = null; - var client =await GetConsulClient(); + var client = await GetConsulClient(); var watcher = new NodeMonitorWatcher(GetConsulClient, _manager, path, - async (oldData, newData) => await NodeChange(oldData, newData),tmpPath=> { + async (oldData, newData) => await NodeChange(oldData, newData), tmpPath => + { var index = tmpPath.LastIndexOf("/"); return _serviceHeartbeatManager.ExistsWhitelist(tmpPath.Substring(index + 1)); - }); - + }); + var queryResult = await client.KV.Keys(path); if (queryResult.Response != null) { @@ -257,7 +258,7 @@ private async Task GetRoute(string path) private async ValueTask GetConsulClient() { - var client=await _consulClientProvider.GetClient(); + var client = await _consulClientProvider.GetClient(); return client; } @@ -265,8 +266,8 @@ private async Task EnterRoutes() { if (_routes != null && _routes.Length > 0) return; - Action action = null ; - var client =await GetConsulClient(); + Action action = null; + var client = await GetConsulClient(); if (_configInfo.EnableChildrenMonitor) { var watcher = new ChildrenMonitorWatcher(GetConsulClient, _manager, _configInfo.RoutePath, @@ -339,7 +340,7 @@ private async Task NodeChange(byte[] oldData, byte[] newData) .Where(i => i.ServiceDescriptor.Id != newRoute.ServiceDescriptor.Id) .Concat(new[] { newRoute }).ToArray(); } - + //触发路由变更事件。 OnChanged(new ServiceRouteChangedEventArgs(newRoute, oldRoute)); } diff --git a/src/Surging.Core/Surging.Core.Consul/ConsulServiceSubscribeManager.cs b/src/Surging.Core/Surging.Core.Consul/ConsulServiceSubscribeManager.cs index dc02ab437..270ed0f16 100644 --- a/src/Surging.Core/Surging.Core.Consul/ConsulServiceSubscribeManager.cs +++ b/src/Surging.Core/Surging.Core.Consul/ConsulServiceSubscribeManager.cs @@ -15,8 +15,8 @@ namespace Surging.Core.Consul { - public class ConsulServiceSubscribeManager: ServiceSubscribeManagerBase, IDisposable -{ + public class ConsulServiceSubscribeManager : ServiceSubscribeManagerBase, IDisposable + { private readonly ConfigInfo _configInfo; private readonly ISerializer _serializer; private readonly IServiceSubscriberFactory _serviceSubscriberFactory; @@ -28,7 +28,7 @@ public class ConsulServiceSubscribeManager: ServiceSubscribeManagerBase, IDispo public ConsulServiceSubscribeManager(ConfigInfo configInfo, ISerializer serializer, ISerializer stringSerializer, IClientWatchManager manager, IServiceSubscriberFactory serviceSubscriberFactory, - ILogger logger, IConsulClientProvider consulClientFactory) :base(stringSerializer) + ILogger logger, IConsulClientProvider consulClientFactory) : base(stringSerializer) { _configInfo = configInfo; _serializer = serializer; @@ -66,7 +66,7 @@ public override async Task ClearAsync() } } } - + public void Dispose() { } @@ -98,7 +98,7 @@ protected override async Task SetSubscribersAsync(IEnumerable subscribers) { - var serviceSubscribers = await GetSubscribers(subscribers.Select(p =>$"{ _configInfo.SubscriberPath }{ p.ServiceDescriptor.Id}")); + var serviceSubscribers = await GetSubscribers(subscribers.Select(p => $"{ _configInfo.SubscriberPath }{ p.ServiceDescriptor.Id}")); if (serviceSubscribers.Count() > 0) { foreach (var subscriber in subscribers) @@ -127,10 +127,10 @@ private async Task GetSubscriber(byte[] data) } private async Task EnterSubscribers() - { + { if (_subscribers != null) return; - var client=await _consulClientFactory.GetClient(); + var client = await _consulClientFactory.GetClient(); if (client.KV.Keys(_configInfo.SubscriberPath).Result.Response?.Count() > 0) { var result = await client.GetChildrenAsync(_configInfo.SubscriberPath); @@ -169,7 +169,7 @@ private async Task GetSubscribers(IEnumerable child { if (_logger.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Debug)) _logger.LogDebug($"准备从节点:{children}中获取订阅者信息。"); - + var subscriber = await GetSubscriber(children); if (subscriber != null) subscribers.Add(subscriber); diff --git a/src/Surging.Core/Surging.Core.Consul/ContainerBuilderExtensions.cs b/src/Surging.Core/Surging.Core.Consul/ContainerBuilderExtensions.cs index 71f75493f..4de155127 100644 --- a/src/Surging.Core/Surging.Core.Consul/ContainerBuilderExtensions.cs +++ b/src/Surging.Core/Surging.Core.Consul/ContainerBuilderExtensions.cs @@ -30,55 +30,55 @@ public static class ContainerBuilderExtensions /// 设置服务路由管理者。 /// /// Rpc服务构建者。 - /// ZooKeeper设置信息。 + /// Consul设置信息。 /// 服务构建者。 public static IServiceBuilder UseConsulRouteManager(this IServiceBuilder builder, ConfigInfo configInfo) { return builder.UseRouteManager(provider => new ConsulServiceRouteManager( - GetConfigInfo(configInfo), - provider.GetRequiredService>(), + GetConfigInfo(configInfo), + provider.GetRequiredService>(), provider.GetRequiredService>(), provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService>(), - provider.GetRequiredService(), - provider.GetRequiredService())); + provider.GetRequiredService(), + provider.GetRequiredService())); } public static IServiceBuilder UseConsulCacheManager(this IServiceBuilder builder, ConfigInfo configInfo) { return builder.UseCacheManager(provider => new ConsulServiceCacheManager( - GetConfigInfo(configInfo), - provider.GetRequiredService>(), + GetConfigInfo(configInfo), + provider.GetRequiredService>(), provider.GetRequiredService>(), provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService>(), - provider.GetRequiredService())); + provider.GetRequiredService())); } /// /// 设置服务命令管理者。 /// /// Rpc服务构建者。 - /// ZooKeeper设置信息。 + /// Consul设置信息。 /// 服务构建者。 public static IServiceBuilder UseConsulCommandManager(this IServiceBuilder builder, ConfigInfo configInfo) { return builder.UseCommandManager(provider => { var result = new ConsulServiceCommandManager( - GetConfigInfo(configInfo), - provider.GetRequiredService>(), + GetConfigInfo(configInfo), + provider.GetRequiredService>(), provider.GetRequiredService>(), - provider.GetRequiredService (), + provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService>(), - provider.GetRequiredService(), - provider.GetRequiredService()); + provider.GetRequiredService(), + provider.GetRequiredService()); return result; }); } @@ -88,13 +88,13 @@ public static IServiceBuilder UseConsulMqttRouteManager(this IServiceBuilder bui return builder.UseMqttRouteManager(provider => new ConsulMqttServiceRouteManager( GetConfigInfo(configInfo), - provider.GetRequiredService>(), + provider.GetRequiredService>(), provider.GetRequiredService>(), provider.GetRequiredService(), provider.GetRequiredService(), provider.GetRequiredService>(), - provider.GetRequiredService(), - provider.GetRequiredService())); + provider.GetRequiredService(), + provider.GetRequiredService())); } public static IServiceBuilder UseConsulServiceSubscribeManager(this IServiceBuilder builder, ConfigInfo configInfo) @@ -107,7 +107,7 @@ public static IServiceBuilder UseConsulServiceSubscribeManager(this IServiceBuil provider.GetRequiredService>(), provider.GetRequiredService(), provider.GetRequiredService(), - provider.GetRequiredService>(), + provider.GetRequiredService>(), provider.GetRequiredService()); return result; }); @@ -142,12 +142,12 @@ public static IServiceBuilder UseHealthCheck(this IServiceBuilder builder) public static IServiceBuilder UseCounlClientProvider(this IServiceBuilder builder, ConfigInfo configInfo) { - builder.Services.Register( provider => - new DefaultConsulClientProvider( - GetConfigInfo(configInfo), - provider.Resolve(), - provider.Resolve(), - provider.Resolve>())).As().SingleInstance(); + builder.Services.Register(provider => + new DefaultConsulClientProvider( + GetConfigInfo(configInfo), + provider.Resolve(), + provider.Resolve(), + provider.Resolve>())).As().SingleInstance(); return builder; } @@ -157,12 +157,12 @@ public static IServiceBuilder UseConsulManager(this IServiceBuilder builder, Con return builder.UseConsulRouteManager(configInfo) .UseHealthCheck() .UseConsulServiceSubscribeManager(configInfo) - .UseConsulCommandManager(configInfo) - .UseConsulCacheManager(configInfo) - .UseCounlClientProvider(configInfo) - .UseConsulAddressSelector() - .UseConsulWatch(configInfo) - .UseConsulMqttRouteManager(configInfo); + .UseConsulCommandManager(configInfo) + .UseConsulCacheManager(configInfo) + .UseCounlClientProvider(configInfo) + .UseConsulAddressSelector() + .UseConsulWatch(configInfo) + .UseConsulMqttRouteManager(configInfo); } [Obsolete] @@ -172,11 +172,11 @@ public static IServiceBuilder UseConsulManager(this IServiceBuilder builder) return builder.UseConsulRouteManager(configInfo) .UseHealthCheck() .UseConsulServiceSubscribeManager(configInfo) - .UseConsulCommandManager(configInfo) + .UseConsulCommandManager(configInfo) .UseCounlClientProvider(configInfo) - .UseConsulAddressSelector() - .UseConsulCacheManager(configInfo).UseConsulWatch(configInfo) - .UseConsulMqttRouteManager(configInfo); + .UseConsulAddressSelector() + .UseConsulCacheManager(configInfo).UseConsulWatch(configInfo) + .UseConsulMqttRouteManager(configInfo); } @@ -186,7 +186,7 @@ private static ConfigInfo GetConfigInfo(ConfigInfo config) var section = CPlatform.AppConfig.GetSection("Consul"); if (section.Exists()) option = section.Get(); - else if(AppConfig.Configuration!=null) + else if (AppConfig.Configuration != null) option = AppConfig.Configuration.Get(); if (option != null) @@ -194,7 +194,7 @@ private static ConfigInfo GetConfigInfo(ConfigInfo config) var sessionTimeout = config.SessionTimeout.TotalSeconds; Double.TryParse(option.SessionTimeout, out sessionTimeout); config = new ConfigInfo( - option.ConnectionString, + option.ConnectionString, TimeSpan.FromSeconds(sessionTimeout), option.LockDelay ?? config.LockDelay, option.RoutePath ?? config.RoutePath, @@ -202,7 +202,7 @@ private static ConfigInfo GetConfigInfo(ConfigInfo config) option.CommandPath ?? config.CommandPath, option.CachePath ?? config.CachePath, option.MqttRoutePath ?? config.MqttRoutePath, - option.ReloadOnChange != null ? bool.Parse(option.ReloadOnChange) : + option.ReloadOnChange != null ? bool.Parse(option.ReloadOnChange) : config.ReloadOnChange, option.EnableChildrenMonitor != null ? bool.Parse(option.EnableChildrenMonitor) : config.EnableChildrenMonitor diff --git a/src/Surging.Core/Surging.Core.Consul/Internal/Implementation/DefaultConsulClientProvider.cs b/src/Surging.Core/Surging.Core.Consul/Internal/Implementation/DefaultConsulClientProvider.cs index 0064aef21..d2bd7eb71 100644 --- a/src/Surging.Core/Surging.Core.Consul/Internal/Implementation/DefaultConsulClientProvider.cs +++ b/src/Surging.Core/Surging.Core.Consul/Internal/Implementation/DefaultConsulClientProvider.cs @@ -23,11 +23,11 @@ public class DefaultConsulClientProvider : IConsulClientProvider private ConfigInfo _config; private readonly IHealthCheckService _healthCheckService; private readonly IConsulAddressSelector _consulAddressSelector; - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly ConcurrentDictionary _addressSelectors = new ConcurrentDictionary(); private readonly ConcurrentDictionary _consulClients = new - ConcurrentDictionary(); + ConcurrentDictionary(); public DefaultConsulClientProvider(ConfigInfo config, IHealthCheckService healthCheckService, IConsulAddressSelector consulAddressSelector, ILogger logger) @@ -76,14 +76,14 @@ public async ValueTask GetClient() return result; } - public async ValueTask> GetClients() + public async ValueTask> GetClients() { var result = new List(); foreach (var address in _config.Addresses) { - var ipAddress=address as IpAddressModel; + var ipAddress = address as IpAddressModel; if (await _healthCheckService.IsHealth(address)) - { + { result.Add(_consulClients.GetOrAdd(ipAddress, new ConsulClient(config => { config.Address = new Uri($"http://{ipAddress.Ip}:{ipAddress.Port}"); diff --git a/src/Surging.Core/Surging.Core.ServiceHosting/Internal/Implementation/ConsoleLifetime.cs b/src/Surging.Core/Surging.Core.ServiceHosting/Internal/Implementation/ConsoleLifetime.cs index 2360fbb96..030dca744 100644 --- a/src/Surging.Core/Surging.Core.ServiceHosting/Internal/Implementation/ConsoleLifetime.cs +++ b/src/Surging.Core/Surging.Core.ServiceHosting/Internal/Implementation/ConsoleLifetime.cs @@ -6,11 +6,14 @@ namespace Surging.Core.ServiceHosting.Internal.Implementation { - public class ConsoleLifetime: IHostLifetime + /// + /// 服务端用来阻止Host主线程退出,直到按下Ctrl+C + /// + public class ConsoleLifetime : IHostLifetime { private readonly ManualResetEvent _shutdownBlock = new ManualResetEvent(false); public ConsoleLifetime(IApplicationLifetime applicationLifetime) - { + { ApplicationLifetime = applicationLifetime ?? throw new ArgumentNullException(nameof(applicationLifetime)); } @@ -18,14 +21,16 @@ public Task WaitForStartAsync(CancellationToken cancellationToken) { ApplicationLifetime.ApplicationStarted.Register(() => { - Console.WriteLine("服务已启动。 按下Ctrl + C关闭。"); + Console.WriteLine("服务已启动。 按下Ctrl + C关闭。"); }); AppDomain.CurrentDomain.ProcessExit += (sender, eventArgs) => { ApplicationLifetime.StopApplication(); + //阻止程序主线程自动退出,等待退出信号 _shutdownBlock.WaitOne(); }; + //按下Ctrl+C退出程序 Console.CancelKeyPress += (sender, e) => { e.Cancel = true;