Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Tweaked the jobs mapping handler to accept an optional timeout parame…
…ter instead of a cancellation token. This applies the timeout, if specified, to each invocation instead of as a global timeout.

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
  • Loading branch information
WhitWaldo committed Feb 26, 2025
commit 5d558b6995a24bc34892f1db9d2a32c190000cdb
12 changes: 8 additions & 4 deletions src/Dapr.Jobs/Extensions/EndpointRouteBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public static class EndpointRouteBuilderExtensions
/// <param name="action">The asynchronous action provided by the developer that handles any inbound requests. The first two
/// parameters must be a <see cref="string"/> for the jobName and the originally registered ReadOnlyMemory&lt;byte&gt; with the
/// payload value, but otherwise can be populated with additional services to be injected into the delegate.</param>
/// <param name="cancellationToken">Cancellation token that will be passed in as the last parameter to the delegate action.</param>
/// <param name="timeout">Optional timeout to apply to a per-request cancellation token.</param>
public static IEndpointRouteBuilder MapDaprScheduledJobHandler(this IEndpointRouteBuilder endpoints,
Delegate action, CancellationToken cancellationToken = default)
Delegate action, TimeSpan? timeout = null)
{
ArgumentNullException.ThrowIfNull(endpoints, nameof(endpoints));
ArgumentNullException.ThrowIfNull(action, nameof(action));
Expand All @@ -50,14 +50,18 @@ public static IEndpointRouteBuilder MapDaprScheduledJobHandler(this IEndpointRou
if (context.Request.ContentLength is > 0)
{
using var streamContent = new StreamContent(context.Request.Body);
payload = await streamContent.ReadAsByteArrayAsync(cancellationToken);
payload = await streamContent.ReadAsByteArrayAsync(CancellationToken.None);
}

using var cts = timeout.HasValue
? new CancellationTokenSource(timeout.Value)
: new CancellationTokenSource();

var parameters = new Dictionary<Type, object>
{
{ typeof(string), jobName },
{ typeof(ReadOnlyMemory<byte>), payload },
{ typeof(CancellationToken), CancellationToken.None }
{ typeof(CancellationToken), cts.Token }
};

var actionParameters = action.Method.GetParameters();
Expand Down