From 6411e6aa3bac5ccd620fffc28f41b648c4fb2dcc Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 5 Jan 2026 14:02:53 +0000 Subject: [PATCH] Fix OTel INatsJSMsg extension --- .../NatsJSTelemetryExtensions.cs | 20 +++++++++ .../OpenTelemetryTest.cs | 45 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/src/NATS.Client.JetStream/NatsJSTelemetryExtensions.cs b/src/NATS.Client.JetStream/NatsJSTelemetryExtensions.cs index 0860bd036..d9b48460a 100644 --- a/src/NATS.Client.JetStream/NatsJSTelemetryExtensions.cs +++ b/src/NATS.Client.JetStream/NatsJSTelemetryExtensions.cs @@ -26,4 +26,24 @@ public static class NatsJSTelemetryExtensions parentContext: msg.Headers.GetActivityContext(), tags: tags); } + + /// Start an activity under the INatsJSMsg associated activity. + /// Nats message + /// Name of new activity + /// Optional tags to add to the activity + /// Returns an or null if no listeners. + public static Activity? StartActivity( + this INatsJSMsg msg, + [CallerMemberName] string name = "", + IEnumerable>? tags = null) + { + if (!Telemetry.HasListeners()) + return null; + + return Telemetry.NatsActivities.StartActivity( + name, + kind: ActivityKind.Internal, + parentContext: msg.Headers.GetActivityContext(), + tags: tags); + } } diff --git a/tests/NATS.Net.OpenTelemetry.Tests/OpenTelemetryTest.cs b/tests/NATS.Net.OpenTelemetry.Tests/OpenTelemetryTest.cs index 494467f38..c7d05e924 100644 --- a/tests/NATS.Net.OpenTelemetry.Tests/OpenTelemetryTest.cs +++ b/tests/NATS.Net.OpenTelemetry.Tests/OpenTelemetryTest.cs @@ -1,4 +1,6 @@ using System.Diagnostics; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; using NATS.Client.Platform.Windows.Tests; namespace NATS.Client.Core.Tests; @@ -28,6 +30,49 @@ public async Task Publish_subscribe_activities() AssertActivityData("foo", activities); } + [Fact] + public async Task JetStream_consume_start_activity_with_interface() + { + var activities = new List(); + using var activityListener = StartActivityListener(activities); + await using var server = await NatsServerProcess.StartAsync(); + await using var nats = new NatsConnection(new NatsOpts { Url = server.Url }); + + var js = new NatsJSContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await js.CreateStreamAsync(new StreamConfig { Name = "test-stream", Subjects = ["test.>"] }, cts.Token); + await js.CreateOrUpdateConsumerAsync("test-stream", new ConsumerConfig("test-consumer"), cts.Token); + + await js.PublishAsync("test.subject", "test-message", cancellationToken: cts.Token); + + var consumer = await js.GetConsumerAsync("test-stream", "test-consumer", cts.Token); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: cts.Token)) + { + // Test StartActivity on INatsJSMsg interface (the fix for issue #1026) + using var activity = msg.StartActivity("test.consume"); + Assert.NotNull(activity); + Assert.Equal("test.consume", activity.OperationName); + + await msg.AckAsync(cancellationToken: cts.Token); + break; + } + + // Verify the publish activity was recorded + var publishActivity = activities.FirstOrDefault(x => x.OperationName == "test.subject publish"); + Assert.NotNull(publishActivity); + + // Verify our custom activity was recorded + var consumeActivity = activities.FirstOrDefault(x => x.OperationName == "test.consume"); + Assert.NotNull(consumeActivity); + Assert.Equal(ActivityKind.Internal, consumeActivity.Kind); + + // Verify the parent relationship (consume activity should have publish as parent via trace context) + Assert.Equal(publishActivity.TraceId, consumeActivity.TraceId); + } + private static ActivityListener StartActivityListener(List activities) { var activityListener = new ActivityListener();