Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.30704.19
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests", "tests\Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests.csproj", "{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.WebJobs.Extensions.EventGrid", "src\Microsoft.Azure.WebJobs.Extensions.EventGrid.csproj", "{9322A9CD-ADC3-4BF3-B3AA-063A66585113}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E58845C7-D3EF-41F8-9E02-C8F02C1DEFA5}.Release|Any CPU.Build.0 = Release|Any CPU
{9322A9CD-ADC3-4BF3-B3AA-063A66585113}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9322A9CD-ADC3-4BF3-B3AA-063A66585113}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9322A9CD-ADC3-4BF3-B3AA-063A66585113}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9322A9CD-ADC3-4BF3-B3AA-063A66585113}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D8FAA1CE-4C73-49FE-A59D-CCEBDAA223CE}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,24 @@ internal class EventGridExtensionConfigProvider : IExtensionConfigProvider,
private ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> _converter;
private readonly HttpRequestProcessor _httpRequestProcessor;

// for end to end testing
internal EventGridExtensionConfigProvider(Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> converter, ILoggerFactory loggerFactory)
internal EventGridExtensionConfigProvider(
Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> converter,
HttpRequestProcessor httpRequestProcessor,
ILoggerFactory loggerFactory)
{
_converter = converter;
_httpRequestProcessor = httpRequestProcessor;
_loggerFactory = loggerFactory;
}

// default constructor
public EventGridExtensionConfigProvider(ILoggerFactory loggerFactory)
public EventGridExtensionConfigProvider(HttpRequestProcessor httpRequestProcessor, ILoggerFactory loggerFactory)
{
_converter = (attr => new EventGridAsyncCollector(new EventGridPublisherClient(new Uri(attr.TopicEndpointUri), new AzureKeyCredential(attr.TopicKeySetting))));
_httpRequestProcessor = httpRequestProcessor;
_loggerFactory = loggerFactory;
}

Expand Down Expand Up @@ -122,90 +128,48 @@ private async Task<HttpResponseMessage> ProcessAsync(HttpRequestMessage req)
return new HttpResponseMessage(HttpStatusCode.NotFound) { Content = new StringContent($"cannot find function: '{functionName}'") };
}

IEnumerable<string> eventTypeHeaders = null;
string eventTypeHeader = null;
if (req.Headers.TryGetValues("aeg-event-type", out eventTypeHeaders))
{
eventTypeHeader = eventTypeHeaders.First();
}

if (String.Equals(eventTypeHeader, "SubscriptionValidation", StringComparison.OrdinalIgnoreCase))
{
string jsonArray = await req.Content.ReadAsStringAsync().ConfigureAwait(false);
SubscriptionValidationEvent validationEvent = null;
List<JObject> events = JsonConvert.DeserializeObject<List<JObject>>(jsonArray);
// TODO remove unnecessary serialization
validationEvent = ((JObject)events[0]["data"]).ToObject<SubscriptionValidationEvent>();
SubscriptionValidationResponse validationResponse = new SubscriptionValidationResponse { ValidationResponse = validationEvent.ValidationCode };
var returnMessage = new HttpResponseMessage(HttpStatusCode.OK);
returnMessage.Content = new StringContent(JsonConvert.SerializeObject(validationResponse));
_logger.LogInformation($"perform handshake with eventGrid for function: {functionName}");
return returnMessage;
}
else if (String.Equals(eventTypeHeader, "Notification", StringComparison.OrdinalIgnoreCase))
{
JArray events = null;
string requestContent = await req.Content.ReadAsStringAsync().ConfigureAwait(false);
var token = JToken.Parse(requestContent);
if (token.Type == JTokenType.Array)
{
// eventgrid schema
events = (JArray)token;
}
else if (token.Type == JTokenType.Object)
{
// cloudevent schema
events = new JArray
{
token
};
}
return await _httpRequestProcessor.ProcessAsync(req, functionName, ProcessEventsAsync, CancellationToken.None).ConfigureAwait(false);
}

List<Task<FunctionResult>> executions = new List<Task<FunctionResult>>();
private async Task<HttpResponseMessage> ProcessEventsAsync(JArray events, string functionName, CancellationToken cancellationToken)
{
List<Task<FunctionResult>> executions = new List<Task<FunctionResult>>();

// Single Dispatch
if (_listeners[functionName].SingleDispatch)
{
foreach (var ev in events)
{
// assume each event is a JObject
TriggeredFunctionData triggerData = new TriggeredFunctionData
{
TriggerValue = ev
};
executions.Add(_listeners[functionName].Executor.TryExecuteAsync(triggerData, CancellationToken.None));
}
await Task.WhenAll(executions).ConfigureAwait(false);
}
// Batch Dispatch
else
// Single Dispatch
if (_listeners[functionName].SingleDispatch)
{
foreach (var ev in events)
{
// assume each event is a JObject
TriggeredFunctionData triggerData = new TriggeredFunctionData
{
TriggerValue = events
TriggerValue = ev
};
executions.Add(_listeners[functionName].Executor.TryExecuteAsync(triggerData, CancellationToken.None));
}

// FIXME without internal queuing, we are going to process all events in parallel
// and return 500 if there's at least one failure...which will cause EventGrid to resend the entire payload
foreach (var execution in executions)
await Task.WhenAll(executions).ConfigureAwait(false);
}
// Batch Dispatch
else
{
TriggeredFunctionData triggerData = new TriggeredFunctionData
{
if (!execution.Result.Succeeded)
{
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(execution.Result.Exception.Message) };
}
}

return new HttpResponseMessage(HttpStatusCode.Accepted);
TriggerValue = events
};
executions.Add(_listeners[functionName].Executor.TryExecuteAsync(triggerData, CancellationToken.None));
}
else if (String.Equals(eventTypeHeader, "Unsubscribe", StringComparison.OrdinalIgnoreCase))

// FIXME without internal queuing, we are going to process all events in parallel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we file an issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will require some kind of internal queue(like for blob storage trigger) and redesign of EventGrid extension. Added an issue: #17756

// and return 500 if there's at least one failure...which will cause EventGrid to resend the entire payload
foreach (var execution in executions)
{
// TODO disable function?
return new HttpResponseMessage(HttpStatusCode.Accepted);
if (!execution.Result.Succeeded)
{
return new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(execution.Result.Exception.Message) };
}
}

return new HttpResponseMessage(HttpStatusCode.BadRequest);
return new HttpResponseMessage(HttpStatusCode.Accepted);
}

private class JTokenToPocoConverter<T> : IConverter<JToken, T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using System;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Microsoft.Azure.WebJobs.Extensions.EventGrid
{
Expand All @@ -21,6 +22,7 @@ public static IWebJobsBuilder AddEventGrid(this IWebJobsBuilder builder)
throw new ArgumentNullException(nameof(builder));
}

builder.Services.TryAddSingleton<HttpRequestProcessor>();
builder.AddExtension<EventGridExtensionConfigProvider>();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.EventGrid
{
internal class HttpRequestProcessor
{
private readonly ILogger _logger;

public HttpRequestProcessor(ILogger<HttpRequestProcessor> logger)
{
_logger = logger;
}

internal async Task<HttpResponseMessage> ProcessAsync(HttpRequestMessage req, string functionName, Func<JArray, string, CancellationToken, Task<HttpResponseMessage>> eventsFunc, CancellationToken cancellationToken)
{
IEnumerable<string> eventTypeHeaders = null;
string eventTypeHeader = null;
if (req.Headers.TryGetValues("aeg-event-type", out eventTypeHeaders))
{
eventTypeHeader = eventTypeHeaders.First();
}

if (String.Equals(eventTypeHeader, "SubscriptionValidation", StringComparison.OrdinalIgnoreCase))
{
string jsonArray = await req.Content.ReadAsStringAsync().ConfigureAwait(false);
SubscriptionValidationEvent validationEvent = null;
List<JObject> events = JsonConvert.DeserializeObject<List<JObject>>(jsonArray);
// TODO remove unnecessary serialization
validationEvent = ((JObject)events[0]["data"]).ToObject<SubscriptionValidationEvent>();
SubscriptionValidationResponse validationResponse = new SubscriptionValidationResponse { ValidationResponse = validationEvent.ValidationCode };
var returnMessage = new HttpResponseMessage(HttpStatusCode.OK);
returnMessage.Content = new StringContent(JsonConvert.SerializeObject(validationResponse));
_logger.LogInformation($"perform handshake with eventGrid for function: {functionName}");
return returnMessage;
}
else if (String.Equals(eventTypeHeader, "Notification", StringComparison.OrdinalIgnoreCase))
{
JArray events = null;
string requestContent = await req.Content.ReadAsStringAsync().ConfigureAwait(false);
var token = JToken.Parse(requestContent);
if (token.Type == JTokenType.Array)
{
// eventgrid schema
events = (JArray)token;
}
else if (token.Type == JTokenType.Object)
{
// cloudevent schema
events = new JArray
{
token
};
}

return await eventsFunc(events, functionName, cancellationToken).ConfigureAwait(false);
}
else if (String.Equals(eventTypeHeader, "Unsubscribe", StringComparison.OrdinalIgnoreCase))
{
// TODO disable function?
return new HttpResponseMessage(HttpStatusCode.Accepted);
}

return new HttpResponseMessage(HttpStatusCode.BadRequest);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Microsoft.Azure.WebJobs.Host.Indexers;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using Newtonsoft.Json.Linq;
using NUnit.Framework;
Expand Down Expand Up @@ -205,7 +206,7 @@ public async Task OutputBindingParamsTests(string functionName, string expectedC
ILoggerFactory loggerFactory = new LoggerFactory();
loggerFactory.AddProvider(new TestLoggerProvider());
// use moq eventgridclient for test extension
var customExtension = new EventGridExtensionConfigProvider(customConverter, loggerFactory);
var customExtension = new EventGridExtensionConfigProvider(customConverter, new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), loggerFactory);

var configuration = new Dictionary<string, string>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging.Abstractions;
using NUnit.Framework;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests
{
Expand All @@ -30,7 +31,7 @@ public void SetupTestListener()
[Test]
public async Task TestUnsubscribe()
{
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
var host = TestHelpers.NewHost<MyProg1>(ext);
await host.StartAsync(); // add listener

Expand All @@ -46,7 +47,7 @@ public async Task TestUnsubscribe()
[Test]
public async Task TestDispatch()
{
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
var host = TestHelpers.NewHost<MyProg1>(ext);
await host.StartAsync(); // add listener

Expand All @@ -70,7 +71,7 @@ public async Task TestDispatch()
public async Task TestCloudEvent()
{
// individual elements
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
var host = TestHelpers.NewHost<MyProg1>(ext);
await host.StartAsync(); // add listener

Expand All @@ -87,7 +88,7 @@ public async Task TestCloudEvent()
[Test]
public async Task WrongFunctionNameTest()
{
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
var host = TestHelpers.NewHost<MyProg2>(ext);
await host.StartAsync(); // add listener

Expand All @@ -103,7 +104,7 @@ public async Task WrongFunctionNameTest()
[Test]
public async Task ExecutionFailureTest()
{
var ext = new EventGridExtensionConfigProvider(NullLoggerFactory.Instance);
var ext = new EventGridExtensionConfigProvider(new HttpRequestProcessor(NullLoggerFactory.Instance.CreateLogger<HttpRequestProcessor>()), NullLoggerFactory.Instance);
var host = TestHelpers.NewHost<MyProg2>(ext);
await host.StartAsync(); // add listener

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ public sealed partial class BlobTriggerAttribute : System.Attribute, Microsoft.A
public BlobTriggerAttribute(string blobPath) { }
public string BlobPath { get { throw null; } }
public string Connection { get { throw null; } set { } }
public Microsoft.Azure.WebJobs.BlobTriggerSource Source { get { throw null; } set { } }
}
public enum BlobTriggerSource
{
LogsAndContainerScan = 0,
EventGrid = 1,
}
}
namespace Microsoft.Azure.WebJobs.Extensions.Storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public sealed class BlobTriggerAttribute : Attribute, IConnectionProvider
{
private readonly string _blobPath;

// LogsAndContainerScan is default kind as it does not require additional actions to set up a blob trigger
private BlobTriggerSource _blobTriggerSource = BlobTriggerSource.LogsAndContainerScan;

/// <summary>
/// Initializes a new instance of the <see cref="BlobTriggerAttribute"/> class.
/// </summary>
Expand Down Expand Up @@ -65,5 +68,14 @@ public string BlobPath
{
get { return _blobPath; }
}

/// <summary>
/// Returns a bool value that indicates whether EventGrid is used.
/// </summary>
public BlobTriggerSource Source
{
get { return _blobTriggerSource; }
set { _blobTriggerSource = value; }
}
}
}
Loading