Skip to content

Commit

Permalink
Start working on RabbitMQ Trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
olahallvall committed Apr 15, 2024
1 parent 72750f1 commit ff5a742
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 26 deletions.
7 changes: 7 additions & 0 deletions FlowDance.AzureFunctions/FlowDance.AzureFunctions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
<PackageReference Include="Microsoft.ApplicationInsights.WorkerService" Version="2.22.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="1.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\FlowDance.Common\FlowDance.Common.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Expand All @@ -34,4 +38,7 @@
<ItemGroup>
<Using Include="System.Threading.ExecutionContext" Alias="ExecutionContext" />
</ItemGroup>
<ItemGroup>
<Folder Include="Triggers\Http\" />
</ItemGroup>
</Project>
11 changes: 11 additions & 0 deletions FlowDance.AzureFunctions/Program.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
using FlowDance.AzureFunctions.Services;
using FlowDance.Common.RabbitMQUtils;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

var host = new HostBuilder()
.ConfigureFunctionsWebApplication()
.ConfigureServices(services =>
{
services.AddApplicationInsightsTelemetryWorkerService();
services.ConfigureFunctionsApplicationInsights();
services.AddLogging();
services.AddTransient<IDetermineCompensation>((s) => {
return new DetermineCompensationService(null);

Check warning on line 17 in FlowDance.AzureFunctions/Program.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
});
services.AddTransient<IStorage>((s) => {
return new Storage(null);

Check warning on line 20 in FlowDance.AzureFunctions/Program.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
});
})
.Build();

Expand Down
2 changes: 1 addition & 1 deletion FlowDance.AzureFunctions/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"profiles": {
"TransactGuard.AzureFunctions": {
"FlowDance.AzureFunctions": {
"commandName": "Project",
"commandLineArgs": "--port 7013",
"launchBrowser": false
Expand Down
19 changes: 19 additions & 0 deletions FlowDance.AzureFunctions/Services/DetermineCompensationService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Microsoft.Extensions.Logging;

namespace FlowDance.AzureFunctions.Services
{
public class DetermineCompensationService : IDetermineCompensation
{
private readonly ILogger _logger;

public DetermineCompensationService(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<DetermineCompensationService>();
}

public void DetermineCompensation(string streamName)
{
throw new NotImplementedException();
}
}
}
7 changes: 7 additions & 0 deletions FlowDance.AzureFunctions/Services/IDetermineCompensation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace FlowDance.AzureFunctions.Services
{
public interface IDetermineCompensation
{
public void DetermineCompensation(string streamName);
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
using System;
using System.Globalization;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using FlowDance.Common;
using FlowDance.Common.Commands;
using System.Security.Principal;
using FlowDance.AzureFunctions.Services;

namespace FlowDance.AzureFunctions
namespace FlowDance.AzureFunctions.Triggers.RabbitMQ
{
public class DetermineCompensationMessagehandler
{
private readonly ILogger _logger;
private readonly IDetermineCompensation _determineCompensationService;

public DetermineCompensationMessagehandler(ILoggerFactory loggerFactory)
public DetermineCompensationMessagehandler(ILoggerFactory loggerFactory, IDetermineCompensation determineCompensationService)
{
_logger = loggerFactory.CreateLogger<DetermineCompensationMessagehandler>();
_determineCompensationService = determineCompensationService;
}

[Function("DetermineCompensationMessagehandler")]
public void Run([RabbitMQTrigger("FlowDance.DetermineCompensation", ConnectionStringSetting = "FlowDanceRabbitMqConnection")] string queueItem)
{
var determineCompensationCommand = JsonConvert.DeserializeObject<DetermineCompensation>(queueItem);

_determineCompensationService.DetermineCompensation(determineCompensationCommand.TraceId.ToString());

Check warning on line 25 in FlowDance.AzureFunctions/Triggers/RabbitMQ/DetermineCompensationMessagehandler.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

Check warning on line 25 in FlowDance.AzureFunctions/Triggers/RabbitMQ/DetermineCompensationMessagehandler.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

_logger.LogInformation($"C# Queue trigger function processed: {queueItem}");
}
}
Expand Down
8 changes: 8 additions & 0 deletions FlowDance.AzureFunctions/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"RabbitMqConnection": {
"HostName": "localhost",
"Username": "guest",
"Password": "guest",
"VirtualHost": "/"
}
}
24 changes: 15 additions & 9 deletions FlowDance.AzureFunctions/host.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
},
"enableLiveMetricsFilters": true
}
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
},
"enableLiveMetricsFilters": true,
"logLevel": {
"default": "Warning",
"Host.Aggregator": "Trace",
"Host.Results": "Information",
"Function": "Information"
}
}
}
}
1 change: 0 additions & 1 deletion FlowDance.Client/FlowDance.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
<PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion FlowDance.Common/FlowDance.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Stream.Client" Version="1.8.2" />
<PackageReference Include="RabbitMQ.Stream.Client" Version="1.8.3" />
</ItemGroup>

</Project>
13 changes: 13 additions & 0 deletions FlowDance.Common/RabbitMQUtils/IStorage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FlowDance.Common.RabbitMQUtils
{
public interface IStorage
{

}
}
7 changes: 7 additions & 0 deletions FlowDance.Common/RabbitMQUtils/SingletonConnection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using RabbitMQ.Client;
using System.Diagnostics;

namespace FlowDance.Common.RabbitMQUtils
{
Expand All @@ -9,11 +10,17 @@ internal class SingletonConnection
private IConnection _connection;
private SingletonConnection()
{
var sw = new Stopwatch();
sw.Start();

var config = new ConfigurationBuilder().AddJsonFile($"appsettings.json").Build();
var connectionFactory = new ConnectionFactory();
config.GetSection("RabbitMqConnection").Bind(connectionFactory);

_connection = connectionFactory.CreateConnection();

sw.Stop();
Console.WriteLine("The constructor in SingletonConnection created a new (RabbitMQ) IConnection in {0} ms.", sw.Elapsed.TotalMilliseconds);
}

public static SingletonConnection GetInstance()
Expand Down
15 changes: 12 additions & 3 deletions FlowDance.Common/RabbitMQUtils/SingletonStreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
using System.Net;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;
using System.Diagnostics;

namespace FlowDance.Common.RabbitMQUtils
{
internal class SingletonStreamSystem
{
private static ILogger<StreamSystem>? _logger;
private static SingletonStreamSystem _instance = new SingletonStreamSystem();
private static readonly SingletonStreamSystem Instance = new();
private StreamSystem? _streamSystem;

private SingletonStreamSystem()
Expand All @@ -18,22 +19,30 @@ private SingletonStreamSystem()
public static SingletonStreamSystem GetInstance(ILogger<StreamSystem> logger)
{
_logger = logger;
return _instance;
return Instance;
}

public StreamSystem GetStreamSystem()
{
if(_streamSystem == null)
{
var sw = new Stopwatch();
sw.Start();

var config = new ConfigurationBuilder().AddJsonFile($"appsettings.json").Build();

_streamSystem = StreamSystem.Create(new StreamSystemConfig()
{
UserName = config.GetSection("RabbitMqConnection").GetSection("Username").Value,
Password = config.GetSection("RabbitMqConnection").GetSection("Password").Value,
Endpoints = new List<EndPoint>() { new IPEndPoint(IPAddress.Loopback, 5552) }
}, _logger).Result;
}, _logger).GetAwaiter().GetResult();

sw.Stop();
_logger.LogInformation("A call to GetStreamSystem created a new StreamSystem in {0} ms.", sw.Elapsed.TotalMilliseconds);

Check warning on line 42 in FlowDance.Common/RabbitMQUtils/SingletonStreamSystem.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'logger' in 'void LoggerExtensions.LogInformation(ILogger logger, string? message, params object?[] args)'.

Check warning on line 42 in FlowDance.Common/RabbitMQUtils/SingletonStreamSystem.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'logger' in 'void LoggerExtensions.LogInformation(ILogger logger, string? message, params object?[] args)'.
}

_logger.LogInformation("A call to GetStreamSystem returns an existing StreamSystem.");

Check warning on line 45 in FlowDance.Common/RabbitMQUtils/SingletonStreamSystem.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'logger' in 'void LoggerExtensions.LogInformation(ILogger logger, string? message, params object?[] args)'.

Check warning on line 45 in FlowDance.Common/RabbitMQUtils/SingletonStreamSystem.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'logger' in 'void LoggerExtensions.LogInformation(ILogger logger, string? message, params object?[] args)'.
return _streamSystem;
}
}
Expand Down
38 changes: 35 additions & 3 deletions FlowDance.Common/RabbitMQUtils/Storage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System.Diagnostics;

namespace FlowDance.Common.RabbitMQUtils;

Expand All @@ -14,7 +15,7 @@ namespace FlowDance.Common.RabbitMQUtils;
///
/// Based on code from this site - https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html
/// </summary>
public class Storage
public class Storage : IStorage
{
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<Producer> _producerLogger;
Expand Down Expand Up @@ -49,7 +50,7 @@ public void StoreEvent(Span span)
ValidateStoredSpans(ReadAllSpansFromStream(span.TraceId.ToString()));

// Create producer
Producer producer = CreateProducer(streamName, streamSystem, confirmationTaskCompletionSource, _producerLogger);
var producer = CreateProducer(streamName, streamSystem, confirmationTaskCompletionSource, _producerLogger);

// Send a messages
var message = new Message(Encoding.Default.GetBytes(JsonConvert.SerializeObject(span, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All })));
Expand Down Expand Up @@ -121,8 +122,12 @@ public List<Span> ReadAllSpansFromStream(string streamName)

private void ValidateStoredSpans(List<Span> spanList)
{

if (spanList.Any())
{
var sw = new Stopwatch();
sw.Start();

// Rule #1 - Can´t add Span after the root Span has been closed.
var spanOpened = spanList[0];
var spanClosed = from s in spanList
Expand All @@ -131,6 +136,9 @@ private void ValidateStoredSpans(List<Span> spanList)

if (spanClosed.Any())
throw new Exception("Spans can´t be add after the root Span has been closed");

sw.Stop();
_streamLogger.LogInformation("A call to ValidateStoredSpans runs for {0} ms.", sw.Elapsed.TotalMilliseconds);
}

}
Expand Down Expand Up @@ -181,6 +189,9 @@ private ulong GetLastOffset(string streamName, ILogger<Consumer> consumerLogger)
/// <exception cref="Exception"></exception>
private List<Span> ReadAllSpansFromStream(string streamName, ILogger<Consumer> consumerLogger)
{
var sw = new Stopwatch();
sw.Start();

var numberOfMessages = GetLastOffset(streamName, consumerLogger) + 1;
var spanList = new List<Span>();

Expand Down Expand Up @@ -220,6 +231,9 @@ private List<Span> ReadAllSpansFromStream(string streamName, ILogger<Consumer> c
consumerTaskCompletionSource.Task.Wait();

consumer.Close();

sw.Stop();
_streamLogger.LogInformation("A call to ReadAllSpansFromStream runs for {0} ms.", sw.Elapsed.TotalMilliseconds);
}

return spanList;
Expand All @@ -235,9 +249,15 @@ public bool StreamExistOrQueue(string name)
{
try
{
var sw = new Stopwatch();
sw.Start();

var channel = SingletonConnection.GetInstance().GetConnection().CreateModel();
QueueDeclareOk ok = channel.QueueDeclarePassive(name);
channel.Close();

sw.Stop();
_streamLogger.LogInformation("A call to StreamExistOrQueue runs for {0} ms.", sw.Elapsed.TotalMilliseconds);
}
catch (RabbitMQ.Client.Exceptions.OperationInterruptedException ex)
{
Expand All @@ -260,9 +280,15 @@ public bool StreamExistOrQueue(string name)
/// <param name="streamName"></param>
public void CreateStream(string streamName)
{
var sw = new Stopwatch();
sw.Start();

var streamSystem = SingletonStreamSystem.GetInstance(_streamLogger).GetStreamSystem();
streamSystem.CreateStream(
new StreamSpec(streamName) { });

sw.Stop();
_streamLogger.LogInformation("A call to StreamExistOrQueue runs for {0} ms.", sw.Elapsed.TotalMilliseconds);
}

/// <summary>
Expand All @@ -284,6 +310,9 @@ public void DeleteStream(string streamName)
/// <exception cref="ArgumentOutOfRangeException"></exception>
private Producer CreateProducer(string streamName, StreamSystem streamSystem, TaskCompletionSource<int> confirmationTaskCompletionSource, ILogger<Producer> procuderLogger)
{
var sw = new Stopwatch();
sw.Start();

var producer = Producer.Create(
new ProducerConfig(
streamSystem,
Expand Down Expand Up @@ -314,7 +343,10 @@ private Producer CreateProducer(string streamName, StreamSystem streamSystem, Ta
await Task.CompletedTask;
}
}, procuderLogger).Result;
}, procuderLogger).GetAwaiter().GetResult();

sw.Stop();
_streamLogger.LogInformation("A call to CreateProducer runs for {0} ms.", sw.Elapsed.TotalMilliseconds);

return producer;
}
Expand Down
Loading

0 comments on commit ff5a742

Please sign in to comment.