diff --git a/src/NATS.Client.JetStream/NatsJSTelemetryExtensions.cs b/src/NATS.Client.JetStream/NatsJSTelemetryExtensions.cs
index 0860bd036..d9b48460a 100644
--- a/src/NATS.Client.JetStream/NatsJSTelemetryExtensions.cs
+++ b/src/NATS.Client.JetStream/NatsJSTelemetryExtensions.cs
@@ -26,4 +26,24 @@ public static class NatsJSTelemetryExtensions
parentContext: msg.Headers.GetActivityContext(),
tags: tags);
}
+
+ /// Start an activity under the INatsJSMsg associated activity.
+ /// Nats message
+ /// Name of new activity
+ /// Optional tags to add to the activity
+ /// Returns an or null if no listeners.
+ public static Activity? StartActivity(
+ this INatsJSMsg msg,
+ [CallerMemberName] string name = "",
+ IEnumerable>? tags = null)
+ {
+ if (!Telemetry.HasListeners())
+ return null;
+
+ return Telemetry.NatsActivities.StartActivity(
+ name,
+ kind: ActivityKind.Internal,
+ parentContext: msg.Headers.GetActivityContext(),
+ tags: tags);
+ }
}
diff --git a/tests/NATS.Net.OpenTelemetry.Tests/OpenTelemetryTest.cs b/tests/NATS.Net.OpenTelemetry.Tests/OpenTelemetryTest.cs
index 494467f38..c7d05e924 100644
--- a/tests/NATS.Net.OpenTelemetry.Tests/OpenTelemetryTest.cs
+++ b/tests/NATS.Net.OpenTelemetry.Tests/OpenTelemetryTest.cs
@@ -1,4 +1,6 @@
using System.Diagnostics;
+using NATS.Client.JetStream;
+using NATS.Client.JetStream.Models;
using NATS.Client.Platform.Windows.Tests;
namespace NATS.Client.Core.Tests;
@@ -28,6 +30,49 @@ public async Task Publish_subscribe_activities()
AssertActivityData("foo", activities);
}
+ [Fact]
+ public async Task JetStream_consume_start_activity_with_interface()
+ {
+ var activities = new List();
+ using var activityListener = StartActivityListener(activities);
+ await using var server = await NatsServerProcess.StartAsync();
+ await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
+
+ var js = new NatsJSContext(nats);
+
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+
+ await js.CreateStreamAsync(new StreamConfig { Name = "test-stream", Subjects = ["test.>"] }, cts.Token);
+ await js.CreateOrUpdateConsumerAsync("test-stream", new ConsumerConfig("test-consumer"), cts.Token);
+
+ await js.PublishAsync("test.subject", "test-message", cancellationToken: cts.Token);
+
+ var consumer = await js.GetConsumerAsync("test-stream", "test-consumer", cts.Token);
+
+ await foreach (var msg in consumer.ConsumeAsync(cancellationToken: cts.Token))
+ {
+ // Test StartActivity on INatsJSMsg interface (the fix for issue #1026)
+ using var activity = msg.StartActivity("test.consume");
+ Assert.NotNull(activity);
+ Assert.Equal("test.consume", activity.OperationName);
+
+ await msg.AckAsync(cancellationToken: cts.Token);
+ break;
+ }
+
+ // Verify the publish activity was recorded
+ var publishActivity = activities.FirstOrDefault(x => x.OperationName == "test.subject publish");
+ Assert.NotNull(publishActivity);
+
+ // Verify our custom activity was recorded
+ var consumeActivity = activities.FirstOrDefault(x => x.OperationName == "test.consume");
+ Assert.NotNull(consumeActivity);
+ Assert.Equal(ActivityKind.Internal, consumeActivity.Kind);
+
+ // Verify the parent relationship (consume activity should have publish as parent via trace context)
+ Assert.Equal(publishActivity.TraceId, consumeActivity.TraceId);
+ }
+
private static ActivityListener StartActivityListener(List activities)
{
var activityListener = new ActivityListener();