From ad48af0e0c5b0a72cf4b0f212ca66400a01236fe Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Mon, 19 Nov 2018 10:50:09 +0100 Subject: [PATCH 1/2] Add test to verify that body objects are not deleted --- .../Sending_messages_larger_than_256kb.cs | 14 ++++++++++---- src/AcceptanceTests/SqsTransportExtensions.cs | 9 ++++++--- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/AcceptanceTests/Sending_messages_larger_than_256kb.cs b/src/AcceptanceTests/Sending_messages_larger_than_256kb.cs index c48ae7ca1..d310a1134 100644 --- a/src/AcceptanceTests/Sending_messages_larger_than_256kb.cs +++ b/src/AcceptanceTests/Sending_messages_larger_than_256kb.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using AcceptanceTesting; using AcceptanceTesting.Customization; + using AmazonSQS.AcceptanceTests; using EndpointTemplates; using Configuration.AdvancedExtensibility; @@ -25,7 +26,9 @@ public async Task Should_receive_messages_with_large_payload_correctly() .Done(c => c.ReceivedPayload != null) .Run(); + var s3Client = SqsTransportExtensions.CreateS3Client(); Assert.AreEqual(payloadToSend, context.ReceivedPayload, "The large payload should be handled correctly using S3"); + Assert.DoesNotThrowAsync(async () => await s3Client.GetObjectAsync(SqsTransportExtensions.S3BucketName, $"{SqsTransportExtensions.S3Prefix}/{context.MessageId}")); } [Test] @@ -39,8 +42,8 @@ public async Task Should_fail_when_no_s3_bucket_is_configured() // Don't configure an S3 bucket for this endpoint b.CustomConfig(x => { - x.GetSettings().Set("NServiceBus.AmazonSQS.S3BucketForLargeMessages", string.Empty); - x.GetSettings().Set("NServiceBus.AmazonSQS.S3Key", string.Empty); + x.GetSettings().Set(SettingsKeys.S3BucketForLargeMessages, string.Empty); + x.GetSettings().Set(SettingsKeys.S3KeyPrefix, string.Empty); }); b.When(async (session, c) => @@ -64,8 +67,8 @@ await session.Send(new MyMessageWithLargePayload // Don't configure an S3 bucket for this endpoint b.CustomConfig(x => { - x.GetSettings().Set("NServiceBus.AmazonSQS.S3BucketForLargeMessages", string.Empty); - x.GetSettings().Set("NServiceBus.AmazonSQS.S3Key", string.Empty); + x.GetSettings().Set(SettingsKeys.S3BucketForLargeMessages, string.Empty); + x.GetSettings().Set(SettingsKeys.S3KeyPrefix, string.Empty); }); }) .Done(c => c.GotTheException) @@ -80,6 +83,8 @@ public class Context : ScenarioContext { public byte[] ReceivedPayload { get; set; } + public string MessageId { get; set; } + public bool GotTheException { get; set; } public Exception Exception { get; set; } @@ -110,6 +115,7 @@ public class MyMessageHandler : IHandleMessages public Task Handle(MyMessageWithLargePayload messageWithLargePayload, IMessageHandlerContext context) { Context.ReceivedPayload = messageWithLargePayload.Payload; + Context.MessageId = context.MessageId; return Task.FromResult(0); } diff --git a/src/AcceptanceTests/SqsTransportExtensions.cs b/src/AcceptanceTests/SqsTransportExtensions.cs index f46c29e21..94ca26671 100644 --- a/src/AcceptanceTests/SqsTransportExtensions.cs +++ b/src/AcceptanceTests/SqsTransportExtensions.cs @@ -15,17 +15,20 @@ public static TransportExtensions ConfigureSqsTransport(this Trans .QueueNamePrefix(queueNamePrefix) .PreTruncateQueueNamesForAcceptanceTests(); - var s3BucketName = EnvironmentHelper.GetEnvironmentVariable(S3BucketEnvironmentVariableName); + S3BucketName = EnvironmentHelper.GetEnvironmentVariable(S3BucketEnvironmentVariableName); - if (!string.IsNullOrEmpty(s3BucketName)) + if (!string.IsNullOrEmpty(S3BucketName)) { - var s3Configuration = transportConfiguration.S3(s3BucketName, "test"); + var s3Configuration = transportConfiguration.S3(S3BucketName, S3Prefix); s3Configuration.ClientFactory(CreateS3Client); } return transportConfiguration; } + public const string S3Prefix = "test"; + public static string S3BucketName; + public static IAmazonSQS CreateSQSClient() => new AmazonSQSClient(); public static IAmazonS3 CreateS3Client() => new AmazonS3Client(); From 512b1ff403faa82622d4fcdb864d49ad400f5017 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Mon, 19 Nov 2018 10:54:24 +0100 Subject: [PATCH 2/2] Remove message body deletion --- src/NServiceBus.AmazonSQS/MessagePump.cs | 31 ++---------------------- 1 file changed, 2 insertions(+), 29 deletions(-) diff --git a/src/NServiceBus.AmazonSQS/MessagePump.cs b/src/NServiceBus.AmazonSQS/MessagePump.cs index 29b156748..4d59b31f9 100644 --- a/src/NServiceBus.AmazonSQS/MessagePump.cs +++ b/src/NServiceBus.AmazonSQS/MessagePump.cs @@ -5,7 +5,6 @@ using System.Threading; using System.Threading.Tasks; using Amazon.S3; - using Amazon.S3.Model; using Amazon.SQS; using Amazon.SQS.Model; using AmazonSQS; @@ -438,35 +437,9 @@ async Task DeleteMessageAndBodyIfRequired(Message message, TransportMessage tran return; // if another receiver fetches the data from S3 } - if (transportMessage != null) + if (!string.IsNullOrEmpty(transportMessage?.S3BodyKey)) { - if (!string.IsNullOrEmpty(transportMessage.S3BodyKey)) - { - try - { - await s3Client.DeleteObjectAsync( - new DeleteObjectRequest - { - BucketName = configuration.S3BucketForLargeMessages, - Key = transportMessage.S3BodyKey - }, - token).ConfigureAwait(false); - } - catch (Exception ex) - { - // If deleting the message body from S3 fails, we don't - // want the exception to make its way through to the _endProcessMessage below, - // as the message has been successfully processed and deleted from the SQS queue - // and effectively doesn't exist anymore. - // It doesn't really matter, as S3 is configured to delete message body data - // automatically after a certain period of time. - Logger.Warn("Couldn't delete message body from S3. Message body data will be aged out by the S3 lifecycle policy when the TTL expires.", ex); - } - } - } - else - { - Logger.Warn("Couldn't delete message body from S3 because the TransportMessage was null. Message body data will be aged out by the S3 lifecycle policy when the TTL expires."); + Logger.Info($"Message body data with key '{transportMessage.S3BodyKey}' will be aged out by the S3 lifecycle policy when the TTL expires."); } }