Skip to content

Commit

Permalink
implements semaphore queue to handle events sequentually
Browse files Browse the repository at this point in the history
  • Loading branch information
runeanielsen committed Aug 20, 2020
1 parent c4609d4 commit 0482892
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 28 deletions.
6 changes: 2 additions & 4 deletions scripts/set-environment-minikube.fish
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ export POSTGIS__PASSWORD="postgres"

# Kafka
export KAFKA__SERVER=(minikube ip):(kubectl describe service openftth-kafka-cluster-kafka-external-bootstrap -n openftth | grep NodePort | grep -o '[0-9]\+')
export KAFKA__POSTGRESROUTESEGMENTTOPIC="route-network-all-tables"
export KAFKA__POSTGRESROUTENODETOPIC="route-network-all-tables"
export KAFKA__POSTGRESROUTESEGMENTCONSUMER="postgres-routenode-consumer"
export KAFKA__POSTGRESROUTENODECONSUMER="postgres-routenode-consumer"
export KAFKA__POSTGISROUTENETWORKTOPIC="postgis.route-network"
export KAFKA__POSTGISROUTENETWORKCONSUMER="postgis-route-network-consumer"
export KAFKA__POSITIONFILEPATH="/tmp/"
export KAFKA__EVENTROUTENETWORKTOPICNAME="event.route-network"

Expand Down
6 changes: 2 additions & 4 deletions src/OpenFTTH.GDBIntegrator.Config/KafkaSetting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ namespace OpenFTTH.GDBIntegrator.Config
public class KafkaSetting
{
public string Server { get; set; }
public string PostgresRouteSegmentTopic { get; set; }
public string PostgresRouteSegmentConsumer { get; set; }
public string PostgresRouteNodeTopic { get; set; }
public string PostgresRouteNodeConsumer { get; set; }
public string PostgisRouteNetworkTopic { get; set; }
public string PostgisRouteNetworkConsumer { get; set; }
public string PositionFilePath { get; set; }
public string EventRouteNetworkTopicName { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using OpenFTTH.GDBIntegrator.Integrator.Notifications;
using OpenFTTH.GDBIntegrator.Integrator.Factories;
using OpenFTTH.GDBIntegrator.Integrator.ConsumerMessages;
using OpenFTTH.GDBIntegrator.Integrator.Queue;
using MediatR;
using System;
using System.Threading;
Expand All @@ -17,7 +18,7 @@ public class GeoDatabaseUpdated : IRequest

public class GeoDatabaseUpdatedHandler : IRequestHandler<GeoDatabaseUpdated, Unit>
{
private static Semaphore _pool = new Semaphore(1, 1);
private static SemaphoreQueue _pool = new SemaphoreQueue(1, 1);
private readonly ILogger<RouteNodeAddedHandler> _logger;
private readonly IMediator _mediator;
private readonly IRouteNodeEventFactory _routeNodeEventFactory;
Expand All @@ -39,16 +40,12 @@ public async Task<Unit> Handle(GeoDatabaseUpdated request, CancellationToken tok
{
try
{
_pool.WaitOne();
await _pool.WaitAsync();

if (request.UpdateMessage is RouteNodeMessage)
{
await HandleRouteNode((RouteNodeMessage)request.UpdateMessage);
}
else if (request.UpdateMessage is RouteSegmentMessage)
{
await HandleRouteSegment((RouteSegmentMessage)request.UpdateMessage);
}

_pool.Release();
}
Expand Down
41 changes: 41 additions & 0 deletions src/OpenFTTH.GDBIntegrator.Integrator/Queue/SemaphoreQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace OpenFTTH.GDBIntegrator.Integrator.Queue
{
public class SemaphoreQueue
{
private SemaphoreSlim semaphore;
private ConcurrentQueue<TaskCompletionSource<bool>> queue =
new ConcurrentQueue<TaskCompletionSource<bool>>();
public SemaphoreQueue(int initialCount)
{
semaphore = new SemaphoreSlim(initialCount);
}
public SemaphoreQueue(int initialCount, int maxCount)
{
semaphore = new SemaphoreSlim(initialCount, maxCount);
}
public void Wait()
{
WaitAsync().Wait();
}
public Task WaitAsync()
{
var tcs = new TaskCompletionSource<bool>();
queue.Enqueue(tcs);
semaphore.WaitAsync().ContinueWith(t =>
{
TaskCompletionSource<bool> popped;
if (queue.TryDequeue(out popped))
popped.SetResult(true);
});
return tcs.Task;
}
public void Release()
{
semaphore.Release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ ILogger<PostgresRouteNetworkSubscriber> logger
public void Subscribe()
{
_consumer = Configure
.Consumer(_kafkaSetting.PostgresRouteSegmentConsumer, c => c.UseKafka(_kafkaSetting.Server))
.Consumer(_kafkaSetting.PostgisRouteNetworkConsumer, c => c.UseKafka(_kafkaSetting.Server))
.Serialization(s => s.RouteNetwork())
.Topics(t => t.Subscribe(_kafkaSetting.PostgresRouteNodeTopic))
.Topics(t => t.Subscribe(_kafkaSetting.PostgisRouteNetworkTopic))
.Positions(p => p.StoreInFileSystem(_kafkaSetting.PositionFilePath))
.Handle(async (messages, context, token) =>
{
Expand Down
19 changes: 7 additions & 12 deletions test/OpenFTTH.GDBIntegrator.Config.Tests/KafkaSettingsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,27 @@ public void KafkaSettings_ShouldInitalizeValues_OnConstruction()
{
var server = "192.13.2.1";
var positionFilePath = "/tmp/";
var postgresRouteSegmentTopic = "event.route-network_route_segment";
var postgresRouteSegmentConsumer = "postgis-consumer_route_segment";
var eventRouteNetwork = "event.route-network";
var postgresRouteNodeTopic = "event.route-network_route_node";
var postgresRouteNodeConsumer = "postgis-consumer_route_node";
var postgisRouteNetworkConsumer = "postgis-route-network-consumer";
var postgisRouteNetworkTopic = "postgis.route-network";


var kafkaSettings = new KafkaSetting
{
Server = server,
PositionFilePath = positionFilePath,
PostgresRouteSegmentTopic = postgresRouteSegmentTopic,
PostgresRouteSegmentConsumer = postgresRouteSegmentConsumer,
EventRouteNetworkTopicName = eventRouteNetwork,
PostgresRouteNodeTopic = postgresRouteNodeTopic,
PostgresRouteNodeConsumer = postgresRouteNodeConsumer
PostgisRouteNetworkConsumer = postgisRouteNetworkConsumer,
PostgisRouteNetworkTopic = postgisRouteNetworkTopic
};

using (new AssertionScope())
{
kafkaSettings.Server.Should().BeEquivalentTo(server);
kafkaSettings.PositionFilePath.Should().BeEquivalentTo(positionFilePath);
kafkaSettings.PostgresRouteSegmentTopic.Should().BeEquivalentTo(postgresRouteSegmentTopic);
kafkaSettings.PostgresRouteSegmentConsumer.Should().BeEquivalentTo(postgresRouteSegmentConsumer);
kafkaSettings.EventRouteNetworkTopicName.Should().BeEquivalentTo(eventRouteNetwork);
kafkaSettings.PostgresRouteNodeTopic.Should().BeEquivalentTo(postgresRouteNodeTopic);
kafkaSettings.PostgresRouteNodeConsumer.Should().BeEquivalentTo(postgresRouteNodeConsumer);
kafkaSettings.PostgisRouteNetworkConsumer.Should().BeEquivalentTo(postgisRouteNetworkConsumer);
kafkaSettings.PostgisRouteNetworkTopic.Should().BeEquivalentTo(postgisRouteNetworkTopic);
}
}
}
Expand Down

0 comments on commit 0482892

Please sign in to comment.