-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EventGrid blob trigger support #17137
Conversation
/// <summary> | ||
/// Returns a bool value that indicates whether EventGrid is used. | ||
/// </summary> | ||
public bool UseEventGrid { get; set; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to have different trigger kinds?
Is it worth to make this an enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
Changefeed would be another strategy once is GA.
If we create enum then we should also name existing strategy/strategies and put there to give user list of possibilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enum is added.
|
||
public async Task<HttpResponseMessage> ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken) | ||
{ | ||
//Debugger.Break(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove here and elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
|
||
namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners | ||
{ | ||
internal interface IHttpRequestProcessor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface removed.
@@ -17,6 +17,12 @@ | |||
<Compile Include="$(MicrosoftAzureWebJobsExtensionsClientsSources)\**\*.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" /> | |||
</ItemGroup> | |||
|
|||
<ItemGroup> | |||
<Compile Include="..\..\..\eventgrid\Microsoft.Azure.WebJobs.Extensions.EventGrid\src\TriggerBinding\HttpRequestProcessor.cs" Link="EventGrid\HttpRequestProcessor.cs" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might become a problem. We don't run storage tests when something in the eventgrid
directory changes. So it's easy to unintentionally break something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What other options do we have here?
I see that this approach is used in other places:
https://github.com/Azure/azure-sdk-for-net/blob/7705656e645e5bc1e9afb07fe33f65a5a565da63/sdk/storage/Azure.Storage.Blobs/src/Azure.Storage.Blobs.csproj#L2
Eventually we will run storage test begore a release.
Can we add a functional test that simulates an event grid notification? |
|
||
public async Task<HttpResponseMessage> ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken) | ||
{ | ||
var functionId = HttpUtility.ParseQueryString(req.RequestUri.Query)["functionName"]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a partial(?) copy of the event grid implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to include sources in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should be an extension for an extension. I.e. someone would need to pull extra package like Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.EventGrid
in order to add extra functionality that spans across blobs and eventgrid. Some kind of plugin architecture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is different for blob and event grid. For event grid we check if function with the name exists, for blob we will check this latter.
} | ||
|
||
return new HttpResponseMessage(HttpStatusCode.BadRequest); | ||
// FIXME without internal queuing, we are going to process all events in parallel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we file an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will require some kind of internal queue(like for blob storage trigger) and redesign of EventGrid extension. Added an issue: #17756
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Event Grid for blob changes has a limitation. It does not cover all blob types, i.e. it can cover either (block blobs and append blobs) or (block blocks and page blobs) - the documentation around it is somewhat confusing so we might need to check experimentally which pair is supported.
Anyway. It would be great to add some assertions to fail fast if customer is trying to bind to blobs we know they're not supported, i.e. they try to bind to SDK client that's not supported.
At least we should be verbose in docs to call out this limitation (probably around flag/enum).
|
||
public BlobsExtensionConfigProvider( | ||
BlobServiceClientProvider blobServiceClientProvider, | ||
BlobTriggerAttributeBindingProvider triggerBinder, | ||
IContextGetter<IBlobWrittenWatcher> contextAccessor, | ||
INameResolver nameResolver, | ||
IConverterManager converterManager) | ||
IConverterManager converterManager, | ||
IHttpRequestProcessor httpEndpointManager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd rather align naming between type and property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add the check in BlobTriggerAttributeBindingProvider
@@ -99,7 +100,7 @@ public async Task<FunctionResult> ExecuteAsync(QueueMessage value, CancellationT | |||
string possibleETag = blobProperties.ETag.ToString(); | |||
|
|||
// If the blob still exists but the ETag is different, delete the message but do a fast path notification. | |||
if (!string.Equals(message.ETag, possibleETag, StringComparison.Ordinal)) | |||
if (!string.Equals(message.ETag, possibleETag, StringComparison.Ordinal) && _blobWrittenWatcher != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this new condition really means that "UseEventGrid == true" ? I.e. is this derived from fact that we have not created _blobWrittenWatcher layers up because flag was true?
If that's the case then I'd suggest to inject the flag (or enum, see other discussion) everywhere we branch logic based on "blob change source". Might be easier to understand why logic branches like that instead of tracing dataflow each time we try to understand why we end up in given code branch.
If such exercise yields classes that look like if(event_grid){ } else { }. Then maybe these should be split and refactored into different abstraction to be more cohesive. I.e. I'd rather introduce new types or split existing types instead of compounding logical branches in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I injected BlobTriggerKind
to BlobQueueTriggerExecutor
for better visibility.
|
||
namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners | ||
{ | ||
internal class HttpRequestProcessor : IHttpRequestProcessor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sdk/storage/Azure.Storage.Webjobs.Extensions.Blobs/src/Listeners/HttpRequestProcessor.cs
- it seems that this file is duplicated. I.e. one of them should be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
private async Task InitializeWriterAsync(CancellationToken cancellationToken) | ||
{ | ||
string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken).ConfigureAwait(false); | ||
string hostBlobTriggerQueueName = HostQueueNames.GetHostBlobTriggerQueueName(hostId); | ||
var hostBlobTriggerQueue = _queueServiceClientProvider.GetHost().GetQueueClient(hostBlobTriggerQueueName); | ||
|
||
_blobTriggerQueueWriter = new BlobTriggerQueueWriter(hostBlobTriggerQueue, _sharedQueueWatcher); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we DI this? I.e. inject ready to use blobTriggerQueueWritter to this class? We might need a factory like
Line 149 in 785ae16
new BlobTriggerQueueWriter(hostBlobTriggerQueue, messageEnqueuedWatcher), _loggerFactory.CreateLogger<BlobListener>()); |
Resolving blob queue name is independent from the strategy we listen to blob changes, so there's an opportunity to reduce code duplication as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlobTriggerQueueWriterFactory
was added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see some files placed in directories that has been renamed at some point. Could you please go through the changes and make sure directory structure and namespaces are correct.
This pull request is protected by Check Enforcer. What is Check Enforcer?Check Enforcer helps ensure all pull requests are covered by at least one check-run (typically an Azure Pipeline). When all check-runs associated with this pull request pass then Check Enforcer itself will pass. Why am I getting this message?You are getting this message because Check Enforcer did not detect any check-runs being associated with this pull request within five minutes. This may indicate that your pull request is not covered by any pipelines and so Check Enforcer is correctly blocking the pull request being merged. What should I do now?If the check-enforcer check-run is not passing and all other check-runs associated with this PR are passing (excluding license-cla) then you could try telling Check Enforcer to evaluate your pull request again. You can do this by adding a comment to this pull request as follows: What if I am onboarding a new service?Often, new services do not have validation pipelines associated with them, in order to bootstrap pipelines for a new service, you can issue the following command as a pull request comment: |
ab453f9
to
661d429
Compare
@pakrym, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me overall.
token | ||
}; | ||
} | ||
return await HttpRequestProcessor.ProcessAsync(req, functionName, _logger, ProcessEventsAsync, CancellationToken.None).ConfigureAwait(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason why HttpRequestProcessor
isn't a singleton and DI'ed?
I'd avoid using statics to pack functionality if possible.
From other angle. _logger
would normally be a dependency for HttpRequestProcessor
rather than a method parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, I added it in DI
|
||
if (_blobTriggerQueueWriter == null) | ||
{ | ||
await InitializeWriterAsync(cancellationToken).ConfigureAwait(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in case of race. is double init fine or should this be under lock ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file was removed
@@ -22,6 +22,12 @@ public sealed partial class BlobTriggerAttribute : System.Attribute, Microsoft.A | |||
public BlobTriggerAttribute(string blobPath) { } | |||
public string BlobPath { get { throw null; } } | |||
public string Connection { get { throw null; } set { } } | |||
public Microsoft.Azure.WebJobs.BlobTriggerKind Kind { get { throw null; } set { } } | |||
} | |||
public enum BlobTriggerKind |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about BlobTriggerSource
/ BlobChangeSource
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to BlobTriggerSource
public enum BlobTriggerKind | ||
{ | ||
/// <summary> | ||
/// Changes detection is relied on Storage Analytics logs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afaik the analytics scan is combined with container scan (list blobs). should we mention that ? Isn't this strategy falling back to container scan if logs are not there? (I'm not sure about this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, fixed enum value name and the description.
var functionName = HttpUtility.ParseQueryString(input.RequestUri.Query)["functionName"]; | ||
if (_blobTriggerQueueWriter == null) | ||
{ | ||
_blobTriggerQueueWriter = await _blobTriggerQueueWriterFactory.CreateAsync(cancellationToken).ConfigureAwait(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, is it fine if double init?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added code to ensure _blobTriggerQueueWriter
is initialized only once using SlimSemaphore
. Simple lock
does not work as we have an async
call and Lazy<>
does not work as we need to pass cancellationtoken
Debug.Assert(watcher != null); | ||
_watcher = watcher; | ||
QueueClient = queueClient; | ||
Debug.Assert(wsharedQueueWatcher != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this DebugAssert stay ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
} | ||
|
||
public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context) | ||
{ | ||
ParameterInfo parameter = context.Parameter; | ||
var blobTriggerAttribute = TypeUtility.GetResolvedAttribute<BlobTriggerAttribute>(context.Parameter); | ||
|
||
if (parameter.ParameterType == typeof(PageBlobClient) && blobTriggerAttribute.Kind == BlobTriggerKind.EventGrid) | ||
{ | ||
_logger.LogError("PageBlobClient is not supported with BlobTriggerKind.EventGrid"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you use nameof
or something like that for BlobTriggerKind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
private readonly BlobServiceClient _blobServiceClient; | ||
private readonly RandomNameResolver _nameResolver; | ||
|
||
private const string RegistrationRequest = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it OK to use hardcoded requests ?
These tests will run in https://dev.azure.com/azure-sdk/internal/_build?definitionId=410&_a=summary with Azure resources generated on the fly via ARM.
Easiest way to check is to create branch in main repo (not fork) and run that pipeline from branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no point in creating eventgid subscription as an webjobs extension itself does not have the http endpoint. We need function host to add the http endpoint.
So best we can do is emulating http requests(passing hardcoded payload) in the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I'm not exactly sure how this emulation works. Worst case we'll follow up if live tests fail.
{ | ||
private readonly ILogger _logger; | ||
|
||
public HttpRequestProcessor(ILoggerFactory loggerFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public HttpRequestProcessor(ILoggerFactory loggerFactory) | |
public HttpRequestProcessor(ILogger<HttpRequestProcessor> logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need public HttpRequestProcessor(ILoggerFactory loggerFactory)
constructor for unit tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can do NullLoggerFactory.Instance.CreateLogger<T>()
in unit tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateLogger
does not have generic implementation. It it in some extension?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -38,6 +38,9 @@ public sealed class BlobTriggerAttribute : Attribute, IConnectionProvider | |||
{ | |||
private readonly string _blobPath; | |||
|
|||
// AnalyticsScan is default kind as it does not require additional actions to set up a blob trigger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// AnalyticsScan is default kind as it does not require additional actions to set up a blob trigger | |
// LogsAndContainerScan is default kind as it does not require additional actions to set up a blob trigger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -164,6 +171,7 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken) | |||
}; | |||
|
|||
sharedBlobQueueListener.Register(_functionDescriptor.Id, registration); | |||
sharedBlobQueueListener.Register(_functionDescriptor.ShortName, registration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to simplifying manual creation of the eventgrid callback uri.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this worth making this runtime change just to simplify the testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for consistency with EventGrid extension:
https://{functionappname}.azurewebsites.net/runtime/webhooks/eventgrid?functionName={functionname}&code={systemkey} (https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-event-grid-trigger?tabs=csharp%2Cbash#version-2x-and-higher-runtime)
where {functionname}
is short function name (without namespace)
So for blobs we want the same format
https://{functionappname}.azurewebsites.net/runtime/webhooks/blobs?functionName={functionname}&code={systemkey}
So we need the registration to find the function to execute by short name
private readonly ILogger<BlobListener> _logger; | ||
|
||
public BlobQueueTriggerExecutor(IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger) | ||
: this(BlobCausalityReader.Instance, blobWrittenWatcher, logger) | ||
public BlobQueueTriggerExecutor(BlobTriggerSource kind, IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public BlobQueueTriggerExecutor(BlobTriggerSource kind, IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger) | |
public BlobQueueTriggerExecutor(BlobTriggerSource blobTriggerSource, IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -32,18 +32,24 @@ internal class EventGridExtensionConfigProvider : IExtensionConfigProvider, | |||
private ILogger _logger; | |||
private readonly ILoggerFactory _loggerFactory; | |||
private readonly Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> _converter; | |||
private HttpRequestProcessor _httpRequestProcessor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can be readonly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
await _semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false); | ||
try | ||
{ | ||
if (_blobTriggerQueueWriter == null) | ||
{ | ||
_blobTriggerQueueWriter = await _blobTriggerQueueWriterFactory.CreateAsync(cancellationToken).ConfigureAwait(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pending comments.
624b854
to
e8bafa2
Compare
Current Blob trigger logic relies on pulling azure storage diagnostic logs. This approach gives us high latencies, bad performance and fragile behavior overall. Using EventGrid subscription is much more reliable approach and it will give approximately 2-3 times better performance.
The overall flow for EventGrid Blob trigger:
Steps to test locally:
Azure.WebJobs.Extensions.Storage.Blobs.5.0.0-alpha.20201120.1
"UseEventGrid = true
" to trigger definitionpublic static void Run([BlobTrigger("samples-workitems/{name}", UseEventGrid = true)]Stream myBlob, string name, ILogger log)
https://docs.microsoft.com/en-us/azure/azure-functions/functions-debug-event-grid-trigger-local
https://83f2d199d6f4.ngrok.io/runtime/webhooks/blobs?functionName=Function1
Note: EventGrid subscription fires on all containers/blobs in the storage account. Maybe you want to add some EventGrid subscription filters.
Note: If you deploy the function app to azure the EventGrid subscription endpoint will be:
https://[functionAppName].azurewebsites.net/runtime/webhooks/blobs?functionName=Function1&code=[blobExtensionKey]
BlobExtensionKey is generated automatically and available on the azure portal.
The plan is to provide the EventGrid Blob trigger in beta as is, write documentation and collect customers feedback.