Skip to content

Commit

Permalink
fixing up new base class
Browse files Browse the repository at this point in the history
Signed-off-by: Neil South <neil.south@answerdigital.com>
  • Loading branch information
neildsouth committed Nov 9, 2023
1 parent baffd3c commit 62a7c55
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 55 deletions.
35 changes: 17 additions & 18 deletions src/InformaticsGateway/Services/Export/ExportServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public abstract class ExportServiceBase : IHostedService, IMonaiService, IDispos
private readonly ILogger _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly InformaticsGatewayConfiguration _configuration;
protected readonly IMessageBrokerSubscriberService _messageSubscriber;
protected readonly IMessageBrokerPublisherService _messagePublisher;
protected readonly IMessageBrokerSubscriberService MessageSubscriber;
protected readonly IMessageBrokerPublisherService MessagePublisher;
private readonly IServiceScope _scope;
protected readonly Dictionary<string, ExportRequestEventDetails> _exportRequests;
protected readonly Dictionary<string, ExportRequestEventDetails> ExportRequests;
private readonly IStorageInfoProvider _storageInfoProvider;
private bool _disposedValue;
private ulong _activeWorkers = 0;
Expand All @@ -89,7 +89,7 @@ protected virtual async Task ProcessMessage(MessageReceivedEventArgs eventArgs)
lock (SyncRoot)
{
var exportRequest = eventArgs.Message.ConvertTo<ExportRequestEvent>();
if (_exportRequests.ContainsKey(exportRequest.ExportTaskId))
if (ExportRequests.ContainsKey(exportRequest.ExportTaskId))
{
_logger.ExportRequestAlreadyQueued(exportRequest.CorrelationId, exportRequest.ExportTaskId);
return;
Expand All @@ -100,11 +100,11 @@ protected virtual async Task ProcessMessage(MessageReceivedEventArgs eventArgs)

var exportRequestWithDetails = new ExportRequestEventDetails(exportRequest);

_exportRequests.Add(exportRequest.ExportTaskId, exportRequestWithDetails);
ExportRequests.Add(exportRequest.ExportTaskId, exportRequestWithDetails);
if (!exportFlow.Post(exportRequestWithDetails))
{
_logger.ErrorPostingExportJobToQueue(exportRequest.CorrelationId, exportRequest.ExportTaskId);
_messageSubscriber.Reject(eventArgs.Message);
MessageSubscriber.Reject(eventArgs.Message);
}
else
{
Expand Down Expand Up @@ -135,13 +135,13 @@ protected ExportServiceBase(

_configuration = configuration.Value;

_messageSubscriber = _scope.ServiceProvider.GetRequiredService<IMessageBrokerSubscriberService>();
_messagePublisher = _scope.ServiceProvider.GetRequiredService<IMessageBrokerPublisherService>();
MessageSubscriber = _scope.ServiceProvider.GetRequiredService<IMessageBrokerSubscriberService>();
MessagePublisher = _scope.ServiceProvider.GetRequiredService<IMessageBrokerPublisherService>();
_storageInfoProvider = _scope.ServiceProvider.GetRequiredService<IStorageInfoProvider>();

_exportRequests = new Dictionary<string, ExportRequestEventDetails>();
ExportRequests = new Dictionary<string, ExportRequestEventDetails>();

_messageSubscriber.OnConnectionError += (sender, args) =>
MessageSubscriber.OnConnectionError += (sender, args) =>
{
_logger.MessagingServiceErrorRecover(args.ErrorMessage);
SetupPolling();
Expand All @@ -167,7 +167,7 @@ public Task StopAsync(CancellationToken cancellationToken)

private void SetupPolling()
{
_messageSubscriber.SubscribeAsync(RoutingKey, RoutingKey, OnMessageReceivedCallback, prefetchCount: Concurrency);
MessageSubscriber.SubscribeAsync(RoutingKey, RoutingKey, OnMessageReceivedCallback, prefetchCount: Concurrency);
_logger.ExportEventSubscription(ServiceName, RoutingKey);
}

Expand All @@ -180,15 +180,15 @@ protected async Task OnMessageReceivedCallback(MessageReceivedEventArgs eventArg
if (!_storageInfoProvider.HasSpaceAvailableForExport)
{
_logger.ExportServiceStoppedDueToLowStorageSpace(_storageInfoProvider.AvailableFreeSpace);
_messageSubscriber.Reject(eventArgs.Message);
MessageSubscriber.Reject(eventArgs.Message);
return;
}

if (Interlocked.Read(ref _activeWorkers) >= Concurrency)
{
_logger.ExceededMaxmimumNumberOfWorkers(ServiceName, _activeWorkers);
await Task.Delay(200).ConfigureAwait(false); // small delay to stop instantly dead lettering the next message.
_messageSubscriber.Reject(eventArgs.Message);
MessageSubscriber.Reject(eventArgs.Message);
return;
}

Expand Down Expand Up @@ -393,13 +393,12 @@ private static void HandleStatus(ExportRequestDataMessage exportRequestData, Exp
{
exportRequest.AddErrorMessages(exportRequestData.Messages);
}

}
}

private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)
{
var exportRequest = _exportRequests[exportRequestData.ExportTaskId];
var exportRequest = ExportRequests[exportRequestData.ExportTaskId];
HandleStatus(exportRequestData, exportRequest);
if (!exportRequest.IsCompleted)
{
Expand All @@ -417,7 +416,7 @@ private void ReportingActionBlock(ExportRequestDataMessage exportRequestData)

lock (SyncRoot)
{
_exportRequests.Remove(exportRequestData.ExportTaskId);
ExportRequests.Remove(exportRequestData.ExportTaskId);
}

if (ReportActionCompleted != null)
Expand All @@ -444,7 +443,7 @@ private void FinaliseMessage(JsonMessage<ExportCompleteEvent> jsonMessage)
.Execute(() =>
{
_logger.PublishingExportCompleteEvent();
_messagePublisher.Publish(_configuration.Messaging.Topics.ExportComplete, jsonMessage.ToMessage());
MessagePublisher.Publish(_configuration.Messaging.Topics.ExportComplete, jsonMessage.ToMessage());
});

Policy
Expand All @@ -458,7 +457,7 @@ private void FinaliseMessage(JsonMessage<ExportCompleteEvent> jsonMessage)
.Execute(() =>
{
_logger.SendingAcknowledgement();
_messageSubscriber.Acknowledge(jsonMessage);
MessageSubscriber.Acknowledge(jsonMessage);
});
}

Expand Down
12 changes: 7 additions & 5 deletions src/InformaticsGateway/Services/Export/ExtAppScuExportService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)
lock (SyncRoot)
{
var externalAppRequest = eventArgs.Message.ConvertTo<ExternalAppRequestEvent>();
if (_exportRequests.ContainsKey(externalAppRequest.ExportTaskId))
if (ExportRequests.ContainsKey(externalAppRequest.ExportTaskId))
{
_logger.ExportRequestAlreadyQueued(externalAppRequest.CorrelationId, externalAppRequest.ExportTaskId);
return;
Expand All @@ -79,11 +79,11 @@ protected override async Task ProcessMessage(MessageReceivedEventArgs eventArgs)

var exportRequestWithDetails = new ExportRequestEventDetails(externalAppRequest);

_exportRequests.Add(externalAppRequest.ExportTaskId, exportRequestWithDetails);
ExportRequests.Add(externalAppRequest.ExportTaskId, exportRequestWithDetails);
if (!exportFlow.Post(exportRequestWithDetails))
{
_logger.ErrorPostingExportJobToQueue(externalAppRequest.CorrelationId, externalAppRequest.ExportTaskId);
_messageSubscriber.Reject(eventArgs.Message);
MessageSubscriber.Reject(eventArgs.Message);
}
else
{
Expand All @@ -109,14 +109,16 @@ protected override async Task ExportCompleteCallback(ExportRequestDataMessage ex
}
catch (Exception ex)
{
HandleCStoreException(ex, exportRequestData);
var errorMessage = $"Error reading DICOM file: {ex.Message}";
_logger.ExportException(errorMessage, ex);
exportRequestData.SetFailed(FileExportStatus.UnsupportedDataType, errorMessage);
}

}

private async Task SaveInRepo(ExportRequestDataMessage externalAppRequest, string studyinstanceId)
{
var existing = _repository.GetAsync(studyinstanceId, new CancellationToken());
var existing = await _repository.GetAsync(studyinstanceId, new CancellationToken()).ConfigureAwait(false);
if (existing is null)
{
await _repository.AddAsync(new ExternalAppDetails
Expand Down
26 changes: 13 additions & 13 deletions src/InformaticsGateway/Services/Scp/ScpServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ internal abstract class ScpServiceBase : IHostedService, IDisposable, IMonaiServ
private readonly IApplicationEntityManager _associationDataProvider;
private readonly ILogger<ScpServiceBase> _logger;
private readonly ILogger<ScpServiceInternalBase> _scpServiceInternalLogger;
protected readonly IHostApplicationLifetime _appLifetime;
protected readonly IHostApplicationLifetime AppLifetime;
private readonly IOptions<InformaticsGatewayConfiguration> _configuration;
protected FoDicomNetwork.IDicomServer? _server;
protected FoDicomNetwork.IDicomServer? Server;
public ServiceStatus Status { get; set; } = ServiceStatus.Unknown;
public virtual string ServiceName => "DICOM SCP Service";

Expand All @@ -67,15 +67,15 @@ public ScpServiceBase(IServiceScopeFactory serviceScopeFactory,
_logger = logginFactory!.CreateLogger<ScpServiceBase>();
_scpServiceInternalLogger = logginFactory!.CreateLogger<ScpServiceInternal>();
_associationDataProvider = applicationEntityManager;
_appLifetime = appLifetime;
AppLifetime = appLifetime;
_configuration = configuration;
_ = DicomDictionary.Default;
}

public void Dispose()
{
_serviceScope.Dispose();
_server?.Dispose();
Server?.Dispose();
GC.SuppressFinalize(this);
}

Expand All @@ -90,8 +90,8 @@ public Task StartAsync(CancellationToken cancellationToken)
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.ServiceStopping(ServiceName);
_server?.Stop();
_server?.Dispose();
Server?.Stop();
Server?.Dispose();
Status = ServiceStatus.Stopped;
return Task.CompletedTask;
}
Expand All @@ -103,20 +103,20 @@ public void ServiceStartBase(int ScpPort)
try
{
_logger.ServiceStarting(ServiceName);
_server = DicomServerFactory.Create<ScpServiceInternalBase>(
Server = DicomServerFactory.Create<ScpServiceInternal>(
NetworkManager.IPv4Any,
ScpPort,
logger: _scpServiceInternalLogger,
userState: _associationDataProvider);

_server.Options.IgnoreUnsupportedTransferSyntaxChange = true;
_server.Options.LogDimseDatasets = _configuration.Value.Dicom.Scp.LogDimseDatasets;
_server.Options.MaxClientsAllowed = _configuration.Value.Dicom.Scp.MaximumNumberOfAssociations;
Server.Options.IgnoreUnsupportedTransferSyntaxChange = true;
Server.Options.LogDimseDatasets = _configuration.Value.Dicom.Scp.LogDimseDatasets;
Server.Options.MaxClientsAllowed = _configuration.Value.Dicom.Scp.MaximumNumberOfAssociations;

if (_server.Exception != null)
if (Server.Exception != null)
{
_logger.ScpListenerInitializationFailure();
throw _server.Exception;
throw Server.Exception;
}

Status = ServiceStatus.Running;
Expand All @@ -126,7 +126,7 @@ public void ServiceStartBase(int ScpPort)
{
Status = ServiceStatus.Cancelled;
_logger.ServiceFailedToStart(ServiceName, ex);
_appLifetime.StopApplication();
AppLifetime.StopApplication();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/InformaticsGateway/Services/Scp/ScpServiceInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public override async Task<DicomCStoreResponse> OnCStoreRequestAsync(DicomCStore
try
{
_logger?.TransferSyntaxUsed(request.TransferSyntax);
var payloadId = await _associationDataProvider!.HandleCStoreRequest(request, Association.CalledAE, Association.CallingAE, _associationId).ConfigureAwait(false);
var payloadId = await AssociationDataProvider!.HandleCStoreRequest(request, Association.CalledAE, Association.CallingAE, AssociationId).ConfigureAwait(false);
_associationInfo.FileReceived(payloadId);
return new DicomCStoreResponse(request, DicomStatus.Success);
}
Expand Down
30 changes: 15 additions & 15 deletions src/InformaticsGateway/Services/Scp/ScpServiceInternalBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ internal abstract class ScpServiceInternalBase :
{
private readonly DicomAssociationInfo _associationInfo;
private readonly ILogger _logger;
protected IApplicationEntityManager? _associationDataProvider;
protected IApplicationEntityManager? AssociationDataProvider;
private IDisposable? _loggerScope;
protected Guid _associationId;
protected Guid AssociationId;
private DateTimeOffset? _associationReceived;

public ScpServiceInternalBase(INetworkStream stream, Encoding fallbackEncoding, ILogger logger, DicomServiceDependencies dicomServiceDependencies)
Expand Down Expand Up @@ -72,14 +72,14 @@ public void OnConnectionClosed(Exception exception)

try
{
var repo = _associationDataProvider!.GetService<IDicomAssociationInfoRepository>();
var repo = AssociationDataProvider!.GetService<IDicomAssociationInfoRepository>();
_associationInfo.Disconnect();
repo?.AddAsync(_associationInfo).Wait();
_logger?.ConnectionClosed(_associationInfo.CorrelationId, _associationInfo.CallingAeTitle, _associationInfo.CalledAeTitle, _associationInfo.Duration.TotalSeconds);
}
catch (Exception ex)
{
_logger?.ErrorSavingDicomAssociationInfo(_associationId, ex);
_logger?.ErrorSavingDicomAssociationInfo(AssociationId, ex);
}
}

Expand Down Expand Up @@ -118,16 +118,16 @@ public async Task OnReceiveAssociationRequestAsync(DicomAssociation association)
{
Interlocked.Increment(ref ScpServiceBase.ActiveConnections);
_associationReceived = DateTimeOffset.UtcNow;
_associationDataProvider = (UserState as IApplicationEntityManager)!;
AssociationDataProvider = (UserState as IApplicationEntityManager)!;

if (_associationDataProvider is null)
if (AssociationDataProvider is null)
{
_associationInfo.Errors = $"Internal error: association data provider not found.";
throw new ServiceException($"{nameof(UserState)} must be an instance of IAssociationDataProvider");
}

_associationId = Guid.NewGuid();
var associationIdStr = $"#{_associationId} {association.RemoteHost}:{association.RemotePort}";
AssociationId = Guid.NewGuid();
var associationIdStr = $"#{AssociationId} {association.RemoteHost}:{association.RemotePort}";

_loggerScope = _logger!.BeginScope(new LoggingDataDictionary<string, object> { { "Association", associationIdStr } });
_logger.CStoreAssociationReceived(association.RemoteHost, association.RemotePort);
Expand All @@ -136,7 +136,7 @@ public async Task OnReceiveAssociationRequestAsync(DicomAssociation association)
_associationInfo.CalledAeTitle = association.CalledAE;
_associationInfo.RemoteHost = association.RemoteHost;
_associationInfo.RemotePort = association.RemotePort;
_associationInfo.CorrelationId = _associationId.ToString();
_associationInfo.CorrelationId = AssociationId.ToString();

if (!await IsValidSourceAeAsync(association.CallingAE, association.RemoteHost).ConfigureAwait(false))
{
Expand All @@ -162,7 +162,7 @@ await SendAssociationRejectAsync(
{
if (pc.AbstractSyntax == DicomUID.Verification)
{
if (!_associationDataProvider.Configuration.Value.Dicom.Scp.EnableVerification)
if (!AssociationDataProvider.Configuration.Value.Dicom.Scp.EnableVerification)
{
_associationInfo.Errors = "Verification service disabled. Called AE: {association.CalledAE}. Calling AE: {association.CallingAE}. IP: {association.RemoteHost}.";
_logger?.VerificationServiceDisabled();
Expand All @@ -172,11 +172,11 @@ await SendAssociationRejectAsync(
DicomRejectReason.ApplicationContextNotSupported
).ConfigureAwait(false);
}
pc.AcceptTransferSyntaxes(_associationDataProvider.Configuration.Value.Dicom.Scp.VerificationServiceTransferSyntaxes.ToDicomTransferSyntaxArray());
pc.AcceptTransferSyntaxes(AssociationDataProvider.Configuration.Value.Dicom.Scp.VerificationServiceTransferSyntaxes.ToDicomTransferSyntaxArray());
}
else if (pc.AbstractSyntax.StorageCategory != DicomStorageCategory.None)
{
if (!_associationDataProvider.CanStore)
if (!AssociationDataProvider.CanStore)
{
_associationInfo.Errors = "Disk pressure. Called AE: {association.CalledAE}. Calling AE: {association.CallingAE}. IP: {association.RemoteHost}.";
await SendAssociationRejectAsync(
Expand All @@ -194,14 +194,14 @@ await SendAssociationRejectAsync(

private async Task<bool> IsValidCalledAeAsync(string calledAe)
{
return await _associationDataProvider!.IsAeTitleConfiguredAsync(calledAe).ConfigureAwait(false);
return await AssociationDataProvider!.IsAeTitleConfiguredAsync(calledAe).ConfigureAwait(false);
}

private async Task<bool> IsValidSourceAeAsync(string callingAe, string host)
{
if (!_associationDataProvider!.Configuration.Value.Dicom.Scp.RejectUnknownSources) return true;
if (!AssociationDataProvider!.Configuration.Value.Dicom.Scp.RejectUnknownSources) return true;

return await _associationDataProvider.IsValidSourceAsync(callingAe, host).ConfigureAwait(false);
return await AssociationDataProvider.IsValidSourceAsync(callingAe, host).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ public ExtAppScuServiceTest(DicomScpFixture dicomScp)
_outputDataPlugInEngine.Setup(p => p.ExecutePlugInsAsync(It.IsAny<ExportRequestDataMessage>()))
.Returns<ExportRequestDataMessage>((ExportRequestDataMessage message) => Task.FromResult(message));

//var seriesInstanceUid = DicomUIDGenerator.GenerateDerivedFromUUID().UID;
//_testDicom = InstanceGenerator.GenerateDicomFile(seriesInstanceUid: seriesInstanceUid);
//_dicomToolkit.Setup(d => d.Load(It.IsAny<byte[]>())).Returns(_testDicom);
_externalAppRepository.Setup(r => r.GetAsync(It.IsAny<string>(), It.IsAny<CancellationToken>())).Returns(Task.FromResult<ExternalAppDetails>(null));

var seriesInstanceUid = DicomUIDGenerator.GenerateDerivedFromUUID().UID;
var testDicom = InstanceGenerator.GenerateDicomFile(seriesInstanceUid: seriesInstanceUid);
_dicomToolkit.Setup(d => d.Load(It.IsAny<byte[]>())).Returns(testDicom);
}

[RetryFact(5, 250, DisplayName = "Constructor - throws on null params")]
Expand Down

0 comments on commit 62a7c55

Please sign in to comment.