Skip to content

Commit

Permalink
remove kafka and listen to change tabel instead (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
runeanielsen authored Mar 25, 2023
1 parent 36e3833 commit 060999c
Show file tree
Hide file tree
Showing 39 changed files with 1,532 additions and 2,474 deletions.
7 changes: 7 additions & 0 deletions src/OpenFTTH.GDBIntegrator.Config/PostgisSetting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,12 @@ public class PostgisSetting
public string Database { get; set; }
public string Username { get; set; }
public string Password { get; set; }

public string ConnectionString => CreateConnectionString();

private string CreateConnectionString()
{
return $"Host={Host};Port={Port};Username={Username};Password={Password};Database={Database}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<ItemGroup>
<None Update="Postgres/SchemaMigration/Scripts/create_route_network_schema.sql" CopyToOutputDirectory="PreserveNewest" />
<None Update="Postgres/SchemaMigration/Scripts/ddl_survey_import.sql" CopyToOutputDirectory="PreserveNewest" />
<None Update="Postgres/SchemaMigration/Scripts/update_and_create_trigger_changes.sql" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using FluentMigrator;
using System.IO;

namespace OpenFTTH.GDBIntegrator.GeoDatabase.Postgres.SchemaMigration
{
[Migration(1678195057)]
public class UpdateAndCreateTriggerChanges : Migration
{
public override void Up()
{
Execute.Script(Path.GetDirectoryName(System.Reflection.Assembly.GetExecutingAssembly().Location)
+ "/Postgres/SchemaMigration/Scripts/update_and_create_trigger_changes.sql");
}

public override void Down()
{
// This is a hard change to do and needs to be resolved manually if a down is needed to be done.
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
-- Edit operation table
CREATE TABLE route_network.route_network_edit_operation (
seq_no BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY,
event_id UUID NULL,
"before" VARCHAR NULL,
"after" VARCHAR NULL,
"type" VARCHAR NOT NULL,
event_timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT route_network_edit_operation_pkey PRIMARY KEY (seq_no)
);

-- Trigger create route node
CREATE OR REPLACE FUNCTION route_network.route_node_create()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
insert into route_network.route_network_edit_operation (event_id, before, after, type)
values (
uuid_generate_v4(),
null,
to_json(NEW),
'RouteNode'
);
RETURN NEW;
END $function$
;

-- This is a new trigger, so we have to set it.
CREATE TRIGGER create_route_node BEFORE INSERT ON route_network.route_node
FOR EACH ROW EXECUTE PROCEDURE route_network.route_node_create();

-- Trigger update route node
CREATE OR REPLACE FUNCTION route_network.route_node_update()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF NEW.delete_me = true
THEN
DELETE FROM route_network.route_node where mrid = OLD.mrid;
RETURN null;
END IF;

insert into route_network.route_network_edit_operation (event_id, before, after, type)
values (
uuid_generate_v4(),
to_json(OLD),
to_json(NEW),
'RouteNode'
);

RETURN NEW;
END $function$
;

-- Trigger create route segment
CREATE OR REPLACE FUNCTION route_network.route_segment_create()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
insert into route_network.route_network_edit_operation (event_id, before, after, type)
values (
uuid_generate_v4(),
null,
to_json(NEW),
'RouteSegment'
);
RETURN NEW;
END $function$
;

-- This is a new trigger, so we have to set it.
CREATE TRIGGER create_route_segment BEFORE INSERT ON route_network.route_segment
FOR EACH ROW EXECUTE PROCEDURE route_network.route_segment_create();

-- Trigger update route segment
CREATE OR REPLACE FUNCTION route_network.route_segment_update()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF NEW.delete_me = true
THEN
DELETE FROM route_network.route_segment where mrid = OLD.mrid;
RETURN null;
END IF;

insert into route_network.route_network_edit_operation (event_id, before, after, type)
values (
uuid_generate_v4(),
to_json(OLD),
to_json(NEW),
'RouteSegment'
);

RETURN NEW;
END $function$
;
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class GeoDatabaseUpdatedHandler : IRequestHandler<GeoDatabaseUpdated, Uni
private readonly IRouteSegmentInfoCommandFactory _routeSegmentInfoCommandFactory;
private readonly IValidationService _validationService;
private readonly IWorkTaskService _workTaskService;
private readonly IEventIdStore _eventIdStore;

public GeoDatabaseUpdatedHandler(
ILogger<GeoDatabaseUpdatedHandler> logger,
Expand All @@ -59,7 +60,8 @@ public GeoDatabaseUpdatedHandler(
IRouteNodeInfoCommandFactory routeNodeInfoCommandFactory,
IRouteSegmentInfoCommandFactory routeSegmentInfoCommandFactory,
IValidationService validationService,
IWorkTaskService workTaskService)
IWorkTaskService workTaskService,
IEventIdStore eventIdStore)
{
_logger = logger;
_mediator = mediator;
Expand All @@ -75,10 +77,27 @@ public GeoDatabaseUpdatedHandler(
_routeSegmentInfoCommandFactory = routeSegmentInfoCommandFactory;
_validationService = validationService;
_workTaskService = workTaskService;
_eventIdStore = eventIdStore;
}

public async Task<Unit> Handle(GeoDatabaseUpdated request, CancellationToken token)
{

var eventId = request.UpdateMessage switch
{
RouteNodeMessage msg => msg.EventId,
RouteSegmentMessage msg => msg.EventId,
InvalidMessage msg => msg.EventId,
_ => throw new ArgumentException(
"Could not handle type of '{typeof(request.UpdateMessage)}'.")
};

if (_eventIdStore.GetEventIds().Contains(eventId))
{
_logger.LogWarning("{EventId} has already been processed.", eventId);
return await Task.FromResult(new Unit());
}

try
{
_eventStore.Clear();
Expand All @@ -98,8 +117,9 @@ public async Task<Unit> Handle(GeoDatabaseUpdated request, CancellationToken tok
await _geoDatabase.BeginTransaction();

// We only do this in very special cases where we cannot rollback
await MarkToBeDeleted((request.UpdateMessage as InvalidMessage).Message,
"Message is invalid and we cannot rollback so we mark it to be deleted.");
await MarkToBeDeleted(
(request.UpdateMessage as InvalidMessage).Message,
"Message is invalid and we cannot rollback so we mark it to be deleted.");
await _geoDatabase.Commit();

// We send an updated event out, to notify that something has been rolled back to refresh GIS.
Expand Down Expand Up @@ -135,11 +155,16 @@ await MarkToBeDeleted((request.UpdateMessage as InvalidMessage).Message,
// We update the work task ids on the newly digitized network elements.
await UpdateWorkTaskIdOnNewlyDigitized(workTaskMrId);

var editOperationOccuredEvent = CreateEditOperationOccuredEvent(workTaskMrId, username);
var editOperationOccuredEvent = CreateEditOperationOccuredEvent(
workTaskMrId,
username,
eventId
);

if (IsOperationEditEventValid(editOperationOccuredEvent))
{
await _producer.Produce(GLOBAL_STREAM_ID, editOperationOccuredEvent);
_eventIdStore.AppendEventId(eventId);
await _geoDatabase.Commit();
}
else
Expand Down Expand Up @@ -539,11 +564,14 @@ private async Task UpdateWorkTaskIdOnNewlyDigitized(Guid workTaskMrId)
}
}

private RouteNetworkEditOperationOccuredEvent CreateEditOperationOccuredEvent(Guid workTaskMrid, string username)
private RouteNetworkEditOperationOccuredEvent CreateEditOperationOccuredEvent(
Guid workTaskMrid,
string username,
Guid eventId)
{
return new RouteNetworkEditOperationOccuredEvent(
nameof(RouteNetworkEditOperationOccuredEvent),
Guid.NewGuid(),
eventId,
DateTime.UtcNow,
workTaskMrid,
username,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
using System;

namespace OpenFTTH.GDBIntegrator.Integrator.ConsumerMessages
{
public class InvalidMessage
public record InvalidMessage
{
public object Message { get; }
public bool Delete { get; }
public Guid EventId { get; init; }
public object Message { get; init; }
public bool Delete { get; init; }

public InvalidMessage(object message, Guid eventId)
{
Message = message;
EventId = eventId;
}

public InvalidMessage(object message, bool delete = false)
public InvalidMessage(object message, Guid eventId, bool delete)
{
Message = message;
EventId = eventId;
Delete = delete;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
using OpenFTTH.GDBIntegrator.RouteNetwork;
using System;

namespace OpenFTTH.GDBIntegrator.Integrator.ConsumerMessages
{
public class RouteNodeMessage
public record RouteNodeMessage
{
public RouteNode Before { get; }
public RouteNode After { get; }
public Guid EventId { get; init; }
public RouteNode Before { get; init; }
public RouteNode After { get; init; }

public RouteNodeMessage() {}

public RouteNodeMessage(RouteNode before, RouteNode after)
public RouteNodeMessage(Guid eventId, RouteNode before, RouteNode after)
{
if (eventId == Guid.Empty)
{
throw new ArgumentException(
"Cannot be default guid.",
nameof(eventId));
}

EventId = eventId;
Before = before;
After = after;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
using OpenFTTH.GDBIntegrator.RouteNetwork;
using System;

namespace OpenFTTH.GDBIntegrator.Integrator.ConsumerMessages
{
public class RouteSegmentMessage
public record RouteSegmentMessage
{
public RouteSegment Before { get; }
public RouteSegment After { get; }
public Guid EventId { get; init; }
public RouteSegment Before { get; init; }
public RouteSegment After { get; init; }

public RouteSegmentMessage() {}

public RouteSegmentMessage(RouteSegment before, RouteSegment after)
public RouteSegmentMessage(Guid eventId, RouteSegment before, RouteSegment after)
{
if (eventId == Guid.Empty)
{
throw new ArgumentException(
"Cannot be default guid.",
nameof(eventId));
}

EventId = eventId;
Before = before;
After = after;
}
Expand Down
63 changes: 63 additions & 0 deletions src/OpenFTTH.GDBIntegrator.Integrator/Store/EventIdStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Npgsql;
using OpenFTTH.GDBIntegrator.Config;

namespace OpenFTTH.GDBIntegrator.Integrator.Store
{
public class EventIdStore : IEventIdStore
{
private HashSet<Guid> _eventIds;
private readonly EventStoreSetting _eventStoreSetting;

public EventIdStore(IOptions<EventStoreSetting> eventStoreSetting)
{
_eventStoreSetting = eventStoreSetting.Value;
}

public void AppendEventId(Guid eventId)
{
_eventIds.Add(eventId);
}

public HashSet<Guid> GetEventIds()
{
return _eventIds ?? throw new InvalidOperationException("EventIds has not been loaded yet.");
}

public async Task<long> LoadEventIds(CancellationToken token = default)
{
_eventIds = new();

await foreach (var eId in GetAllEventIds(token).ConfigureAwait(false))
{
_eventIds.Add(eId);
}

return _eventIds.Count;
}

private async IAsyncEnumerable<Guid> GetAllEventIds(
[EnumeratorCancellation] CancellationToken token = default)
{
const string SQL = @"SELECT data->'EventId' AS event_id
FROM events.mt_events
WHERE type = 'route_network_edit_operation_occured_event'";

using var conn = new NpgsqlConnection(_eventStoreSetting.ConnectionString);
using var cmd = new NpgsqlCommand(SQL, conn);

await conn.OpenAsync(token).ConfigureAwait(false);
var reader = await cmd.ExecuteReaderAsync(token).ConfigureAwait(false);

while (await reader.ReadAsync(token).ConfigureAwait(false))
{
yield return reader.GetGuid(reader.GetOrdinal("event_id"));
}
}
}
}
Loading

0 comments on commit 060999c

Please sign in to comment.