Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
1a376cf
Make context mockable
svegiraju-microsoft Oct 10, 2024
f51def0
Fix project
svegiraju-microsoft Oct 10, 2024
43d8ee2
Consolidated version of coverlet.msbuild, coverlet.collector, xunit, …
m3nax Jun 26, 2024
3ce4685
Make context mockable
svegiraju-microsoft Oct 10, 2024
d083483
Fix project
svegiraju-microsoft Oct 10, 2024
0653902
Added unit test to prove out concern raised on Discord
WhitWaldo Sep 13, 2024
a7c37ce
Removed unused using
WhitWaldo Sep 19, 2024
2b408c6
Added unit test to validate that headers aren't being stripped off re…
WhitWaldo Oct 7, 2024
8277fc2
Fixed spelling typo
WhitWaldo Oct 7, 2024
53d7a0c
Added fix to handle null return values
WhitWaldo Oct 6, 2024
cf9f1ca
Removed unnecessary null check
WhitWaldo Oct 11, 2024
5dcc4e5
Removed deprecated methods from DaprClient and tests as well as unuse…
WhitWaldo Sep 3, 2024
11b3aec
Removed unused (and invalid) reference
WhitWaldo Sep 3, 2024
c6eb1d8
Removed E2E workflow test as it validated DaprClient and the function…
WhitWaldo Oct 11, 2024
0b89433
Adding instance-based CreateInvokableHttpClient (#1319)
WhitWaldo Oct 14, 2024
d1b6479
Fixed security advisory updates across dependencies (transitive and d…
WhitWaldo Oct 15, 2024
b148bc4
Removes floating classes and introduces Dapr.Common project (#1365)
WhitWaldo Oct 16, 2024
614a92b
Extracted Protos out to common project (#1367)
WhitWaldo Oct 16, 2024
a8e1b79
Improvement of the dotnet-contributing files (#1330)
Shubhdeep02 Oct 16, 2024
d538b26
Support case insensitive cloudevent payloads and forward cloudevent p…
iliaspoli Oct 17, 2024
453da80
Updating actor serialization documentation (#1371)
WhitWaldo Oct 18, 2024
e07a12e
Prioritize retrieval of environment variables from IConfiguration ins…
WhitWaldo Oct 18, 2024
d4f2ff1
cleanup: Removed Serilog nuget from Directory.Packages.props (#1376)
m3nax Oct 22, 2024
01eddad
Removed sample folder (#1375)
m3nax Oct 22, 2024
0b4eeda
Remove unused variables (#1314)
RafaelJCamara Oct 24, 2024
ed86404
Remove unused using statements. (#1313)
RafaelJCamara Oct 24, 2024
d5af95c
Incremental source generator for actors (#1334)
m3nax Oct 28, 2024
03995c7
Add .NET client for Dapr Jobs API (#1384)
WhitWaldo Nov 1, 2024
d1aa92b
Updated prereqs to specify .NET 6 and .NET 8 in v1.15 (#1398)
WhitWaldo Nov 1, 2024
a06605c
Refactor DaprWorkflowClientBuilderFactory and WorkflowRuntimeOptions …
neworange-ruud Nov 4, 2024
c464294
Fix for DI registration not completing as expected (#1386)
WhitWaldo Nov 5, 2024
372d428
Add .NET client for pub/sub support - streaming subscriptions (#1381)
WhitWaldo Nov 5, 2024
a3da0aa
ci: set fail-fast to false (#1405)
mikeee Nov 12, 2024
3cbc50f
Added async operations workflow sample (#1394)
WhitWaldo Nov 12, 2024
c0a5a35
Added workflow example: Fan out/fan in (#1396)
WhitWaldo Nov 13, 2024
33d94a5
Added workflow sample: Sub-workflows (#1395)
WhitWaldo Nov 14, 2024
e04af4e
Added workflow sample: Task chaining (#1387)
WhitWaldo Nov 14, 2024
a972d2f
Added workflow sample: Monitor (#1388)
WhitWaldo Nov 14, 2024
f9c9e23
Added workflow example: External interaction (#1389)
WhitWaldo Nov 18, 2024
105f798
Optional DI lifecycle change (#1408)
WhitWaldo Nov 21, 2024
7933066
Additional lifecycle registration changes (#1410)
WhitWaldo Nov 24, 2024
072c060
Preserve comparer of the original dictionary from ConfigurationProvid…
tomhreb Nov 30, 2024
4ed3480
Update all.sln
WhitWaldo Nov 30, 2024
ed68f28
Bug/476 multiple methods per interface with JSON serialization doesn´…
paule96 Dec 3, 2024
2afde01
Support .NET 9 (#1404)
WhitWaldo Dec 4, 2024
b343ecb
update .net workflow docs to stable (#1418)
hhunter-ms Dec 4, 2024
4ed55b3
FIX: Actor source generator generates invalid code for generic interf…
m3nax Dec 5, 2024
d21a686
Add .NET client for LLM Conversations support (#1382)
WhitWaldo Dec 10, 2024
2fe08c9
Updated protos to latest in dapr/dapr (#1420)
WhitWaldo Dec 11, 2024
1e77e39
Conversation builder consistency changes (#1423)
WhitWaldo Dec 11, 2024
9d36d3e
#906 -Added methods in status API supports for saving and reading bin…
divzi-p Dec 11, 2024
dd06c48
Fixes + unit tests for streaming PubSub implementation (#1415)
WhitWaldo Dec 11, 2024
3dd8019
Fix nulls
svegiraju-microsoft Dec 20, 2024
f188fd1
Merge branch 'master' into users/svegira/make-context-mockable
siri-varma Dec 20, 2024
ce7b2d8
Delete examples/Client/PublishSubscribe/StreamingSubscriptionExample/…
siri-varma Dec 20, 2024
63bad79
Delete examples/AI/ConversationalAI/Properties/launchSettings.json
siri-varma Dec 20, 2024
57c93d2
Delete daprdocs/content/en/dotnet-sdk-docs/dotnet-ai/dotnet-ai-usage.md
siri-varma Dec 20, 2024
dd6b3c3
Update dotnet-jobs-howto.md
siri-varma Dec 20, 2024
e11bf0e
Update dotnet-jobs-howto.md
siri-varma Dec 20, 2024
0684a6c
Update dotnet-workflowclient-usage.md
siri-varma Dec 20, 2024
94763b3
Update dotnet-workflowclient-usage.md
siri-varma Dec 20, 2024
b519b62
fix thing
svegiraju-microsoft Dec 20, 2024
8f27ba7
Update WorkflowActivityContext.cs
siri-varma Dec 20, 2024
096664a
Update WorkflowActivityContext.cs
siri-varma Dec 21, 2024
dbda89d
fix things
svegiraju-microsoft Dec 21, 2024
1c4cc5b
Fix version
svegiraju-microsoft Dec 21, 2024
e3e6b0a
Update Dapr.Workflow.Test.csproj
siri-varma Dec 21, 2024
f32fb8e
fix things
svegiraju-microsoft Dec 21, 2024
b55699c
Delete examples/AI/ConversationalAI/Properties/launchSettings.json
siri-varma Dec 21, 2024
ea4ac28
Delete examples/Client/PublishSubscribe/StreamingSubscriptionExample/…
siri-varma Dec 21, 2024
df7c70c
Update WorkflowActivityContext.cs
siri-varma Dec 28, 2024
3077035
Merge branch 'master' into users/svegira/make-context-mockable
WhitWaldo Dec 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixes + unit tests for streaming PubSub implementation (#1415)
* Added null check - the proto suggests this shouldn't ever be null, but there's an issue reporting as much, so this fixes that

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Removed the Task.WhenAll making the operation non-blocking

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Added unit test to validate that the subscription is no longer blocking

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Removed unused line from previous test, added another test

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Added another test

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* More unit tests

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Added more unit tests

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Updated to make DaprPublishSubscribeClientBuilder configurable via a registered IConfiguration

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Added missing copyright statements

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Added missing package reference

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Fixed bad reference (missed in merge)

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Fixed failing unit test

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Tweak to only pass along EventMessage payloads to developers as it's expected that the initial response will be null if EventMessage is populated

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Was missing assignment of the Data property in the TopicMessage. Shout out to both @tommorvolloriddle and @Aimless321 for catching this!

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Fix - return would be bad. Continue is the right move.

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Added a simple test

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Fixed unit tests

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

* Merged in tweaks from #1422

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>

---------

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Siri Varma Vegiraju <svegiraju@microsoft.com>
  • Loading branch information
WhitWaldo authored and svegiraju-microsoft committed Dec 20, 2024
commit dd06c48c926aeb1bca762761eb60f4026fc1742e
18 changes: 18 additions & 0 deletions src/Dapr.Messaging/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Dapr.Messaging.Test, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b1f597635c44597fcecb493e2b1327033b29b1a98ac956a1a538664b68f87d45fbaada0438a15a6265e62864947cc067d8da3a7d93c5eb2fcbb850e396c8684dba74ea477d82a1bbb18932c0efb30b64ff1677f85ae833818707ac8b49ad8062ca01d2c89d8ab1843ae73e8ba9649cd28666b539444dcdee3639f95e2a099bb2")]


Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClien
/// </summary>
public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient, string? daprApiToken)
{
Client = client;
this.Client = client;
this.HttpClient = httpClient;
this.DaprApiToken = daprApiToken;
}
Expand All @@ -63,7 +63,7 @@ public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient
/// <returns></returns>
public override async Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default)
{
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, Client);
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, this.Client);
await receiver.SubscribeAsync(cancellationToken);
return receiver;
}
Expand Down
57 changes: 45 additions & 12 deletions src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable
/// </summary>
private bool isDisposed;

// Internal property for testing purposes
internal Task TopicMessagesChannelCompletion => topicMessagesChannel.Reader.Completion;
// Internal property for testing purposes
internal Task AcknowledgementsChannelCompletion => acknowledgementsChannel.Reader.Completion;

/// <summary>
/// Constructs a new instance of a <see cref="PublishSubscribeReceiver"/> instance.
/// </summary>
Expand Down Expand Up @@ -115,20 +120,40 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default

var stream = await GetStreamAsync(cancellationToken);

//Retrieve the messages from the sidecar and write to the messages channel
var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken);
//Retrieve the messages from the sidecar and write to the messages channel - start without awaiting so this isn't blocking
_ = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken)
.ContinueWith(HandleTaskCompletion, null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted,
TaskScheduler.Default);

//Process the messages as they're written to either channel
var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken);
var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken);
_ = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken).ContinueWith(HandleTaskCompletion,
null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
_ = ProcessTopicChannelMessagesAsync(cancellationToken).ContinueWith(HandleTaskCompletion, null,
cancellationToken,
TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
}

try
{
await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask);
}
catch (OperationCanceledException)
/// <summary>
/// Exposed for testing purposes only.
/// </summary>
/// <param name="message">The test message to write.</param>
internal async Task WriteMessageToChannelAsync(TopicMessage message)
{
await topicMessagesChannel.Writer.WriteAsync(message);
}

//Exposed for testing purposes only
internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement acknowledgement)
{
await acknowledgementsChannel.Writer.WriteAsync(acknowledgement);
}

//Exposed for testing purposes only
internal static void HandleTaskCompletion(Task task, object? state)
{
if (task.Exception != null)
{
// Will be cleaned up during DisposeAsync
throw task.Exception;
}
}

Expand Down Expand Up @@ -251,13 +276,21 @@ await stream.RequestStream.WriteAsync(
//Each time a message is received from the stream, push it into the topic messages channel
await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken))
{
//https://github.com/dapr/dotnet-sdk/issues/1412 reports that this is sometimes null
//Skip the initial response - we only want to pass along TopicMessage payloads to developers
if (response?.EventMessage is null)
{
continue;
}

var message =
new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type,
response.EventMessage.SpecVersion, response.EventMessage.DataContentType,
response.EventMessage.Topic, response.EventMessage.PubsubName)
{
Path = response.EventMessage.Path,
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value)
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value),
Data = response.EventMessage.Data.ToByteArray()
};

try
Expand Down Expand Up @@ -308,6 +341,6 @@ public async ValueTask DisposeAsync()
/// </summary>
/// <param name="MessageId">The identifier of the message.</param>
/// <param name="Action">The action to take on the message in the acknowledgement request.</param>
private sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
internal sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
}

1 change: 1 addition & 0 deletions test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<PackageReference Include="Grpc.Net.Client" />
<PackageReference Include="protobuf-net.Grpc.AspNetCore" />
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,90 @@
using Dapr.Messaging.PublishSubscribe;
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Messaging.PublishSubscribe;
using Dapr.Messaging.PublishSubscribe.Extensions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Moq;

namespace Dapr.Messaging.Test.Extensions;

public sealed class PublishSubscribeServiceCollectionExtensionsTests
{
[Fact]
public void AddDaprPubSubClient_RegistersIHttpClientFactory()
public void AddDaprMessagingClient_FromIConfiguration()
{
const string apiToken = "abc123";
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
{"DAPR_API_TOKEN", apiToken }
})
.Build();

var services = new ServiceCollection();

services.AddSingleton<IConfiguration>(configuration);

services.AddDaprPubSubClient();

var app = services.BuildServiceProvider();

var pubSubClient = app.GetRequiredService<DaprPublishSubscribeClient>() as DaprPublishSubscribeGrpcClient;

Assert.NotNull(pubSubClient!);
Assert.Equal(apiToken, pubSubClient.DaprApiToken);
}

[Fact]
public void AddDaprPubSubClient_RegistersIHttpClientFactory()
{
var services = new ServiceCollection();
services.AddDaprPubSubClient();

var serviceProvider = services.BuildServiceProvider();
var daprClient = serviceProvider.GetService<DaprPublishSubscribeClient>();
Assert.NotNull(daprClient);
}

[Fact]
public void AddDaprPubSubClient_CallsConfigureAction()
{
var services = new ServiceCollection();

var configureCalled = false;

services.AddDaprPubSubClient(Configure);

var serviceProvider = services.BuildServiceProvider();
var daprClient = serviceProvider.GetService<DaprPublishSubscribeClient>();
Assert.NotNull(daprClient);
Assert.True(configureCalled);
return;

void Configure(IServiceProvider sp, DaprPublishSubscribeClientBuilder builder)
{
configureCalled = true;
}
}

[Fact]
public void AddDaprPubSubClient_RegistersServicesCorrectly()
{
var services = new ServiceCollection();
services.AddDaprPubSubClient();
var serviceProvider = services.BuildServiceProvider();

var httpClientFactory = serviceProvider.GetService<IHttpClientFactory>();
Assert.NotNull(httpClientFactory);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
using Dapr.Messaging.PublishSubscribe;
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Messaging.PublishSubscribe;

namespace Dapr.Messaging.Test.PublishSubscribe
{
Expand Down
Loading