Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/NATS.Client.JetStream/NatsJSTelemetryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,24 @@ public static class NatsJSTelemetryExtensions
parentContext: msg.Headers.GetActivityContext(),
tags: tags);
}

/// <summary>Start an activity under the INatsJSMsg associated activity.</summary>
/// <param name="msg">Nats message</param>
/// <param name="name">Name of new activity</param>
/// <param name="tags">Optional tags to add to the activity</param>
/// <returns>Returns an <see cref="Activity"/> or null if no listeners.</returns>
public static Activity? StartActivity<T>(
this INatsJSMsg<T> msg,
[CallerMemberName] string name = "",
IEnumerable<KeyValuePair<string, object?>>? tags = null)
{
if (!Telemetry.HasListeners())
return null;

return Telemetry.NatsActivities.StartActivity(
name,
kind: ActivityKind.Internal,
parentContext: msg.Headers.GetActivityContext(),
tags: tags);
}
}
45 changes: 45 additions & 0 deletions tests/NATS.Net.OpenTelemetry.Tests/OpenTelemetryTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Diagnostics;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Client.Platform.Windows.Tests;

namespace NATS.Client.Core.Tests;
Expand Down Expand Up @@ -28,6 +30,49 @@ public async Task Publish_subscribe_activities()
AssertActivityData("foo", activities);
}

[Fact]
public async Task JetStream_consume_start_activity_with_interface()
{
var activities = new List<Activity>();
using var activityListener = StartActivityListener(activities);
await using var server = await NatsServerProcess.StartAsync();
await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });

var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await js.CreateStreamAsync(new StreamConfig { Name = "test-stream", Subjects = ["test.>"] }, cts.Token);
await js.CreateOrUpdateConsumerAsync("test-stream", new ConsumerConfig("test-consumer"), cts.Token);

await js.PublishAsync("test.subject", "test-message", cancellationToken: cts.Token);

var consumer = await js.GetConsumerAsync("test-stream", "test-consumer", cts.Token);

await foreach (var msg in consumer.ConsumeAsync<string>(cancellationToken: cts.Token))
{
// Test StartActivity on INatsJSMsg<T> interface (the fix for issue #1026)
using var activity = msg.StartActivity("test.consume");
Assert.NotNull(activity);
Assert.Equal("test.consume", activity.OperationName);

await msg.AckAsync(cancellationToken: cts.Token);
break;
}

// Verify the publish activity was recorded
var publishActivity = activities.FirstOrDefault(x => x.OperationName == "test.subject publish");
Assert.NotNull(publishActivity);

// Verify our custom activity was recorded
var consumeActivity = activities.FirstOrDefault(x => x.OperationName == "test.consume");
Assert.NotNull(consumeActivity);
Assert.Equal(ActivityKind.Internal, consumeActivity.Kind);

// Verify the parent relationship (consume activity should have publish as parent via trace context)
Assert.Equal(publishActivity.TraceId, consumeActivity.TraceId);
}

private static ActivityListener StartActivityListener(List<Activity> activities)
{
var activityListener = new ActivityListener();
Expand Down
Loading