Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/SampleApp/Sample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Text.Json.Serialization;
using Azure.Messaging.EventGrid;
using Devlooped;
using Devlooped.WhatsApp;
using Microsoft.Azure.Functions.Worker.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Response = Devlooped.WhatsApp.Response;

var builder = FunctionsApplication.CreateBuilder(args);
builder.ConfigureFunctionsWebApplication();
Expand Down Expand Up @@ -43,7 +45,7 @@
storage :
CloudStorageAccount.Parse(builder.Configuration["AzureWebJobsStorage"]));

builder.Services
var whatsapp = builder.Services
.AddWhatsApp<ProcessHandler>(configure: options =>
{
options.ReactOnMessage = "🌐";
Expand All @@ -57,6 +59,14 @@
.Use(EchoAndHandle)
.UseConversation(conversationWindowSeconds: 300 /* default */);

// If event grid is set up, switch to processing messages using that
if (builder.Configuration["EventGrid:Topic"] is { Length: > 0 } topic &&
builder.Configuration["EventGrid:Key"] is { Length: > 0 } key)
{
whatsapp.UseEventGridProcessor(new EventGridPublisherClient(
new Uri(topic), new Azure.AzureKeyCredential(key)));
}

builder.Build().Run();

static async IAsyncEnumerable<Response> EchoAndHandle(IEnumerable<IMessage> messages, IWhatsAppHandler inner, [EnumeratorCancellation] CancellationToken cancellation)
Expand Down
44 changes: 44 additions & 0 deletions src/Tests/EventGridTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using Azure;
using Azure.Messaging.EventGrid;
using Microsoft.Extensions.Configuration;

namespace Devlooped.WhatsApp;

public class EventGridTests
{
[SecretsFact("EventGrid:Topic", "EventGrid:Key", "SendFrom", "SendTo")]
public async Task SendEvent()
{
var configuration = new ConfigurationBuilder()
.AddUserSecrets<EventGridTests>()
.Build();

var client = new EventGridPublisherClient(
new Uri(configuration["EventGrid:Topic"]!),
new AzureKeyCredential(configuration["EventGrid:Key"]!));

var processor = new EventGridProcessor(client, new EventGridOptions());

await processor.EnqueueAsync(
$$"""
{
"Content": {
"$type": "text",
"Text": "Is it running?"
},
"Id": "wamid.HBgNNTQ5MTE1OTI3ODI4MhUCABIYIEYyQ0U5N0E0MDA5MkU4MUU5RkU1RERCMzE5Q0QzNjk3AA==",
"Service": {
"Id": "{{configuration["SendFrom"]}}",
"Number": "{{configuration["SendFromNumber"]}}"
},
"User": {
"Name": "Test",
"Number": "{{configuration["SendTo"]}}"
},
"Timestamp": 1749722446,
"notification": "539235785933710",
"Number": "{{configuration["SendTo"]}}"
}
""");
}
}
4 changes: 2 additions & 2 deletions src/WhatsApp/AzureFunctionsConsole.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ namespace Devlooped.WhatsApp;
/// Azure functions used in development environments to allow the WhatsApp CLI to connect
/// and exercise the WhatsApp API without requiring a full WhatsApp for Business account.
/// </summary>
public class AzureFunctionsConsole(
class AzureFunctionsConsole(
IWhatsAppHandler handler,
ILogger<AzureFunctions> logger,
ILogger<AzureFunctionsWebhook> logger,
IHostEnvironment environment)
{
[Function("whatsapp_console")]
Expand Down
25 changes: 25 additions & 0 deletions src/WhatsApp/AzureFunctionsProcessors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Azure.Messaging.EventGrid;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;

namespace Devlooped.WhatsApp;

class AzureFunctionsProcessors(PipelineRunner runner)
{
[Function("whatsapp_dequeue")]
public Task DequeueAsync([QueueTrigger("whatsappwebhook", Connection = "AzureWebJobsStorage")] string json)
=> runner.ProcessAsync(json);

[Function("whatsapp_eventgrid")]
public async Task<IActionResult> HandleEventGrid(
#if CI || RELEASE
[EventGridTrigger] EventGridEvent e)
#else
[HttpTrigger(AuthorizationLevel.Anonymous, "post")]
[Microsoft.Azure.Functions.Worker.Http.FromBody] EventGridEvent e)
#endif
{
await runner.ProcessAsync(e.Data.ToString());
return new OkResult();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
using System.Text;
using Azure.Data.Tables;
using Azure.Storage.Queues;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

Expand All @@ -18,15 +16,13 @@ namespace Devlooped.WhatsApp;
/// <param name="whatsapp">The <see cref="IWhatsAppClient"/> client to signal message processing state.</param>
/// <param name="handler">The message handler that will process incoming messages.</param>
/// <param name="logger">The logger.</param>
public class AzureFunctions(
QueueServiceClient queueClient,
class AzureFunctionsWebhook(
TableServiceClient tableClient,
IMessageProcessor messageProcessor,
IWhatsAppClient whatsapp,
IWhatsAppHandler handler,
IOptions<MetaOptions> metaOptions,
IOptions<WhatsAppOptions> functionOptions,
ILogger<AzureFunctions> logger,
IHostEnvironment environment)
ILogger<AzureFunctionsWebhook> logger)
{
readonly WhatsAppOptions functionOptions = functionOptions.Value;

Expand Down Expand Up @@ -55,10 +51,8 @@ public async Task<IActionResult> Message([HttpTrigger(AuthorizationLevel.Anonymo
if (functionOptions.ReactOnMessage != null && message.Type == MessageType.Content)
await message.React(functionOptions.ReactOnMessage).SendAsync(whatsapp).Ignore();

// Otherwise, queue the new message
var queue = queueClient.GetQueueClient("whatsappwebhook");
await queue.CreateIfNotExistsAsync();
await queue.SendMessageAsync(json);
// Otherwise, enqueue the message processing
await messageProcessor.EnqueueAsync(json);
}
else
{
Expand All @@ -68,41 +62,6 @@ public async Task<IActionResult> Message([HttpTrigger(AuthorizationLevel.Anonymo
return new OkResult();
}

[Function("whatsapp_process")]
public async Task Process([QueueTrigger("whatsappwebhook", Connection = "AzureWebJobsStorage")] string json)
{
logger.LogDebug("Processing WhatsApp message: {Message}", json);

if (await WhatsApp.Message.DeserializeAsync(json) is { } message)
{
if (functionOptions.ReadOnProcess is true && message.Type == MessageType.Content)
// Ignored since this can be an old, deleted message, for example
await whatsapp.MarkReadAsync(message.Service.Id, message.Id).Ignore();

// Ensure idempotent processing at dequeue time, since we might have been called
// multiple times for the same message by WhatsApp (Message method) while processing was still
// happening (and therefore we didn't save the entity yet).
var table = tableClient.GetTableClient("WhatsAppWebhook");
await table.CreateIfNotExistsAsync();
if (await table.GetEntityIfExistsAsync<TableEntity>(message.User.Number, message.NotificationId) is { HasValue: true } existing)
{
logger.LogInformation("Skipping already handled message {Id}", message.Id);
return;
}

// Await all responses
// No action needed, just make sure all items are processed
await handler.HandleAsync([message]).ToArrayAsync();

await table.UpsertEntityAsync(new TableEntity(message.User.Number, message.Id));
logger.LogInformation($"Completed work item: {message.Id}");
}
else
{
logger.LogWarning("Failed to deserialize message.");
}
}

[Function("whatsapp_register")]
public IActionResult Register([HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "whatsapp")] HttpRequest req)
{
Expand Down
32 changes: 32 additions & 0 deletions src/WhatsApp/EventGridProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Azure.Messaging.EventGrid;

namespace Devlooped.WhatsApp;

/// <summary>
/// Options used to populate <see cref="EventGridEvent"/> when publishing
/// to the processor. Can be used to collect telemetry and logs.
/// </summary>
public class EventGridOptions
{
/// <summary>
/// The <see cref="EventGridEvent.Subject"/>. Defaults to <c>EventGridProcessor</c>.
/// </summary>
public string Subject { get; set; } = nameof(EventGridProcessor);
/// <summary>
/// The <see cref="EventGridEvent.EventType"/>. Defaults to <c>Devlooped.WhatsApp.MessageReceived</c>."/>
/// </summary>
public string EventType { get; set; } = "Devlooped.WhatsApp.MessageReceived";
/// <summary>
/// The <see cref="EventGridEvent.DataVersion"/>. Defaults to the assembly informational version.
/// </summary>
public string DataVersion { get; set; } = ThisAssembly.Info.InformationalVersion;
}

class EventGridProcessor(EventGridPublisherClient client, EventGridOptions options) : IMessageProcessor
{
public async Task EnqueueAsync(/* lang=json */ string json, CancellationToken cancellation = default)
{
await client.SendEventAsync(new EventGridEvent(
options.Subject, options.EventType, options.DataVersion, json), cancellation);
}
}
39 changes: 39 additions & 0 deletions src/WhatsApp/EventGridProcessorExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Azure.Messaging.EventGrid;
using Microsoft.Extensions.DependencyInjection;

namespace Devlooped.WhatsApp;

/// <summary>
/// Provides extensions for processing WhatsApp messages asynchronusly
/// using Azure Functions queue.
/// </summary>
public static class EventGridProcessorExtensions
{
/// <summary>
/// Uses the Azure Functions queue to process WhatsApp messages asynchronously.
/// </summary>
/// <param name="builder">The builder pipeline</param>
/// <param name="configure">Optional configuration callback for the queue.></param>
public static WhatsAppHandlerBuilder UseEventGridProcessor(this WhatsAppHandlerBuilder builder,
EventGridPublisherClient? publisher = default,
Action<EventGridOptions>? configure = default)
{
Throw.IfNull(builder);

if (builder.Services.FirstOrDefault(x => x.ServiceType == typeof(IMessageProcessor)) is { } processor)
{
builder.Services.Remove(processor);
}

builder.Services.AddSingleton<IMessageProcessor>(services =>
{
var options = new EventGridOptions();
configure?.Invoke(options);
return new EventGridProcessor(publisher ??
services.GetRequiredService<EventGridPublisherClient>(),
options);
});

return builder;
}
}
17 changes: 17 additions & 0 deletions src/WhatsApp/IMessageProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Devlooped.WhatsApp;

/// <summary>
/// Interface implemented by the async message processing used
/// to decouple the WhatsApp webhook from the actual processing.
/// </summary>
/// <remarks>
/// Unless explicitly configured otherwise, the default implementation
/// will use Azure Storage Queues to enqueue the messages for processing.
/// </remarks>
public interface IMessageProcessor
{
/// <summary>
/// Enqueues the WhatsApp for Business webhook message for async processing.
/// </summary>
Task EnqueueAsync(string json, CancellationToken cancellation = default);
}
48 changes: 48 additions & 0 deletions src/WhatsApp/PipelineRunner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using Azure.Data.Tables;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Devlooped.WhatsApp;

class PipelineRunner(TableServiceClient tableClient,
IWhatsAppClient whatsapp,
IWhatsAppHandler handler,
IOptions<WhatsAppOptions> functionOptions,
ILogger<PipelineRunner> logger)
{
readonly WhatsAppOptions functionOptions = functionOptions.Value;

public async Task ProcessAsync(string json)
{
logger.LogDebug("Processing WhatsApp message: {Message}", json);

if (await Message.DeserializeAsync(json) is { } message)
{
if (functionOptions.ReadOnProcess is true && message.Type == MessageType.Content)
// Ignored since this can be an old, deleted message, for example
await whatsapp.MarkReadAsync(message.Service.Id, message.Id).Ignore();

// Ensure idempotent processing at dequeue time, since we might have been called
// multiple times for the same message by WhatsApp (Message method) while processing was still
// happening (and therefore we didn't save the entity yet).
var table = tableClient.GetTableClient("WhatsAppWebhook");
await table.CreateIfNotExistsAsync();
if (await table.GetEntityIfExistsAsync<TableEntity>(message.User.Number, message.NotificationId) is { HasValue: true } existing)
{
logger.LogInformation("Skipping already handled message {Id}", message.Id);
return;
}

// Await all responses
// No action needed, just make sure all items are processed
await handler.HandleAsync([message]).ToArrayAsync();

await table.UpsertEntityAsync(new TableEntity(message.User.Number, message.Id));
logger.LogInformation($"Completed work item: {message.Id}");
}
else
{
logger.LogWarning("Failed to deserialize message.");
}
}
}
13 changes: 13 additions & 0 deletions src/WhatsApp/QueueMessageProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Azure.Storage.Queues;

namespace Devlooped.WhatsApp;

class QueueMessageProcessor(QueueServiceClient client) : IMessageProcessor
{
public async Task EnqueueAsync(string json, CancellationToken cancellation = default)
{
var queue = client.GetQueueClient("whatsappwebhook");
await queue.CreateIfNotExistsAsync(cancellationToken: cancellation);
await queue.SendMessageAsync(json, cancellation);
}
}
Loading