Skip to content

Commit

Permalink
Fix PR1
Browse files Browse the repository at this point in the history
  • Loading branch information
alrod committed Dec 15, 2020
1 parent 5f48487 commit ab453f9
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public sealed class BlobTriggerAttribute : Attribute, IConnectionProvider
{
private readonly string _blobPath;

// AnalyticsScan is default kind as it does not require additional actions to set up a blob trigger
private BlobTriggerKind _blobTriggerKind = BlobTriggerKind.AnalyticsScan;

/// <summary>
/// Initializes a new instance of the <see cref="BlobTriggerAttribute"/> class.
/// </summary>
Expand Down Expand Up @@ -70,6 +73,10 @@ public string BlobPath
/// <summary>
/// Returns a bool value that indicates whether EventGrid is used.
/// </summary>
public bool UseEventGrid { get; set; }
public BlobTriggerKind Kind
{
get { return _blobTriggerKind; }
set { _blobTriggerKind = value; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Microsoft.Azure.WebJobs
{
/// <summary>
/// Provides blob trigger kinds to detect changes.
/// </summary>
public enum BlobTriggerKind
{
/// <summary>
/// Changes detection is relied on Storage Analytics logs.
/// <see href="https://docs.microsoft.com/en-us/rest/api/storageservices/storage-analytics-log-format">Storage Analytics logs</see>
/// </summary>
AnalyticsScan,
/// <summary>
/// Changes detection is relied on EventGrid.
/// <see href="https://docs.microsoft.com/en-us/azure/event-grid/event-schema-blob-storage">Azure Blob Storage as an Event Grid source</see>
/// </summary>
EventGrid
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Web;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Expand All @@ -18,6 +21,8 @@
using Microsoft.Azure.WebJobs.Extensions.Storage.Common;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Config
{
Expand All @@ -32,22 +37,26 @@ internal class BlobsExtensionConfigProvider : IExtensionConfigProvider,
private IContextGetter<IBlobWrittenWatcher> _blobWrittenWatcherGetter;
private readonly INameResolver _nameResolver;
private IConverterManager _converterManager;
private readonly IHttpRequestProcessor _httpEndpointManager;
private readonly BlobTriggerQueueWriterFactory _blobTriggerQueueWriterFactory;
private readonly ILogger _logger;
private BlobTriggerQueueWriter _blobTriggerQueueWriter;

public BlobsExtensionConfigProvider(
BlobServiceClientProvider blobServiceClientProvider,
BlobTriggerAttributeBindingProvider triggerBinder,
IContextGetter<IBlobWrittenWatcher> contextAccessor,
INameResolver nameResolver,
IConverterManager converterManager,
IHttpRequestProcessor httpEndpointManager)
BlobTriggerQueueWriterFactory blobTriggerQueueWriterFactory,
ILoggerFactory loggerFactory)
{
_blobServiceClientProvider = blobServiceClientProvider;
_triggerBinder = triggerBinder;
_blobWrittenWatcherGetter = contextAccessor;
_nameResolver = nameResolver;
_converterManager = converterManager;
_httpEndpointManager = httpEndpointManager;
_blobTriggerQueueWriterFactory = blobTriggerQueueWriterFactory;
_logger = loggerFactory.CreateLogger<BlobsExtensionConfigProvider>();
}

public void Initialize(ExtensionConfigContext context)
Expand All @@ -58,7 +67,12 @@ public void Initialize(ExtensionConfigContext context)

private void InitilizeBlobBindings(ExtensionConfigContext context)
{
_httpEndpointManager.RegisterHttpEnpoint(context);
System.Diagnostics.Debugger.Break();

#pragma warning disable CS0618 // Type or member is obsolete
Uri url = context.GetWebhookHandler();
#pragma warning restore CS0618 // Type or member is obsolete
_logger.LogInformation($"registered http endpoint = {url?.GetLeftPart(UriPartial.Path)}");

var rule = context.AddBindingRule<BlobAttribute>();

Expand Down Expand Up @@ -120,7 +134,16 @@ BlobContainerClient IConverter<BlobAttribute, BlobContainerClient>.Convert(

public async Task<HttpResponseMessage> ConvertAsync(HttpRequestMessage input, CancellationToken cancellationToken)
{
return await _httpEndpointManager.ProcessHttpRequestAsync(input, cancellationToken).ConfigureAwait(false);
var functionName = HttpUtility.ParseQueryString(input.RequestUri.Query)["functionName"];
if (_blobTriggerQueueWriter == null)
{
_blobTriggerQueueWriter = await _blobTriggerQueueWriterFactory.CreateAsync(cancellationToken).ConfigureAwait(false);
}

string content = await input.Content.ReadAsStringAsync().ConfigureAwait(false);
var headers = input.Headers;

return await EventGrid.HttpRequestProcessor.ProcessAsync(input, functionName, _logger, ProcessEventsAsync, cancellationToken).ConfigureAwait(false);
}

#endregion
Expand Down Expand Up @@ -355,5 +378,32 @@ private async Task<BlobWithContainer<BlobBaseClient>> GetBlobAsync(

return new BlobWithContainer<BlobBaseClient>(container, blob);
}

private async Task<HttpResponseMessage> ProcessEventsAsync(JArray events, string functionName, CancellationToken cancellationToken)
{
foreach (JObject jo in events)
{
BlobTriggerMessage blobTriggerMessage = GetBlobTriggerMessage(jo, functionName);
await _blobTriggerQueueWriter.EnqueueAsync(blobTriggerMessage, cancellationToken).ConfigureAwait(false);
}

return new HttpResponseMessage(HttpStatusCode.Accepted);
}

private BlobTriggerMessage GetBlobTriggerMessage(JObject jo, string functionId)
{
JObject data = jo["data"] as JObject;

BlobUriBuilder blobUriBuilder = new BlobUriBuilder(new Uri(data["url"].ToString()));

return new BlobTriggerMessage()
{
ETag = $"\"{data["eTag"]}\"",
BlobType = (BlobType)Enum.Parse(typeof(BlobType), data["blobType"].ToString().Replace("Blob", "")),
ContainerName = blobUriBuilder.BlobContainerName,
BlobName = blobUriBuilder.BlobName,
FunctionId = functionId
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ internal class BlobListenerFactory : IListenerFactory
private readonly BlobsOptions _blobsOptions;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly IContextSetter<IBlobWrittenWatcher> _blobWrittenWatcherSetter;
private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter;
//private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter;
private readonly BlobTriggerQueueWriterFactory _blobTriggerQueueWriterFactory;
private readonly ISharedContextProvider _sharedContextProvider;
private readonly FunctionDescriptor _functionDescriptor;
private readonly ILoggerFactory _loggerFactory;
Expand All @@ -35,15 +36,15 @@ internal class BlobListenerFactory : IListenerFactory
private readonly QueueServiceClient _dataQueueServiceClient;
private readonly BlobContainerClient _container;
private readonly IBlobPathSource _input;
private readonly bool _useEventGrid;
private readonly BlobTriggerKind _blobTriggerKind;
private readonly ITriggeredFunctionExecutor _executor;
private readonly IHostSingletonManager _singletonManager;

public BlobListenerFactory(IHostIdProvider hostIdProvider,
BlobsOptions blobsOptions,
IWebJobsExceptionHandler exceptionHandler,
IContextSetter<IBlobWrittenWatcher> blobWrittenWatcherSetter,
SharedQueueWatcher messageEnqueuedWatcherSetter,
BlobTriggerQueueWriterFactory blobTriggerQueueWriterFactory,
ISharedContextProvider sharedContextProvider,
ILoggerFactory loggerFactory,
FunctionDescriptor functionDescriptor,
Expand All @@ -53,15 +54,15 @@ public BlobListenerFactory(IHostIdProvider hostIdProvider,
QueueServiceClient dataQueueServiceClient,
BlobContainerClient container,
IBlobPathSource input,
bool useEventGrid,
BlobTriggerKind triggerKind,
ITriggeredFunctionExecutor executor,
IHostSingletonManager singletonManager)
{
_hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider));
_blobTriggerQueueWriterFactory = blobTriggerQueueWriterFactory ?? throw new ArgumentNullException(nameof(blobTriggerQueueWriterFactory));
_blobsOptions = blobsOptions ?? throw new ArgumentNullException(nameof(blobsOptions));
_exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler));
_blobWrittenWatcherSetter = blobWrittenWatcherSetter ?? throw new ArgumentNullException(nameof(blobWrittenWatcherSetter));
_messageEnqueuedWatcherSetter = messageEnqueuedWatcherSetter ?? throw new ArgumentNullException(nameof(messageEnqueuedWatcherSetter));
_sharedContextProvider = sharedContextProvider ?? throw new ArgumentNullException(nameof(sharedContextProvider));
_functionDescriptor = functionDescriptor ?? throw new ArgumentNullException(nameof(functionDescriptor));
_loggerFactory = loggerFactory;
Expand All @@ -71,7 +72,7 @@ public BlobListenerFactory(IHostIdProvider hostIdProvider,
_dataQueueServiceClient = dataQueueServiceClient ?? throw new ArgumentNullException(nameof(dataQueueServiceClient));
_container = container ?? throw new ArgumentNullException(nameof(container));
_input = input ?? throw new ArgumentNullException(nameof(input));
_useEventGrid = useEventGrid;
_blobTriggerKind = triggerKind;
_executor = executor ?? throw new ArgumentNullException(nameof(executor));
_singletonManager = singletonManager ?? throw new ArgumentNullException(nameof(singletonManager));
}
Expand All @@ -80,7 +81,6 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
{
// Note that these clients are intentionally for the storage account rather than for the dashboard account.
// We use the storage, not dashboard, account for the blob receipt container and blob trigger queues.
var primaryQueueClient = _hostQueueServiceClient;
var primaryBlobClient = _hostBlobServiceClient;

// Important: We're using the storage account of the function target here, which is the account that the
Expand All @@ -89,29 +89,27 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
var targetBlobClient = _dataBlobServiceClient;
var targetQueueClient = _dataQueueServiceClient;

string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken).ConfigureAwait(false);
string hostBlobTriggerQueueName = HostQueueNames.GetHostBlobTriggerQueueName(hostId);
var hostBlobTriggerQueue = primaryQueueClient.GetQueueClient(hostBlobTriggerQueueName);
BlobTriggerQueueWriter blobTriggerQueueWriter = await _blobTriggerQueueWriterFactory.CreateAsync(cancellationToken).ConfigureAwait(false);

SharedQueueWatcher sharedQueueWatcher = _messageEnqueuedWatcherSetter;
string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken).ConfigureAwait(false);

SharedBlobListener sharedBlobListener = null;
// we do not need SharedBlobListener for EventGrid blob trigger
if (!_useEventGrid)
// we do not need to create SharedBlobListener for EventGrid blob trigger
if (_blobTriggerKind == BlobTriggerKind.AnalyticsScan)
{
sharedBlobListener = _sharedContextProvider.GetOrCreateInstance<SharedBlobListener>(
new SharedBlobListenerFactory(hostId, _hostBlobServiceClient, _exceptionHandler, _blobWrittenWatcherSetter, _loggerFactory.CreateLogger<BlobListener>()));

// Register the blob container we wish to monitor with the shared blob listener.
await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlobClient,
hostBlobTriggerQueue, sharedQueueWatcher, cancellationToken).ConfigureAwait(false);
blobTriggerQueueWriter, cancellationToken).ConfigureAwait(false);
}

// Create a "bridge" listener that will monitor the blob
// notification queue and dispatch to the target job function.
SharedBlobQueueListener sharedBlobQueueListener = _sharedContextProvider.GetOrCreateInstance<SharedBlobQueueListener>(
new SharedBlobQueueListenerFactory(_hostQueueServiceClient, sharedQueueWatcher, hostBlobTriggerQueue,
_blobsOptions, _exceptionHandler, _loggerFactory, sharedBlobListener?.BlobWritterWatcher, _functionDescriptor));
new SharedBlobQueueListenerFactory(_hostQueueServiceClient, blobTriggerQueueWriter.SharedQueueWatcher, blobTriggerQueueWriter.QueueClient,
_blobsOptions, _exceptionHandler, _loggerFactory, sharedBlobListener?.BlobWritterWatcher, _functionDescriptor, _blobTriggerKind));
var queueListener = new BlobListener(sharedBlobQueueListener);

// the client to use for the poison queue
Expand All @@ -126,7 +124,8 @@ await RegisterWithSharedBlobListenerAsync(hostId, sharedBlobListener, primaryBlo
// shared blob listener in this host instance.
// We do not need SharedBlobListener for EventGrid blob trigger.
object singletonListenerCreated = false;
if (sharedBlobListener != null && !_sharedContextProvider.TryGetValue(SingletonBlobListenerScopeId, out singletonListenerCreated))
if (_blobTriggerKind == BlobTriggerKind.AnalyticsScan
&& !_sharedContextProvider.TryGetValue(SingletonBlobListenerScopeId, out singletonListenerCreated))
{
// Create a singleton shared blob listener, since we only
// want a single instance of the blob poll/scan logic to be running
Expand All @@ -151,12 +150,11 @@ private async Task RegisterWithSharedBlobListenerAsync(
string hostId,
SharedBlobListener sharedBlobListener,
BlobServiceClient blobClient,
QueueClient hostBlobTriggerQueue,
IMessageEnqueuedWatcher messageEnqueuedWatcher,
BlobTriggerQueueWriter blobTriggerQueueWriter,
CancellationToken cancellationToken)
{
BlobTriggerExecutor triggerExecutor = new BlobTriggerExecutor(hostId, _functionDescriptor, _input, new BlobReceiptManager(blobClient),
new BlobTriggerQueueWriter(hostBlobTriggerQueue, messageEnqueuedWatcher), _loggerFactory.CreateLogger<BlobListener>());
blobTriggerQueueWriter, _loggerFactory.CreateLogger<BlobListener>());

await sharedBlobListener.RegisterAsync(blobClient, _container, triggerExecutor, cancellationToken).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ internal partial class BlobQueueTriggerExecutor : ITriggerExecutor<QueueMessage>
private readonly IBlobCausalityReader _causalityReader;
private readonly IBlobWrittenWatcher _blobWrittenWatcher;
private readonly ConcurrentDictionary<string, BlobQueueRegistration> _registrations;
private readonly BlobTriggerKind _blobTriggerKind;
private readonly ILogger<BlobListener> _logger;

public BlobQueueTriggerExecutor(IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger)
: this(BlobCausalityReader.Instance, blobWrittenWatcher, logger)
public BlobQueueTriggerExecutor(BlobTriggerKind kind, IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger)
: this(BlobCausalityReader.Instance, kind, blobWrittenWatcher, logger)
{
}

public BlobQueueTriggerExecutor(IBlobCausalityReader causalityReader, IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger)
public BlobQueueTriggerExecutor(IBlobCausalityReader causalityReader, BlobTriggerKind blobTriggerKind, IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger)
{
_causalityReader = causalityReader;
_blobTriggerKind = blobTriggerKind;
_blobWrittenWatcher = blobWrittenWatcher;
_registrations = new ConcurrentDictionary<string, BlobQueueRegistration>();
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand All @@ -57,6 +59,8 @@ public void Register(string functionId, BlobQueueRegistration registration)

public async Task<FunctionResult> ExecuteAsync(QueueMessage value, CancellationToken cancellationToken)
{
Debugger.Break();

BlobTriggerMessage message = JsonConvert.DeserializeObject<BlobTriggerMessage>(value.Body.ToValidUTF8String(), JsonSerialization.Settings);

if (message == null)
Expand Down Expand Up @@ -100,7 +104,7 @@ public async Task<FunctionResult> ExecuteAsync(QueueMessage value, CancellationT
string possibleETag = blobProperties.ETag.ToString();

// If the blob still exists but the ETag is different, delete the message but do a fast path notification.
if (!string.Equals(message.ETag, possibleETag, StringComparison.Ordinal) && _blobWrittenWatcher != null)
if (_blobTriggerKind == BlobTriggerKind.AnalyticsScan && !string.Equals(message.ETag, possibleETag, StringComparison.Ordinal))
{
_blobWrittenWatcher.Notify(new Extensions.Storage.Blobs.BlobWithContainer<BlobBaseClient>(container, blob));
return successResult;
Expand Down
Loading

0 comments on commit ab453f9

Please sign in to comment.