Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Management;
using Elsa.Workflows.Runtime.Models;
using Elsa.Workflows.Runtime.Notifications;

namespace Elsa.Workflows.Runtime;

/// <inheritdoc />
public class DefaultRegistriesPopulator(IWorkflowDefinitionStorePopulator workflowDefinitionStorePopulator, IActivityRegistryPopulator activityRegistryPopulator) : IRegistriesPopulator
public class DefaultRegistriesPopulator(
IWorkflowDefinitionStorePopulator workflowDefinitionStorePopulator,
IActivityRegistryPopulator activityRegistryPopulator,
INotificationSender notificationSender) : IRegistriesPopulator
{
/// <inheritdoc />
public async Task PopulateAsync(CancellationToken cancellationToken = default)
Expand All @@ -21,6 +27,11 @@ public async Task PopulateAsync(CancellationToken cancellationToken = default)

// Stage 4. Re-update the workflow definition store with the current set of activities.
// Finally, we need to re-populate the workflow definition store to make sure that the workflow definitions are up-to-date.
await workflowDefinitionStorePopulator.PopulateStoreAsync(true, cancellationToken);
var workflowDefinitions = await workflowDefinitionStorePopulator.PopulateStoreAsync(true, cancellationToken);

// Stage 5: Publish a notification that the workflow definitions have been reloaded. This ensures other replicated nodes can update their activity registry.
var reloadedWorkflowDefinitions = workflowDefinitions.Select(ReloadedWorkflowDefinition.FromDefinition).ToList();
var notification = new WorkflowDefinitionsReloaded(reloadedWorkflowDefinitions);
await notificationSender.SendAsync(notification, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Management;
using Elsa.Workflows.Management.Entities;
using Elsa.Workflows.Runtime.Notifications;
using NSubstitute;

namespace Elsa.Workflows.Runtime.UnitTests.Services;

public class DefaultRegistriesPopulatorTests
{
private readonly IWorkflowDefinitionStorePopulator _workflowDefinitionStorePopulatorMock;
private readonly INotificationSender _notificationSenderMock;
private readonly DefaultRegistriesPopulator _populator;

public DefaultRegistriesPopulatorTests()
{
_workflowDefinitionStorePopulatorMock = Substitute.For<IWorkflowDefinitionStorePopulator>();
var activityRegistryPopulatorMock = Substitute.For<IActivityRegistryPopulator>();
_notificationSenderMock = Substitute.For<INotificationSender>();

_populator = new(
_workflowDefinitionStorePopulatorMock,
activityRegistryPopulatorMock,
_notificationSenderMock);
}

[Fact(DisplayName = "PopulateAsync publishes WorkflowDefinitionsReloaded notification")]
public async Task PopulateAsync_PublishesWorkflowDefinitionsReloadedNotification()
{
// Arrange
var workflowDefinitions = new List<WorkflowDefinition>
{
new()
{
Id = "1",
DefinitionId = "def-1",
Version = 1
},
new()
{
Id = "2",
DefinitionId = "def-2",
Version = 1
}
};

_workflowDefinitionStorePopulatorMock
.PopulateStoreAsync(true, Arg.Any<CancellationToken>())
.Returns(workflowDefinitions);

// Act
await _populator.PopulateAsync();

// Assert
await _notificationSenderMock.Received(1).SendAsync(
Arg.Is<WorkflowDefinitionsReloaded>(n => n.ReloadedWorkflowDefinitions.Count == 2),
Arg.Any<CancellationToken>());
}
}
Loading