Skip to content

Commit

Permalink
switch to using event store (#98)
Browse files Browse the repository at this point in the history
* switch to using event store

* now handles if graographical area updated fails
  • Loading branch information
runeanielsen authored Feb 10, 2023
1 parent 8e1777d commit 426b5c1
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 85 deletions.
7 changes: 5 additions & 2 deletions scripts/set-environment-minikube.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ export KAFKA__EVENTROUTENETWORKTOPICNAME="domain.route-network"
export KAFKA__EVENTGEOGRAPHICALAREAUPDATED="notification.geographical-area-updated"

# Notification server
export NOTIFICATIONSERVER__DOMAIN="notification-server"
export NOTIFICATIONSERVER__PORT="80"
export NOTIFICATIONSERVER__DOMAIN="localhost"
export NOTIFICATIONSERVER__PORT="6666"

# Event store
export EVENTSTORE__CONNECTIONSTRING="Host=event-store.openftth.local;Port=5432;Username=postgres;Password=postgres;Database=EVENT_STORE"

# Logging
export SERILOG__MINIMUMLEVEL="Information"
6 changes: 6 additions & 0 deletions src/OpenFTTH.GDBIntegrator.Config/EventStoreSetting.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace OpenFTTH.GDBIntegrator.Config;

public sealed record EventStoreSetting
{
public string ConnectionString { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
<PackageReference Include="FluentMigrator" Version="3.2.8" />
<PackageReference Include="FluentMigrator.Extensions.Postgres" Version="3.2.8" />
<PackageReference Include="npgsql" Version="5.0.7" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class GeoDatabaseUpdated : IRequest

public class GeoDatabaseUpdatedHandler : IRequestHandler<GeoDatabaseUpdated, Unit>
{
// This is the global ID for the RouteNetwork event stream. Should not be changed.
private readonly Guid GLOBAL_STREAM_ID = Guid.Parse("70554b8a-a572-4ab6-b837-19681ed83d35");

private readonly ILogger<GeoDatabaseUpdatedHandler> _logger;
private readonly IMediator _mediator;
private readonly IRouteNodeCommandFactory _routeNodeEventFactory;
Expand Down Expand Up @@ -136,7 +139,7 @@ await MarkToBeDeleted((request.UpdateMessage as InvalidMessage).Message,

if (IsOperationEditEventValid(editOperationOccuredEvent))
{
await _producer.Produce(_kafkaSettings.EventRouteNetworkTopicName, editOperationOccuredEvent);
await _producer.Produce(GLOBAL_STREAM_ID, editOperationOccuredEvent);
await _geoDatabase.Commit();
}
else
Expand Down Expand Up @@ -165,11 +168,23 @@ await MarkToBeDeleted((request.UpdateMessage as InvalidMessage).Message,
{
if (_modifiedGeometriesStore.GetRouteNodes().Count > 0 || _modifiedGeometriesStore.GetRouteSegments().Count > 0)
{
await _mediator.Publish(new GeographicalAreaUpdated
try
{
RouteNodes = _modifiedGeometriesStore.GetRouteNodes(),
RouteSegment = _modifiedGeometriesStore.GetRouteSegments()
});
await _mediator.Publish(new GeographicalAreaUpdated
{
RouteNodes = _modifiedGeometriesStore.GetRouteNodes(),
RouteSegment = _modifiedGeometriesStore.GetRouteSegments()
});
}
catch (Exception ex)
{
// This is not good, but the application can still keep running even
// if there are issues with the notification server.
// If a notification is not broadcastet the worst thing that can happen
// is that a user has to refresh the UI themselves.
_logger.LogInformation(
$"Could not broadcast {nameof(GeographicalAreaUpdated)}.\n{ex}");
}
}
_eventStore.Clear();
_modifiedGeometriesStore.Clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using OpenFTTH.EventSourcing;
using System;
using System.Threading.Tasks;

namespace OpenFTTH.GDBIntegrator.Producer.EventStore
{
public class EventStoreProducer : IProducer
{
private readonly IEventStore _eventStore;

public EventStoreProducer(IEventStore eventStore)
{
_eventStore = eventStore;
}

public async Task Produce(Guid streamId, object toposMessage)
{
var currentVersion = 0L;

if (_eventStore.Aggregates.CheckIfAggregateIdHasBeenUsed(streamId))
{
// We retrieve the latest version each time,
// so we don't have to keep internal version state up to date.
currentVersion = await _eventStore
.CurrentStreamVersionAsync(streamId) ??
throw new InvalidOperationException(
"Could not get stream version. The method returned null.");
}

// Next expected version is current version plus one.
var nextExpectedVersion = currentVersion + 1;

await _eventStore
.AppendStreamAsync(
streamId,
nextExpectedVersion,
new[] { toposMessage });
}
}
}
5 changes: 2 additions & 3 deletions src/OpenFTTH.GDBIntegrator.Producer/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

namespace OpenFTTH.GDBIntegrator.Producer
{
public interface IProducer : IDisposable
public interface IProducer
{
void Init();
Task Produce(string topicName, Object message);
Task Produce(Guid streamId, object message);
}
}
50 changes: 0 additions & 50 deletions src/OpenFTTH.GDBIntegrator.Producer/Kafka/Producer.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MediatR" Version="8.1.0" />
<PackageReference Include="OpenFTTH.NotificationClient" Version="0.3.0" />
<PackageReference Include="Topos" Version="0.0.95" />
<PackageReference Include="Topos.Kafka" Version="0.0.95" />
<PackageReference Include="Topos.Serilog" Version="0.0.95" />
<PackageReference Include="Topos.NewtonsoftJson" Version="0.0.95" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageReference Include="OpenFTTH.EventSourcing" Version="3.0.5" />
</ItemGroup>

</Project>
20 changes: 19 additions & 1 deletion src/OpenFTTH.GDBIntegrator/Internal/HostConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Serialization;
using OpenFTTH.EventSourcing.Postgres;
using OpenFTTH.GDBIntegrator.Config;
using OpenFTTH.GDBIntegrator.GeoDatabase;
using OpenFTTH.GDBIntegrator.GeoDatabase.Postgres;
Expand All @@ -16,6 +18,7 @@
using OpenFTTH.GDBIntegrator.Integrator.Validate;
using OpenFTTH.GDBIntegrator.Integrator.WorkTask;
using OpenFTTH.GDBIntegrator.Producer;
using OpenFTTH.GDBIntegrator.Producer.EventStore;
using OpenFTTH.GDBIntegrator.Producer.NotificationServer;
using OpenFTTH.GDBIntegrator.RouteNetwork.Factories;
using OpenFTTH.GDBIntegrator.RouteNetwork.Mapping;
Expand Down Expand Up @@ -80,7 +83,7 @@ private static void ConfigureServices(IHostBuilder hostBuilder)

services.AddHostedService<Startup>();
services.AddSingleton<IRouteNetworkSubscriber, PostgresRouteNetworkSubscriber>();
services.AddSingleton<IProducer, Producer.Kafka.Producer>();
services.AddSingleton<IProducer, EventStoreProducer>();
services.AddSingleton<IGeoDatabase, Postgis>();
services.AddTransient<IRouteSegmentValidator, RouteSegmentValidator>();
services.AddTransient<IRouteSegmentFactory, RouteSegmentFactory>();
Expand All @@ -91,7 +94,11 @@ private static void ConfigureServices(IHostBuilder hostBuilder)
services.AddTransient<IRouteSegmentEventFactory, RouteSegmentEventFactory>();
services.AddTransient<IRouteNodeEventFactory, RouteNodeEventFactory>();
services.AddTransient<IInfoMapper, InfoMapper>();

// This is not the event store with database, this is a local implementation of a place
// to store events globally before being processed.
services.AddSingleton<IEventStore, EventStore>();

services.AddSingleton<IModifiedGeometriesStore, ModifiedGeometriesStore>();
services.AddTransient<IRouteNodeInfoCommandFactory, RouteNodeInfoCommandFactory>();
services.AddTransient<IRouteSegmentInfoCommandFactory, RouteSegmentInfoCommandFactory>();
Expand All @@ -101,6 +108,14 @@ private static void ConfigureServices(IHostBuilder hostBuilder)
services.AddTransient<IWorkTaskService, WorkTaskService>();
services.AddHttpClient<IWorkTaskService, WorkTaskService>();
services.AddSingleton<INotificationClient, NotificationServerClient>();
services.AddSingleton<OpenFTTH.EventSourcing.IEventStore>(
e =>
new PostgresEventStore(
serviceProvider: e.GetRequiredService<IServiceProvider>(),
connectionString: e.GetRequiredService<IOptions<EventStoreSetting>>().Value.ConnectionString,
databaseSchemaName: "events"
)
);

services.Configure<KafkaSetting>(
kafkaSettings => hostContext.Configuration.GetSection("kafka").Bind(kafkaSettings));
Expand All @@ -113,6 +128,9 @@ private static void ConfigureServices(IHostBuilder hostBuilder)

services.Configure<ApplicationSetting>(
applicationSettings => hostContext.Configuration.GetSection("application").Bind(applicationSettings));

services.Configure<EventStoreSetting>(
eventStoreSetting => hostContext.Configuration.GetSection("eventStore").Bind(eventStoreSetting));
});
}

Expand Down
1 change: 1 addition & 0 deletions src/OpenFTTH.GDBIntegrator/OpenFTTH.GDBIntegrator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Debug" Version="1.0.1" />
<PackageReference Include="Serilog.Formatting.Compact" Version="1.1.0" />
<PackageReference Include="OpenFTTH.EventSourcing" Version="3.0.5" />
</ItemGroup>

<ItemGroup>
Expand Down
3 changes: 0 additions & 3 deletions src/OpenFTTH.GDBIntegrator/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,11 @@ private void OnStarted()
{
_logger.LogInformation($"Starting {nameof(IRouteNetworkSubscriber)}");
_routeNetworkSubscriber.Subscribe();
_logger.LogInformation("Init kafka producer");
_producer.Init();
}

private void OnStopped()
{
_routeNetworkSubscriber.Dispose();
_producer.Dispose();
_logger.LogInformation("Stopped service");
}
}
Expand Down
15 changes: 7 additions & 8 deletions test/OpenFTTH.GDBIntegrator.Tests/StartupTest.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
using OpenFTTH.GDBIntegrator;
using FakeItEasy;
using FluentAssertions;
using FluentMigrator.Runner;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using OpenFTTH.GDBIntegrator.Producer;
using OpenFTTH.GDBIntegrator.Subscriber;
using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using FakeItEasy;
using Xunit;
using OpenFTTH.GDBIntegrator.Subscriber;
using OpenFTTH.GDBIntegrator.Producer;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Hosting;
using FluentMigrator.Runner;

namespace OpenFTTH.GDBIntegrator.Tests
{
Expand Down

0 comments on commit 426b5c1

Please sign in to comment.