Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobTrigger uses Event Grid subscription #2438

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,10 @@ public string BlobPath
{
get { return _blobPath; }
}

/// <summary>
/// Returns a bool value that indicates whether EventGrid is used.
/// </summary>
public bool UseEventGrid { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -15,32 +16,37 @@
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Dispatch;

namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
{
[Extension("AzureStorageBlobs", "Blobs")]
internal class BlobsExtensionConfigProvider : IExtensionConfigProvider,
IConverter<BlobAttribute, CloudBlobContainer>,
IConverter<BlobAttribute, CloudBlobDirectory>,
IConverter<BlobAttribute, BlobsExtensionConfigProvider.MultiBlobContext>
IConverter<BlobAttribute, BlobsExtensionConfigProvider.MultiBlobContext>,
IAsyncConverter<HttpRequestMessage, HttpResponseMessage>
{
private readonly BlobTriggerAttributeBindingProvider _triggerBinder;
private StorageAccountProvider _accountProvider;
private IContextGetter<IBlobWrittenWatcher> _blobWrittenWatcherGetter;
private readonly INameResolver _nameResolver;
private IConverterManager _converterManager;
private readonly IHttpEndpointManager _httpEndpointManager;

public BlobsExtensionConfigProvider(StorageAccountProvider accountProvider,
BlobTriggerAttributeBindingProvider triggerBinder,
IContextGetter<IBlobWrittenWatcher> contextAccessor,
INameResolver nameResolver,
IConverterManager converterManager)
IConverterManager converterManager,
IHttpEndpointManager httpEndpointManager)
{
_accountProvider = accountProvider;
_triggerBinder = triggerBinder;
_blobWrittenWatcherGetter = contextAccessor;
_nameResolver = nameResolver;
_converterManager = converterManager;
_httpEndpointManager = httpEndpointManager;
}

public void Initialize(ExtensionConfigContext context)
Expand Down Expand Up @@ -86,6 +92,8 @@ private void InitilizeBlobBindings(ExtensionConfigContext context)

private void InitializeBlobTriggerBindings(ExtensionConfigContext context)
{
_httpEndpointManager.RegisterHttpEnpoint(context);

var rule = context.AddBindingRule<BlobTriggerAttribute>();
rule.BindToTrigger<ICloudBlob>(_triggerBinder);

Expand All @@ -105,6 +113,11 @@ private void InitializeBlobTriggerBindings(ExtensionConfigContext context)
context.AddConverter(new StorageBlobConverter<CloudPageBlob>());
}

public async Task<HttpResponseMessage> ConvertAsync(HttpRequestMessage input, CancellationToken cancellationToken)
{
return await _httpEndpointManager.ProcessHttpRequestAsync(input, cancellationToken);
}

#region Container rules
CloudBlobContainer IConverter<BlobAttribute, CloudBlobContainer>.Convert(
BlobAttribute blobAttribute)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Dispatch;
using Microsoft.Azure.Storage.Blob;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal class SharedQueueBlobListener : IListener
{
public SharedQueueBlobListener(ListenerFactoryContext context, StorageAccount dataAccount)
{
// register SharedQueueBlobHandler handler
context.GetDispatchQueue(new SharedQueueBlobHandler(context.Executor, dataAccount));
}

public Task StartAsync(CancellationToken cancellationToken)
{
return Task.FromResult(true);
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.FromResult(true);
}

public void Dispose()
{
}

public void Cancel()
{
}

internal class SharedQueueBlobHandler : IMessageHandler
{
private ITriggeredFunctionExecutor _executor;
private CloudBlobClient _blobClient;

public SharedQueueBlobHandler(ITriggeredFunctionExecutor executor, StorageAccount dataAccount)
{
_executor = executor;
_blobClient = dataAccount.CreateCloudBlobClient();
}
public async Task<FunctionResult> TryExecuteAsync(JObject data, CancellationToken cancellationToken)
{
// Both Event Grid schema and Cloud Event schema define blob uri in ["data"]["url"]
ICloudBlob blob = await _blobClient.GetBlobReferenceFromServerAsync(new Uri(data["data"]["url"].ToString()));

TriggeredFunctionData input = new TriggeredFunctionData
{
TriggerValue = blob
};
return await _executor.TryExecuteAsync(input, cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex

ITriggerBinding binding = new BlobTriggerBinding(parameter, hostAccount, dataAccount, path,
_hostIdProvider, _queueOptions, _blobsOptions, _exceptionHandler, _blobWrittenWatcherSetter,
_messageEnqueuedWatcherSetter, _sharedContextProvider, _singletonManager, _loggerFactory);
_messageEnqueuedWatcherSetter, _sharedContextProvider, _singletonManager,
blobTriggerAttribute.UseEventGrid, _loggerFactory);

return Task.FromResult(binding);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal class BlobTriggerBinding : ITriggerBinding
private readonly IAsyncObjectToTypeConverter<ICloudBlob> _converter;
private readonly IReadOnlyDictionary<string, Type> _bindingDataContract;
private readonly IHostSingletonManager _singletonManager;
private readonly bool _useEventGrid;

public BlobTriggerBinding(ParameterInfo parameter,
StorageAccount hostAccount,
Expand All @@ -52,7 +53,9 @@ public BlobTriggerBinding(ParameterInfo parameter,
SharedQueueWatcher messageEnqueuedWatcherSetter,
ISharedContextProvider sharedContextProvider,
IHostSingletonManager singletonManager,
ILoggerFactory loggerFactory)
bool useEventGrid,
ILoggerFactory loggerFactory
)
{
_parameter = parameter ?? throw new ArgumentNullException(nameof(parameter));
_hostAccount = hostAccount ?? throw new ArgumentNullException(nameof(hostAccount));
Expand All @@ -73,6 +76,7 @@ public BlobTriggerBinding(ParameterInfo parameter,
_loggerFactory = loggerFactory;
_converter = CreateConverter(_blobClient);
_bindingDataContract = CreateBindingDataContract(path);
_useEventGrid = useEventGrid;
}

public Type TriggerValueType
Expand Down Expand Up @@ -176,11 +180,19 @@ public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)

var container = _blobClient.GetContainerReference(_path.ContainerNamePattern);

var factory = new BlobListenerFactory(_hostIdProvider, _queueOptions, _blobsOptions, _exceptionHandler,
_blobWrittenWatcherSetter, _messageEnqueuedWatcherSetter, _sharedContextProvider, _loggerFactory,
context.Descriptor, _hostAccount, _dataAccount, container, _path, context.Executor, _singletonManager);
if (!_useEventGrid)
{
var factory = new BlobListenerFactory(_hostIdProvider, _queueOptions, _blobsOptions, _exceptionHandler,
_blobWrittenWatcherSetter, _messageEnqueuedWatcherSetter, _sharedContextProvider, _loggerFactory,
context.Descriptor, _hostAccount, _dataAccount, container, _path, context.Executor, _singletonManager);

return factory.CreateAsync(context.CancellationToken);
return factory.CreateAsync(context.CancellationToken);
}
else
{
var listener = new SharedQueueBlobListener(context, _dataAccount);
return Task.FromResult((IListener)listener);
}
}

public ParameterDescriptor ToParameterDescriptor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ public QueueWriter(StorageLoadBalancerQueue parent, CloudQueue queue)

public async Task AddAsync(T item, CancellationToken cancellationToken = default(CancellationToken))
{
string contents = JsonConvert.SerializeObject(
item,
JsonSerialization.Settings);

var msg = new CloudQueueMessage(contents);
var msg = new CloudQueueMessage(item.ToString());
await _queue.AddMessageAndCreateIfNotExistsAsync(msg, cancellationToken);

_parent._sharedWatcher.Notify(_queue.Name);
Expand All @@ -87,7 +83,7 @@ public QueueWriter(StorageLoadBalancerQueue parent, CloudQueue queue)
private CloudQueue Convert(string queueMoniker)
{
// $$$ Review
var account = _storageAccountProvider.Get(ConnectionStringNames.Dashboard);
var account = _storageAccountProvider.Get(ConnectionStringNames.Storage);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a bug?

Copy link
Member Author

@alrod alrod Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Actually we did not use StorageLoadBalancerQueue class before but the class was checked in in the repository.

var queue = account.CreateCloudQueueClient().GetQueueReference(queueMoniker);
return queue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public static IWebJobsBuilder AddAzureStorage(this IWebJobsBuilder builder, Acti
};

// $$$ Move to Host.Storage?
builder.Services.TryAddSingleton<ILoadBalancerQueue, StorageLoadBalancerQueue>();
// Replace existing runtime InMemoryLoadBalancerQueue with storage-backed implementations.
builder.Services.AddSingleton<ILoadBalancerQueue, StorageLoadBalancerQueue>();

builder.Services.TryAddSingleton<SharedQueueWatcher>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.5" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.0" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.1.0" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.14" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Microsoft.Azure.WebJobs.Host\WebJobs.Host.csproj" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this an intentional change, or something you were doing while developing? We should be able to continue depending on the package only here, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

</ItemGroup>
</Project>
121 changes: 121 additions & 0 deletions src/Microsoft.Azure.WebJobs.Host/Dispatch/HttpRequestProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Linq;
using System.Web;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System.Net;
using System.Threading.Tasks;
using Newtonsoft.Json;
using System.Threading;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Config;

namespace Microsoft.Azure.WebJobs.Host.Dispatch
{
public interface IHttpEndpointManager
{
void RegisterHttpEnpoint(ExtensionConfigContext context);

Task<HttpResponseMessage> ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken);
}

/// <summary>
/// Hadles HttpRequests that comes to extension http endpoint.
/// </summary>
internal class HttpEndpointManager : IHttpEndpointManager
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new class? and interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we use HttpEndpointManager to register http enpoint for an extension and handle to http requests for the extension.

Interface is introduced for DI as implementation is used some internal stuff.

{
private readonly ILogger _logger;
private readonly SharedQueueHandler _sharedQueue;

public HttpEndpointManager(SharedQueueHandler sharedQueue, ILoggerFactory loggerFactory)
{
_sharedQueue = sharedQueue;
_logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("HttpRequestProcessor"));
}

public void RegisterHttpEnpoint(ExtensionConfigContext context)
{
Uri url = context.GetWebhookHandler();
_logger.LogInformation($"registered http endpoint = {url?.GetLeftPart(UriPartial.Path)}");
}

public async Task<HttpResponseMessage> ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken)
{
var functionId = HttpUtility.ParseQueryString(req.RequestUri.Query)[SharedQueueHandler.FunctionId];
IEnumerable<string> eventTypeHeaders = null;
string eventTypeHeader = null;
if (req.Headers.TryGetValues("aeg-event-type", out eventTypeHeaders))
{
eventTypeHeader = eventTypeHeaders.First();
}

if (String.Equals(eventTypeHeader, "SubscriptionValidation", StringComparison.OrdinalIgnoreCase))
{
string jsonArray = await req.Content.ReadAsStringAsync();
SubscriptionValidationEvent validationEvent = null;
List<JObject> events = JsonConvert.DeserializeObject<List<JObject>>(jsonArray);
// TODO remove unnecessary serialization
validationEvent = ((JObject)events[0]["data"]).ToObject<SubscriptionValidationEvent>();
SubscriptionValidationResponse validationResponse = new SubscriptionValidationResponse { ValidationResponse = validationEvent.ValidationCode };
var returnMessage = new HttpResponseMessage(HttpStatusCode.OK);
returnMessage.Content = new StringContent(JsonConvert.SerializeObject(validationResponse));
_logger.LogInformation($"perform handshake with eventGrid for function: {functionId}");
return returnMessage;
}
else if (String.Equals(eventTypeHeader, "Notification", StringComparison.OrdinalIgnoreCase))
{
JArray events = null;
string requestContent = await req.Content.ReadAsStringAsync();
var token = JToken.Parse(requestContent);
if (token.Type == JTokenType.Array)
{
// eventgrid schema
events = (JArray)token;
}
else if (token.Type == JTokenType.Object)
{
// cloudevent schema
events = new JArray
{
token
};
}

foreach (JObject jo in events)
{
// TODO ALROD - Azure Queues do not suport batch sending. Sending in parallel? ServiceBus?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would indeed be good to send them in parallel.

What is the desired behavior if enqueuing one of the messages fails? Right now, we'd end up in a partial number of messages enqueued and an exception bubbling up, correct?

Copy link
Member Author

@alrod alrod Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added sending in parallel.

Right now, we'd end up in a partial number of messages enqueued and an exception bubbling up, correct?

Correct.

await _sharedQueue.EnqueueAsync(jo, functionId, cancellationToken);
}

return new HttpResponseMessage(HttpStatusCode.Accepted);
}
else if (String.Equals(eventTypeHeader, "Unsubscribe", StringComparison.OrdinalIgnoreCase))
{
// TODO disable function?
return new HttpResponseMessage(HttpStatusCode.Accepted);
}

return new HttpResponseMessage(HttpStatusCode.BadRequest);
}
}

internal class SubscriptionValidationResponse
{
[JsonProperty(PropertyName = "validationResponse")]
public string ValidationResponse { get; set; }
}

internal class SubscriptionValidationEvent
{
[JsonProperty(PropertyName = "validationCode")]
public string ValidationCode { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Microsoft.Azure.WebJobs.Host.Dispatch
/// these messages will be distributed to multiple worker instance
/// for later processing
/// </summary>
internal interface IDispatchQueueHandler
public interface IDispatchQueueHandler
{
/// <summary> Add a message to the shared queue.</summary>
/// <param name="message"> A JObject to be later processed by IMessageHandler </param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.Azure.WebJobs.Host.Dispatch
/// Defines the contract for executing a triggered function using a dispatch queue.
/// <see cref="ListenerFactoryContext.GetDispatchQueue(IMessageHandler)"/>
/// </summary>
internal interface IMessageHandler
public interface IMessageHandler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the visibility of these interfaces changing for to allow the extension to consume them?

@mathewc can you please comment here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we did not use the interface before (there was only E2E test). IMessageHandler must be public in order to use in an extension.

{
/// <summary>
/// Try to invoke the triggered function using the values specified.
Expand Down
Loading