Skip to content

Commit

Permalink
[Storage] Badly encoded handler in Webjobs (Azure#18326)
Browse files Browse the repository at this point in the history
* wip

* export api.

* that works.

* handle peeked messages

* api

* post merge.

* propagate queueclient.

* fire and forget callback.

* tweaks.

* re-record.

* hack core temporarily.

* use event hander from core.

* revert test change.

* remove direct core reference from test package.

* that won't be necessary.

* more tests.

* readme.

* whitespace.

* change options creation.

* some pr feedback.

* api

* get parent queue service.

* can handle bad message.

* blob trigger.

* post-merge.

* post-merge.
  • Loading branch information
kasobol-msft authored and jongio committed Feb 9, 2021
1 parent a2652e1 commit 1dc5dd7
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,64 @@
// Licensed under the MIT License.

using System;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs
{
internal class QueueServiceClientProvider : StorageClientProvider<QueueServiceClient, QueueClientOptions>
{
private readonly QueuesOptions _queuesOptions;
private readonly ILogger<QueueServiceClientProvider> _logger;

public QueueServiceClientProvider(
IConfiguration configuration,
AzureComponentFactory componentFactory,
AzureEventSourceLogForwarder logForwarder,
IOptions<QueuesOptions> queueOptions)
IOptions<QueuesOptions> queueOptions,
ILogger<QueueServiceClientProvider> logger)
: base(configuration, componentFactory, logForwarder)
{
_queuesOptions = queueOptions?.Value;
_logger = logger;
}

protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options)
protected override QueueClientOptions CreateClientOptions(IConfiguration configuration)
{
var options = base.CreateClientOptions(configuration);
options.MessageEncoding = _queuesOptions.MessageEncoding;
options.MessageDecodingFailed += HandleMessageDecodingFailed;
return options;
}

protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options)
{
return new QueueServiceClient(connectionString, options);
}

protected override QueueServiceClient CreateClientFromTokenCredential(Uri endpointUri, TokenCredential tokenCredential, QueueClientOptions options)
{
options.MessageEncoding = _queuesOptions.MessageEncoding;
return new QueueServiceClient(endpointUri, tokenCredential, options);
}

private async Task HandleMessageDecodingFailed(QueueMessageDecodingFailedEventArgs args)
{
// SharedBlobQueueProcessor moves to poison queue only if message is parsable and has corresponding registration.
// Therefore, we log and discard garbage here.
if (args.ReceivedMessage != null)
{
_logger.LogWarning("Invalid message in blob trigger queue {QueueName}, messageId={messageId}, body={body}",
args.Queue.Name, args.ReceivedMessage.MessageId, args.ReceivedMessage.Body.ToString());
await args.Queue.DeleteMessageAsync(args.ReceivedMessage.MessageId, args.ReceivedMessage.PopReceipt).ConfigureAwait(false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal class FakeQueueServiceClientProvider : QueueServiceClientProvider
private readonly QueueServiceClient _queueServiceClient;

public FakeQueueServiceClientProvider(QueueServiceClient queueServiceClient)
: base(null, null, null, null)
: base(null, null, null, null, null)
{
_queueServiceClient = queueServiceClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ internal protected virtual async Task CompleteProcessingMessageAsync(QueueMessag
}
}

private async Task HandlePoisonMessageAsync(QueueMessage message, CancellationToken cancellationToken)
internal async Task HandlePoisonMessageAsync(QueueMessage message, CancellationToken cancellationToken)
{
if (_poisonQueue != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public virtual TClient GetHost()
return this.Get(null);
}

private TClientOptions CreateClientOptions(IConfiguration configuration)
protected virtual TClientOptions CreateClientOptions(IConfiguration configuration)
{
var clientOptions = (TClientOptions) _componentFactory.CreateClientOptions(typeof(TClientOptions), null, configuration);
clientOptions.Diagnostics.ApplicationId ??= "AzureWebJobs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ internal static QueueProcessor CreateQueueProcessor(QueueClient queue, QueueClie
return queueProcessor;
}

private static QueueClient CreatePoisonQueueReference(QueueServiceClient client, string name)
internal static QueueClient CreatePoisonQueueReference(QueueServiceClient client, string name)
{
Debug.Assert(client != null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,82 @@
using System;
using Azure.Core;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners;
using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Listeners;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues
{
internal class QueueServiceClientProvider : StorageClientProvider<QueueServiceClient, QueueClientOptions>
{
private readonly QueuesOptions _queuesOptions;
private readonly ILoggerFactory _loggerFactory;
private readonly IQueueProcessorFactory _queueProcessorFactory;
private readonly SharedQueueWatcher _messageEnqueuedWatcher;

public QueueServiceClientProvider(
IConfiguration configuration,
AzureComponentFactory componentFactory,
AzureEventSourceLogForwarder logForwarder,
IOptions<QueuesOptions> queueOptions)
IOptions<QueuesOptions> queueOptions,
ILoggerFactory loggerFactory,
IQueueProcessorFactory queueProcessorFactory,
SharedQueueWatcher messageEnqueuedWatcher)
: base(configuration, componentFactory, logForwarder)
{
_queuesOptions = queueOptions?.Value;
_loggerFactory = loggerFactory;
_queueProcessorFactory = queueProcessorFactory;
_messageEnqueuedWatcher = messageEnqueuedWatcher;
}

protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options)
protected override QueueClientOptions CreateClientOptions(IConfiguration configuration)
{
var options = base.CreateClientOptions(configuration);
options.MessageEncoding = _queuesOptions.MessageEncoding;
return options;
}

protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options)
{
var originalEncoding = options.MessageEncoding;
options.MessageEncoding = QueueMessageEncoding.None;
var nonEncodingClient = new QueueServiceClient(connectionString, options);
options.MessageDecodingFailed += CreateMessageDecodingFailedHandler(nonEncodingClient);
options.MessageEncoding = originalEncoding;
return new QueueServiceClient(connectionString, options);
}

protected override QueueServiceClient CreateClientFromTokenCredential(Uri endpointUri, TokenCredential tokenCredential, QueueClientOptions options)
{
options.MessageEncoding = _queuesOptions.MessageEncoding;
var originalEncoding = options.MessageEncoding;
options.MessageEncoding = QueueMessageEncoding.None;
var nonEncodingClient = new QueueServiceClient(endpointUri, tokenCredential, options);
options.MessageDecodingFailed += CreateMessageDecodingFailedHandler(nonEncodingClient);
options.MessageEncoding = originalEncoding;
return new QueueServiceClient(endpointUri, tokenCredential, options);
}

private SyncAsyncEventHandler<QueueMessageDecodingFailedEventArgs> CreateMessageDecodingFailedHandler(QueueServiceClient nonEncodingQueueServiceClient)
{
return async (QueueMessageDecodingFailedEventArgs args) =>
{
// This event is raised only in async paths hence args.RunSynchronously is ignored.
if (args.ReceivedMessage != null)
{
var queueClient = args.Queue;
var poisonQueueClient = QueueListenerFactory.CreatePoisonQueueReference(nonEncodingQueueServiceClient, queueClient.Name);
var queueProcessor = QueueListenerFactory.CreateQueueProcessor(queueClient, poisonQueueClient, _loggerFactory, _queueProcessorFactory, _queuesOptions, _messageEnqueuedWatcher);
await queueProcessor.HandlePoisonMessageAsync(args.ReceivedMessage, args.CancellationToken).ConfigureAwait(false);
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ internal class FakeQueueServiceClientProvider : QueueServiceClientProvider
private readonly QueueServiceClient _queueServiceClient;

public FakeQueueServiceClientProvider(QueueServiceClient queueServiceClient)
: base(null, null, null, null)
: base(null, null, null, null, null, null, null)
{
_queueServiceClient = queueServiceClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ public class AzureStorageEndToEndTests : LiveTestBase<WebJobsTestEnvironment>
private const string TestQueueNameEtag = TestArtifactsPrefix + "etag2equeue%rnd%";
private const string DoneQueueName = TestArtifactsPrefix + "donequeue%rnd%";

private const string BadMessageQueue1 = TestArtifactsPrefix + "-badmessage1-%rnd%";
private const string BadMessageQueue2 = TestArtifactsPrefix + "-badmessage2-%rnd%";
private const string BadMessageQueue = TestArtifactsPrefix + "-badmessage-%rnd%";

private static int _badMessage1Calls;
private static int _badMessage2Calls;
private static int _badMessageCalls;

private static EventWaitHandle _startWaitHandle;
private static EventWaitHandle _functionChainWaitHandle;
private QueueServiceClient _queueServiceClient;
private QueueServiceClient _queueServiceClientWithoutEncoding;
private BlobServiceClient _blobServiceClient;
private RandomNameResolver _resolver;

Expand All @@ -59,6 +58,7 @@ public void SetUp()
{
_fixture = new AzureStorageEndToEndTests.TestFixture(TestEnvironment);
_queueServiceClient = _fixture.QueueServiceClient;
_queueServiceClientWithoutEncoding = _fixture.QueueServiceClientWithoutEncoding;
_blobServiceClient = _fixture.BlobServiceClient;
}

Expand Down Expand Up @@ -138,28 +138,13 @@ public static void NotifyCompletion(
_functionChainWaitHandle.Set();
}

/// <summary>
/// We'll insert a bad message. It should get here okay. It will
/// then pass it on to the next trigger.
/// </summary>
public static void BadMessage_CloudQueueMessage(
[QueueTrigger(BadMessageQueue1)] QueueMessage badMessageIn,
[Queue(BadMessageQueue2)] out string badMessageOut,
#pragma warning disable CS0618 // Type or member is obsolete
TraceWriter log)
#pragma warning restore CS0618 // Type or member is obsolete
{
_badMessage1Calls++;
badMessageOut = badMessageIn.MessageText;
}

public static void BadMessage_String(
[QueueTrigger(BadMessageQueue2)] string message,
[QueueTrigger(BadMessageQueue)] string message,
#pragma warning disable CS0618 // Type or member is obsolete
TraceWriter log)
#pragma warning restore CS0618 // Type or member is obsolete
{
_badMessage2Calls++;
_badMessageCalls++;
}

// Uncomment the Fact attribute to run
Expand Down Expand Up @@ -220,14 +205,12 @@ private async Task EndToEndTest(bool uploadBlobBeforeHostStart)
}

[Test]
[Ignore("TODO (kasobol-msft) revisit this test when base64/BinaryData is supported in SDK")]
public async Task BadQueueMessageE2ETests()
{
// This test ensures that the host does not crash on a bad message (it previously did)
// Insert a bad message into a queue that should:
// - trigger BadMessage_CloudQueueMessage, which will put it into a second queue that will
// - trigger BadMessage_String, which should fail
// - BadMessage_String should fail repeatedly until it is moved to the poison queue
// - BadMessage_String should be transfered to poison queue.
// The test will watch that poison queue to know when to complete

// Reinitialize the name resolver to avoid conflicts
Expand All @@ -254,14 +237,12 @@ public async Task BadQueueMessageE2ETests()
// - use a GUID as the content, which is not a valid base64 string
// - pass 'true', to indicate that it is a base64 string
string messageContent = Guid.NewGuid().ToString();
// var message = new CloudQueueMessage(messageContent, true); // TODO (kasobol-msft) check this base64 thing

var queue = _queueServiceClient.GetQueueClient(_resolver.ResolveInString(BadMessageQueue1));
var queue = _queueServiceClientWithoutEncoding.GetQueueClient(_resolver.ResolveInString(BadMessageQueue));
await queue.CreateIfNotExistsAsync();
await queue.ClearMessagesAsync();

// the poison queue will end up off of the second queue
var poisonQueue = _queueServiceClient.GetQueueClient(_resolver.ResolveInString(BadMessageQueue2) + "-poison");
var poisonQueue = _queueServiceClientWithoutEncoding.GetQueueClient(_resolver.ResolveInString(BadMessageQueue) + "-poison");
await poisonQueue.DeleteIfExistsAsync();

await queue.SendMessageAsync(messageContent);
Expand All @@ -272,22 +253,8 @@ await TestHelpers.Await(async () =>
bool done = false;
if (await poisonQueue.ExistsAsync())
{
poisonMessage = (await poisonQueue.ReceiveMessagesAsync(1)).Value.FirstOrDefault();
poisonMessage = await poisonQueue.ReceiveMessageAsync();
done = poisonMessage != null;
if (done)
{
// Sleep briefly, then make sure the other message has been deleted.
// If so, trying to delete it again will throw an error.
Thread.Sleep(1000);
// The message is in the second queue
var queue2 = _queueServiceClient.GetQueueClient(_resolver.ResolveInString(BadMessageQueue2));
RequestFailedException ex = Assert.ThrowsAsync<RequestFailedException>(
() => queue2.DeleteMessageAsync(_lastMessageId, _lastMessagePopReceipt));
Assert.AreEqual("MessageNotFound", ex.ErrorCode);
}
}
var logs = loggerProvider.GetAllLogMessages();
return done;
Expand All @@ -300,8 +267,7 @@ await TestHelpers.Await(async () =>
Assert.AreEqual(messageContent, poisonMessage.MessageText);

// Make sure the functions were called correctly
Assert.AreEqual(1, _badMessage1Calls);
Assert.AreEqual(0, _badMessage2Calls);
Assert.AreEqual(0, _badMessageCalls);

// Validate Logger
var loggerErrors = loggerProvider.GetAllLogMessages().Where(l => l.Level == Microsoft.Extensions.Logging.LogLevel.Error);
Expand Down Expand Up @@ -376,6 +342,7 @@ public TestFixture(WebJobsTestEnvironment testEnvironment)

var queueOptions = new QueueClientOptions() { MessageEncoding = QueueMessageEncoding.Base64 };
this.QueueServiceClient = new QueueServiceClient(testEnvironment.PrimaryStorageAccountConnectionString, queueOptions);
this.QueueServiceClientWithoutEncoding = new QueueServiceClient(testEnvironment.PrimaryStorageAccountConnectionString);
this.BlobServiceClient = new BlobServiceClient(testEnvironment.PrimaryStorageAccountConnectionString);
}

Expand All @@ -385,6 +352,12 @@ public QueueServiceClient QueueServiceClient
private set;
}

public QueueServiceClient QueueServiceClientWithoutEncoding
{
get;
private set;
}

public BlobServiceClient BlobServiceClient
{
get;
Expand Down
Loading

0 comments on commit 1dc5dd7

Please sign in to comment.