Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.1/bugfix/87 Memory is not being released #88

Merged
merged 1 commit into from
Mar 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Storage.App/Storage.App.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
<UserSecretsId>7e4fa6b7-add2-44ec-a908-b95747757b49</UserSecretsId>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<RootNamespace>Buildersoft.Andy.X.Storage.App</RootNamespace>
<Version>2.0.0</Version>
<Version>2.1.0</Version>
<Company>Buildersoft</Company>
<Product>Buildersoft Andy</Product>
<Authors>Buildersoft</Authors>
<Description>Buildersoft Andy X is a distributed messaging system. This system will empower developers to move into Event Driven Systems. Andy X is a multi-tenant system.</Description>
<Copyright>Copyright © Buildersoft 2022</Copyright>
<DockerfileContext>..\..</DockerfileContext>
<SignAssembly>True</SignAssembly>

</PropertyGroup>

Expand Down
1 change: 1 addition & 0 deletions src/Storage.App/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

"Partition": {
"SizeInMemory": 3000,
"BatchSize": 3000,
"FlushInterval": 5000,
"PointerAcknowledgedMessageArchivationInterval": 3600000
//"PointerAcknowledgedMessageArchivationInterval": 180000
Expand Down
9 changes: 5 additions & 4 deletions src/Storage.Core/Service/System/SystemService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class SystemService
private readonly TenantIOService _tenantIOService;
private readonly ProducerIOService _producerIOService;
private readonly ConsumerIOService _consumerIOService;
private readonly MessageIOService _messageIOService2;
private readonly MessageIOService _messageIOService;
private readonly List<XNodeConfiguration> nodes;
private readonly DataStorageConfiguration dataStorage;
private readonly AgentConfiguration agent;
Expand All @@ -38,7 +38,7 @@ public SystemService(
TenantIOService tenantIOService,
ProducerIOService producerIOService,
ConsumerIOService consumerIOService,
MessageIOService messageIOService2)
MessageIOService messageIOService)
{
_logger = logger;
_serviceProvider = serviceProvider;
Expand All @@ -48,7 +48,7 @@ public SystemService(
_tenantIOService = tenantIOService;
_producerIOService = producerIOService;
_consumerIOService = consumerIOService;
_messageIOService2 = messageIOService2;
_messageIOService = messageIOService;
nodes = _serviceProvider.GetService(typeof(List<XNodeConfiguration>)) as List<XNodeConfiguration>;
dataStorage = _serviceProvider.GetService(typeof(DataStorageConfiguration)) as DataStorageConfiguration;
agent = _serviceProvider.GetService(typeof(AgentConfiguration)) as AgentConfiguration;
Expand Down Expand Up @@ -136,12 +136,13 @@ private void InitializeServices()
agentId,
xnode,
dataStorage,
partition,
agent,
_xNodeConnectionRepository,
_tenantIOService,
_producerIOService,
_consumerIOService,
_messageIOService2);
_messageIOService);
}
}
else
Expand Down
60 changes: 43 additions & 17 deletions src/Storage.Core/Service/XNodes/Handlers/ConsumerEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Buildersoft.Andy.X.Storage.Model.App.Consumers;
using Buildersoft.Andy.X.Storage.Model.App.Messages;
using Buildersoft.Andy.X.Storage.Model.Commands.Consumer;
using Buildersoft.Andy.X.Storage.Model.Configuration;
using Buildersoft.Andy.X.Storage.Model.Contexts;
using Buildersoft.Andy.X.Storage.Model.Events.Consumers;
using Buildersoft.Andy.X.Storage.Model.Files;
Expand All @@ -13,6 +14,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading;
Expand All @@ -26,18 +28,22 @@ public class ConsumerEventHandler
private readonly XNodeEventService _xNodeEventService;
private readonly ConsumerIOService _consumerIOService;
private readonly MessageIOService _messageIOService;
private readonly PartitionConfiguration _partitionConfiguration;
private readonly ConcurrentDictionary<string, Task> _unacknowledgedMessageProcesses;

public ConsumerEventHandler(
ILogger<SystemService> logger,
XNodeEventService xNodeEventService,
ConsumerIOService consumerIOService,
MessageIOService messageIOService)
MessageIOService messageIOService,
PartitionConfiguration partitionConfiguration)
{
_logger = logger;
_xNodeEventService = xNodeEventService;
_consumerIOService = consumerIOService;
_messageIOService = messageIOService;
_partitionConfiguration = partitionConfiguration;

_unacknowledgedMessageProcesses = new ConcurrentDictionary<string, Task>();

InitializeEvents();
Expand Down Expand Up @@ -106,16 +112,15 @@ await NotifyNodesForConsumerConnection(new NotifyConsumerConnection()
{
ConnectionType = ConnectionType.Disconnected,

Id = obj.Id,
SubscriptionType = obj.SubscriptionType,
Tenant = obj.Tenant,
Product = obj.Product,
Topic = obj.Topic,
Component = obj.Component,
Id = obj.Id,
ConsumerName = obj.ConsumerName,
SubscriptionType = obj.SubscriptionType,
InitialPosition = InitialPosition.Latest,
Product = obj.Product,
Tenant = obj.Tenant,
Topic = obj.Topic,
});

}

private void XNodeEventService_MessageAcknowledged(Model.Events.Messages.MessageAcknowledgedArgs obj)
Expand Down Expand Up @@ -202,10 +207,6 @@ private void ReleaseUnacknoledgedMessageTasks(string consumerKey)

_unacknowledgedMessageProcesses[consumerKey].Dispose();
_unacknowledgedMessageProcesses.TryRemove(consumerKey, out _);

// Cleanup memory.
GC.Collect();
GC.WaitForPendingFinalizers();
}

private void CheckPointerDbConnection(ConsumerPointerContext tenantContext, string consumerKey)
Expand Down Expand Up @@ -259,9 +260,10 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List<Model.Entitie
if (isNewConsumer == true)
CachePointers(obj, rows, partitionDate);

var consumerMessages = new List<ConsumerMessage>();
foreach (var row in rows)
{
var consumerMessage = new ConsumerMessage()
consumerMessages.Add(new ConsumerMessage()
{
Consumer = obj.ConsumerName,
Message = new Message()
Expand All @@ -277,12 +279,35 @@ private async Task AnalyseFileRows(ConsumerConnectedArgs obj, List<Model.Entitie
MessageRaw = row.Payload.JsonToObject<object>(),
Headers = row.Headers.JsonToObject<Dictionary<string, object>>()
}
};
});
await SendToNodes(consumerMessages);
}
await SendToNodes(consumerMessages, true);
}

foreach (var xNode in _xNodeEventService.GetXNodeConnectionRepository().GetAllServices())
private async Task SendToNodes(List<ConsumerMessage> consumerMessages, bool sendTheRest = false)
{
if (sendTheRest == false)
{
if (consumerMessages.Count == _partitionConfiguration.BatchSize)
{
foreach (var xNode in _xNodeEventService.GetXNodeConnectionRepository().GetAllServices())
{
//Transmit messages to the other nodes.
await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("TransmitMessagesToConsumer", consumerMessages);
}
consumerMessages.Clear();
}
}
else
{
if (consumerMessages.Count > 0)
{
//Transmit the message to the other nodes.
await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("TransmitMessagesToConsumer", consumerMessage);
foreach (var xNode in _xNodeEventService.GetXNodeConnectionRepository().GetAllServices())
{
//Transmit messages to the other nodes.
await xNode.Value.Values.ToList()[0].GetHubConnection().SendAsync("TransmitMessagesToConsumer", consumerMessages);
}
}
}
}
Expand Down Expand Up @@ -318,7 +343,8 @@ private List<MessageFile> GetPartitionFiles(string tenant, string product, strin
string fileName = Path.GetFileNameWithoutExtension(partition);
string[] partitionNameSplited = fileName.Split("_");

var partitionDate = DateTime.Parse($"{partitionNameSplited[2]}-{partitionNameSplited[3]}-{partitionNameSplited[4]}");
var partitionDate = DateTime.ParseExact($"{partitionNameSplited[2]}-{partitionNameSplited[3]}-{partitionNameSplited[4]} {partitionNameSplited[5]}:00:00", "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture);


messages.Add(new MessageFile() { Path = partition, PartitionDate = partitionDate });
});
Expand Down
6 changes: 4 additions & 2 deletions src/Storage.Core/Service/XNodes/XNodeEventService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ public class XNodeEventService

private string agentId;
private readonly XNodeConfiguration nodeConfig;
private readonly PartitionConfiguration partitionConfiguration;

public XNodeEventService(ILogger<SystemService> logger,
string agentId,
XNodeConfiguration nodeConfig,
DataStorageConfiguration dataStorageConfig,
PartitionConfiguration partitionConfiguration,
AgentConfiguration agentConfiguration,
IXNodeConnectionRepository xNodeConnectionRepository,
TenantIOService tenantIOService,
Expand All @@ -90,7 +92,7 @@ public XNodeEventService(ILogger<SystemService> logger,
this.messageIOService = messageIOService;
this.agentId = agentId;
this.nodeConfig = nodeConfig;

this.partitionConfiguration = partitionConfiguration;
var provider = new XNodeConnectionProvider(nodeConfig, dataStorageConfig, agentConfiguration, agentId);
_connection = provider.GetHubConnection();

Expand Down Expand Up @@ -162,7 +164,7 @@ private void InitializeEventHandlers()
agentEventHandler = new AgentEventHandler(logger, this, tenantIOService);
tenantEventHandler = new TenantEventHandler(logger, this, tenantIOService);
producerEventHandler = new ProducerEventHandler(logger, this, producerIOService);
consumerEventHandler = new ConsumerEventHandler(logger, this, consumerIOService, messageIOService);
consumerEventHandler = new ConsumerEventHandler(logger, this, consumerIOService, messageIOService, partitionConfiguration);
messageEventHandler = new MessageEventHandler(logger, this, messageIOService);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Storage.Core/Storage.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Version>2.0.0</Version>
<Version>2.1.0</Version>
<Company>Buildersoft</Company>
<Product>Buildersoft Andy</Product>
<Authors>Buildersoft</Authors>
Expand Down
35 changes: 28 additions & 7 deletions src/Storage.IO/Connectors/ConsumerConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ConsumerConnector
public Model.Threading.ThreadPool ThreadingPool { get; set; }
public ConcurrentQueue<Model.Entities.ConsumerMessage> MessagesBuffer { get; set; }

private bool isMemoryReleased;

public int Count { get; set; }
public ConcurrentDictionary<Guid, Model.Entities.ConsumerMessage> BatchAcknowledgedConsumerMessagesToMerge { get; set; }
Expand Down Expand Up @@ -65,14 +66,14 @@ public ConsumerConnector(ILogger<ConsumerIOService> logger,
BatchUnacknowledgedConsumerMessagesToMerge = new ConcurrentDictionary<Guid, Model.Entities.ConsumerMessage>();
ConsumerPointerContext = consumerPointer;
Count = 0;

isMemoryReleased = true;
try
{
consumerPointer.ChangeTracker.AutoDetectChangesEnabled = false;
consumerPointer.Database.EnsureCreated();

// database exists
// create new instance of Backend ConsumerArchiveBackgroundService
// Database exists
// Create new instance of Backend ConsumerArchiveBackgroundService
_consumerArchiveBackgroundService = new ConsumerArchiveBackgroundService(logger, tenant, product, component, topic, consumer, partitionConfiguration, consumerPointer);
_consumerArchiveBackgroundService.StartService();
}
Expand All @@ -95,9 +96,33 @@ private void FlushPointerTimer_Elapsed(object sender, ElapsedEventArgs e)
AutoFlushAcknowledgedBatchPointers();
AutoFlushUnacknowledgedBatchPointers();

ReleaseMemory();

_flushPointerTimer.Start();
}

private void ReleaseMemory()
{
if (isMemoryReleased == false)
{
if (MessagesBuffer.Count == 0 && BatchAcknowledgedConsumerMessagesToMerge.Count == 0 && BatchUnacknowledgedConsumerMessagesToMerge.Count == 0)
{
// ConsumerPointerContext.Dispose();
GC.Collect();
GC.SuppressFinalize(this);
GC.SuppressFinalize(ConsumerPointerContext);
GC.SuppressFinalize(MessagesBuffer);
GC.SuppressFinalize(BatchAcknowledgedConsumerMessagesToMerge);
GC.SuppressFinalize(BatchUnacknowledgedConsumerMessagesToMerge);

//GC.Collect();
//GC.WaitForPendingFinalizers();

isMemoryReleased = true;
}
}
}

private void AutoFlushAcknowledgedBatchPointers()
{
lock (BatchAcknowledgedConsumerMessagesToMerge)
Expand Down Expand Up @@ -187,10 +212,6 @@ public void StopAutoFlushPointer()
_flushPointerTimer.Stop();

_consumerArchiveBackgroundService.StopService();

// Cleanup memory.
GC.Collect();
GC.WaitForPendingFinalizers();
}
}
}
Loading