-
Notifications
You must be signed in to change notification settings - Fork 358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BlobTrigger uses Event Grid subscription #2438
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the MIT License. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Microsoft.Azure.WebJobs.Host.Executors; | ||
using Microsoft.Azure.WebJobs.Host.Listeners; | ||
using Microsoft.Azure.WebJobs.Host.Dispatch; | ||
using Microsoft.Azure.Storage.Blob; | ||
using Newtonsoft.Json.Linq; | ||
|
||
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners | ||
{ | ||
internal class SharedQueueBlobListener : IListener | ||
{ | ||
public SharedQueueBlobListener(ListenerFactoryContext context, StorageAccount dataAccount) | ||
{ | ||
// register SharedQueueBlobHandler handler | ||
context.GetDispatchQueue(new SharedQueueBlobHandler(context.Executor, dataAccount)); | ||
} | ||
|
||
public Task StartAsync(CancellationToken cancellationToken) | ||
{ | ||
return Task.FromResult(true); | ||
} | ||
|
||
public Task StopAsync(CancellationToken cancellationToken) | ||
{ | ||
return Task.FromResult(true); | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
} | ||
|
||
public void Cancel() | ||
{ | ||
} | ||
|
||
internal class SharedQueueBlobHandler : IMessageHandler | ||
{ | ||
private ITriggeredFunctionExecutor _executor; | ||
private CloudBlobClient _blobClient; | ||
|
||
public SharedQueueBlobHandler(ITriggeredFunctionExecutor executor, StorageAccount dataAccount) | ||
{ | ||
_executor = executor; | ||
_blobClient = dataAccount.CreateCloudBlobClient(); | ||
} | ||
public async Task<FunctionResult> TryExecuteAsync(JObject data, CancellationToken cancellationToken) | ||
{ | ||
// Both Event Grid schema and Cloud Event schema define blob uri in ["data"]["url"] | ||
ICloudBlob blob = await _blobClient.GetBlobReferenceFromServerAsync(new Uri(data["data"]["url"].ToString())); | ||
|
||
TriggeredFunctionData input = new TriggeredFunctionData | ||
{ | ||
TriggerValue = blob | ||
}; | ||
return await _executor.TryExecuteAsync(input, cancellationToken); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,9 +96,12 @@ | |
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.5" /> | ||
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.0" /> | ||
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.1.0" /> | ||
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.14" /> | ||
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004"> | ||
<PrivateAssets>all</PrivateAssets> | ||
</PackageReference> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\Microsoft.Azure.WebJobs.Host\WebJobs.Host.csproj" /> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this an intentional change, or something you were doing while developing? We should be able to continue depending on the package only here, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
</ItemGroup> | ||
</Project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the MIT License. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Net.Http; | ||
using System.Linq; | ||
using System.Web; | ||
using Microsoft.Azure.WebJobs.Logging; | ||
using Microsoft.Extensions.Logging; | ||
using Newtonsoft.Json.Linq; | ||
using System.Net; | ||
using System.Threading.Tasks; | ||
using Newtonsoft.Json; | ||
using System.Threading; | ||
using Microsoft.Azure.WebJobs.Host.Executors; | ||
using Microsoft.Azure.WebJobs.Host.Timers; | ||
using Microsoft.Azure.WebJobs.Host.Listeners; | ||
using Microsoft.Azure.WebJobs.Host.Config; | ||
|
||
namespace Microsoft.Azure.WebJobs.Host.Dispatch | ||
{ | ||
public interface IHttpEndpointManager | ||
{ | ||
void RegisterHttpEnpoint(ExtensionConfigContext context); | ||
|
||
Task<HttpResponseMessage> ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken); | ||
} | ||
|
||
/// <summary> | ||
/// Hadles HttpRequests that comes to extension http endpoint. | ||
/// </summary> | ||
internal class HttpEndpointManager : IHttpEndpointManager | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a new class? and interface? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we use Interface is introduced for DI as implementation is used some internal stuff. |
||
{ | ||
private readonly ILogger _logger; | ||
private readonly SharedQueueHandler _sharedQueue; | ||
|
||
public HttpEndpointManager(SharedQueueHandler sharedQueue, ILoggerFactory loggerFactory) | ||
{ | ||
_sharedQueue = sharedQueue; | ||
_logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("HttpRequestProcessor")); | ||
} | ||
|
||
public void RegisterHttpEnpoint(ExtensionConfigContext context) | ||
{ | ||
Uri url = context.GetWebhookHandler(); | ||
_logger.LogInformation($"registered http endpoint = {url?.GetLeftPart(UriPartial.Path)}"); | ||
} | ||
|
||
public async Task<HttpResponseMessage> ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken) | ||
{ | ||
var functionId = HttpUtility.ParseQueryString(req.RequestUri.Query)[SharedQueueHandler.FunctionId]; | ||
IEnumerable<string> eventTypeHeaders = null; | ||
string eventTypeHeader = null; | ||
if (req.Headers.TryGetValues("aeg-event-type", out eventTypeHeaders)) | ||
{ | ||
eventTypeHeader = eventTypeHeaders.First(); | ||
} | ||
|
||
if (String.Equals(eventTypeHeader, "SubscriptionValidation", StringComparison.OrdinalIgnoreCase)) | ||
{ | ||
string jsonArray = await req.Content.ReadAsStringAsync(); | ||
SubscriptionValidationEvent validationEvent = null; | ||
List<JObject> events = JsonConvert.DeserializeObject<List<JObject>>(jsonArray); | ||
// TODO remove unnecessary serialization | ||
validationEvent = ((JObject)events[0]["data"]).ToObject<SubscriptionValidationEvent>(); | ||
SubscriptionValidationResponse validationResponse = new SubscriptionValidationResponse { ValidationResponse = validationEvent.ValidationCode }; | ||
var returnMessage = new HttpResponseMessage(HttpStatusCode.OK); | ||
returnMessage.Content = new StringContent(JsonConvert.SerializeObject(validationResponse)); | ||
_logger.LogInformation($"perform handshake with eventGrid for function: {functionId}"); | ||
return returnMessage; | ||
} | ||
else if (String.Equals(eventTypeHeader, "Notification", StringComparison.OrdinalIgnoreCase)) | ||
{ | ||
JArray events = null; | ||
string requestContent = await req.Content.ReadAsStringAsync(); | ||
var token = JToken.Parse(requestContent); | ||
if (token.Type == JTokenType.Array) | ||
{ | ||
// eventgrid schema | ||
events = (JArray)token; | ||
} | ||
else if (token.Type == JTokenType.Object) | ||
{ | ||
// cloudevent schema | ||
events = new JArray | ||
{ | ||
token | ||
}; | ||
} | ||
|
||
foreach (JObject jo in events) | ||
{ | ||
// TODO ALROD - Azure Queues do not suport batch sending. Sending in parallel? ServiceBus? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would indeed be good to send them in parallel. What is the desired behavior if enqueuing one of the messages fails? Right now, we'd end up in a partial number of messages enqueued and an exception bubbling up, correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added sending in parallel.
Correct. |
||
await _sharedQueue.EnqueueAsync(jo, functionId, cancellationToken); | ||
} | ||
|
||
return new HttpResponseMessage(HttpStatusCode.Accepted); | ||
} | ||
else if (String.Equals(eventTypeHeader, "Unsubscribe", StringComparison.OrdinalIgnoreCase)) | ||
{ | ||
// TODO disable function? | ||
return new HttpResponseMessage(HttpStatusCode.Accepted); | ||
} | ||
|
||
return new HttpResponseMessage(HttpStatusCode.BadRequest); | ||
} | ||
} | ||
|
||
internal class SubscriptionValidationResponse | ||
{ | ||
[JsonProperty(PropertyName = "validationResponse")] | ||
public string ValidationResponse { get; set; } | ||
} | ||
|
||
internal class SubscriptionValidationEvent | ||
{ | ||
[JsonProperty(PropertyName = "validationCode")] | ||
public string ValidationCode { get; set; } | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ namespace Microsoft.Azure.WebJobs.Host.Dispatch | |
/// Defines the contract for executing a triggered function using a dispatch queue. | ||
/// <see cref="ListenerFactoryContext.GetDispatchQueue(IMessageHandler)"/> | ||
/// </summary> | ||
internal interface IMessageHandler | ||
public interface IMessageHandler | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the visibility of these interfaces changing for to allow the extension to consume them? @mathewc can you please comment here as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we did not use the interface before (there was only E2E test). |
||
{ | ||
/// <summary> | ||
/// Try to invoke the triggered function using the values specified. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this a bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Actually we did not use
StorageLoadBalancerQueue
class before but the class was checked in in the repository.