From ab453f945297f2487b8760b12e7e9957b8fe4961 Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Tue, 15 Dec 2020 15:54:44 -0800 Subject: [PATCH] Fix PR1 --- .../src/BlobTriggerAttribute.cs | 9 +- .../src/BlobTriggerKind.cs | 22 ++ .../Config/BlobsExtensionConfigProvider.cs | 60 +++++- .../src/Listeners/BlobListenerFactory.cs | 38 ++-- .../src/Listeners/BlobQueueTriggerExecutor.cs | 12 +- .../src/Listeners/BlobTriggerQueueWriter.cs | 23 +- .../BlobTriggerQueueWriterFactory.cs | 37 ++++ .../src/Listeners/HttpRequestProcessor.cs | 110 ---------- .../src/Listeners/IHttpRequestProcessor.cs | 17 -- .../SharedBlobQueueListenerFactory.cs | 7 +- .../StorageBlobsWebJobsBuilderExtensions.cs | 2 +- .../BlobTriggerAttributeBindingProvider.cs | 21 +- .../src/Triggers/BlobTriggerBinding.cs | 17 +- .../tests/Listeners/BlobListenerTests.cs | 2 +- .../BlobQueueTriggerExecutorTests.cs | 2 +- .../EventGridBlobTriggerEndToEndTests.cs | 201 ++++++++++++++++++ 16 files changed, 395 insertions(+), 185 deletions(-) create mode 100644 sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/BlobTriggerKind.cs create mode 100644 sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerQueueWriterFactory.cs delete mode 100644 sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/HttpRequestProcessor.cs delete mode 100644 sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/IHttpRequestProcessor.cs create mode 100644 sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/EventGridBlobTriggerEndToEndTests.cs diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/BlobTriggerAttribute.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/BlobTriggerAttribute.cs index 817e31088299a..6f0440687d2e3 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/BlobTriggerAttribute.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/BlobTriggerAttribute.cs @@ -39,6 +39,9 @@ public sealed class BlobTriggerAttribute : Attribute, IConnectionProvider { private readonly string _blobPath; + // AnalyticsScan is default kind as it does not require additional actions to set up a blob trigger + private BlobTriggerKind _blobTriggerKind = BlobTriggerKind.AnalyticsScan; + /// /// Initializes a new instance of the class. /// @@ -70,6 +73,10 @@ public string BlobPath /// /// Returns a bool value that indicates whether EventGrid is used. /// - public bool UseEventGrid { get; set; } + public BlobTriggerKind Kind + { + get { return _blobTriggerKind; } + set { _blobTriggerKind = value; } + } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/BlobTriggerKind.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/BlobTriggerKind.cs new file mode 100644 index 0000000000000..20f5540cfa8e2 --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/BlobTriggerKind.cs @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace Microsoft.Azure.WebJobs +{ + /// + /// Provides blob trigger kinds to detect changes. + /// + public enum BlobTriggerKind + { + /// + /// Changes detection is relied on Storage Analytics logs. + /// Storage Analytics logs + /// + AnalyticsScan, + /// + /// Changes detection is relied on EventGrid. + /// Azure Blob Storage as an Event Grid source + /// + EventGrid + } +} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Config/BlobsExtensionConfigProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Config/BlobsExtensionConfigProvider.cs index 19ae2c0d6635a..fe2f0c3433b19 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Config/BlobsExtensionConfigProvider.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Config/BlobsExtensionConfigProvider.cs @@ -3,11 +3,14 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; +using System.Net; using System.Net.Http; using System.Threading; using System.Threading.Tasks; +using System.Web; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; using Azure.Storage.Blobs.Specialized; @@ -18,6 +21,8 @@ using Microsoft.Azure.WebJobs.Extensions.Storage.Common; using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.Host.Config; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Config { @@ -32,7 +37,9 @@ internal class BlobsExtensionConfigProvider : IExtensionConfigProvider, private IContextGetter _blobWrittenWatcherGetter; private readonly INameResolver _nameResolver; private IConverterManager _converterManager; - private readonly IHttpRequestProcessor _httpEndpointManager; + private readonly BlobTriggerQueueWriterFactory _blobTriggerQueueWriterFactory; + private readonly ILogger _logger; + private BlobTriggerQueueWriter _blobTriggerQueueWriter; public BlobsExtensionConfigProvider( BlobServiceClientProvider blobServiceClientProvider, @@ -40,14 +47,16 @@ public BlobsExtensionConfigProvider( IContextGetter contextAccessor, INameResolver nameResolver, IConverterManager converterManager, - IHttpRequestProcessor httpEndpointManager) + BlobTriggerQueueWriterFactory blobTriggerQueueWriterFactory, + ILoggerFactory loggerFactory) { _blobServiceClientProvider = blobServiceClientProvider; _triggerBinder = triggerBinder; _blobWrittenWatcherGetter = contextAccessor; _nameResolver = nameResolver; _converterManager = converterManager; - _httpEndpointManager = httpEndpointManager; + _blobTriggerQueueWriterFactory = blobTriggerQueueWriterFactory; + _logger = loggerFactory.CreateLogger(); } public void Initialize(ExtensionConfigContext context) @@ -58,7 +67,12 @@ public void Initialize(ExtensionConfigContext context) private void InitilizeBlobBindings(ExtensionConfigContext context) { - _httpEndpointManager.RegisterHttpEnpoint(context); + System.Diagnostics.Debugger.Break(); + +#pragma warning disable CS0618 // Type or member is obsolete + Uri url = context.GetWebhookHandler(); +#pragma warning restore CS0618 // Type or member is obsolete + _logger.LogInformation($"registered http endpoint = {url?.GetLeftPart(UriPartial.Path)}"); var rule = context.AddBindingRule(); @@ -120,7 +134,16 @@ BlobContainerClient IConverter.Convert( public async Task ConvertAsync(HttpRequestMessage input, CancellationToken cancellationToken) { - return await _httpEndpointManager.ProcessHttpRequestAsync(input, cancellationToken).ConfigureAwait(false); + var functionName = HttpUtility.ParseQueryString(input.RequestUri.Query)["functionName"]; + if (_blobTriggerQueueWriter == null) + { + _blobTriggerQueueWriter = await _blobTriggerQueueWriterFactory.CreateAsync(cancellationToken).ConfigureAwait(false); + } + + string content = await input.Content.ReadAsStringAsync().ConfigureAwait(false); + var headers = input.Headers; + + return await EventGrid.HttpRequestProcessor.ProcessAsync(input, functionName, _logger, ProcessEventsAsync, cancellationToken).ConfigureAwait(false); } #endregion @@ -355,5 +378,32 @@ private async Task> GetBlobAsync( return new BlobWithContainer(container, blob); } + + private async Task ProcessEventsAsync(JArray events, string functionName, CancellationToken cancellationToken) + { + foreach (JObject jo in events) + { + BlobTriggerMessage blobTriggerMessage = GetBlobTriggerMessage(jo, functionName); + await _blobTriggerQueueWriter.EnqueueAsync(blobTriggerMessage, cancellationToken).ConfigureAwait(false); + } + + return new HttpResponseMessage(HttpStatusCode.Accepted); + } + + private BlobTriggerMessage GetBlobTriggerMessage(JObject jo, string functionId) + { + JObject data = jo["data"] as JObject; + + BlobUriBuilder blobUriBuilder = new BlobUriBuilder(new Uri(data["url"].ToString())); + + return new BlobTriggerMessage() + { + ETag = $"\"{data["eTag"]}\"", + BlobType = (BlobType)Enum.Parse(typeof(BlobType), data["blobType"].ToString().Replace("Blob", "")), + ContainerName = blobUriBuilder.BlobContainerName, + BlobName = blobUriBuilder.BlobName, + FunctionId = functionId + }; + } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListenerFactory.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListenerFactory.cs index bd309623d32cd..5816d8eecb099 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListenerFactory.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListenerFactory.cs @@ -25,7 +25,8 @@ internal class BlobListenerFactory : IListenerFactory private readonly BlobsOptions _blobsOptions; private readonly IWebJobsExceptionHandler _exceptionHandler; private readonly IContextSetter _blobWrittenWatcherSetter; - private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter; + //private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter; + private readonly BlobTriggerQueueWriterFactory _blobTriggerQueueWriterFactory; private readonly ISharedContextProvider _sharedContextProvider; private readonly FunctionDescriptor _functionDescriptor; private readonly ILoggerFactory _loggerFactory; @@ -35,7 +36,7 @@ internal class BlobListenerFactory : IListenerFactory private readonly QueueServiceClient _dataQueueServiceClient; private readonly BlobContainerClient _container; private readonly IBlobPathSource _input; - private readonly bool _useEventGrid; + private readonly BlobTriggerKind _blobTriggerKind; private readonly ITriggeredFunctionExecutor _executor; private readonly IHostSingletonManager _singletonManager; @@ -43,7 +44,7 @@ public BlobListenerFactory(IHostIdProvider hostIdProvider, BlobsOptions blobsOptions, IWebJobsExceptionHandler exceptionHandler, IContextSetter blobWrittenWatcherSetter, - SharedQueueWatcher messageEnqueuedWatcherSetter, + BlobTriggerQueueWriterFactory blobTriggerQueueWriterFactory, ISharedContextProvider sharedContextProvider, ILoggerFactory loggerFactory, FunctionDescriptor functionDescriptor, @@ -53,15 +54,15 @@ public BlobListenerFactory(IHostIdProvider hostIdProvider, QueueServiceClient dataQueueServiceClient, BlobContainerClient container, IBlobPathSource input, - bool useEventGrid, + BlobTriggerKind triggerKind, ITriggeredFunctionExecutor executor, IHostSingletonManager singletonManager) { _hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider)); + _blobTriggerQueueWriterFactory = blobTriggerQueueWriterFactory ?? throw new ArgumentNullException(nameof(blobTriggerQueueWriterFactory)); _blobsOptions = blobsOptions ?? throw new ArgumentNullException(nameof(blobsOptions)); _exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler)); _blobWrittenWatcherSetter = blobWrittenWatcherSetter ?? throw new ArgumentNullException(nameof(blobWrittenWatcherSetter)); - _messageEnqueuedWatcherSetter = messageEnqueuedWatcherSetter ?? throw new ArgumentNullException(nameof(messageEnqueuedWatcherSetter)); _sharedContextProvider = sharedContextProvider ?? throw new ArgumentNullException(nameof(sharedContextProvider)); _functionDescriptor = functionDescriptor ?? throw new ArgumentNullException(nameof(functionDescriptor)); _loggerFactory = loggerFactory; @@ -71,7 +72,7 @@ public BlobListenerFactory(IHostIdProvider hostIdProvider, _dataQueueServiceClient = dataQueueServiceClient ?? throw new ArgumentNullException(nameof(dataQueueServiceClient)); _container = container ?? throw new ArgumentNullException(nameof(container)); _input = input ?? throw new ArgumentNullException(nameof(input)); - _useEventGrid = useEventGrid; + _blobTriggerKind = triggerKind; _executor = executor ?? throw new ArgumentNullException(nameof(executor)); _singletonManager = singletonManager ?? throw new ArgumentNullException(nameof(singletonManager)); } @@ -80,7 +81,6 @@ public async Task CreateAsync(CancellationToken cancellationToken) { // Note that these clients are intentionally for the storage account rather than for the dashboard account. // We use the storage, not dashboard, account for the blob receipt container and blob trigger queues. - var primaryQueueClient = _hostQueueServiceClient; var primaryBlobClient = _hostBlobServiceClient; // Important: We're using the storage account of the function target here, which is the account that the @@ -89,29 +89,27 @@ public async Task CreateAsync(CancellationToken cancellationToken) var targetBlobClient = _dataBlobServiceClient; var targetQueueClient = _dataQueueServiceClient; - string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken).ConfigureAwait(false); - string hostBlobTriggerQueueName = HostQueueNames.GetHostBlobTriggerQueueName(hostId); - var hostBlobTriggerQueue = primaryQueueClient.GetQueueClient(hostBlobTriggerQueueName); + BlobTriggerQueueWriter blobTriggerQueueWriter = await _blobTriggerQueueWriterFactory.CreateAsync(cancellationToken).ConfigureAwait(false); - SharedQueueWatcher sharedQueueWatcher = _messageEnqueuedWatcherSetter; + string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken).ConfigureAwait(false); SharedBlobListener sharedBlobListener = null; - // we do not need SharedBlobListener for EventGrid blob trigger - if (!_useEventGrid) + // we do not need to create SharedBlobListener for EventGrid blob trigger + if (_blobTriggerKind == BlobTriggerKind.AnalyticsScan) { sharedBlobListener = _sharedContextProvider.GetOrCreateInstance( new SharedBlobListenerFactory(hostId, _hostBlobServiceClient, _exceptionHandler, _blobWrittenWatcherSetter, _loggerFactory.CreateLogger())); // Register the blob container we wish to monitor with the shared blob listener. await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlobClient, - hostBlobTriggerQueue, sharedQueueWatcher, cancellationToken).ConfigureAwait(false); + blobTriggerQueueWriter, cancellationToken).ConfigureAwait(false); } // Create a "bridge" listener that will monitor the blob // notification queue and dispatch to the target job function. SharedBlobQueueListener sharedBlobQueueListener = _sharedContextProvider.GetOrCreateInstance( - new SharedBlobQueueListenerFactory(_hostQueueServiceClient, sharedQueueWatcher, hostBlobTriggerQueue, - _blobsOptions, _exceptionHandler, _loggerFactory, sharedBlobListener?.BlobWritterWatcher, _functionDescriptor)); + new SharedBlobQueueListenerFactory(_hostQueueServiceClient, blobTriggerQueueWriter.SharedQueueWatcher, blobTriggerQueueWriter.QueueClient, + _blobsOptions, _exceptionHandler, _loggerFactory, sharedBlobListener?.BlobWritterWatcher, _functionDescriptor, _blobTriggerKind)); var queueListener = new BlobListener(sharedBlobQueueListener); // the client to use for the poison queue @@ -126,7 +124,8 @@ await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlo // shared blob listener in this host instance. // We do not need SharedBlobListener for EventGrid blob trigger. object singletonListenerCreated = false; - if (sharedBlobListener != null && !_sharedContextProvider.TryGetValue(SingletonBlobListenerScopeId, out singletonListenerCreated)) + if (_blobTriggerKind == BlobTriggerKind.AnalyticsScan + && !_sharedContextProvider.TryGetValue(SingletonBlobListenerScopeId, out singletonListenerCreated)) { // Create a singleton shared blob listener, since we only // want a single instance of the blob poll/scan logic to be running @@ -151,12 +150,11 @@ private async Task RegisterWithSharedBlobListenerAsync( string hostId, SharedBlobListener sharedBlobListener, BlobServiceClient blobClient, - QueueClient hostBlobTriggerQueue, - IMessageEnqueuedWatcher messageEnqueuedWatcher, + BlobTriggerQueueWriter blobTriggerQueueWriter, CancellationToken cancellationToken) { BlobTriggerExecutor triggerExecutor = new BlobTriggerExecutor(hostId, _functionDescriptor, _input, new BlobReceiptManager(blobClient), - new BlobTriggerQueueWriter(hostBlobTriggerQueue, messageEnqueuedWatcher), _loggerFactory.CreateLogger()); + blobTriggerQueueWriter, _loggerFactory.CreateLogger()); await sharedBlobListener.RegisterAsync(blobClient, _container, triggerExecutor, cancellationToken).ConfigureAwait(false); } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobQueueTriggerExecutor.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobQueueTriggerExecutor.cs index cce2ec114a9b7..a1abc47fd333f 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobQueueTriggerExecutor.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobQueueTriggerExecutor.cs @@ -30,16 +30,18 @@ internal partial class BlobQueueTriggerExecutor : ITriggerExecutor private readonly IBlobCausalityReader _causalityReader; private readonly IBlobWrittenWatcher _blobWrittenWatcher; private readonly ConcurrentDictionary _registrations; + private readonly BlobTriggerKind _blobTriggerKind; private readonly ILogger _logger; - public BlobQueueTriggerExecutor(IBlobWrittenWatcher blobWrittenWatcher, ILogger logger) - : this(BlobCausalityReader.Instance, blobWrittenWatcher, logger) + public BlobQueueTriggerExecutor(BlobTriggerKind kind, IBlobWrittenWatcher blobWrittenWatcher, ILogger logger) + : this(BlobCausalityReader.Instance, kind, blobWrittenWatcher, logger) { } - public BlobQueueTriggerExecutor(IBlobCausalityReader causalityReader, IBlobWrittenWatcher blobWrittenWatcher, ILogger logger) + public BlobQueueTriggerExecutor(IBlobCausalityReader causalityReader, BlobTriggerKind blobTriggerKind, IBlobWrittenWatcher blobWrittenWatcher, ILogger logger) { _causalityReader = causalityReader; + _blobTriggerKind = blobTriggerKind; _blobWrittenWatcher = blobWrittenWatcher; _registrations = new ConcurrentDictionary(); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); @@ -57,6 +59,8 @@ public void Register(string functionId, BlobQueueRegistration registration) public async Task ExecuteAsync(QueueMessage value, CancellationToken cancellationToken) { + Debugger.Break(); + BlobTriggerMessage message = JsonConvert.DeserializeObject(value.Body.ToValidUTF8String(), JsonSerialization.Settings); if (message == null) @@ -100,7 +104,7 @@ public async Task ExecuteAsync(QueueMessage value, CancellationT string possibleETag = blobProperties.ETag.ToString(); // If the blob still exists but the ETag is different, delete the message but do a fast path notification. - if (!string.Equals(message.ETag, possibleETag, StringComparison.Ordinal) && _blobWrittenWatcher != null) + if (_blobTriggerKind == BlobTriggerKind.AnalyticsScan && !string.Equals(message.ETag, possibleETag, StringComparison.Ordinal)) { _blobWrittenWatcher.Notify(new Extensions.Storage.Blobs.BlobWithContainer(container, blob)); return successResult; diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerQueueWriter.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerQueueWriter.cs index 974eb92ba02a4..75dd19692e8e0 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerQueueWriter.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerQueueWriter.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Extensions.Storage.Common; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Protocols; using Newtonsoft.Json; using QueueClient = Azure.Storage.Queues.QueueClient; @@ -14,22 +15,24 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners { internal class BlobTriggerQueueWriter : IBlobTriggerQueueWriter { - private readonly QueueClient _queue; - private readonly IMessageEnqueuedWatcher _watcher; - - public BlobTriggerQueueWriter(QueueClient queue, IMessageEnqueuedWatcher watcher) + public BlobTriggerQueueWriter(QueueClient queueClient, SharedQueueWatcher wsharedQueueWatcher) { - _queue = queue; - Debug.Assert(watcher != null); - _watcher = watcher; + QueueClient = queueClient; + Debug.Assert(wsharedQueueWatcher != null); + SharedQueueWatcher = wsharedQueueWatcher; } + public QueueClient QueueClient { get; } + + public SharedQueueWatcher SharedQueueWatcher { get; } + + public async Task<(string QueueName, string MessageId)> EnqueueAsync(BlobTriggerMessage message, CancellationToken cancellationToken) { string contents = JsonConvert.SerializeObject(message, JsonSerialization.Settings); - var receipt = await _queue.AddMessageAndCreateIfNotExistsAsync(BinaryData.FromString(contents), cancellationToken).ConfigureAwait(false); - _watcher.Notify(_queue.Name); - return (QueueName: _queue.Name, MessageId: receipt.MessageId); + var receipt = await QueueClient.AddMessageAndCreateIfNotExistsAsync(BinaryData.FromString(contents), cancellationToken).ConfigureAwait(false); + SharedQueueWatcher.Notify(QueueClient.Name); + return (QueueName: QueueClient.Name, MessageId: receipt.MessageId); } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerQueueWriterFactory.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerQueueWriterFactory.cs new file mode 100644 index 0000000000000..a0db4f527fe92 --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerQueueWriterFactory.cs @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; +using Microsoft.Azure.WebJobs.Host.Executors; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners +{ + internal class BlobTriggerQueueWriterFactory + { + private readonly IHostIdProvider _hostIdProvider; + private readonly QueueServiceClientProvider _queueServiceClientProvider; + private readonly SharedQueueWatcher _sharedQueueWatcher; + + public BlobTriggerQueueWriterFactory(IHostIdProvider hostIdProvider, QueueServiceClientProvider queueServiceClientProvider, SharedQueueWatcher sharedQueueWatcher) + { + _hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider)); + _queueServiceClientProvider = queueServiceClientProvider ?? throw new ArgumentNullException(nameof(queueServiceClientProvider)); + _sharedQueueWatcher = sharedQueueWatcher ?? throw new ArgumentNullException(nameof(sharedQueueWatcher)); + } + + public async Task CreateAsync(CancellationToken cancellationToken) + { + string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken).ConfigureAwait(false); + string hostBlobTriggerQueueName = HostQueueNames.GetHostBlobTriggerQueueName(hostId); + var hostBlobTriggerQueue = _queueServiceClientProvider.GetHost().GetQueueClient(hostBlobTriggerQueueName); + + return new BlobTriggerQueueWriter(hostBlobTriggerQueue, _sharedQueueWatcher); + } + } +} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/HttpRequestProcessor.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/HttpRequestProcessor.cs deleted file mode 100644 index b288617ab2b7f..0000000000000 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/HttpRequestProcessor.cs +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Net.Http; -using System.Threading; -using System.Threading.Tasks; -using System.Web; -using Azure.Storage.Blobs; -using Azure.Storage.Blobs.Models; -using Microsoft.Azure.WebJobs.Extensions.Storage.Common; -using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; -using Microsoft.Azure.WebJobs.Host.Config; -using Microsoft.Azure.WebJobs.Host.Executors; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; - -namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners -{ - internal class HttpRequestProcessor : IHttpRequestProcessor - { - private readonly SharedQueueWatcher _sharedQueueWatcher; - private readonly IHostIdProvider _hostIdProvider; - private readonly QueueServiceClientProvider _queueServiceClientProvider; - private readonly ILogger _logger; - private BlobTriggerQueueWriter _blobTriggerQueueWriter; - - public HttpRequestProcessor( - SharedQueueWatcher sharedQueueWatcher, - IHostIdProvider hostIdProvider, - QueueServiceClientProvider queueServiceClientProvider, - ILoggerFactory loggerFactory) - { - _sharedQueueWatcher = sharedQueueWatcher; - _hostIdProvider = hostIdProvider; - _queueServiceClientProvider = queueServiceClientProvider; - _logger = loggerFactory.CreateLogger(); - } - - public void RegisterHttpEnpoint(ExtensionConfigContext context) - { - //Debugger.Break(); -#pragma warning disable CS0618 // Type or member is obsolete - Uri url = context.GetWebhookHandler(); -#pragma warning restore CS0618 // Type or member is obsolete - _logger.LogInformation($"registered http endpoint = {url?.GetLeftPart(UriPartial.Path)}"); - } - - public async Task ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken) - { - //Debugger.Break(); - var functionName = HttpUtility.ParseQueryString(req.RequestUri.Query)["functionName"]; - - IEnumerable eventTypeHeaders = null; - string eventTypeHeader = null; - if (req.Headers.TryGetValues("aeg-event-type", out eventTypeHeaders)) - { - eventTypeHeader = eventTypeHeaders.First(); - } - - return await EventGrid.HttpRequestProcessor.ProcessAsync(req, functionName, _logger, ProcessEventsAsync, CancellationToken.None).ConfigureAwait(false); - } - - private async Task ProcessEventsAsync(JArray events, string functionName, CancellationToken cancellationToken) - { - if (_blobTriggerQueueWriter == null) - { - await InitializeWriterAsync(cancellationToken).ConfigureAwait(false); - } - - foreach (JObject jo in events) - { - BlobTriggerMessage blobTriggerMessage = GetBlobTriggerMessage(jo, functionName); - await _blobTriggerQueueWriter.EnqueueAsync(blobTriggerMessage, cancellationToken).ConfigureAwait(false); - } - - return new HttpResponseMessage(HttpStatusCode.Accepted); - } - - - private async Task InitializeWriterAsync(CancellationToken cancellationToken) - { - string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken).ConfigureAwait(false); - string hostBlobTriggerQueueName = HostQueueNames.GetHostBlobTriggerQueueName(hostId); - var hostBlobTriggerQueue = _queueServiceClientProvider.GetHost().GetQueueClient(hostBlobTriggerQueueName); - - _blobTriggerQueueWriter = new BlobTriggerQueueWriter(hostBlobTriggerQueue, _sharedQueueWatcher); - } - - private BlobTriggerMessage GetBlobTriggerMessage(JObject jo, string functionId) - { - JObject data = jo["data"] as JObject; - - BlobUriBuilder blobUriBuilder = new BlobUriBuilder(new Uri(data["url"].ToString())); - - return new BlobTriggerMessage() - { - ETag = $"\"{data["eTag"]}\"", - BlobType = (BlobType)Enum.Parse(typeof(BlobType), data["blobType"].ToString().Replace("Blob", "")), - ContainerName = blobUriBuilder.BlobContainerName, - BlobName = blobUriBuilder.BlobName, - FunctionId = functionId - }; - } - } -} \ No newline at end of file diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/IHttpRequestProcessor.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/IHttpRequestProcessor.cs deleted file mode 100644 index 3895ea150c163..0000000000000 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/IHttpRequestProcessor.cs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System.Net.Http; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Azure.WebJobs.Host.Config; - -namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners -{ - internal interface IHttpRequestProcessor - { - void RegisterHttpEnpoint(ExtensionConfigContext context); - - Task ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken); - } -} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobQueueListenerFactory.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobQueueListenerFactory.cs index 29218e5336d67..bf06c297eb28f 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobQueueListenerFactory.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobQueueListenerFactory.cs @@ -32,6 +32,7 @@ internal class SharedBlobQueueListenerFactory : IFactory()); + BlobQueueTriggerExecutor triggerExecutor = new BlobQueueTriggerExecutor(_blobTriggerKind, _blobWrittenWatcher, _loggerFactory.CreateLogger()); // The poison queue to use for a given poison blob lives in the same // storage account as the triggering blob by default. In multi-storage account scenarios diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/StorageBlobsWebJobsBuilderExtensions.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/StorageBlobsWebJobsBuilderExtensions.cs index 96a7d4227c553..a4451fa0074fa 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/StorageBlobsWebJobsBuilderExtensions.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/StorageBlobsWebJobsBuilderExtensions.cs @@ -42,7 +42,7 @@ public static IWebJobsBuilder AddAzureStorageBlobs(this IWebJobsBuilder builder, builder.Services.TryAddSingleton(); builder.Services.TryAddSingleton(); - builder.Services.TryAddSingleton(); + builder.Services.TryAddSingleton(); builder.Services.TryAddSingleton>((p) => new ContextAccessor()); builder.Services.TryAddSingleton((p) => p.GetService>() as IContextGetter); diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Triggers/BlobTriggerAttributeBindingProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Triggers/BlobTriggerAttributeBindingProvider.cs index 7abd521aa052e..e6923b46d9af6 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Triggers/BlobTriggerAttributeBindingProvider.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Triggers/BlobTriggerAttributeBindingProvider.cs @@ -4,6 +4,8 @@ using System; using System.Reflection; using System.Threading.Tasks; +using Azure.Storage.Blobs.Specialized; +using Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners; using Microsoft.Azure.WebJobs.Extensions.Storage.Common; using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; using Microsoft.Azure.WebJobs.Host; @@ -24,10 +26,11 @@ internal class BlobTriggerAttributeBindingProvider : ITriggerBindingProvider private readonly BlobsOptions _blobsOptions; private readonly IWebJobsExceptionHandler _exceptionHandler; private readonly IContextSetter _blobWrittenWatcherSetter; - private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter; + private readonly BlobTriggerQueueWriterFactory _blobTriggerQueueWriterFactory; private readonly ISharedContextProvider _sharedContextProvider; private readonly IHostSingletonManager _singletonManager; private readonly ILoggerFactory _loggerFactory; + private readonly ILogger _logger; public BlobTriggerAttributeBindingProvider( INameResolver nameResolver, @@ -37,7 +40,8 @@ public BlobTriggerAttributeBindingProvider( IOptions blobsOptions, IWebJobsExceptionHandler exceptionHandler, IContextSetter blobWrittenWatcherSetter, - SharedQueueWatcher messageEnqueuedWatcherSetter, + //SharedQueueWatcher messageEnqueuedWatcherSetter, + BlobTriggerQueueWriterFactory blobTriggerQueueWriterFactory, ISharedContextProvider sharedContextProvider, IHostSingletonManager singletonManager, ILoggerFactory loggerFactory) @@ -48,12 +52,13 @@ public BlobTriggerAttributeBindingProvider( _blobsOptions = (blobsOptions ?? throw new ArgumentNullException(nameof(blobsOptions))).Value; _exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler)); _blobWrittenWatcherSetter = blobWrittenWatcherSetter ?? throw new ArgumentNullException(nameof(blobWrittenWatcherSetter)); - _messageEnqueuedWatcherSetter = messageEnqueuedWatcherSetter ?? throw new ArgumentNullException(nameof(messageEnqueuedWatcherSetter)); + _blobTriggerQueueWriterFactory = blobTriggerQueueWriterFactory ?? throw new ArgumentNullException(nameof(blobTriggerQueueWriterFactory)); _sharedContextProvider = sharedContextProvider ?? throw new ArgumentNullException(nameof(sharedContextProvider)); _singletonManager = singletonManager ?? throw new ArgumentNullException(nameof(singletonManager)); _nameResolver = nameResolver; _loggerFactory = loggerFactory; + _logger = loggerFactory.CreateLogger(); } public Task TryCreateAsync(TriggerBindingProviderContext context) @@ -61,6 +66,12 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex ParameterInfo parameter = context.Parameter; var blobTriggerAttribute = TypeUtility.GetResolvedAttribute(context.Parameter); + if (parameter.ParameterType == typeof(PageBlobClient) && blobTriggerAttribute.Kind == BlobTriggerKind.EventGrid) + { + _logger.LogError("PageBlobClient is not supported with BlobTriggerKind.EventGrid"); + return Task.FromResult(null); + } + if (blobTriggerAttribute == null) { return Task.FromResult(null); @@ -79,9 +90,9 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex // dataAccount.AssertTypeOneOf(StorageAccountType.GeneralPurpose, StorageAccountType.BlobOnly); ITriggerBinding binding = new BlobTriggerBinding(parameter, hostBlobServiceClient, hostQueueServiceClient, - dataBlobServiceClient, dataQueueServiceClient, path, blobTriggerAttribute.UseEventGrid, + dataBlobServiceClient, dataQueueServiceClient, path, blobTriggerAttribute.Kind, _hostIdProvider, _blobsOptions, _exceptionHandler, _blobWrittenWatcherSetter, - _messageEnqueuedWatcherSetter, _sharedContextProvider, _singletonManager, _loggerFactory); + _blobTriggerQueueWriterFactory, _sharedContextProvider, _singletonManager, _loggerFactory); return Task.FromResult(binding); } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Triggers/BlobTriggerBinding.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Triggers/BlobTriggerBinding.cs index 77cc0e1372c68..4e6d08870f9fc 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Triggers/BlobTriggerBinding.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Triggers/BlobTriggerBinding.cs @@ -38,13 +38,14 @@ internal class BlobTriggerBinding : ITriggerBinding private readonly BlobsOptions _blobsOptions; private readonly IWebJobsExceptionHandler _exceptionHandler; private readonly IContextSetter _blobWrittenWatcherSetter; - private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter; + //private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter; + private readonly BlobTriggerQueueWriterFactory _blobTriggerQueueWriterFactory; private readonly ISharedContextProvider _sharedContextProvider; private readonly ILoggerFactory _loggerFactory; private readonly IAsyncObjectToTypeConverter _converter; private readonly IReadOnlyDictionary _bindingDataContract; private readonly IHostSingletonManager _singletonManager; - private readonly bool _useEventGrid; + private readonly BlobTriggerKind _blobTriggerKind; public BlobTriggerBinding(ParameterInfo parameter, BlobServiceClient hostBlobServiceClient, @@ -52,12 +53,12 @@ public BlobTriggerBinding(ParameterInfo parameter, BlobServiceClient dataBlobServiceClient, QueueServiceClient dataQueueServiceClient, IBlobPathSource path, - bool useEventGrid, + BlobTriggerKind blobTriggerKind, IHostIdProvider hostIdProvider, BlobsOptions blobsOptions, IWebJobsExceptionHandler exceptionHandler, IContextSetter blobWrittenWatcherSetter, - SharedQueueWatcher messageEnqueuedWatcherSetter, + BlobTriggerQueueWriterFactory blobTriggerQueueWriterFactory, ISharedContextProvider sharedContextProvider, IHostSingletonManager singletonManager, ILoggerFactory loggerFactory) @@ -70,12 +71,12 @@ public BlobTriggerBinding(ParameterInfo parameter, _accountName = _dataBlobServiceClient.AccountName; _path = path ?? throw new ArgumentNullException(nameof(path)); - _useEventGrid = useEventGrid; + _blobTriggerKind = blobTriggerKind; _hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider)); _blobsOptions = blobsOptions ?? throw new ArgumentNullException(nameof(blobsOptions)); _exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler)); _blobWrittenWatcherSetter = blobWrittenWatcherSetter ?? throw new ArgumentNullException(nameof(blobWrittenWatcherSetter)); - _messageEnqueuedWatcherSetter = messageEnqueuedWatcherSetter ?? throw new ArgumentNullException(nameof(messageEnqueuedWatcherSetter)); + _blobTriggerQueueWriterFactory = blobTriggerQueueWriterFactory ?? throw new ArgumentNullException(nameof(blobTriggerQueueWriterFactory)); _sharedContextProvider = sharedContextProvider ?? throw new ArgumentNullException(nameof(sharedContextProvider)); _singletonManager = singletonManager ?? throw new ArgumentNullException(nameof(singletonManager)); _loggerFactory = loggerFactory; @@ -187,9 +188,9 @@ public Task CreateListenerAsync(ListenerFactoryContext context) var container = _dataBlobServiceClient.GetBlobContainerClient(_path.ContainerNamePattern); var factory = new BlobListenerFactory(_hostIdProvider, _blobsOptions, _exceptionHandler, - _blobWrittenWatcherSetter, _messageEnqueuedWatcherSetter, _sharedContextProvider, _loggerFactory, + _blobWrittenWatcherSetter, _blobTriggerQueueWriterFactory, _sharedContextProvider, _loggerFactory, context.Descriptor, _hostBlobServiceClient, _hostQueueServiceClient, _dataBlobServiceClient, _dataQueueServiceClient, - container, _path, _useEventGrid, context.Executor, _singletonManager); + container, _path, _blobTriggerKind, context.Executor, _singletonManager); return factory.CreateAsync(context.CancellationToken); } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobListenerTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobListenerTests.cs index 63d0ea9cc0bd6..3719619a79551 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobListenerTests.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobListenerTests.cs @@ -15,7 +15,7 @@ public void GetMonitor_ReturnsSharedMonitor() { var queueListener = new QueueListener(); var watcherMock = new Mock(MockBehavior.Strict); - var executor = new BlobQueueTriggerExecutor(watcherMock.Object, NullLogger.Instance); + var executor = new BlobQueueTriggerExecutor(BlobTriggerKind.AnalyticsScan, watcherMock.Object, NullLogger.Instance); var sharedBlobQueueListener = new SharedBlobQueueListener(queueListener, executor); var sharedListenerMock = new Mock(MockBehavior.Strict); var blobListener1 = new BlobListener(sharedBlobQueueListener); diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobQueueTriggerExecutorTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobQueueTriggerExecutorTests.cs index bad6c4e338155..042cdae8f1e1e 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobQueueTriggerExecutorTests.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobQueueTriggerExecutorTests.cs @@ -351,7 +351,7 @@ private BlobQueueTriggerExecutor CreateProductUnderTest( private BlobQueueTriggerExecutor CreateProductUnderTest( IBlobCausalityReader causalityReader, IBlobWrittenWatcher blobWrittenWatcher) { - return new BlobQueueTriggerExecutor(causalityReader, blobWrittenWatcher, _logger); + return new BlobQueueTriggerExecutor(causalityReader, BlobTriggerKind.AnalyticsScan, blobWrittenWatcher, _logger); } private static IBlobCausalityReader CreateStubCausalityReader() diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/EventGridBlobTriggerEndToEndTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/EventGridBlobTriggerEndToEndTests.cs new file mode 100644 index 0000000000000..a48ff2f5ceb4f --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/EventGridBlobTriggerEndToEndTests.cs @@ -0,0 +1,201 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core.TestFramework; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Specialized; +using Microsoft.AspNetCore.Http; +using Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Triggers; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Config; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using NUnit.Framework; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.ScenarioTests +{ + public class EventGridBlobTriggerEndToEndTests : LiveTestBase + { + private const string TestArtifactPrefix = "e2etests"; + private const string EventGridContainerName = TestArtifactPrefix + "eventgrid-%rnd%"; + private const string TestBlobName = "test"; + private readonly string _resolvedContainerName; + + private readonly BlobContainerClient _testContainer; + private readonly BlobServiceClient _blobServiceClient; + private readonly RandomNameResolver _nameResolver; + + private const string RegistrationRequest = + @"[{ + ""id"": ""09473e51-90aa-4a7b-88f1-039ea0d7ee64"", + ""topic"": ""/subscriptions/18318512-69ef-49db-9cbb-4a13fe0fa72c/resourceGroups/EventGrid/providers/Microsoft.Storage/StorageAccounts/alrodegtest"", + ""subject"": """", + ""data"": { + ""validationCode"": ""F83B17BA-898A-4309-8F89-8BB2B3A06D02"", + ""validationUrl"": ""https://rp-eastus.eventgrid.azure.net:553/eventsubscriptions/eg1/validate?id=F83B17BA-898A-4309-8F89-8BB2B3A06D02&t=2020-12-08T02:28:30.6463986Z&apiVersion=2020-04-01-preview&token=AEvNcDi9Gonj83RQEK4owr6zXA31QPzppkc7BwlvBeI%3d"" + }, + ""eventType"": ""Microsoft.EventGrid.SubscriptionValidationEvent"", + ""eventTime"": ""2020-12-08T02:28:30.6463986Z"", + ""metadataVersion"": ""1"", + ""dataVersion"": ""2"" +}]"; + + private const string NotificationRequest = +@"[{ + ""topic"":""/subscriptions/18318512-69ef-49db-9cbb-4a13fe0fa72c/resourceGroups/EventGrid/providers/Microsoft.Storage/storageAccounts/alrodegtest"", + ""subject"":""/blobServices/default/containers/sample-workitems/blobs/blob.txt"", + ""eventType"":""Microsoft.Storage.BlobCreated"", + ""id"":""e5c50ef5-f01e-0017-048b-d20b04066601"", + ""data"":{ + ""api"":""PutBlob"", + ""clientRequestId"":""8dd38cbd-67e6-473e-a64c-e4d715ed0a52"", + ""requestId"":""e5c50ef5-f01e-0017-048b-d20b04000000"", + ""eTag"":""0x8D8A0A2FA6E70FF"", + ""contentType"":""application/octet-stream"", + ""contentLength"":1, + ""blobType"":""BlockBlob"", + ""url"":""https://alrodegtest.blob.core.windows.net/[blobPathPlaceHolder]"", + ""sequencer"":""000000000000000000000000000089AE00000000018dd658"", + ""storageDiagnostics"":{ + ""batchId"":""aea96df5-b006-0006-008b-d291b0000000"" + } + }, + ""dataVersion"":"""", + ""metadataVersion"":""1"", + ""eventTime"":""2020-12-15T02:41:51.9623179Z"" + } +]"; + + public EventGridBlobTriggerEndToEndTests() + { + _nameResolver = new RandomNameResolver(); + + // pull from a default host + var host = new HostBuilder() + .ConfigureDefaultTestHost(b => + { + b.AddAzureStorageBlobs().AddAzureStorageQueues(); + }) + .Build(); + _blobServiceClient = new BlobServiceClient(TestEnvironment.PrimaryStorageAccountConnectionString); + _resolvedContainerName = _nameResolver.ResolveInString(EventGridContainerName); + _testContainer = _blobServiceClient.GetBlobContainerClient(_resolvedContainerName); + Assert.False(_testContainer.ExistsAsync().Result); + _testContainer.CreateAsync().Wait(); + } + + public IHostBuilder NewBuilder(TProgram program, Action configure = null) + { + var activator = new FakeActivator(); + activator.Add(program); + + return new HostBuilder() + .ConfigureDefaultTestHost(b => + { + IWebJobsBuilder builder = b.AddAzureStorageBlobs().AddAzureStorageQueues(); + var ss = builder.Services.BuildServiceProvider(); + }) + .ConfigureServices(services => + { + services.AddSingleton(activator); + services.AddSingleton(_nameResolver); + }); + } + + [Test] + public async Task EventGridRequest_Subscription_Succeeded() + { + var prog = new EventGrid_Program(); + var host = NewBuilder(prog).Build(); + + using (host) + { + host.Start(); + HttpResponseMessage response = await SendEventGridRequest(host, RegistrationRequest, "SubscriptionValidation"); + Assert.True(response.StatusCode == HttpStatusCode.OK); + } + } + + [Test] + public async Task EventGridRequest_Notification_Succeeded() + { + var blob = _testContainer.GetBlockBlobClient(TestBlobName); + await blob.UploadTextAsync("0"); + + var prog = new EventGrid_Program(); + var host = NewBuilder(prog).Build(); + + using (prog._completedEvent = new ManualResetEvent(initialState: false)) + using (host) + { + host.Start(); + HttpResponseMessage response = await SendEventGridRequest(host, NotificationRequest.Replace("[blobPathPlaceHolder]", _resolvedContainerName + "/" + TestBlobName), "Notification"); + Assert.True(response.StatusCode == HttpStatusCode.Accepted); + Assert.True(prog._completedEvent.WaitOne(TimeSpan.FromSeconds(60))); + } + } + + [Test] + public async Task PageBlob_NotSupported() + { + var prog = new EventGrid_PageBlob(); + var host = NewBuilder(prog).Build(); + + using (host) + { + host.Start(); + await Task.Delay(5000); // Wait util all logs a re populated + var log = host.GetTestLoggerProvider().GetAllLogMessages() + .FirstOrDefault(x => x.Level == Microsoft.Extensions.Logging.LogLevel.Error && x.FormattedMessage == "PageBlobClient is not supported with BlobTriggerKind.EventGrid"); + Assert.IsNotNull(log); + } + } + + private async Task SendEventGridRequest(IHost host, string content, string eventType) + { + var configProvidersEnumerator = host.Services.GetServices(typeof(IExtensionConfigProvider)).GetEnumerator(); + while (configProvidersEnumerator.MoveNext()) + { + if (configProvidersEnumerator.Current is IAsyncConverter convertor) + { + HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, "https://test?functionName=EventGridBlobTrigger"); + request.Content = new StringContent(content); + request.Headers.Add("Aeg-Event-Type", eventType); + return await convertor.ConvertAsync(request, CancellationToken.None); + } + } + + throw new Exception("IAsyncConverter was not found"); + } + + public class EventGrid_Program + { + public ManualResetEvent _completedEvent; + + [FunctionName("EventGridBlobTrigger")] + public void EventGridBlobTrigger( + [BlobTrigger(EventGridContainerName + "/{name}", Kind = BlobTriggerKind.EventGrid)] string input) + { + _completedEvent.Set(); + } + } + + public class EventGrid_PageBlob + { + [FunctionName("EventGridBlobTrigger")] + public void EventGridBlobTrigger( + [BlobTrigger(EventGridContainerName + "/{name}", Kind = BlobTriggerKind.EventGrid)] PageBlobClient input) + { + } + } + } +}