-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathFailedMessageFactory.cs
76 lines (64 loc) · 3 KB
/
FailedMessageFactory.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
namespace ServiceControl.Operations
{
using System;
using System.Collections.Generic;
using Contracts.Operations;
using Infrastructure;
using NServiceBus;
using NServiceBus.Faults;
using Recoverability;
using FailedMessage = MessageFailures.FailedMessage;
class FailedMessageFactory
{
public FailedMessageFactory(IFailedMessageEnricher[] failedEnrichers)
{
this.failedEnrichers = failedEnrichers;
}
public List<FailedMessage.FailureGroup> GetGroups(string messageType, FailureDetails failureDetails, FailedMessage.ProcessingAttempt processingAttempt)
{
var groups = new List<FailedMessage.FailureGroup>();
foreach (var enricher in failedEnrichers)
{
groups.AddRange(enricher.Enrich(messageType, failureDetails, processingAttempt));
}
return groups;
}
public FailureDetails ParseFailureDetails(IReadOnlyDictionary<string, string> headers)
{
var result = new FailureDetails();
DictionaryExtensions.CheckIfKeyExists("NServiceBus.TimeOfFailure", headers, s => result.TimeOfFailure = DateTimeOffsetHelper.ToDateTimeOffset(s).UtcDateTime);
result.Exception = GetException(headers);
if (!headers.ContainsKey(FaultsHeaderKeys.FailedQ))
{
throw new Exception($"Missing '{FaultsHeaderKeys.FailedQ}' header. Message is poison message or incorrectly send to (error) queue.");
}
result.AddressOfFailingEndpoint = headers[FaultsHeaderKeys.FailedQ];
return result;
}
static ExceptionDetails GetException(IReadOnlyDictionary<string, string> headers)
{
var exceptionDetails = new ExceptionDetails();
DictionaryExtensions.CheckIfKeyExists("NServiceBus.ExceptionInfo.ExceptionType", headers,
s => exceptionDetails.ExceptionType = s);
DictionaryExtensions.CheckIfKeyExists("NServiceBus.ExceptionInfo.Message", headers,
s => exceptionDetails.Message = s);
DictionaryExtensions.CheckIfKeyExists("NServiceBus.ExceptionInfo.Source", headers,
s => exceptionDetails.Source = s);
DictionaryExtensions.CheckIfKeyExists("NServiceBus.ExceptionInfo.StackTrace", headers,
s => exceptionDetails.StackTrace = s);
return exceptionDetails;
}
public FailedMessage.ProcessingAttempt CreateProcessingAttempt(Dictionary<string, string> headers, Dictionary<string, object> metadata, FailureDetails failureDetails)
{
return new FailedMessage.ProcessingAttempt
{
AttemptedAt = failureDetails.TimeOfFailure,
FailureDetails = failureDetails,
MessageMetadata = metadata,
MessageId = headers[Headers.MessageId],
Headers = headers
};
}
IFailedMessageEnricher[] failedEnrichers;
}
}