Skip to content

Commit

Permalink
set csharp_prefer_braces to true
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Jansen <jan.jansen@gdata.de>
  • Loading branch information
farodin91 committed Dec 16, 2021
1 parent 939a4c6 commit e224812
Show file tree
Hide file tree
Showing 23 changed files with 177 additions and 32 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ indent_size = 4
[*.cs]
indent_size = 4
csharp_style_namespace_declarations = file_scoped:warning
csharp_prefer_braces = true:warning

[*.json]
indent_size = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ public class JsonNetDeserializer<T> : IMessageDeserializer<T> where T : notnull
public T Deserialize(byte[] message)
{
if (message == null || message.Length == 0)
{
throw new ArgumentNullException(nameof(message));
}

var json = Encoding.UTF8.GetString(message);
try
Expand Down
2 changes: 2 additions & 0 deletions src/Motor.Extensions.Conversion.JsonNet/JsonNetSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ public class JsonNetSerializer<T> : IMessageSerializer<T> where T : notnull
public byte[] Serialize(T message)
{
if (Equals(message, default(T)))
{
throw new ArgumentNullException(nameof(message));
}

return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ namespace Motor.Extensions.Conversion.Protobuf;

public T Deserialize(byte[] inMessage)
{
if (inMessage == null || inMessage.Length == 0) throw new ArgumentNullException(nameof(inMessage));
if (inMessage == null || inMessage.Length == 0)
{
throw new ArgumentNullException(nameof(inMessage));
}

T deserializedMsg;
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ public class SystemJsonDeserializer<T> : IMessageDeserializer<T> where T : notnu
public T Deserialize(byte[] message)
{
if (message == null || message.Length == 0)
{
throw new ArgumentNullException(nameof(message));
}

try
{
return JsonSerializer.Deserialize<T>(message) ?? throw new ArgumentNullException(nameof(message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ public class SystemJsonSerializer<T> : IMessageSerializer<T> where T : notnull
public byte[] Serialize(T message)
{
if (Equals(message, default(T)))
{
throw new ArgumentNullException(nameof(message));
}

return JsonSerializer.SerializeToUtf8Bytes(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ public override async Task<ProcessedMessageStatus> HandleMessageAsync(MotorCloud
{
var parentContext = dataCloudEvent.GetActivityContext();
using var activity = OpenTelemetryOptions.ActivitySource.StartActivity(nameof(HandleMessageAsync), ActivityKind.Server, parentContext);
if (activity is null) return await base.HandleMessageAsync(dataCloudEvent, token);
if (activity is null)
{
return await base.HandleMessageAsync(dataCloudEvent, token);
}

using (activity.Start())
using (_logger.BeginScope("TraceId: {traceid}, SpanId: {spanid}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ public interface IMessageConsumer<TInput> where TInput : notnull

async Task ExecuteAsync(CancellationToken token = default)
{
while (!token.IsCancellationRequested) await Task.Delay(TimeSpan.FromSeconds(100), token).ConfigureAwait(false);
while (!token.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(100), token).ConfigureAwait(false);
}
}

Task StopAsync(CancellationToken token = default);
Expand Down
6 changes: 5 additions & 1 deletion src/Motor.Extensions.Hosting.CloudEvents/MotorCloudEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ public MotorCloudEvent(
params KeyValuePair<CloudEventAttribute, object>[] extensions)
{
BaseEvent = new CloudEvent(CloudEventsSpecVersion.Default);
foreach (var (key, value) in extensions) BaseEvent[key] = value;
foreach (var (key, value) in extensions)
{
BaseEvent[key] = value;
}

BaseEvent.Id = id ?? Guid.NewGuid().ToString();
BaseEvent.Type = type ?? typeof(TData).Name;
BaseEvent.Source = source;
Expand Down
32 changes: 27 additions & 5 deletions src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public Func<MotorCloudEvent<byte[]>, CancellationToken, Task<ProcessedMessageSta

public Task StartAsync(CancellationToken token = default)
{
if (ConsumeCallbackAsync is null) throw new InvalidOperationException("ConsumeCallback is null");
if (ConsumeCallbackAsync is null)
{
throw new InvalidOperationException("ConsumeCallback is null");
}

var consumerBuilder = new ConsumerBuilder<string?, byte[]>(_options)
.SetLogHandler((_, logMessage) => WriteLog(logMessage))
Expand All @@ -66,13 +69,18 @@ public async Task ExecuteAsync(CancellationToken token = default)
await Task.Run(() =>
{
while (!token.IsCancellationRequested)
{
try
{
var msg = _consumer?.Consume(token);
if (msg != null && !msg.IsPartitionEOF)
{
SingleMessageHandling(token, msg);
}
else
{
_logger.LogDebug("No messages received");
}
}
catch (OperationCanceledException)
{
Expand All @@ -83,6 +91,7 @@ await Task.Run(() =>
{
_logger.LogError(e, "Failed to receive message.", e);
}
}
}, token).ConfigureAwait(false);
}

Expand Down Expand Up @@ -132,11 +141,18 @@ private void WriteStatistics(string json)
.Select(t => t.Value)
.SelectMany(t => t.Partitions ?? new Dictionary<string, KafkaStatisticsPartition>())
.Select(t => (Parition: t.Key.ToString(), t.Value.ConsumerLag));
if (partitionConsumerLags is null) return;
if (partitionConsumerLags is null)
{
return;
}

foreach (var (partition, consumerLag) in partitionConsumerLags)
{
var lag = consumerLag;
if (lag == -1) lag = 0;
if (lag == -1)
{
lag = 0;
}

_consumerLagSummary?.WithLabels(_options.Topic, partition)?.Observe(lag);
_consumerLagGauge?.WithLabels(_options.Topic, partition)?.Set(lag);
Expand Down Expand Up @@ -167,7 +183,10 @@ private void SingleMessageHandling(CancellationToken token, ConsumeResult<string
throw new ArgumentOutOfRangeException();
}
if (msg.Offset % _options.CommitPeriod != 0) return;
if (msg.Offset % _options.CommitPeriod != 0)
{
return;
}
try
{
Expand All @@ -187,7 +206,10 @@ public MotorCloudEvent<byte[]> KafkaMessageToCloudEvent(Message<string?, byte[]>

private void Dispose(bool disposing)
{
if (disposing) _consumer?.Dispose();
if (disposing)
{
_consumer?.Dispose();
}
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public static void SetPriority<T>(this IBasicProperties self, MotorCloudEvent<by
{
var messagePriority = cloudEvent.GetRabbitMQPriority() ?? options.DefaultPriority;
if (messagePriority.HasValue)
{
self.Priority = messagePriority.Value;

}
}
public static void WriteCloudEventIntoHeader(this IBasicProperties self, MotorCloudEvent<byte[]> cloudEvent)
{
Expand Down
36 changes: 30 additions & 6 deletions src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ public Func<MotorCloudEvent<byte[]>, CancellationToken, Task<ProcessedMessageSta
public async Task ExecuteAsync(CancellationToken token = default)
{
_stoppingToken = token;
while (token.IsCancellationRequested) await Task.Delay(TimeSpan.FromSeconds(100), token);
while (token.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(100), token);
}
}

public Task StartAsync(CancellationToken token = default)
Expand All @@ -71,14 +74,18 @@ public Task StopAsync(CancellationToken token = default)
private void ThrowIfNoCallbackConfigured()
{
if (ConsumeCallbackAsync is null)
{
throw new InvalidOperationException(
$"Cannot start consuming as no {nameof(ConsumeCallbackAsync)} was configured!");
}
}

private void ThrowIfConsumerAlreadyStarted()
{
if (_started)
{
throw new InvalidOperationException("Cannot start consuming as the consumer was already started!");
}
}

private void ConfigureChannel()
Expand All @@ -94,13 +101,25 @@ private void DeclareQueue()
}

var arguments = _options.Queue.Arguments.ToDictionary(t => t.Key, t => t.Value);
if (_options.Queue.MaxPriority is not null) arguments.Add("x-max-priority", _options.Queue.MaxPriority);
if (_options.Queue.MaxPriority is not null)
{
arguments.Add("x-max-priority", _options.Queue.MaxPriority);
}

if (_options.Queue.MaxLength is not null) arguments.Add("x-max-length", _options.Queue.MaxLength);
if (_options.Queue.MaxLength is not null)
{
arguments.Add("x-max-length", _options.Queue.MaxLength);
}

if (_options.Queue.MaxLengthBytes is not null) arguments.Add("x-max-length-bytes", _options.Queue.MaxLengthBytes);
if (_options.Queue.MaxLengthBytes is not null)
{
arguments.Add("x-max-length-bytes", _options.Queue.MaxLengthBytes);
}

if (_options.Queue.MessageTtl is not null) arguments.Add("x-message-ttl", _options.Queue.MessageTtl);
if (_options.Queue.MessageTtl is not null)
{
arguments.Add("x-message-ttl", _options.Queue.MessageTtl);
}

switch (_options.Queue.Mode)
{
Expand All @@ -121,11 +140,13 @@ private void DeclareQueue()
arguments
);
foreach (var routingKeyConfig in _options.Queue.Bindings)
{
_channel?.QueueBind(
_options.Queue.Name,
routingKeyConfig.Exchange,
routingKeyConfig.RoutingKey,
routingKeyConfig.Arguments);
}
}

private void StartConsumerOnChannel()
Expand All @@ -147,7 +168,10 @@ private void ConsumerCallback(BasicDeliverEventArgs args)
.GetAwaiter();
task?.OnCompleted(() =>
{
if (_stoppingToken.IsCancellationRequested) return;
if (_stoppingToken.IsCancellationRequested)
{
return;
}
var processedMessageStatus = task?.GetResult();
switch (processedMessageStatus)
Expand Down
14 changes: 11 additions & 3 deletions src/Motor.Extensions.Hosting.Timer/Timer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,19 @@ public Timer(IOptions<TimerOptions> config,

private async Task StartTimer(CancellationToken token)
{
if (_scheduler is not null) await _scheduler.Start(token);
if (_scheduler is not null)
{
await _scheduler.Start(token).ConfigureAwait(false);
}
_started = true;
}

private void ThrowIfTimerAlreadyStarted()
{
if (_started)
{
throw new InvalidOperationException("Cannot start timer as the timer was already started!");
}
}

private async Task ConfigureTimer()
Expand Down Expand Up @@ -69,11 +74,14 @@ public async Task StartAsync(CancellationToken token)
{
ThrowIfTimerAlreadyStarted();
await ConfigureTimer().ConfigureAwait(false);
await StartTimer(token);
await StartTimer(token).ConfigureAwait(false);
}

public async Task StopAsync(CancellationToken token)
{
if (_scheduler is not null) await _scheduler.Shutdown(token);
if (_scheduler is not null)
{
await _scheduler.Shutdown(token);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ public MessageProcessingHealthCheck(IOptions<MessageProcessingOptions> options,
public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken token = default)
{
if (_queue.ItemCount == 0) return Task.FromResult(HealthCheckResult.Healthy());
if (_queue.ItemCount == 0)
{
return Task.FromResult(HealthCheckResult.Healthy());
}

return Task.FromResult(DateTimeOffset.UtcNow - _queue.LastDequeuedAt > _maxTimeWithoutAcknowledgedMessage
? HealthCheckResult.Unhealthy()
Expand Down
5 changes: 4 additions & 1 deletion src/Motor.Extensions.Hosting/Internal/BackgroundTaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ public BackgroundTaskQueue(IMetricsFactory<BackgroundTaskQueue<T>>? metricsFacto

public Task<ProcessedMessageStatus> QueueBackgroundWorkItem(T item)
{
if (item is null) throw new ArgumentNullException(nameof(item));
if (item is null)
{
throw new ArgumentNullException(nameof(item));
}

var taskCompletionStatus = new TaskCompletionSource<ProcessedMessageStatus>();

Expand Down
6 changes: 5 additions & 1 deletion src/Motor.Extensions.Http/PrometheusDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ protected override async Task<HttpResponseMessage> SendAsync(
stopwatch.Start();
var response = await base.SendAsync(request, token);
var uri = request.RequestUri;
if (uri is null) return response;
if (uri is null)
{
return response;
}

_requestTotal.WithLabels(uri.Host, response.StatusCode.ToString()).Inc();
_requestLatency.WithLabels(uri.Host).Observe(stopwatch.ElapsedMilliseconds);
return response;
Expand Down
4 changes: 4 additions & 0 deletions src/Motor.Extensions.Utilities/MotorHostBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public IHostBuilder ConfigureContainer<TContainerBuilder>(
public IHost Build()
{
if (_enableConfigureWebDefaults)
{
_builder
.ConfigureWebHostDefaults(builder =>
{
Expand All @@ -101,14 +102,17 @@ public IHost Build()
.ConfigureHealthChecks(builder =>
{
foreach (var healthCheck in _healthChecks)
{
builder.Add(new HealthCheckRegistration(
healthCheck.Name,
s => (IHealthCheck)ActivatorUtilities.GetServiceOrCreateInstance(s, healthCheck.Type),
healthCheck.FailureStatus,
healthCheck.Tags,
healthCheck.Timeout)
);
}
});
}

return _builder.Build();
}
Expand Down
Loading

0 comments on commit e224812

Please sign in to comment.