Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/WhatsApp/AzureFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public async Task<IActionResult> Message([HttpTrigger(AuthorizationLevel.Anonymo
if (await WhatsApp.Message.DeserializeAsync(json) is { } message)
{
// Ensure idempotent processing
var table = tableClient.GetTableClient("whatsapp");
var table = tableClient.GetTableClient("WhatsAppWebhook");
await table.CreateIfNotExistsAsync();
if (await table.GetEntityIfExistsAsync<TableEntity>(message.User.Number, message.NotificationId) is { HasValue: true } existing)
{
Expand All @@ -44,7 +44,7 @@ public async Task<IActionResult> Message([HttpTrigger(AuthorizationLevel.Anonymo
}

// Otherwise, queue the new message
var queue = queueClient.GetQueueClient("whatsapp");
var queue = queueClient.GetQueueClient("whatsappwebhook");
await queue.CreateIfNotExistsAsync();
await queue.SendMessageAsync(json);

Expand Down Expand Up @@ -73,7 +73,7 @@ public async Task<IActionResult> Message([HttpTrigger(AuthorizationLevel.Anonymo
}

[Function("whatsapp_process")]
public async Task Process([QueueTrigger("whatsapp", Connection = "AzureWebJobsStorage")] string json)
public async Task Process([QueueTrigger("whatsappwebhook", Connection = "AzureWebJobsStorage")] string json)
{
logger.LogDebug("Processing WhatsApp message: {Message}", json);

Expand All @@ -82,7 +82,7 @@ public async Task Process([QueueTrigger("whatsapp", Connection = "AzureWebJobsSt
// Ensure idempotent processing at dequeue time, since we might have been called
// multiple times for the same message by WhatsApp (Message method) while processing was still
// happening (and therefore we didn't save the entity yet).
var table = tableClient.GetTableClient("whatsapp");
var table = tableClient.GetTableClient("WhatsAppWebhook");
await table.CreateIfNotExistsAsync();
if (await table.GetEntityIfExistsAsync<TableEntity>(message.User.Number, message.NotificationId) is { HasValue: true } existing)
{
Expand Down
2 changes: 1 addition & 1 deletion src/WhatsApp/ConversationHandlerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public static WhatsAppHandlerBuilder UseConversation(this WhatsAppHandlerBuilder
{
// By adding the conversation service, the conversation handlers will be automatically added to the pipeline
builder.Services.AddSingleton<IConversationService, ConversationService>(services
=> new ConversationService(services.GetRequiredService<IStorageService>()));
=> new ConversationService(services.GetRequiredService<IStorageService>()));
}

return builder;
Expand Down
6 changes: 3 additions & 3 deletions src/WhatsApp/ConversationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public async IAsyncEnumerable<IMessage> GetConversationAsync(IMessage message, [
if (!string.IsNullOrEmpty(message.ConversationId))
{
var conversation = storage
.GetMessagesAsync(message.Number, message.ConversationId, cancellationToken)
.GetMessagesAsync(message.UserNumber, message.ConversationId, cancellationToken)
.OrderBy(x => x.Timestamp);

await foreach (var conversationMessage in conversation)
Expand All @@ -39,7 +39,7 @@ public async Task<string> GetOrCreateConversationIdAsync(IMessage message, int s
// Even if the timeout is expired
if (!string.IsNullOrEmpty(message.Context))
{
var contextMsg = await storage.GetMessageAsync(message.Number, message.Context, cancellationToken);
var contextMsg = await storage.GetMessageAsync(message.UserNumber, message.Context, cancellationToken);

if (contextMsg?.ConversationId is string contextConversationId && !string.IsNullOrEmpty(contextConversationId))
return contextConversationId;
Expand All @@ -48,7 +48,7 @@ public async Task<string> GetOrCreateConversationIdAsync(IMessage message, int s
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds() - seconds;

// Use the conversation id for a message processed in the last ConversationWindowInSeconds seconds
var conversation = await storage.GetActiveConversationAsync(message.Number, cancellationToken);
var conversation = await storage.GetActiveConversationAsync(message.UserNumber, cancellationToken);
var conversationId = conversation?.Id;

if (conversationId == null || conversation?.Timestamp < timestamp)
Expand Down
11 changes: 8 additions & 3 deletions src/WhatsApp/IMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@ namespace Devlooped.WhatsApp;
[JsonDerivedType(typeof(ReactionResponse), "response/reaction")]
public interface IMessage
{
/// <summary>
/// Gets the message id.
/// </summary>
string Id { get; }

/// <summary>
/// Gets the phone number associated with the message sender.
/// </summary>
string Number { get; }
string UserNumber { get; }

/// <summary>
/// Gets the message id.
/// Gets the unique identifier for the service.
/// </summary>
string Id { get; }
string ServiceId { get; }

/// <summary>
/// Gets the timestamp representing the number of milliseconds since the Unix epoch (January 1, 1970, 00:00:00 UTC).
Expand Down
5 changes: 4 additions & 1 deletion src/WhatsApp/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,5 +252,8 @@ .value.statuses[0] as $status |
public abstract MessageType Type { get; }

/// <inheritdoc/>
public string Number => User.Number;
string IMessage.UserNumber => User.Number;

/// <inheritdoc/>
string IMessage.ServiceId => Service.Id;
}
20 changes: 10 additions & 10 deletions src/WhatsApp/MessageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ public static partial class MessageExtensions
/// <summary>
/// Creates a reaction response for the user message.
/// </summary>
public static ReactionResponse React(this UserMessage message, string emoji)
=> new(message.User.Number, message.Service.Id, message.Id, message.ConversationId, emoji);
public static ReactionResponse React(this IMessage message, string emoji)
=> new(message.UserNumber, message.ServiceId, message.Id, message.ConversationId, emoji);

/// <summary>
/// Creates a simple template response for the message.
/// </summary>
public static TemplateResponse Template(this Message message, string name, string language)
=> new(message.User.Number, message.Service.Id, message.Id, message.ConversationId, name, language);
public static TemplateResponse Template(this IMessage message, string name, string language)
=> new(message.UserNumber, message.Id, message.Id, message.ConversationId, name, language);

/// <summary>
/// Creates a complex template response for the message.
Expand All @@ -27,20 +27,20 @@ public static TemplateResponse Template(this Message message, string name, strin
/// <see cref="https://developers.facebook.com/docs/whatsapp/api/messages/message-templates#supported-languages"/>
/// <see cref="https://developers.facebook.com/docs/whatsapp/cloud-api/reference/messages/#template-object"/>
/// <see cref="https://developers.facebook.com/docs/whatsapp/cloud-api/reference/messages/#components-object"/>
public static TemplateResponse Template(this Message message, object template)
=> new(message.User.Number, message.Service.Id, message.Id, message.ConversationId, template);
public static TemplateResponse Template(this IMessage message, object template)
=> new(message.UserNumber, message.ServiceId, message.Id, message.ConversationId, template);

/// <summary>
/// Creates a text response for the message.
/// </summary>
public static TextResponse Reply(this Message message, string text)
=> new(message.User.Number, message.Service.Id, message.Id, message.ConversationId, text);
public static TextResponse Reply(this IMessage message, string text)
=> new(message.UserNumber, message.ServiceId, message.Id, message.ConversationId, text);

/// <summary>
/// Creates a text response with buttons for the message.
/// </summary>
public static TextResponse Reply(this Message message, string text, Button button1, Button? button2 = default)
=> new(message.User.Number, message.Service.Id, message.Id, message.ConversationId, text, button1, button2);
public static TextResponse Reply(this IMessage message, string text, Button button1, Button? button2 = default)
=> new(message.UserNumber, message.ServiceId, message.Id, message.ConversationId, text, button1, button2);

/// <summary>
/// Attempts to retrieve a single message from the specified collection.
Expand Down
2 changes: 1 addition & 1 deletion src/WhatsApp/ReactionResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public record ReactionResponse(string Number, string Service, string Context, st
/// <inheritdoc/>
protected override async Task<string?> SendCoreAsync(IWhatsAppClient client, CancellationToken cancellationToken = default)
{
await client.ReactAsync(Service, Number, Context, Emoji);
await client.ReactAsync(ServiceId, UserNumber, Context, Emoji);

return Ulid.NewUlid().ToString();
}
Expand Down
6 changes: 3 additions & 3 deletions src/WhatsApp/Response.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
/// Represents a response message or command that can be sent using a WhatsApp client.
/// </summary>
/// <remarks>This abstract record serves as a base type for defining specific response messages or commands that
/// can be sent to a WhatsApp client. It provides common properties such as <see cref="Number"/>, <see cref="Service"/>,
/// can be sent to a WhatsApp client. It provides common properties such as <see cref="UserNumber"/>, <see cref="ServiceId"/>,
/// <see cref="Context"/>, and <see cref="ConversationId"/>, as well as methods for sending the response
/// asynchronously.</remarks>
/// <param name="Number">The phone number of the recipient in international format.</param>
/// <param name="Service">The identifier of the service handling the message.</param>
/// <param name="ServiceId">The identifier of the service handling the message.</param>
/// <param name="Context">The unique identifier of the message to which the reaction is being sent.</param>
/// <param name="ConversationId">The conversation id where this response was generated</param>
public abstract partial record Response(string Number, string Service, string Context, string? ConversationId) : IMessage
public abstract partial record Response(string UserNumber, string ServiceId, string Context, string? ConversationId) : IMessage
{
/// <inheritdoc/>
public string Id { get; init; } = string.Empty;
Expand Down
41 changes: 26 additions & 15 deletions src/WhatsApp/RestoreConversationMessagesHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,41 @@

namespace Devlooped.WhatsApp;

class RestoreConversationMessagesHandler(IWhatsAppHandler innerHandler, IConversationService conversationService) : DelegatingWhatsAppHandler(innerHandler)
/// <summary>
/// Represents configuration options for a conversation.
/// </summary>
/// <remarks>This record is used to specify settings that control the behavior of a conversation.</remarks>
/// <param name="RestoreMessages">A value indicating whether to restore previous messages in the conversation. <see langword="true"/> to restore
/// messages; otherwise, <see langword="false"/>.</param>
public record ConversationOptions(bool RestoreMessages = true);

class RestoreConversationMessagesHandler(IWhatsAppHandler innerHandler, IConversationService conversationService, ConversationOptions options) : DelegatingWhatsAppHandler(innerHandler)
{
public override async IAsyncEnumerable<Response> HandleAsync(IEnumerable<IMessage> messages, [EnumeratorCancellation] CancellationToken cancellation = default)
{
IEnumerable<IMessage> conversation;
IEnumerable<IMessage> conversation = messages;

// Optimization to avoid creating the list when there is only 1 message to be processed
if (messages.TrySingle(out var single))
{
conversation = await conversationService.GetConversationAsync(single, cancellation).ToArrayAsync();
}
else
if (options.RestoreMessages)
{
var conversationList = new List<IMessage>();

foreach (var message in messages)
// Optimization to avoid creating the list when there is only 1 message to be processed
if (messages.TrySingle(out var single))
{
await foreach (var conversationMessage in conversationService.GetConversationAsync(message, cancellation))
conversation = await conversationService.GetConversationAsync(single, cancellation).ToArrayAsync();
}
else
{
var conversationList = new List<IMessage>();

foreach (var message in messages)
{
conversationList.Add(conversationMessage);
await foreach (var conversationMessage in conversationService.GetConversationAsync(message, cancellation))
{
conversationList.Add(conversationMessage);
}
}
}

conversation = conversationList;
conversation = conversationList;
}
}

await foreach (var response in base.HandleAsync(conversation, cancellation))
Expand Down
15 changes: 8 additions & 7 deletions src/WhatsApp/StorageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ class StorageService(CloudStorageAccount storage, IFeatureManager featureManager
{
readonly List<IMessage> EmptyList = new();

const string MessagesTableName = "messages";
const string ConversationsTableName = "conversations";
const string ActiveConversationTableName = "conversation";
const string MessagesTableName = "WhatsAppMessages";
const string ConversationsTableName = "WhatsAppConversations";

Lazy<IDocumentRepository<IMessage>> messagesRepository = new(() =>
DocumentRepository.Create<IMessage>(
storage,
MessagesTableName,
x => x.Number,
x => x.UserNumber,
x => x.Id));

Lazy<IDocumentRepository<Conversation>> conversationsRepository = new(() =>
Expand All @@ -28,18 +27,20 @@ class StorageService(CloudStorageAccount storage, IFeatureManager featureManager
Lazy<IDocumentRepository<Conversation>> activeConversationRepository = new(() =>
DocumentRepository.Create<Conversation>(
storage,
ActiveConversationTableName,
ConversationsTableName,
x => x.Number,
// We only have one active conversation by number
// NOTE: we can use the same table name since no conversation will
// ever have the ID 'active'
x => "active"));

/// <inheritdoc/>
public async Task SaveAsync(IMessage message, CancellationToken cancellationToken = default)
{
if (!string.IsNullOrEmpty(message.ConversationId) && await featureManager.IsEnabledAsync(FeatureFlags.Conversation))
{
var conversation = await conversationsRepository.Value.GetAsync(message.Number, message.ConversationId, cancellationToken) ??
new(message.Number, message.ConversationId, new(), message.Timestamp);
var conversation = await conversationsRepository.Value.GetAsync(message.UserNumber, message.ConversationId, cancellationToken) ??
new(message.UserNumber, message.ConversationId, new(), message.Timestamp);

conversation.Messages.Add(message);

Expand Down
2 changes: 1 addition & 1 deletion src/WhatsApp/TemplateResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public TemplateResponse(string Number, string Service, string Context, string? C
/// <inheritdoc/>
protected override async Task<string?> SendCoreAsync(IWhatsAppClient client, CancellationToken cancellationToken = default)
{
await client.SendTemplateAsync(Service, Number, Template, cancellationToken);
await client.SendTemplateAsync(ServiceId, UserNumber, Template, cancellationToken);

return Ulid.NewUlid().ToString();
}
Expand Down
6 changes: 3 additions & 3 deletions src/WhatsApp/TextResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ public record TextResponse(string Number, string Service, string Context, string
if (Button1 != null)
{
return (Button2 == null ?
client.ReplyAsync(Number, Service, Context, Text, Button1) :
client.ReplyAsync(Number, Service, Context, Text, Button1, Button2));
client.ReplyAsync(UserNumber, ServiceId, Context, Text, Button1) :
client.ReplyAsync(UserNumber, ServiceId, Context, Text, Button1, Button2));
}
else
{
return client.ReplyAsync(Number, Service, Context, Text);
return client.ReplyAsync(UserNumber, ServiceId, Context, Text);
}
}
}
2 changes: 1 addition & 1 deletion src/WhatsApp/WhatsAppServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static WhatsAppHandlerBuilder AddWhatsApp(
// Check if the conversation capability was enabled by getting the conversation service
if (services.GetService<IConversationService>() is IConversationService conversationService)
{
return new RestoreConversationMessagesHandler(inner, conversationService);
return new RestoreConversationMessagesHandler(inner, conversationService, services.GetService<ConversationOptions>() ?? new());
}

return new DelegatingWhatsAppHandler(inner);
Expand Down